<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          用 MySQL 實現(xiàn)分布式鎖,你聽過嗎?

          共 2338字,需瀏覽 5分鐘

           ·

          2021-10-22 08:31


          點擊上方藍色字體,選擇“設為星標”


          回復”學習資料“獲取學習寶典



          來源 |?blog.csdn.net/linsongbin1/article/details/79444274


          概述

          以前參加過一個庫存系統(tǒng),由于其業(yè)務復雜性,搞了很多個應用來支撐。這樣的話一份庫存數(shù)據(jù)就有可能同時有多個應用來修改庫存數(shù)據(jù)。比如說,有定時任務域xx.cron,和SystemA域和SystemB域這幾個JAVA應用,可能同時修改同一份庫存數(shù)據(jù)。如果不做協(xié)調(diào)的話,就會有臟數(shù)據(jù)出現(xiàn)。對于跨JAVA進程的線程協(xié)調(diào),可以借助外部環(huán)境,例如DB或者Redis。下文介紹一下如何使用DB來實現(xiàn)分布式鎖。

          設計

          本文設計的分布式鎖的交互方式如下:1、根據(jù)業(yè)務字段生成transaction_id,并線程安全的創(chuàng)建鎖資源 2、根據(jù)transaction_id申請鎖 3、釋放鎖

          動態(tài)創(chuàng)建鎖資源

          在使用synchronized關鍵字的時候,必須指定一個鎖對象。

          synchronized(obj)?{

          }?

          進程內(nèi)的線程可以基于obj來實現(xiàn)同步。obj在這里可以理解為一個鎖對象。如果線程要進入synchronized代碼塊里,必須先持有obj對象上的鎖。這種鎖是JAVA里面的內(nèi)置鎖,創(chuàng)建的過程是線程安全的。那么借助DB,如何保證創(chuàng)建鎖的過程是線程安全的呢?可以利用DB中的UNIQUE KEY特性,一旦出現(xiàn)了重復的key,由于UNIQUE KEY的唯一性,會拋出異常的。在JAVA里面,是SQLIntegrityConstraintViolationException異常。

          create?table?distributed_lock
          (
          ?id?BIGINT?UNSIGNED?PRIMARY?KEY?AUTO_INCREMENT?COMMENT?'自增主鍵',
          ?transaction_id?varchar(128)?NOT?NULL?DEFAULT?''?COMMENT?'事務id',
          ?last_update_time?TIMESTAMP?DEFAULT?CURRENT_TIMESTAMP?ON?UPDATE?CURRENT_TIMESTAMP?NOT?NULL?COMMENT?'最后更新時間',
          ?create_time?TIMESTAMP?DEFAULT?'0000-00-00?00:00:00'?NOT?NULL?COMMENT?'創(chuàng)建時間',
          ?UNIQUE?KEY?`idx_transaction_id`?(`transaction_id`)
          )

          transaction_id是事務Id,比如說,可以用

          倉庫 + 條碼 + 銷售模式

          來組裝一個transaction_id,表示某倉庫某銷售模式下的某個條碼資源。不同條碼,當然就有不同的transaction_id。如果有兩個應用,拿著相同的transaction_id來創(chuàng)建鎖資源的時候,只能有一個應用創(chuàng)建成功。

          一條distributed_lock記錄插入成功了,就表示一份鎖資源創(chuàng)建成功了。

          DB連接池列表設計

          在寫操作頻繁的業(yè)務系統(tǒng)中,通常會進行分庫,以降低單數(shù)據(jù)庫寫入的壓力,并提高寫操作的吞吐量。如果使用了分庫,那么業(yè)務數(shù)據(jù)自然也都分配到各個數(shù)據(jù)庫上了。在這種水平切分的多數(shù)據(jù)庫上使用DB分布式鎖,可以自定義一個DataSouce列表。并暴露一個getConnection(String transactionId)方法,按照transactionId找到對應的Connection

          實現(xiàn)代碼如下:

          package?dlock;

          import?com.alibaba.druid.pool.DruidDataSource;
          import?org.springframework.stereotype.Component;

          import?javax.annotation.PostConstruct;
          import?java.io.FileInputStream;
          import?java.io.IOException;
          import?java.sql.Connection;
          import?java.util.ArrayList;
          import?java.util.List;
          import?java.util.Properties;

          @Component
          public?class?DataSourcePool?{
          ????private?List?dlockDataSources?=?new?ArrayList<>();

          ????@PostConstruct
          ????private?void?initDataSourceList()?throws?IOException?{
          ????????Properties?properties?=?new?Properties();
          ????????FileInputStream?fis?=?new?FileInputStream("db.properties");
          ????????properties.load(fis);

          ????????Integer?lockNum?=?Integer.valueOf(properties.getProperty("DLOCK_NUM"));
          ????????for?(int?i?=?0;?i?????????????String?user?=?properties.getProperty("DLOCK_USER_"?+?i);
          ????????????String?password?=?properties.getProperty("DLOCK_PASS_"?+?i);
          ????????????Integer?initSize?=?Integer.valueOf(properties.getProperty("DLOCK_INIT_SIZE_"?+?i));
          ????????????Integer?maxSize?=?Integer.valueOf(properties.getProperty("DLOCK_MAX_SIZE_"?+?i));
          ????????????String?url?=?properties.getProperty("DLOCK_URL_"?+?i);

          ????????????DruidDataSource?dataSource?=?createDataSource(user,password,initSize,maxSize,url);
          ????????????dlockDataSources.add(dataSource);
          ????????}
          ????}

          ????private?DruidDataSource?createDataSource(String?user,?String?password,?Integer?initSize,?Integer?maxSize,?String?url)?{
          ????????DruidDataSource?dataSource?=?new?DruidDataSource();
          ????????dataSource.setDriverClassName("com.mysql.jdbc.Driver");
          ????????dataSource.setUsername(user);
          ????????dataSource.setPassword(password);
          ????????dataSource.setUrl(url);
          ????????dataSource.setInitialSize(initSize);
          ????????dataSource.setMaxActive(maxSize);

          ????????return?dataSource;
          ????}

          ????public?Connection?getConnection(String?transactionId)?throws?Exception?{
          ????????if?(dlockDataSources.size()?<=?0)?{
          ????????????return?null;
          ????????}

          ????????if?(transactionId?==?null?||?"".equals(transactionId))?{
          ????????????throw?new?RuntimeException("transactionId是必須的");
          ????????}

          ????????int?hascode?=?transactionId.hashCode();
          ????????if?(hascode?0)?{
          ????????????hascode?=?-?hascode;
          ????????}

          ????????return?dlockDataSources.get(hascode?%?dlockDataSources.size()).getConnection();
          ????}
          }

          首先編寫一個initDataSourceList方法,并利用Spring的PostConstruct注解初始化一個DataSource 列表。相關的DB配置從db.properties讀取。

          DLOCK_NUM=2

          DLOCK_USER_0="user1"
          DLOCK_PASS_0="pass1"
          DLOCK_INIT_SIZE_0=2
          DLOCK_MAX_SIZE_0=10
          DLOCK_URL_0="jdbc:mysql://localhost:3306/test1"

          DLOCK_USER_1="user1"
          DLOCK_PASS_1="pass1"
          DLOCK_INIT_SIZE_1=2
          DLOCK_MAX_SIZE_1=10
          DLOCK_URL_1="jdbc:mysql://localhost:3306/test2"

          DataSource使用阿里的DruidDataSource

          接著最重要的一個實現(xiàn)getConnection(String transactionId)方法。實現(xiàn)原理很簡單,獲取transactionId的hashcode,并對DataSource的長度取模即可。

          連接池列表設計好后,就可以實現(xiàn)往distributed_lock表插入數(shù)據(jù)了。

          package?dlock;

          import?org.springframework.beans.factory.annotation.Autowired;
          import?org.springframework.stereotype.Component;

          import?java.sql.*;

          @Component
          public?class?DistributedLock?{

          ????@Autowired
          ????private?DataSourcePool?dataSourcePool;

          ????/**
          ?????*?根據(jù)transactionId創(chuàng)建鎖資源
          ?????*/
          ????public?String?createLock(String?transactionId)?throws?Exception{
          ????????if?(transactionId?==?null)?{
          ????????????throw?new?RuntimeException("transactionId是必須的");
          ????????}
          ????????Connection?connection?=?null;
          ????????Statement?statement?=?null;
          ????????try?{
          ????????????connection?=?dataSourcePool.getConnection(transactionId);
          ????????????connection.setAutoCommit(false);
          ????????????statement?=?connection.createStatement();
          ????????????statement.executeUpdate("INSERT?INTO?distributed_lock(transaction_id)?VALUES?('"?+?transactionId?+?"')");
          ????????????connection.commit();
          ????????????return?transactionId;
          ????????}
          ????????catch?(SQLIntegrityConstraintViolationException?icv)?{
          ????????????//說明已經(jīng)生成過了。
          ????????????if?(connection?!=?null)?{
          ????????????????connection.rollback();
          ????????????}
          ????????????return?transactionId;
          ????????}
          ????????catch?(Exception?e)?{
          ????????????if?(connection?!=?null)?{
          ????????????????connection.rollback();
          ????????????}
          ????????????throw??e;
          ????????}
          ????????finally?{
          ????????????if?(statement?!=?null)?{
          ????????????????statement.close();
          ????????????}

          ????????????if?(connection?!=?null)?{
          ????????????????connection.close();
          ????????????}
          ????????}
          ????}
          }

          根據(jù)transactionId鎖住線程

          接下來利用DB的select for update特性來鎖住線程。當多個線程根據(jù)相同的transactionId并發(fā)同時操作select for update的時候,只有一個線程能成功,其他線程都block住,直到select for update成功的線程使用commit操作后,block住的所有線程的其中一個線程才能開始干活。我們在上面的DistributedLock類中創(chuàng)建一個lock方法。

          ?public?boolean?lock(String?transactionId)?throws?Exception?{
          ????????Connection?connection?=?null;
          ????????PreparedStatement?preparedStatement?=?null;
          ????????ResultSet?resultSet?=?null;
          ????????try?{
          ????????????connection?=?dataSourcePool.getConnection(transactionId);
          ????????????preparedStatement?=?connection.prepareStatement("SELECT?*?FROM?distributed_lock?WHERE?transaction_id?=???FOR?UPDATE?");
          ????????????preparedStatement.setString(1,transactionId);
          ????????????resultSet?=?preparedStatement.executeQuery();
          ????????????if?(!resultSet.next())?{
          ????????????????connection.rollback();
          ????????????????return?false;
          ????????????}
          ????????????return?true;
          ????????}?catch?(Exception?e)?{
          ????????????if?(connection?!=?null)?{
          ????????????????connection.rollback();
          ????????????}
          ????????????throw??e;
          ????????}
          ????????finally?{
          ????????????if?(preparedStatement?!=?null)?{
          ????????????????preparedStatement.close();
          ????????????}

          ????????????if?(resultSet?!=?null)?{
          ????????????????resultSet.close();
          ????????????}

          ????????????if?(connection?!=?null)?{
          ????????????????connection.close();
          ????????????}
          ????????}
          ????}

          實現(xiàn)解鎖操作

          當線程執(zhí)行完任務后,必須手動的執(zhí)行解鎖操作,之前被鎖住的線程才能繼續(xù)干活。在我們上面的實現(xiàn)中,其實就是獲取到當時select for update成功的線程對應的Connection,并實行commit操作即可。

          那么如何獲取到呢?我們可以利用ThreadLocal。首先在DistributedLock類中定義

          private?ThreadLocal?threadLocalConn?=?new?ThreadLocal<>();

          每次調(diào)用lock方法的時候,把Connection放置到ThreadLocal里面。我們修改lock方法。

          ?public?boolean?lock(String?transactionId)?throws?Exception?{
          ????????Connection?connection?=?null;
          ????????PreparedStatement?preparedStatement?=?null;
          ????????ResultSet?resultSet?=?null;
          ????????try?{
          ????????????connection?=?dataSourcePool.getConnection(transactionId);
          ????????????threadLocalConn.set(connection);
          ????????????preparedStatement?=?connection.prepareStatement("SELECT?*?FROM?distributed_lock?WHERE?transaction_id?=???FOR?UPDATE?");
          ????????????preparedStatement.setString(1,transactionId);
          ????????????resultSet?=?preparedStatement.executeQuery();
          ????????????if?(!resultSet.next())?{
          ????????????????connection.rollback();
          ????????????????threadLocalConn.remove();
          ????????????????return?false;
          ????????????}
          ????????????return?true;
          ????????}?catch?(Exception?e)?{
          ????????????if?(connection?!=?null)?{
          ????????????????connection.rollback();
          ????????????????threadLocalConn.remove();
          ????????????}
          ????????????throw??e;
          ????????}
          ????????finally?{
          ????????????if?(preparedStatement?!=?null)?{
          ????????????????preparedStatement.close();
          ????????????}

          ????????????if?(resultSet?!=?null)?{
          ????????????????resultSet.close();
          ????????????}

          ????????????if?(connection?!=?null)?{
          ????????????????connection.close();
          ????????????}
          ????????}
          ????}

          這樣子,當獲取到Connection后,將其設置到ThreadLocal中,如果lock方法出現(xiàn)異常,則將其從ThreadLocal中移除掉。

          有了這幾步后,我們可以來實現(xiàn)解鎖操作了。我們在DistributedLock添加一個unlock方法。

          ?public?void?unlock()?throws?Exception?{
          ????????Connection?connection?=?null;
          ????????try?{
          ????????????connection?=?threadLocalConn.get();
          ????????????if?(!connection.isClosed())?{
          ????????????????connection.commit();
          ????????????????connection.close();
          ????????????????threadLocalConn.remove();
          ????????????}
          ????????}?catch?(Exception?e)?{
          ????????????if?(connection?!=?null)?{
          ????????????????connection.rollback();
          ????????????????connection.close();
          ????????????}
          ????????????threadLocalConn.remove();
          ????????????throw?e;
          ????????}
          ????}

          缺點

          畢竟是利用DB來實現(xiàn)分布式鎖,對DB還是造成一定的壓力。當時考慮使用DB做分布式的一個重要原因是,我們的應用是后端應用,平時流量不大的,反而關鍵的是要保證庫存數(shù)據(jù)的正確性。對于像前端庫存系統(tǒng),比如添加購物車占用庫存等操作,最好別使用DB來實現(xiàn)分布式鎖了。

          進一步思考

          如果想鎖住多份數(shù)據(jù)該怎么實現(xiàn)?比如說,某個庫存操作,既要修改物理庫存,又要修改虛擬庫存,想鎖住物理庫存的同時,又鎖住虛擬庫存。其實也不是很難,參考lock方法,寫一個multiLock方法,提供多個transactionId的入?yún)ⅲ琭or循環(huán)處理就可以了。這個后續(xù)有時間再補上。

          END



          后臺回復?學習資料?領取學習視頻



          瀏覽 53
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲男人天堂2025 | 天天日天天撸天天干 | 人人干人人摸人人爱 | 精品国产a∨一区天美传媒 | re在线精品视频 |