<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          牛逼!用 MySQL 實現(xiàn)一個分布式鎖,這也太強了。。。

          共 18389字,需瀏覽 37分鐘

           ·

          2021-09-30 23:01

          上一篇:再見!LayUI !

          以前參加過一個庫存系統(tǒng),由于其業(yè)務復雜性,搞了很多個應用來支撐。這樣的話一份庫存數(shù)據(jù)就有可能同時有多個應用來修改庫存數(shù)據(jù)。

          比如說,有定時任務域xx.cron,和SystemA域和SystemB域這幾個JAVA應用,可能同時修改同一份庫存數(shù)據(jù)。如果不做協(xié)調的話,就會有臟數(shù)據(jù)出現(xiàn)。

          對于跨JAVA進程的線程協(xié)調,可以借助外部環(huán)境,例如DB或者Redis。

          下文介紹一下如何使用DB來實現(xiàn)分布式鎖。

          設計

          本文設計的分布式鎖的交互方式如下:

          1、根據(jù)業(yè)務字段生成transaction_id,并線程安全的創(chuàng)建鎖資源

          2、根據(jù)transaction_id申請鎖

          3、釋放鎖

          動態(tài)創(chuàng)建鎖資源

          在使用synchronized關鍵字的時候,必須指定一個鎖對象。

          synchronized(obj) {
          ...

          進程內的線程可以基于obj來實現(xiàn)同步。obj在這里可以理解為一個鎖對象。如果線程要進入synchronized代碼塊里,必須先持有obj對象上的鎖。這種鎖是JAVA里面的內置鎖,創(chuàng)建的過程是線程安全的。

          那么借助DB,如何保證創(chuàng)建鎖的過程是線程安全的呢?可以利用DB中的UNIQUE KEY特性,一旦出現(xiàn)了重復的key,由于UNIQUE KEY的唯一性,會拋出異常的。

          在JAVA里面,是SQLIntegrityConstraintViolationException異常。

          create table distributed_lock
          (
           id BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT COMMENT '自增主鍵',
           transaction_id varchar(128) NOT NULL DEFAULT '' COMMENT '事務id',
           last_update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP NOT NULL COMMENT '最后更新時間',
           create_time TIMESTAMP DEFAULT '0000-00-00 00:00:00' NOT NULL COMMENT '創(chuàng)建時間',
           UNIQUE KEY `idx_transaction_id` (`transaction_id`)
          )

          transaction_id是事務Id,比如說,可以用

          倉庫 + 條碼 + 銷售模式

          來組裝一個transaction_id,表示某倉庫某銷售模式下的某個條碼資源。不同條碼,當然就有不同的transaction_id。如果有兩個應用,拿著相同的transaction_id來創(chuàng)建鎖資源的時候,只能有一個應用創(chuàng)建成功。

          一條distributed_lock記錄插入成功了,就表示一份鎖資源創(chuàng)建成功了。另外,關注公眾號互聯(lián)網(wǎng)架構師,在后臺回復:2,可以獲取我整理的 Java 系列面試題和答案,非常齊全。

          DB連接池列表設計

          在寫操作頻繁的業(yè)務系統(tǒng)中,通常會進行分庫,以降低單數(shù)據(jù)庫寫入的壓力,并提高寫操作的吞吐量。最新 Java 核心技術教程,都在這了。

          如果使用了分庫,那么業(yè)務數(shù)據(jù)自然也都分配到各個數(shù)據(jù)庫上了。在這種水平切分的多數(shù)據(jù)庫上使用DB分布式鎖,可以自定義一個DataSouce列表。并暴露一個getConnection(String transactionId)方法,按照transactionId找到對應的Connection。

          實現(xiàn)代碼如下:

          package dlock;

          import com.alibaba.druid.pool.DruidDataSource;
          import org.springframework.stereotype.Component;

          import javax.annotation.PostConstruct;
          import java.io.FileInputStream;
          import java.io.IOException;
          import java.sql.Connection;
          import java.util.ArrayList;
          import java.util.List;
          import java.util.Properties;

          @Component
          public class DataSourcePool {
              private List<DruidDataSource> dlockDataSources = new ArrayList<>();

              @PostConstruct
              private void initDataSourceList() throws IOException {
                  Properties properties = new Properties();
                  FileInputStream fis = new FileInputStream("db.properties");
                  properties.load(fis);

                  Integer lockNum = Integer.valueOf(properties.getProperty("DLOCK_NUM"));
                  for (int i = 0; i < lockNum; i++) {
                      String user = properties.getProperty("DLOCK_USER_" + i);
                      String password = properties.getProperty("DLOCK_PASS_" + i);
                      Integer initSize = Integer.valueOf(properties.getProperty("DLOCK_INIT_SIZE_" + i));
                      Integer maxSize = Integer.valueOf(properties.getProperty("DLOCK_MAX_SIZE_" + i));
                      String url = properties.getProperty("DLOCK_URL_" + i);

                      DruidDataSource dataSource = createDataSource(user,password,initSize,maxSize,url);
                      dlockDataSources.add(dataSource);
                  }
              }

              private DruidDataSource createDataSource(String user, String password, Integer initSize, Integer maxSize, String url) {
                  DruidDataSource dataSource = new DruidDataSource();
                  dataSource.setDriverClassName("com.mysql.jdbc.Driver");
                  dataSource.setUsername(user);
                  dataSource.setPassword(password);
                  dataSource.setUrl(url);
                  dataSource.setInitialSize(initSize);
                  dataSource.setMaxActive(maxSize);

                  return dataSource;
              }

              public Connection getConnection(String transactionId) throws Exception {
                  if (dlockDataSources.size() <= 0) {
                      return null;
                  }

                  if (transactionId == null || "".equals(transactionId)) {
                      throw new RuntimeException("transactionId是必須的");
                  }

                  int hascode = transactionId.hashCode();
                  if (hascode < 0) {
                      hascode = - hascode;
                  }

                  return dlockDataSources.get(hascode % dlockDataSources.size()).getConnection();
              }
          }

          首先編寫一個initDataSourceList方法,并利用Spring的PostConstruct注解初始化一個DataSource 列表。相關的DB配置從db.properties讀取。

          DLOCK_NUM=2

          DLOCK_USER_0="user1"
          DLOCK_PASS_0="pass1"
          DLOCK_INIT_SIZE_0=2
          DLOCK_MAX_SIZE_0=10
          DLOCK_URL_0="jdbc:mysql://localhost:3306/test1"

          DLOCK_USER_1="user1"
          DLOCK_PASS_1="pass1"
          DLOCK_INIT_SIZE_1=2
          DLOCK_MAX_SIZE_1=10
          DLOCK_URL_1="jdbc:mysql://localhost:3306/test2"

          DataSource使用阿里的DruidDataSource。

          接著最重要的一個實現(xiàn)getConnection(String transactionId)方法。實現(xiàn)原理很簡單,獲取transactionId的hashcode,并對DataSource的長度取模即可。

          連接池列表設計好后,就可以實現(xiàn)往distributed_lock表插入數(shù)據(jù)了。
          package dlock;

          import org.springframework.beans.factory.annotation.Autowired;
          import org.springframework.stereotype.Component;

          import java.sql.*;

          @Component
          public class DistributedLock {

              @Autowired
              private DataSourcePool dataSourcePool;

              /**
               * 根據(jù)transactionId創(chuàng)建鎖資源
               */
              public String createLock(String transactionId) throws Exception{
                  if (transactionId == null) {
                      throw new RuntimeException("transactionId是必須的");
                  }
                  Connection connection = null;
                  Statement statement = null;
                  try {
                      connection = dataSourcePool.getConnection(transactionId);
                      connection.setAutoCommit(false);
                      statement = connection.createStatement();
                      statement.executeUpdate("INSERT INTO distributed_lock(transaction_id) VALUES ('" + transactionId + "')");
                      connection.commit();
                      return transactionId;
                  }
                  catch (SQLIntegrityConstraintViolationException icv) {
                      //說明已經(jīng)生成過了。
                      if (connection != null) {
                          connection.rollback();
                      }
                      return transactionId;
                  }
                  catch (Exception e) {
                      if (connection != null) {
                          connection.rollback();
                      }
                      throw  e;
                  }
                  finally {
                      if (statement != null) {
                          statement.close();
                      }

                      if (connection != null) {
                          connection.close();
                      }
                  }
              }
          }

          根據(jù)transactionId鎖住線程

          接下來利用DB的select for update特性來鎖住線程。當多個線程根據(jù)相同的transactionId并發(fā)同時操作select for update的時候,只有一個線程能成功,其他線程都block住,直到select for update成功的線程使用commit操作后,block住的所有線程的其中一個線程才能開始干活。我們在上面的DistributedLock類中創(chuàng)建一個lock法。

          public boolean lock(String transactionId) throws Exception {
              Connection connection = null;
              PreparedStatement preparedStatement = null;
              ResultSet resultSet = null;
              try {
                  connection = dataSourcePool.getConnection(transactionId);
                  preparedStatement = connection.prepareStatement("SELECT * FROM distributed_lock WHERE transaction_id = ? FOR UPDATE ");
                  preparedStatement.setString(1,transactionId);
                  resultSet = preparedStatement.executeQuery();
                  if (!resultSet.next()) {
                      connection.rollback();
                      return false;
                  }
                  return true;
              } catch (Exception e) {
                  if (connection != null) {
                      connection.rollback();
                  }
                  throw  e;
              }
              finally {
                  if (preparedStatement != null) {
                      preparedStatement.close();
                  }

                  if (resultSet != null) {
                      resultSet.close();
                  }

                  if (connection != null) {
                      connection.close();
                  }
              }
          }

          實現(xiàn)解鎖操作

          當線程執(zhí)行完任務后,必須手動的執(zhí)行解鎖操作,之前被鎖住的線程才能繼續(xù)干活。在我們上面的實現(xiàn)中,其實就是獲取到當時select for update成功的線程對應的Connection,并實行commit操作即可。

          那么如何獲取到呢?我們可以利用ThreadLocal。首先在DistributedLock類中定義

          private ThreadLocal<Connection> threadLocalConn = new ThreadLocal<>();

          每次調用lock方法的時候,把Connection放置到ThreadLocal里面。我們修改lock方法。

          public boolean lock(String transactionId) throws Exception {
              Connection connection = null;
              PreparedStatement preparedStatement = null;
              ResultSet resultSet = null;
              try {
                  connection = dataSourcePool.getConnection(transactionId);
                  threadLocalConn.set(connection);
                  preparedStatement = connection.prepareStatement("SELECT * FROM distributed_lock WHERE transaction_id = ? FOR UPDATE ");
                  preparedStatement.setString(1,transactionId);
                  resultSet = preparedStatement.executeQuery();
                  if (!resultSet.next()) {
                      connection.rollback();
                      threadLocalConn.remove();
                      return false;
                  }
                  return true;
              } catch (Exception e) {
                  if (connection != null) {
                      connection.rollback();
                      threadLocalConn.remove();
                  }
                  throw  e;
              }
              finally {
                  if (preparedStatement != null) {
                      preparedStatement.close();
                  }

                  if (resultSet != null) {
                      resultSet.close();
                  }

                  if (connection != null) {
                      connection.close();
                  }
              }
          }

          這樣子,當獲取到Connection后,將其設置到ThreadLocal中,如果lock方法出現(xiàn)異常,則將其從ThreadLocal中移除掉。

          有了這幾步后,我們可以來實現(xiàn)解鎖操作了。我們在DistributedLock添加一個unlock方法。

          public void unlock() throws Exception {
              Connection connection = null;
              try {
                  connection = threadLocalConn.get();
                  if (!connection.isClosed()) {
                      connection.commit();
                      connection.close();
                      threadLocalConn.remove();
                  }
              } catch (Exception e) {
                  if (connection != null) {
                      connection.rollback();
                      connection.close();
                  }
                  threadLocalConn.remove();
                  throw e;
              }
          }

          缺點

          畢竟是利用DB來實現(xiàn)分布式鎖,對DB還是造成一定的壓力。當時考慮使用DB做分布式的一個重要原因是,我們的應用是后端應用,平時流量不大的,反而關鍵的是要保證庫存數(shù)據(jù)的正確性。

          對于像前端庫存系統(tǒng),比如添加購物車占用庫存等操作,最好別使用DB來實現(xiàn)分布式鎖了。

          進一步思考

          如果想鎖住多份數(shù)據(jù)該怎么實現(xiàn)?

          比如說,某個庫存操作,既要修改物理庫存,又要修改虛擬庫存,想鎖住物理庫存的同時,又鎖住虛擬庫存。

          其實也不是很難,參考lock方法,寫一個multiLock方法,提供多個transactionId的入?yún)?,for循環(huán)處理就可以了。

          來源:https://blog.csdn.net/linsongbin1/article/details/79444274


          感謝您的閱讀,也歡迎您發(fā)表關于這篇文章的任何建議,關注我,技術不迷茫!小編到你上高速。

              · END ·
          最后,關注公眾號互聯(lián)網(wǎng)架構師,在后臺回復:2T,可以獲取我整理的 Java 系列面試題和答案,非常齊全。


          正文結束


          推薦閱讀 ↓↓↓

          1.不認命,從10年流水線工人,到谷歌上班的程序媛,一位湖南妹子的勵志故事

          2.如何才能成為優(yōu)秀的架構師?

          3.從零開始搭建創(chuàng)業(yè)公司后臺技術棧

          4.程序員一般可以從什么平臺接私活?

          5.37歲程序員被裁,120天沒找到工作,無奈去小公司,結果懵了...

          6.IntelliJ IDEA 2019.3 首個最新訪問版本發(fā)布,新特性搶先看

          7.這封“領導痛批95后下屬”的郵件,句句扎心!

          8.15張圖看懂瞎忙和高效的區(qū)別!

          一個人學習、工作很迷茫?


          點擊「閱讀原文」加入我們的小圈子!

          瀏覽 46
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  人妻丰满熟妇av无码区蜜桃 | 琪琪在线视频 | 黄色av网| 啊啊啊啊91 | 操操操无码 |