<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          用 MySQL 實現(xiàn)一個分布式鎖,這也太強了...

          共 1920字,需瀏覽 4分鐘

           ·

          2021-11-03 22:30

          點擊“藍(lán)字”,關(guān)注,置頂公眾號

          每日技術(shù)干貨,第一時間送達!

          以前參加過一個庫存系統(tǒng),由于其業(yè)務(wù)復(fù)雜性,搞了很多個應(yīng)用來支撐。這樣的話一份庫存數(shù)據(jù)就有可能同時有多個應(yīng)用來修改庫存數(shù)據(jù)。

          比如說,有定時任務(wù)域xx.cron,和SystemA域和SystemB域這幾個JAVA應(yīng)用,可能同時修改同一份庫存數(shù)據(jù)。如果不做協(xié)調(diào)的話,就會有臟數(shù)據(jù)出現(xiàn)。

          對于跨JAVA進程的線程協(xié)調(diào),可以借助外部環(huán)境,例如DB或者Redis。

          下文介紹一下如何使用DB來實現(xiàn)分布式鎖。

          設(shè)計

          本文設(shè)計的分布式鎖的交互方式如下:

          1、根據(jù)業(yè)務(wù)字段生成transaction_id,并線程安全的創(chuàng)建鎖資源

          2、根據(jù)transaction_id申請鎖

          3、釋放鎖

          動態(tài)創(chuàng)建鎖資源

          在使用synchronized關(guān)鍵字的時候,必須指定一個鎖對象。

          synchronized(obj)?{
          ...
          }?

          進程內(nèi)的線程可以基于obj來實現(xiàn)同步。obj在這里可以理解為一個鎖對象。如果線程要進入synchronized代碼塊里,必須先持有obj對象上的鎖。這種鎖是JAVA里面的內(nèi)置鎖,創(chuàng)建的過程是線程安全的。

          那么借助DB,如何保證創(chuàng)建鎖的過程是線程安全的呢?可以利用DB中的UNIQUE KEY特性,一旦出現(xiàn)了重復(fù)的key,由于UNIQUE KEY的唯一性,會拋出異常的。

          在JAVA里面,是SQLIntegrityConstraintViolationException異常。

          create?table?distributed_lock
          (
          ?id?BIGINT?UNSIGNED?PRIMARY?KEY?AUTO_INCREMENT?COMMENT?'自增主鍵',
          ?transaction_id?varchar(128)?NOT?NULL?DEFAULT?''?COMMENT?'事務(wù)id',
          ?last_update_time?TIMESTAMP?DEFAULT?CURRENT_TIMESTAMP?ON?UPDATE?CURRENT_TIMESTAMP?NOT?NULL?COMMENT?'最后更新時間',
          ?create_time?TIMESTAMP?DEFAULT?'0000-00-00?00:00:00'?NOT?NULL?COMMENT?'創(chuàng)建時間',
          ?UNIQUE?KEY?`idx_transaction_id`?(`transaction_id`)
          )

          transaction_id是事務(wù)Id,比如說,可以用

          倉庫 + 條碼 + 銷售模式

          來組裝一個transaction_id,表示某倉庫某銷售模式下的某個條碼資源。不同條碼,當(dāng)然就有不同的transaction_id。如果有兩個應(yīng)用,拿著相同的transaction_id來創(chuàng)建鎖資源的時候,只能有一個應(yīng)用創(chuàng)建成功。

          一條distributed_lock記錄插入成功了,就表示一份鎖資源創(chuàng)建成功了。

          DB連接池列表設(shè)計

          在寫操作頻繁的業(yè)務(wù)系統(tǒng)中,通常會進行分庫,以降低單數(shù)據(jù)庫寫入的壓力,并提高寫操作的吞吐量。

          如果使用了分庫,那么業(yè)務(wù)數(shù)據(jù)自然也都分配到各個數(shù)據(jù)庫上了。在這種水平切分的多數(shù)據(jù)庫上使用DB分布式鎖,可以自定義一個DataSouce列表。并暴露一個getConnection(String transactionId)方法,按照transactionId找到對應(yīng)的Connection

          實現(xiàn)代碼如下:

          @Component
          public?class?DataSourcePool?{
          ????private?List?dlockDataSources?=?new?ArrayList<>();

          ????@PostConstruct
          ????private?void?initDataSourceList()?throws?IOException?{
          ????????Properties?properties?=?new?Properties();
          ????????FileInputStream?fis?=?new?FileInputStream("db.properties");
          ????????properties.load(fis);

          ????????Integer?lockNum?=?Integer.valueOf(properties.getProperty("DLOCK_NUM"));
          ????????for?(int?i?=?0;?i?????????????String?user?=?properties.getProperty("DLOCK_USER_"?+?i);
          ????????????String?password?=?properties.getProperty("DLOCK_PASS_"?+?i);
          ????????????Integer?initSize?=?Integer.valueOf(properties.getProperty("DLOCK_INIT_SIZE_"?+?i));
          ????????????Integer?maxSize?=?Integer.valueOf(properties.getProperty("DLOCK_MAX_SIZE_"?+?i));
          ????????????String?url?=?properties.getProperty("DLOCK_URL_"?+?i);

          ????????????DruidDataSource?dataSource?=?createDataSource(user,password,initSize,maxSize,url);
          ????????????dlockDataSources.add(dataSource);
          ????????}
          ????}

          ????private?DruidDataSource?createDataSource(String?user,?String?password,?Integer?initSize,?Integer?maxSize,?String?url)?{
          ????????DruidDataSource?dataSource?=?new?DruidDataSource();
          ????????dataSource.setDriverClassName("com.mysql.jdbc.Driver");
          ????????dataSource.setUsername(user);
          ????????dataSource.setPassword(password);
          ????????dataSource.setUrl(url);
          ????????dataSource.setInitialSize(initSize);
          ????????dataSource.setMaxActive(maxSize);

          ????????return?dataSource;
          ????}

          ????public?Connection?getConnection(String?transactionId)?throws?Exception?{
          ????????if?(dlockDataSources.size()?<=?0)?{
          ????????????return?null;
          ????????}

          ????????if?(transactionId?==?null?||?"".equals(transactionId))?{
          ????????????throw?new?RuntimeException("transactionId是必須的");
          ????????}

          ????????int?hascode?=?transactionId.hashCode();
          ????????if?(hascode?????????????hascode?=?-?hascode;
          ????????}

          ????????return?dlockDataSources.get(hascode?%?dlockDataSources.size()).getConnection();
          ????}
          }

          首先編寫一個initDataSourceList方法,并利用Spring的PostConstruct注解初始化一個DataSource?列表。相關(guān)的DB配置從db.properties讀取。

          DLOCK_NUM=2

          DLOCK_USER_0="user1"
          DLOCK_PASS_0="pass1"
          DLOCK_INIT_SIZE_0=2
          DLOCK_MAX_SIZE_0=10
          DLOCK_URL_0="jdbc:mysql://localhost:3306/test1"

          DLOCK_USER_1="user1"
          DLOCK_PASS_1="pass1"
          DLOCK_INIT_SIZE_1=2
          DLOCK_MAX_SIZE_1=10
          DLOCK_URL_1="jdbc:mysql://localhost:3306/test2"

          DataSource使用阿里的DruidDataSource

          接著最重要的一個實現(xiàn)getConnection(String transactionId)方法。實現(xiàn)原理很簡單,獲取transactionId的hashcode,并對DataSource的長度取模即可。

          連接池列表設(shè)計好后,就可以實現(xiàn)往distributed_lock表插入數(shù)據(jù)了。

          @Component
          public?class?DistributedLock?{

          ????@Autowired
          ????private?DataSourcePool?dataSourcePool;

          ????/**
          ?????*?根據(jù)transactionId創(chuàng)建鎖資源
          ?????*/
          ????public?String?createLock(String?transactionId)?throws?Exception{
          ????????if?(transactionId?==?null)?{
          ????????????throw?new?RuntimeException("transactionId是必須的");
          ????????}
          ????????Connection?connection?=?null;
          ????????Statement?statement?=?null;
          ????????try?{
          ????????????connection?=?dataSourcePool.getConnection(transactionId);
          ????????????connection.setAutoCommit(false);
          ????????????statement?=?connection.createStatement();
          ????????????statement.executeUpdate("INSERT?INTO?distributed_lock(transaction_id)?VALUES?('"?+?transactionId?+?"')");
          ????????????connection.commit();
          ????????????return?transactionId;
          ????????}
          ????????catch?(SQLIntegrityConstraintViolationException?icv)?{
          ????????????//說明已經(jīng)生成過了。
          ????????????if?(connection?!=?null)?{
          ????????????????connection.rollback();
          ????????????}
          ????????????return?transactionId;
          ????????}
          ????????catch?(Exception?e)?{
          ????????????if?(connection?!=?null)?{
          ????????????????connection.rollback();
          ????????????}
          ????????????throw??e;
          ????????}
          ????????finally?{
          ????????????if?(statement?!=?null)?{
          ????????????????statement.close();
          ????????????}

          ????????????if?(connection?!=?null)?{
          ????????????????connection.close();
          ????????????}
          ????????}
          ????}
          }

          根據(jù)transactionId鎖住線程

          接下來利用DB的select for update特性來鎖住線程。當(dāng)多個線程根據(jù)相同的transactionId并發(fā)同時操作select for update的時候,只有一個線程能成功,其他線程都block住,直到select for update成功的線程使用commit操作后,block住的所有線程的其中一個線程才能開始干活。我們在上面的DistributedLock類中創(chuàng)建一個lock法。

          public?boolean?lock(String?transactionId)?throws?Exception?{
          ????Connection?connection?=?null;
          ????PreparedStatement?preparedStatement?=?null;
          ????ResultSet?resultSet?=?null;
          ????try?{
          ????????connection?=?dataSourcePool.getConnection(transactionId);
          ????????preparedStatement?=?connection.prepareStatement("SELECT?*?FROM?distributed_lock?WHERE?transaction_id?=???FOR?UPDATE?");
          ????????preparedStatement.setString(1,transactionId);
          ????????resultSet?=?preparedStatement.executeQuery();
          ????????if?(!resultSet.next())?{
          ????????????connection.rollback();
          ????????????return?false;
          ????????}
          ????????return?true;
          ????}?catch?(Exception?e)?{
          ????????if?(connection?!=?null)?{
          ????????????connection.rollback();
          ????????}
          ????????throw??e;
          ????}
          ????finally?{
          ????????if?(preparedStatement?!=?null)?{
          ????????????preparedStatement.close();
          ????????}

          ????????if?(resultSet?!=?null)?{
          ????????????resultSet.close();
          ????????}

          ????????if?(connection?!=?null)?{
          ????????????connection.close();
          ????????}
          ????}
          }

          實現(xiàn)解鎖操作

          當(dāng)線程執(zhí)行完任務(wù)后,必須手動的執(zhí)行解鎖操作,之前被鎖住的線程才能繼續(xù)干活。在我們上面的實現(xiàn)中,其實就是獲取到當(dāng)時select for update成功的線程對應(yīng)的Connection,并實行commit操作即可。

          那么如何獲取到呢?我們可以利用ThreadLocal。首先在DistributedLock類中定義

          private?ThreadLocal?threadLocalConn?=?new?ThreadLocal<>();

          每次調(diào)用lock方法的時候,把Connection放置到ThreadLocal里面。我們修改lock方法。

          public?boolean?lock(String?transactionId)?throws?Exception?{
          ????Connection?connection?=?null;
          ????PreparedStatement?preparedStatement?=?null;
          ????ResultSet?resultSet?=?null;
          ????try?{
          ????????connection?=?dataSourcePool.getConnection(transactionId);
          ????????threadLocalConn.set(connection);
          ????????preparedStatement?=?connection.prepareStatement("SELECT?*?FROM?distributed_lock?WHERE?transaction_id?=???FOR?UPDATE?");
          ????????preparedStatement.setString(1,transactionId);
          ????????resultSet?=?preparedStatement.executeQuery();
          ????????if?(!resultSet.next())?{
          ????????????connection.rollback();
          ????????????threadLocalConn.remove();
          ????????????return?false;
          ????????}
          ????????return?true;
          ????}?catch?(Exception?e)?{
          ????????if?(connection?!=?null)?{
          ????????????connection.rollback();
          ????????????threadLocalConn.remove();
          ????????}
          ????????throw??e;
          ????}
          ????finally?{
          ????????if?(preparedStatement?!=?null)?{
          ????????????preparedStatement.close();
          ????????}

          ????????if?(resultSet?!=?null)?{
          ????????????resultSet.close();
          ????????}

          ????????if?(connection?!=?null)?{
          ????????????connection.close();
          ????????}
          ????}
          }

          這樣子當(dāng)獲取到Connection后,將其設(shè)置到ThreadLocal中,如果lock方法出現(xiàn)異常,則將其從ThreadLocal中移除掉。

          有了這幾步后,我們可以來實現(xiàn)解鎖操作了。我們在DistributedLock添加一個unlock方法。

          public?void?unlock()?throws?Exception?{
          ????Connection?connection?=?null;
          ????try?{
          ????????connection?=?threadLocalConn.get();
          ????????if?(!connection.isClosed())?{
          ????????????connection.commit();
          ????????????connection.close();
          ????????????threadLocalConn.remove();
          ????????}
          ????}?catch?(Exception?e)?{
          ????????if?(connection?!=?null)?{
          ????????????connection.rollback();
          ????????????connection.close();
          ????????}
          ????????threadLocalConn.remove();
          ????????throw?e;
          ????}
          }

          缺點

          畢竟是利用DB來實現(xiàn)分布式鎖,對DB還是造成一定的壓力。當(dāng)時考慮使用DB做分布式的一個重要原因是,我們的應(yīng)用是后端應(yīng)用,平時流量不大的,反而關(guān)鍵的是要保證庫存數(shù)據(jù)的正確性。

          對于像前端庫存系統(tǒng),比如添加購物車占用庫存等操作,最好別使用DB來實現(xiàn)分布式鎖了。

          進一步思考

          如果想鎖住多份數(shù)據(jù)該怎么實現(xiàn)?

          比如說,某個庫存操作,既要修改物理庫存,又要修改虛擬庫存,想鎖住物理庫存的同時,又鎖住虛擬庫存。

          其實也不是很難,參考lock方法,寫一個multiLock方法,提供多個transactionId的入?yún)ⅲ琭or循環(huán)處理就可以了。

          來源:https://blog.csdn.net/linsongbin1/article/details/79444274



          往期推薦



          為什么高級程序員都不用 “ ! = null ' 做判空

          聊聊 Java 泛型通配符 T,E,K,V,?

          Redis 單例、主從、sentinel 以及集群的配置方式及優(yōu)缺點對比

          使用Java 這幾個常用工具類庫,助你告別996,建議收藏!

          數(shù)據(jù)庫分表分庫策略

          你對索引的理解有多少?



          瀏覽 45
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  久久亚洲免费 | 玖玖国产精品视频 | 亚洲人网站 | 女人18片毛片120分钟 | 欧美黄片免费播放 |