<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          怎樣實現(xiàn)一個分布式的公平鎖?

          共 1654字,需瀏覽 4分鐘

           ·

          2022-04-16 09:51

          在并發(fā)的場景下,很多時候,我們的業(yè)務開發(fā)中會有加鎖的操作,以此來保證執(zhí)行的互斥,保障業(yè)務邏輯。比如在 Java 里就有多種基于 AQS 的組件,方便使用。創(chuàng)建鎖的時候,還可以特別的指定一下,當前這個鎖,是否需要公平。

              /**     * Creates an instance of {@code ReentrantLock} with the     * given fairness policy.     *     * @param fair {@code true} if this lock should use a fair ordering policy     */    public ReentrantLock(boolean fair) {        sync = fair ? new FairSync() : new NonfairSync();    }


          對于需要公平的場景,和我們真實生活一樣,這里的 FairSync 會通過一個 CLH 隊列將請求線程排隊。

          在單實例應用中,本機的 CLH 排隊就足夠了,我們現(xiàn)在切換到分布式的場景。

          在分布式場景下,為了實現(xiàn)鎖的功能,就出現(xiàn)了各種分布式鎖。相比單實例場景下的鎖只能鎖定自己的實例,分布式的鎖由于統(tǒng)一的外部中間件的介入,將鎖的信息提取到獨立的外部,所以可以將多個應用實例做到互斥。


          那分布式的場景下,怎么樣能保證公平呢?


          和我們從單實例到分布式加鎖的思路一樣,要公平,就排隊,在統(tǒng)一的第三方處進行排隊


          來第三方這里排隊,也有一些需要注意的點。比如你在判斷當前隊列里有沒有等待,如果沒有就取鎖成功,執(zhí)行,有等待就入隊,而判斷的這個邏輯,仍然可能是并發(fā)操作,也需要做到加鎖處理。就好像你看了一眼某個飯店沒什么人,開心的去買了杯奶茶,回來一看滿了。


          對于分布式鎖,基于 Redis 的 Redission 用得不少。如果換成Redis 分布式公平鎖,那基本就只有 Redission 了。

          下面我通過兩段代碼,以及部分文字,描述下 Redission 的公平鎖的實現(xiàn)。


          簡要概括下:

          Redission 的公平鎖,是通過「 Redis + Lua?腳本」來實現(xiàn)的。在拿到一個 Redission 的 Redis 連接之后,通過?「eval()」可以執(zhí)行一段 Lua script,同時傳入一些 key 和 args。因為不管有多少 Lua 的邏輯,都是在同一個連接內,所以不會存在買完奶茶發(fā)現(xiàn)人滿了的情況。這里應用到了 Redis 的 pub/sub 功能,等待的線程,會在輪到自己時收到 Redis 的提醒,前提是需要訂閱了相應的通知。


          來看加鎖的 Lua 邏輯,代碼寫的比較清楚,我也加了些對應的Redis操作以及參數(shù)的注釋。通過 list 來存儲排隊信息,同時每個等待線程都有一個超時時間,超時退出隊列。?所以eval 執(zhí)行這個的時候,返回的是一個 ttl Long 類型。表示過期時間。

          --[[
          用于 lock 操作。
          KEYS[1] = lockName KEYS[2] = waitQueueName????KEYS[3]?=?timeoutName????? ARGV[1] = waitTime ARGV[2] = lockName ARGV[3] = leaseTime????ARGV[4]?=?currentTime--]]
          while true do local firstThreadId2 = redis.call("lindex", KEYS[2], 0) if firstThreadId2 == false then break end local timeout = tonumber(redis.call("zscore", KEYS[3], firstThreadId2)) if timeout <= tonumber(ARGV[4]) then redis.call("zrem", KEYS[3], firstThreadId2) redis.call("lpop", KEYS[2]) else break endendif (redis.call("exists", KEYS[1]) == 0) and ((redis.call("exists", KEYS[2]) == 0) or (redis.call("lindex", KEYS[2], 0) == ARGV[2])) then redis.call("lpop", KEYS[2]) redis.call("zrem", KEYS[3], ARGV[2]) -- 移除有序集合中的一個或多個成員 redis.call("hset", KEYS[1], ARGV[2], 1) -- 將哈希表 key 中的字段 field 的值設為 value 。 redis.call("pexpire", KEYS[1], ARGV[1]) return nilendif (redis.call("hexists", KEYS[1], ARGV[2]) == 1) then -- HEXISTS key field 查看哈希表 key 中,指定的字段是否存在。 redis.call("hincrby", KEYS[1], ARGV[2], 1) -- HINCRBY key field increment 為哈希表 key 中的指定字段的整數(shù)值加上增量 increment 。 redis.call("pexpire", KEYS[1], ARGV[1]) -- Redis PEXPIRE 命令和 EXPIRE 命令的作用類似,但是它以毫秒為單位設置 key 的生存時間,而不像 EXPIRE 命令那樣,以秒為單位。 return nilendlocal firstThreadId = redis.call("lindex", KEYS[2], 0)local ttlif firstThreadId ~= false and firstThreadId ~= ARGV[2] then ttl = tonumber(redis.call("zscore", KEYS[3], firstThreadId)) - tonumber(ARGV[4])else ttl = redis.call("pttl", KEYS[1]) -- Redis Pttl 命令以毫秒為單位返回 key 的剩余過期時間。endlocal?timeout?=?ttl??+?tonumber(ARGV[3])?+?tonumber(ARGV[4])if redis.call("zadd", KEYS[3], timeout, ARGV[2]) == 1 then redis.call("rpush", KEYS[2], ARGV[2])endreturn?ttl


          再來看解鎖的邏輯。我前面加鎖的一些內容對應著看,重點在于?「publish」這里,在輪到某個線程時,nextThreadId 這個 channel 會收到通知。

          這里的 threadId 是我們在加鎖和解鎖的時候都需要傳入的。如果你留意過 Java 的線程 Id 就會發(fā)現(xiàn),不同實例之間有很大概率會重復的。為了避免,各個 Client 在傳入 ThradId 的時候,除了真實的 id 外,還需要加入各個 client 對應的信息加以區(qū)分


          --[[    用于 unlock 操作。       KEYS[1] = lockName    KEYS[2] = waitQueueName    KEYS[3] = timeoutName    KEYS[4] = channelName        ARGV[1] = message    ARGV[2] = leaseTime    ARGV[3] = lockName    ARGV[3] = currentTime--]]
          while true do local firstThreadId2 = redis.call("lindex", KEYS[2], 0) if firstThreadId2 == false then break end local timeout = tonumber(redis.call("zscore", KEYS[3], firstThreadId2)) if timeout <= tonumber(ARGV[4]) then redis.call("zrem", KEYS[3], firstThreadId2) redis.call("lpop", KEYS[2]) else break endend
          if (redis.call("exists", KEYS[1]) == 0) then local nextThreadId = redis.call("lindex", KEYS[2], 0) if nextThreadId ~= false then redis.call("publish", KEYS[4] .. ":" .. nextThreadId, ARGV[1]) end return 1endif (redis.call("hexists", KEYS[1], ARGV[3]) == 0) then return nilendlocal counter = redis.call("hincrby", KEYS[1], ARGV[3], -1)if (counter > 0) then redis.call("pexpire", KEYS[1], ARGV[2]) return 0end
          redis.call("del", KEYS[1])local nextThreadId = redis.call("lindex", KEYS[2], 0)if nextThreadId ~= false then redis.call("publish", KEYS[4] .. ":" .. nextThreadId, ARGV[1])endreturn 1


          看過了解鎖邏輯后,外面eval 執(zhí)行加鎖的時候,需要對應有 sub ,才會收到這里解鎖的 pub 信息,否則就卡住了。


          這里需要注意下,sub 這個功能是個阻塞操作,需要單獨的線程里執(zhí)行,通過一個 Future 來實現(xiàn)一定等待時間的 sub 功能,超時再 unsub。這塊邏輯 Redission 封裝的比較多,感興趣的可以到源碼里點點。(都不用下載代碼,直接在 GitHub 上 按個. 就行在線閱讀源碼的新方法)



          相關閱讀

          大白話描述并發(fā)編程重要概念

          圖解分布式一致性算法

          瀏覽 39
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  五月天婷婷网站 | 五月婷婷六月色 | 亚洲国产夫妻自拍 | 人妻熟女一二三区夜夜爱 | 五月天婷婷丁香蜜桃91 |