<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Linux同步原語系列-spinlock及其演進(jìn)優(yōu)化

          共 19004字,需瀏覽 39分鐘

           ·

          2021-12-29 01:56

          1. 引言

          通常我們的說的同步其實(shí)有兩個(gè)層面的意思:

          1. 一個(gè)是線程間的同步,主要是為了按照編程者指定的特定順序執(zhí)行;

          2. 另外一個(gè)是數(shù)據(jù)的同步,主要是為了保存數(shù)據(jù)。

          為了高效解決同步問題,前人抽象出同步原語供開發(fā)者使用。不僅多核,單核也需要同步原語,核心的問題要保證共享資源訪問的正確性。如果共享資源劃分不合理,同步原語的開銷會(huì)制約多核性能。

          常見的同步原語有:互斥鎖、條件變量、信號(hào)量、讀寫鎖、RCU等。

          本文主要聚焦于互斥鎖,對(duì)應(yīng)linux的spinlock,我們?cè)噲D沿著時(shí)間的脈絡(luò)去梳理spinlock的不斷改進(jìn)的進(jìn)程,如果涉及CPU體系結(jié)構(gòu),我們主要關(guān)注ARM體系結(jié)構(gòu)的實(shí)現(xiàn)。如果后續(xù)有時(shí)間我們會(huì)繼續(xù)分析其他同步原語的演進(jìn)和優(yōu)化歷程。

          2. 演化進(jìn)程

          linux內(nèi)核spinlock是互斥機(jī)制的最底層基礎(chǔ)設(shè)施,它的性能直接關(guān)系到內(nèi)核的性能,主要分為這么幾個(gè)階段:

          1. linux-2.6.25之前,我們稱之為原始spinlock。

            對(duì)鎖的實(shí)現(xiàn),是使用原子操作去無序競爭全局的鎖資源。

            這個(gè)階段對(duì)鎖的爭用處于無序競爭的狀態(tài)。如果CPU核心數(shù)不多,資源相對(duì)充裕,好比我們?nèi)ャy行柜臺(tái)辦理業(yè)務(wù),一共就1-2個(gè)人,無非你在前還是我在前的問題,公平性的問題并不突出,性能也沒什么大的影響,但是一旦cpu核心數(shù)和鎖的競爭者相對(duì)較多時(shí),就會(huì)出現(xiàn)有些人因?yàn)槟承﹥?yōu)勢(shì)(如CPU算力強(qiáng),鎖正好落在當(dāng)前CPU的cacheline中等)總是能搶到鎖,其他人總是搶不到的情況出現(xiàn)。

          2. linux-2.6.25,在x86下實(shí)現(xiàn)了ticket spinlock,替換原始spinlock。

            隨著CPU核數(shù)的增多以及對(duì)共享資源的爭用越發(fā)激烈,公平性問題就顯現(xiàn)出來了。保證公平一個(gè)很自然的思路就是大家都來排隊(duì)。如果對(duì)鎖的爭用比較激烈再加上如果此時(shí)核比較多,此時(shí)一旦釋放鎖,其實(shí)只有1個(gè)CPU能搶到鎖,但是因?yàn)榇蠹矣^察都是全局的一個(gè)鎖,那其他CPU的cacheline會(huì)因此失效,會(huì)有相當(dāng)程度的性能損耗。還是就以去銀行柜臺(tái)辦理業(yè)務(wù)為例,它的實(shí)現(xiàn)相當(dāng)于去銀行取號(hào)、排隊(duì)、等叫號(hào)這么一個(gè)過程,問題在于叫號(hào)相當(dāng)是一個(gè)廣播機(jī)制,所有人都要偵聽,還是有點(diǎn)浪費(fèi)時(shí)間精力。

          3. linux-3.5,ARM體系結(jié)構(gòu)用ticket spinlock替換了原始spinlock。

            過了幾個(gè)版本,ARM才替換,原因作者沒有去考證,不得而知。

          4. linux-3.15,實(shí)現(xiàn)了mcs_spinlock,但未替換ticket spinlock。

            它把對(duì)全局鎖轉(zhuǎn)換成per-cpu的鎖,減少爭用的損耗,然后各個(gè)鎖之間通過鏈表串起來排隊(duì),解決了公平性和性能損耗的問題。然后它卻沒有替代ticket spinlock成為內(nèi)核默認(rèn)實(shí)現(xiàn),因?yàn)閟pinlock太底層了,已經(jīng)嵌入了內(nèi)核的各種關(guān)鍵數(shù)據(jù)結(jié)構(gòu)中,它的數(shù)據(jù)結(jié)構(gòu)要比spinlock大,這是內(nèi)核鎖不能接受的,但是最終它還是合入了內(nèi)核,只是沒有人去用它。但是它的存在為后一步的優(yōu)化,仍然起到了非常重要的作用。它的實(shí)現(xiàn)思路是把ticket spinlock的廣播機(jī)制轉(zhuǎn)變?yōu)閾艄膫骰?,也就是?shí)際上可以我并需要偵聽廣播,主要在我前面排隊(duì)的人在使用完鎖以后告訴我可以了。

          5. linux-4.2,實(shí)現(xiàn)了queued spinlock(簡稱qspinlock),替換了ticket spinlock

            它首先肯定要解決mcs_spinlock占用大小,實(shí)際上它結(jié)合了ticket spinlockmcs_spinlock的優(yōu)點(diǎn),大小保持一致,如果只有1個(gè)或2個(gè)CPU試圖獲取鎖,只用這個(gè)新的spinlock,當(dāng)有3個(gè)以上的CPU試圖獲取鎖,才需要mcs_spinlock。它的數(shù)據(jù)結(jié)構(gòu)有表示當(dāng)前鎖的持有情況、是否有還有一個(gè)競爭者,已經(jīng)需要快速找到對(duì)應(yīng)CPU的per-cpu結(jié)構(gòu)的mcs_spinlock節(jié)點(diǎn),這3個(gè)大的域被塞在ticket spinlock同樣大小數(shù)據(jù)結(jié)構(gòu)中。這種遵守原先架構(gòu)約束的情況而做出的改進(jìn),非常值得我們學(xué)習(xí)。

          6. linux-4.19,ARM體系結(jié)構(gòu)將queued spinlock替換成默認(rèn)實(shí)現(xiàn)。

            原因是什么,作者同樣沒去考證。

          3. 原始spinlock實(shí)現(xiàn)

          3.1 關(guān)鍵數(shù)據(jù)結(jié)構(gòu)和公共代碼

          typedef struct {
          volatile unsigned int lock;
          } raw_spinlock_t;

          typedef struct {
          raw_spinlock_t raw_lock;
          } spinlock_t;

          #define spin_lock(lock) _spin_lock(lock)

          void __lockfunc _spin_lock(spinlock_t *lock)
          {
          preempt_disable();
          _raw_spin_lock(lock);
          }
          # define _raw_spin_lock(lock) __raw_spin_lock(&(lock)->raw_lock)

          3.2 ARM體系結(jié)構(gòu)的加鎖實(shí)現(xiàn)

          //arm32(那時(shí)候還沒arm64)的實(shí)現(xiàn),這個(gè)時(shí)期的內(nèi)核大體對(duì)應(yīng)ARMV6
          static inline void __raw_spin_lock(raw_spinlock_t *lock)
          {
          unsigned long tmp;

          __asm__ __volatile__(
          "1: ldrex %0, [%1]\n" //1.
          " teq %0, #0\n" //2.
          " strexeq %0, %2, [%1]\n" //3.
          " teqeq %0, #0\n" //4.
          #ifdef CONFIG_CPU_32v6K
          " wfene\n" //5.
          #endif
          " bne 1b" //6.
          : "=&r" (tmp)
          : "r" (&lock->lock), "r" (1)
          : "cc");

          smp_mb(); //7.
          }

          通過數(shù)據(jù)結(jié)構(gòu),可以看出,此時(shí)的lock還是一個(gè)unsigned int類型的數(shù)據(jù),加鎖的時(shí)候,首先會(huì)關(guān)閉搶占,然后會(huì)轉(zhuǎn)到各個(gè)體系結(jié)構(gòu)的實(shí)現(xiàn),我們關(guān)注ARM的實(shí)現(xiàn),__raw_spin_lock的分析如下:

          1. 讀取lock的狀態(tài)值給tmp,并將&lock->lock標(biāo)記為獨(dú)占。

          2. 判斷l(xiāng)ock的狀態(tài)是否為0。如果是0說明可以繼續(xù)往下走(跳到第3步);如果不為0,說明自旋鎖處于上鎖狀態(tài),不能訪問,跳到第5步(如果不支持WFE則直接跳到第6步)自旋,最后回到第1步自旋。teq執(zhí)行會(huì)影響標(biāo)志寄存器中Z標(biāo)志位,后面帶eq或者ne后綴的執(zhí)行都受該標(biāo)志位影響。

          3. 執(zhí)行strex執(zhí)行,只有從上一次ldrex執(zhí)行到本次strex這個(gè)被標(biāo)記為獨(dú)占的地址(&lock->lock)沒有改變,才會(huì)執(zhí)行成功(lock的狀態(tài)改寫為1)。通過strex執(zhí)和ldrex實(shí)現(xiàn)原子性操作。

          4. 繼續(xù)判斷l(xiāng)ock的狀態(tài)是否為0,為0說明獲得鎖;不為0說明沒有獲得鎖,跳到第5步(如果支持WFE的話)。

          5. 執(zhí)行WFE指令(如果支持的話),CPU進(jìn)低功耗狀態(tài),省點(diǎn)功耗。

          6. 如果收到SEV指令(如果有第五步的話),繼續(xù)判斷l(xiāng)ock的狀態(tài)是否為0,不為0跳到第1步,繼續(xù)循環(huán);如果lock為0,繼續(xù)跳到第7步

          7. 執(zhí)行barrier(多核情況下為)DMB指令,保證訪存順序按我們的編程順序執(zhí)行(即后面的load/store絕不允許越過smp_mb()屏障亂序到前面執(zhí)行)。

          3.3 ARM體系結(jié)構(gòu)的解鎖實(shí)現(xiàn)

          static inline void __raw_spin_unlock(raw_spinlock_t *lock)
          {
          smp_mb();

          __asm__ __volatile__(
          " str %1, [%0]\n"
          #ifdef CONFIG_CPU_32v6K
          " mcr p15, 0, %1, c7, c10, 4\n" /* DSB */
          " sev"
          #endif
          :
          : "r" (&lock->lock), "r" (0)
          : "cc");
          }

          解鎖的操作相對(duì)簡單,str將lock->lock賦值,然后使用DSB保序,使用sev通知持鎖cpu得到鎖。

          4. ticket spinlock

          4.1 關(guān)鍵數(shù)據(jù)結(jié)構(gòu)和公共代碼

          typedef struct {
          union {
          u32 slock;
          struct __raw_tickets { //只考慮小端
          u16 owner;
          u16 next;
          } tickets;
          };
          } arch_spinlock_t;

          typedef struct raw_spinlock {
          arch_spinlock_t raw_lock;
          } raw_spinlock_t;

          typedef struct spinlock {
          union {
          struct raw_spinlock rlock;
          };
          } spinlock_t;

          static inline void spin_lock(spinlock_t *lock)
          {
          raw_spin_lock(&lock->rlock);
          }
          #define raw_spin_lock(lock) _raw_spin_lock(lock)
          void __lockfunc _raw_spin_lock(raw_spinlock_t *lock)
          {
          __raw_spin_lock(lock);
          }
          static inline void __raw_spin_lock(raw_spinlock_t *lock)
          {
          preempt_disable();
          do_raw_spin_lock();
          }
          static inline void do_raw_spin_lock(raw_spinlock_t *lock) __acquires(lock)
          {
          arch_spin_lock(&lock->raw_lock);
          }

          4.2 AArch32的加鎖實(shí)現(xiàn)

          static inline void arch_spin_lock(arch_spinlock_t *lock)
          {
          unsigned long tmp;
          u32 newval;
          arch_spinlock_t lockval;

          __asm__ __volatile__(
          "1: ldrex %0, [%3]\n" //1.
          " add %1, %0, %4\n" //2.
          " strex %2, %1, [%3]\n" //3.
          " teq %2, #0\n" //4.
          " bne 1b" //5.
          : "=&r" (lockval), "=&r" (newval), "=&r" (tmp)
          : "r" (&lock->slock), "I" (1 << TICKET_SHIFT)
          : "cc");

          while (lockval.tickets.next != lockval.tickets.owner) { //6.
          wfe(); //7.
          lockval.tickets.owner = ACCESS_ONCE(lock->tickets.owner);//8.
          }

          smp_mb(); //9.
          }
          1. 把lock->slock值保存到lock_val

          2. newval = lockval + (1 << TICKET_SHIFT)= lockval + (1 << 16),等價(jià)于newval =lockval.tickets.next++,相當(dāng)于從銀行取號(hào)機(jī)取號(hào)。

          3. strex tmp, newval, [&lock->slock],將新的值newval 存在lock中,strex將是否成功結(jié)果存入在tmp中

          4. 檢查是否寫入成功lockval.tickets.next

          5. 成功則跳到第6步,否則返回第1步重試,同上文類似也是實(shí)現(xiàn)原子操作

          6. lockval.tickets.next和owner成員是否相等,相等跳到第9步,成功獲得鎖;沒有則跳到第7步。成功的話,相當(dāng)于銀行柜臺(tái)已經(jīng)叫我的號(hào)了,我就可以去辦理業(yè)務(wù)了,沒有的話,我還要繼續(xù)等。

          7. 執(zhí)行WFE指令,CPU進(jìn)低功耗狀態(tài),省點(diǎn)功耗。

          8. 如果收到SEV指令,從低功耗狀態(tài)恢復(fù),重新獲得新的owner值,因?yàn)橐话闶莿e人釋放了鎖才會(huì)發(fā)送SEV指令,這時(shí)owner的值已經(jīng)發(fā)生了變化,需要重新從內(nèi)存中獲取(ACCESS_ONCE本身的實(shí)現(xiàn)就是增加了volatile這個(gè)關(guān)鍵字,它確保編譯器每次訪問的變量都是從內(nèi)存中獲取,防止編譯器優(yōu)化)。

          9. 執(zhí)行barrier,同上文描述,不再贅述。

          4.3 AArch32的解鎖實(shí)現(xiàn)

          static inline void arch_spin_unlock(arch_spinlock_t *lock)
          {
          unsigned long tmp;
          u32 slock;

          smp_mb();

          __asm__ __volatile__(
          " mov %1, #1\n" //1.
          "1: ldrex %0, [%2]\n" //2.
          " uadd16 %0, %0, %1\n" //3.
          " strex %1, %0, [%2]\n" //4.
          " teq %1, #0\n" //5.
          " bne 1b" //6.
          : "=&r" (slock), "=&r" (tmp)
          : "r" (&lock->slock)
          : "cc");

          dsb_sev(); //7.
          }
          1. 將tmp賦值為1

          2. 將lock->slock的值賦值給slock。

          3. 將slock的低16bit,也就是owner成員的值加1。

          4. 將新的值新的ower,使用strex寫入中lock->slock,將是否成功結(jié)果存入在tmp中

          5. 檢查是否寫入成功,成功,跳到第7步,實(shí)現(xiàn)了原子操作;不成功跳到第6步;

          6. tmp不等于0(不成功),繼續(xù)返回label 1重試,即跳回第2步

          4.4 AArch64的加鎖實(shí)現(xiàn)

          static inline void arch_spin_lock(arch_spinlock_t *lock)
          {
          unsigned int tmp;
          arch_spinlock_t lockval, newval;

          asm volatile(
          " prfm pstl1strm, %3\n" //1.
          "1: ldaxr %w0, %3\n" //2.
          " add %w1, %w0, %w5\n" //3.
          " stxr %w2, %w1, %3\n" //4.
          " cbnz %w2, 1b\n" //5.
          /* Did we get the lock? */
          " eor %w1, %w0, %w0, ror #16\n" //6.
          " cbz %w1, 3f\n" //7.
          " sevl\n" //8.
          "2: wfe\n" //9.
          " ldaxrh %w2, %4\n" //10.
          " eor %w1, %w2, %w0, lsr #16\n" //11.
          " cbnz %w1, 2b\n" //12.
          "3:" //13.
          : "=&r" (lockval), "=&r" (newval), "=&r" (tmp), "+Q" (*lock)
          : "Q" (lock->owner), "I" (1 << TICKET_SHIFT)
          : "memory")
          ;
          }

          核心邏輯與AArch32類似,匯編實(shí)現(xiàn)會(huì)有不一樣,這里不再展開。

          1. 從lock(memory)預(yù)取數(shù)據(jù)到L1cache中,加速執(zhí)行。

          2. 使用ldaxr指令(Load-acquire exclusive register,帶Exclusive和Acquire-Release 兩種語義),將lock的值賦值給lockval。

          3. newval = lockval + (1 << TICKET_SHIFT)= lockval + (1 << 16),等價(jià)于newval =lockval.tickets.next++,相當(dāng)于從銀行取號(hào)機(jī)取號(hào)。

          4. 使用stxr指令,將newval賦值給lock,并將exclusive是否設(shè)置成功結(jié)果存放到tmp中。

          5. 如果tmp != 0,說明exclusive失敗,需要重新跳到開始處(第2步)重試,因?yàn)檫@時(shí)候其他CPU核心可能有是執(zhí)行流插入,搶在我們前面執(zhí)行。否則繼續(xù)。

          6. eor 異或運(yùn)算實(shí)現(xiàn)lockval.tickets.next和owner成員是否相等的判斷

          7. 如果相等,跳到label3(對(duì)應(yīng)13步),獲得鎖進(jìn)入臨界區(qū)。否則往下,執(zhí)行自旋。

          8. 使用SEVL指令(發(fā)送本地事件,Send Event Locally),喚醒本CPU核心,防止有丟失unlock的情況出現(xiàn)。

          9. 執(zhí)行WFE指令,CPU進(jìn)低功耗狀態(tài),省點(diǎn)功耗。

          10. 獲取當(dāng)前的Owner值存放在tmp中

          11. 判斷l(xiāng)ockval.tickets.next和owner成員的值是否相等

          12. 如果不相等就回跳到label2(對(duì)應(yīng)第9步)。相等繼續(xù)往下。

          13. 結(jié)束退出。

          4.5 AArch64的解鎖實(shí)現(xiàn)

          static inline void arch_spin_unlock(arch_spinlock_t *lock)
          {
          asm volatile(
          " stlrh %w1, %0\n"
          : "=Q" (lock->owner)
          : "r" (lock->owner + 1)
          : "memory")
          ;
          }

          解鎖的操作相對(duì)簡單,stlrh除了將lock->owner++(相當(dāng)于銀行柜臺(tái)叫下一個(gè)排隊(duì)者的號(hào)碼),將會(huì)兼有SEVDSB的功能。

          5. mcs spinlock

          5.1 關(guān)鍵數(shù)據(jù)結(jié)構(gòu)和變量

          struct mcs_spinlock {
          ????struct mcs_spinlock *next;
          //1.
          ????in
          t nt locked; //2.
          };

          1. 當(dāng)一個(gè)CPU試圖獲取一個(gè)spinlock時(shí),它就會(huì)將自己的MCS lock加到這個(gè)spinlock的等待隊(duì)列的隊(duì)尾,然后next指向這個(gè)新的MCS lock。

          2. locked的值為1表示已經(jīng)獲得spinlock,為0則表示還沒有持有該鎖。

          5.2 加鎖的實(shí)現(xiàn)

          static inline void mcs_spin_lock(struct mcs_spinlock **lock, struct mcs_spinlock *node)
          {
          struct mcs_spinlock *prev;

          node->locked = 0; //1.
          node->next = NULL;

          prev = xchg(lock, node); //2
          if (likely(prev == NULL)) { //3
          return;
          }
          ACCESS_ONCE(prev->next) = node; //4

          arch_mcs_spin_lock_contended(&node->locked); //5.
          }

          先看下兩個(gè)參數(shù):

          • 第1個(gè)參數(shù)lock:是指向指針的指針(二級(jí)指針),是因?yàn)樗赶虻氖悄┪补?jié)點(diǎn)里的next域,而next本身是一個(gè)指向struct mcs_spinlock的指針。

          • 第2個(gè)參數(shù)node:試圖加鎖的CPU對(duì)應(yīng)的MCS lock節(jié)點(diǎn)。

          接下來看代碼邏輯

          1. 初始化node節(jié)點(diǎn)

          2. 找隊(duì)列末尾的那個(gè)mcs lock。xchg完成兩件事,一是給一個(gè)指針賦值,二是獲取了這個(gè)指針在賦值前的值,相當(dāng)于下面兩句:

            prev = *lock; //隊(duì)尾的lock
            *lock = node; //將lock指向新的node
          3. 如果隊(duì)列為空,CPU可以立即獲得鎖,直接返回;否則繼續(xù)往下。不需要基于"locked"的值進(jìn)行spin,所以此時(shí)locked的值不需要關(guān)心。

          4. 等價(jià)于prev->next = node,把自己這個(gè)node加入等待隊(duì)列的末尾。

          5. 調(diào)用arch_mcs_spin_lock_contended,等待當(dāng)前鎖的持有者將鎖釋放給我。

          #define arch_mcs_spin_lock_contended(l)	 \
          do { \
          while (!(smp_load_acquire(l))) \ //1.

          arch_mutex_cpu_relax(); \ //2.
          } while (0)
          1. 上文中的node->locked==0,說明沒有獲得鎖,需要繼續(xù)往下執(zhí)行;說明已經(jīng)獲得鎖,直接退出

          2. ARM64中arch_mutex_cpu_relax調(diào)用cpu_relax函數(shù)的,有一個(gè)內(nèi)存屏障指令防止編譯器優(yōu)化。從4.1開始還存一個(gè)yield指令。該指令,為了提高性能,占用cpu的線程可以使用該給其他線程。

          5.3 解鎖的實(shí)現(xiàn)

          static inline void mcs_spin_unlock(struct mcs_spinlock **lock, struct mcs_spinlock *node)
          {
          struct mcs_spinlock *next = ACCESS_ONCE(node->next); //1.

          if (likely(!next)) { //2.
          if (likely(cmpxchg(lock, node, NULL) == node)) //3
          return;
          while (!(next = ACCESS_ONCE(node->next))) //4.
          arch_mutex_cpu_relax(); //5.
          }

          arch_mcs_spin_unlock_contended(&next->locked); //6.
          }
          1. 找到等待隊(duì)列中的下一個(gè)節(jié)點(diǎn)

          2. 當(dāng)前沒有其他CPU試圖獲得鎖

          3. 直接釋放鎖。如果*lock == node,將*lock = NULL,然后直接返回;反之,說明當(dāng)前隊(duì)列中有等待獲取鎖的CPU。繼續(xù)往下。cmpxchg作用翻譯大致如下所示:

            cmpxchg(lock, node, NULL)
            {
            ret = *lock;
            if (*lock == node)
            *lock = NULL;
            return ret;
            }
          4. 距離函數(shù)開頭獲得"next"指針的值已經(jīng)過去一段時(shí)間了,在這個(gè)時(shí)間間隔里,可能又有CPU把自己添加到隊(duì)列里來了。需要重新獲得next指針的值。于是,待新的node添加成功后,才可以通過arch_mcs_spin_unlock_contended()將spinlock傳給下一個(gè)CPU

          5. 如果next為空,調(diào)用arch_mutex_cpu_relax,作用同上文。

          6. arch_mcs_spin_unlock_contended(l),實(shí)際上是調(diào)用smp_store_release((l), 1),將next->locked設(shè)置為1。將spinlock傳給下一個(gè)CPU

          6. queued spinlock

          6.1 概述

          它首先肯定要解決mcs_spinlock的占用空間問題,否則設(shè)計(jì)再好,也無法合入主線。它是這樣的:大部分情況用的鎖大小控制在跟以前ticket spinlock一樣的水平,設(shè)計(jì)了兩個(gè)域:分別是lockedpending,分別表示鎖當(dāng)前是否被持有,已經(jīng)在持有時(shí),是否又來了一個(gè)申請(qǐng)者競爭。爭鎖好比搶皇位,皇位永遠(yuǎn)只有1個(gè)(對(duì)應(yīng)locked域),除此之外還有1個(gè)太子位(對(duì)應(yīng)pending域),防止皇帝出現(xiàn)意外能隨時(shí)候補(bǔ)上,不至于出現(xiàn)群龍無首的狀態(tài),他們可以住在紫禁城內(nèi)(使用qspinlock)。這兩個(gè)位置被占后,其他人還想來競爭皇位,只有等皇帝和太子都移交各自的位子以后才可以,在等待的時(shí)候你需要在紫禁城外待在自己的府邸里(使用mcs_spinlock),減小紫禁城的擁擠(減少系統(tǒng)損耗)。

          即當(dāng)鎖的申請(qǐng)者小于或等于2時(shí),只需要1個(gè)qspinlock就可以了,其所占內(nèi)存的大小和ticket spinlock一樣。當(dāng)鎖的競爭者大于2個(gè)時(shí)候,才需要(N-2)個(gè)mcs_spinlock。qspinlock還是全局的,為降低鎖的競爭,使用退化到per-cpu的mcs_spinlock鎖,所有的mcs_spinlock鎖串行構(gòu)成一個(gè)等待隊(duì)列,這樣cacheline invalide帶來的損耗減輕了很多。這是它的基本設(shè)計(jì)思想。

          在大多數(shù)情況下,per-cpu的mcs lock只有1個(gè),除非發(fā)生了嵌套task上下文被中斷搶占,因?yàn)橹袛嗌舷挛闹挥?種類(softirq、hardirq和nmi),所有每個(gè)CPU核心至多有4個(gè)mcs_spinlock鎖競爭。而且,所有mcs_spinlock會(huì)串聯(lián)到一個(gè)等待隊(duì)列里的。

          上圖展示的是:qspinlocklockedpending位都被占,需要進(jìn)入mcs_spinlock等待隊(duì)列,而CPU(2)是第1個(gè)進(jìn)入等待隊(duì)列的,qspinlocktail.cpu則被賦值成2,CPU(2)的mcs_spinlock數(shù)組的第3個(gè)成員空閑,則qspinlocktail.index被賦值成3。入隊(duì)的是CPU(3)、CPU(1)和CPU(0),通過mcs_spinlocknext域?qū)⒋蠹疫B成隊(duì)列。

          假如等到競爭qspinlock的2個(gè)鎖的持有者,都釋放了,則CPU(2)的第3個(gè)空閑成員則獲得鎖,完成它的臨界區(qū)訪問后,通過qspinlocktail.cpu(類似于頁表基址)和tail.index(類似于頁表內(nèi)的偏移),快速找到下個(gè)mcs_spinlock的node節(jié)點(diǎn)。

          下面要進(jìn)入細(xì)節(jié)分析了,本文只考慮小端模式、NR_CPUS < 16K的情況,不考慮虛擬化這塊,去掉qstats統(tǒng)計(jì),力圖聚焦在該鎖實(shí)現(xiàn)的核心邏輯上。

          6.2 關(guān)鍵數(shù)據(jù)結(jié)構(gòu)和變量

          struct qspinlock


          數(shù)據(jù)簡化如下:

          typedef struct qspinlock {
          union {
          atomic_t val;
          struct {
          u8 locked;
          u8 pending;
          };
          struct {
          u16 locked_pending;
          u16 tail;
          };
          };
          } arch_spinlock_t;

          struct qspinlock包含了三個(gè)主要部分:

          1. locked(0- 7bit):表示是否持有鎖,只有1和0兩個(gè)值,1表示某個(gè)CPU已經(jīng)持有鎖,0則表示沒有持有鎖。

          2. pending(8bit):作為競爭鎖的第一候補(bǔ),第1個(gè)等待自旋鎖的CPU只需要簡單地設(shè)置它為1,則表示它成為第一候補(bǔ),后面再有CPU來競爭鎖,則需要?jiǎng)?chuàng)建mcs lock節(jié)點(diǎn)了;為0則表示該候補(bǔ)位置是空閑的。

          3. tail(16-31bit): 通過這個(gè)域可以找到自旋鎖隊(duì)列的最后一個(gè)節(jié)點(diǎn)。又細(xì)分為:

            • tail cpu(18-31bit):來記錄和快速索引需要訪問的mcs_spinlock位于哪個(gè)CPU上,作用類似于頁表基址。

            • tail index(16-17bit):用來標(biāo)識(shí)當(dāng)前處在哪種context中。Linux中一共有4種context,分別是task, softirq, hardirq和nmi,1個(gè)CPU在1種context下,至多試圖獲取一個(gè)spinlock,1個(gè)CPU至多同時(shí)試圖獲取4個(gè)spinlock。當(dāng)然也表示嵌套的層數(shù)。對(duì)應(yīng)per-cpu的mcs_spinlock的數(shù)組(對(duì)應(yīng)下文的struct mcs_spinlock mcs_nodes[4])的下標(biāo),作用類似于類似于頁表內(nèi)的偏移。

          struct mcs_spinlock

          mcs_spinlock的數(shù)據(jù)結(jié)構(gòu)如下:

          struct mcs_spinlock {
          struct mcs_spinlock *next;
          int locked; /* 1 if lock acquired */
          int count; /* nesting count, see qspinlock.c */
          };

          struct qnode {
          struct mcs_spinlock mcs;
          };

          static DEFINE_PER_CPU_ALIGNED(struct mcs_spinlock, mcs_nodes[4]);

          使用per-cpu的struct mcs_spinlock mcs_nodes[4],可以用來減少對(duì)cacheline的競爭。數(shù)組數(shù)量為4前文已經(jīng)解釋過了。struct mcs_spinlock具體含義如下:

          1. locked:用來通知這個(gè)CPU你可以持鎖了,通過該域完成擊鼓傳花,當(dāng)然這個(gè)動(dòng)作是上一個(gè)申請(qǐng)者釋放的時(shí)候通知的。

          2. count:嵌套的計(jì)數(shù)。只有第0個(gè)節(jié)點(diǎn)這個(gè)域才有用,用來索引空閑節(jié)點(diǎn)的。

          3. next:指向下一個(gè)鎖的申請(qǐng)者,構(gòu)成串行的等待隊(duì)列的鏈表。

          6.3 加鎖實(shí)現(xiàn)

          核心邏輯概述

          我們把一個(gè)qspinlock對(duì)應(yīng)的**( tail, pending, locked)稱為一個(gè)三元組(x,y,z)**,以此描述鎖狀態(tài)及其遷移,有2中特殊狀態(tài):

          1. 初始狀態(tài)(無申請(qǐng)者競爭):(0, 0, 0)

          2. 過渡狀態(tài)(類似于皇帝正在傳位給太子,處于交接期):(0, 1, 0)

          按加鎖原有代碼只有慢速路徑和非慢速路徑,我們?yōu)榱诵形姆奖?,將源代碼的慢速路徑又分為中速路徑和慢速路徑,這樣就有以下三組狀態(tài):

          不同路徑出現(xiàn)情況核心功能所在的函數(shù)和使用鎖的種類
          快速路徑鎖沒有持有者,locked位(皇位)空缺queued_spin_lock,使用qspinlock
          中速路徑鎖已經(jīng)被持有,但pending位(太子位)空缺queued_spin_lock_slowpath,使用qspinlock
          慢速路徑鎖已經(jīng)被持有,locked位和pending位都沒空缺queued_spin_lock_slowpath,使用mcs_spinlock

          核心邏輯就變成

          1. 如果沒有人持有鎖,那我們進(jìn)入快速路徑,直接拿到鎖走人,走之前把locked位(皇位)標(biāo)記成已搶占狀態(tài)。期間,只需要使用qspinlock。

          2. 如果有人持有鎖(搶到了皇位成為皇帝),但pending位(太子位)空缺,那我們先搶這個(gè)位置,進(jìn)入的是中速路徑,等這個(gè)人(皇帝)釋放鎖(傳位)了,我們就可以拿到鎖了。期間,只需要用到qspinlock。

          3. 如果這兩個(gè)位置我們都搶不到,則進(jìn)入慢速路徑,需要使用per-cpu的mcs_spinlock。

          總體狀態(tài)流程圖如下:

          梳理一下對(duì)應(yīng)偽碼描述:

          static __always_inline void queued_spin_lock(struct qspinlock *lock)
          {
          if (初始狀態(tài)){ //沒人持鎖
          //進(jìn)入快速路徑
          return;}
          queued_spin_lock_slowpath(lock, val); //進(jìn)入中速或者慢速路徑
          }

          void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val) //lock是原始鎖,val是之前拿到的lock->val的值
          {
          //代碼片段1:過渡狀態(tài)判斷:看是否處于過渡狀態(tài),嘗試等這個(gè)狀態(tài)完成
          //代碼片段2:慢速路徑判斷:看是否有其他申請(qǐng)者競爭,則直接進(jìn)入慢速路徑的核心片段
          //代碼片段3:中速路徑

          queue:
          //代碼判斷4:慢速路徑
          //1. 調(diào)用__this_cpu_inc(mcs_nodes[0].count)將當(dāng)前cpu競爭鎖的數(shù)量+1
          //2. mcs node的尋址和初始化
          //3. 調(diào)用queued_spin_trylock(),看看準(zhǔn)備期間spinlock是不是已經(jīng)被釋放了
          //4. 處理等待隊(duì)列已經(jīng)有人的情況,重點(diǎn)是arch_mcs_spin_lock_contended();
          //5. 處理我們加入隊(duì)列就是隊(duì)首的情況
          locked:// 已經(jīng)等到鎖了
          //6.處理我們是隊(duì)列最后一個(gè)節(jié)點(diǎn)的情況
          //7.處理我們前面還有節(jié)點(diǎn)的情況
          //8.調(diào)用arch_mcs_spin_unlock_contended()通知下一個(gè)節(jié)點(diǎn)
          release:
          //9.__this_cpu_dec(mcs_nodes[0].count);
          }

          下面我們開始分析

          queued_spin_lockqueued_spin_lock_slowpath函數(shù)的實(shí)現(xiàn)細(xì)節(jié)。

          實(shí)現(xiàn)細(xì)節(jié)分析

          static __always_inline void queued_spin_lock(struct qspinlock *lock)
          {
          u32 val;

          val = atomic_cmpxchg_acquire(&lock->val, 0, _Q_LOCKED_VAL); //1
          if (likely(val == 0)) //2
          return;
          queued_spin_lock_slowpath(lock, val); //3
          }
          1. 通過atomic_cmpxchg_acquire,與之前的cmpxchg類似,用原子操作實(shí)現(xiàn):

            • 將val賦值為lock->val。

            • lock->val值如果為0(沒人拿到鎖),將lock->val的值設(shè)為1(即*_Q_LOCKED_VAL*),三元組狀態(tài)由**( tail, pending, locked)**= (0, 0, 0)遷移為(0, 0, 1)。

            • lock->val值如果不為0,保持lock->val的值不變。

          2. 如果當(dāng)前沒有人獲得鎖,直接拿到鎖返回。三元組的狀態(tài)遷移已經(jīng)在上一步完成了。否則需要繼續(xù)往下,走中速/慢速路徑。

          3. 進(jìn)入中速/慢速路徑,調(diào)用queued_spin_lock_slowpath函數(shù)

          快速路徑

          lock->val值==0(沒人拿到鎖),三元組狀態(tài)由**( tail, pending, locked)**= (0, 0, 0)遷移為(0, 0, 1)??焖俜祷兀挥玫却?。

          下面進(jìn)入queued_spin_lock_slowpath函數(shù)的分析,我們先分析過渡狀態(tài)判斷中速路徑兩個(gè)代碼片段:

          void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
          {

          /* 過渡狀態(tài)判斷 */
          if (val == _Q_PENDING_VAL) { //0.
          int cnt = _Q_PENDING_LOOPS;
          val = atomic_cond_read_relaxed(&lock->val,(VAL != _Q_PENDING_VAL) || !cnt--);
          }
          /*其他部分代碼*/

          }

          過渡狀態(tài)判斷

          1. 如果三元組( tail,pending,locked)狀態(tài)如果是(0, 1, 0),則嘗試等待狀態(tài)變?yōu)?(0, 0, 1),但是只會(huì)循環(huán)等待1次。

            簡單翻譯一下atomic_cond_read_relaxed語句的意思為:如果(lock->val != _Q_PENDING_VAL) || !cnt--)則跳出循環(huán),繼續(xù)往下。

          中速路徑

          進(jìn)入中速路徑的前提是當(dāng)前沒有其他競爭者在等待隊(duì)列中排隊(duì)以及pending位空缺,之后我們先將pending位占住。如果已經(jīng)這段期間已經(jīng)有人釋放了,那直接獲取鎖并將pending位重置為空閑,反正則要自旋等持鎖者釋放鎖再做其他動(dòng)作。

          void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
          {

          /* ... 過渡狀態(tài)判斷及處理,代碼省略 */

          /* ... 判斷是否有pending和tail是否有競爭者,有競爭者直接進(jìn)入慢速路徑排隊(duì),代碼省略 */

          /* 中速路徑開始 */
          val = atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val); //1.
          if (!(val & ~_Q_LOCKED_MASK)) { //2.
          if (val & _Q_LOCKED_MASK) { //3.
          atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_MASK)); //4.
          }

          clear_pending_set_locked(lock); //5.
          return; //6.
          }

          if (!(val & _Q_PENDING_MASK)) //7.
          clear_pending(lock);
          /* 中速路徑結(jié)束 */

          /*其他部分代碼*/

          }

          這部分代碼的核心邏輯是在競爭qspinlocpending,即競爭太子位。進(jìn)入中速路徑的前提是三元組( tail,pending,locked)=(0, 0, *)

          1. 離“是否是過渡狀態(tài)”和“除鎖的持有者者之外是否競爭者”的判斷已經(jīng)過了一段時(shí)間了。需要用atomic_fetch_or_acquire函數(shù),通過原子操作,重新看一下鎖的狀態(tài),執(zhí)行了兩個(gè)動(dòng)作:

            執(zhí)行完,三元組狀態(tài)會(huì)從(0, 0, *)改為(0, 1, *)。

            • val = lock->val(成為“原始的lock->val”)

            • lock->val的peding域被置位。

          2. 如果原始的lock->valpendingtail域都為0,則表明沒有pending位沒有其他競爭者,即太子位空閑可以去競爭;否則,則表明有其他競爭者,不滿足中速路徑的條件,需要進(jìn)入慢速路徑。lock->val的和tail域就不用關(guān)心了。所以此時(shí)的三元狀態(tài)可以使(*, 1, *)

          3. 如果locked域?yàn)?,表明位置被占著,已經(jīng)有人在持鎖了,我們需要跳到第4步,等鎖被釋放;否則,沒有人持鎖,皇位是空著的,我們跳到第5步,直接去上位就好了。

          4. spin等待,直到lock->vallocked域變回0,也就是等皇位空出來。三元組狀態(tài)會(huì)從(*, 1, 1) 轉(zhuǎn)變?yōu)? *, 1, 0)

          5. 拿到鎖了,可以登基上位了,但還要做些清理工作,調(diào)用clear_pending_set_locked,將lock->valpending域清零以及將locked置1。即三元組狀態(tài)由(*, 1, 0) 轉(zhuǎn)變?yōu)? *, 0, 1)。

          6. 結(jié)束,提前返回。

          7. 如果沒有進(jìn)入中速路徑,第1步開始時(shí)獲得的鎖的狀態(tài)是(n, 0, *) ,需要把在第一步設(shè)置的現(xiàn)在鎖的pending域恢復(fù)成0。

          慢速路徑

          上面流程圖有些地方不太準(zhǔn)確,但沒有關(guān)系,可以先幫我們建立起總體流程印象,細(xì)節(jié)后續(xù)我們會(huì)展開。

          在拿到mcs lock的空閑節(jié)點(diǎn)之后,我們先用queued_spin_trylock函數(shù),檢查一下在我們準(zhǔn)備的這段時(shí)間里,是不是已經(jīng)有人釋放了該鎖了,如果有人釋放了,就提前返回了。

          獲得鎖之前,要分兩種情況分別處理:

          • 如果隊(duì)列中只有我們,那只用自旋等lock的pending位空出來即可。

          • 如果隊(duì)列中還有其他競爭者在排在我們前面,則需要自旋等前序節(jié)點(diǎn)釋放該鎖。

          獲得鎖之后,也要分兩種情況處理:

          • 如果隊(duì)列中只有我們,將鎖的locked位置位,表明鎖已經(jīng)被我們持有了,不需要再做其他處理。

          • 如果隊(duì)列中還有其他競爭者在排在我們后面,則需要將鎖傳遞這位競爭者,當(dāng)然需要要等它正式加入。

          慢速路徑的簡化代碼如下所示:

          void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
          {
          struct mcs_spinlock *prev, *next, *node;
          u32 old, tail;
          int idx;

          /* ... 過渡狀態(tài)判斷及處理,省略 */

          /* 先判斷是否有pending和tail是否有競爭者,有競爭者直接進(jìn)入慢速路徑排隊(duì) */
          if (val & ~_Q_LOCKED_MASK)
          goto queue;

          /* 中速路徑代碼開始 */
          val = atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val);
          if (!(val & ~_Q_LOCKED_MASK)) {
          // ...中速路徑處理,省略
          return;
          }
          /* 中速路徑代碼結(jié)束 */

          /* 之前代碼的收尾清理工作 */
          if (!(val & _Q_PENDING_MASK))
          clear_pending(lock); //將pending狀態(tài)恢復(fù)。


          //慢速路徑代碼開始位置
          queue://進(jìn)入mcs lock隊(duì)列
          node = this_cpu_ptr(&mcs_nodes[0]); //1.
          idx = node->count++;
          tail = encode_tail(smp_processor_id(), idx);

          node += idx; //2.
          barrier();
          node->locked = 0;
          node->next = NULL;

          if (queued_spin_trylock(lock)) //3.
          goto release;
          smp_wmb();

          old = xchg_tail(lock, tail); //4.
          next = NULL;

          if (old & _Q_TAIL_MASK) { //5.
          prev = decode_tail(old); //6.
          WRITE_ONCE(prev->next, node); //7.
          arch_mcs_spin_lock_contended(&node->locked); //8.
          next = READ_ONCE(node->next); //9.
          if (next)
          prefetchw(next);
          }

          val = atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK)); //10.

          locked:
          if (((val & _Q_TAIL_MASK) == tail) && //11.
          atomic_try_cmpxchg_relaxed(&lock->val, &val, _Q_LOCKED_VAL))
          goto release; /* No contention */

          set_locked(lock); //12.

          if (!next) //13.
          next = smp_cond_load_relaxed(&node->next, (VAL));

          arch_mcs_spin_unlock_contended(&next->locked); //14.

          release:
          __this_cpu_dec(mcs_nodes[0].count); //15.
          }

          之前是進(jìn)入queued_spin_lock_slowpath函數(shù),有“是否過渡狀態(tài)的判斷”及處理,“是否要進(jìn)入慢速路徑的判斷”及處理,如果進(jìn)了中速路徑的處理。下面正式進(jìn)入慢速路徑的處理。因?yàn)橄到y(tǒng)可能有多個(gè)cpu并發(fā),即使在同一個(gè)cpu上也可能切換了4種context上下文的中一種,說不定大家競爭的鎖已經(jīng)都被釋放了也說不準(zhǔn),所以此時(shí)三元組的狀態(tài)是未知的。

          1. 獲得local CPU上的第0個(gè)節(jié)點(diǎn)mcs_spinlock類型的node節(jié)點(diǎn),并將節(jié)點(diǎn)的count值+1,count值記錄了local CPU上空閑節(jié)點(diǎn)的起始下標(biāo),該值也只在第0個(gè)有意義。encode_tail函數(shù),節(jié)點(diǎn)編號(hào)編碼成tail(2bit的tail index和14bit的tail cpu),同時(shí)還完成:

            • 區(qū)分到底是tail域沒有指向任何節(jié)點(diǎn),還是指向了第0個(gè)CPU的第0個(gè)節(jié)點(diǎn)。

            • 檢查當(dāng)前CPU上自旋鎖嵌套的層數(shù)是否超過4層。

          2. 根據(jù)idx取到取到空閑節(jié)點(diǎn),相當(dāng)于node = mcs_nodes[idx]。這里有barrier保序,防止編譯器優(yōu)化。然后對(duì)空閑節(jié)點(diǎn)進(jìn)行初始化。

          3. 調(diào)用queued_spin_trylock函數(shù),檢查一下在我們準(zhǔn)備的這段時(shí)間里,是不是已經(jīng)有人釋放了該鎖了。如果成功獲得鎖,則直接跳到lable release處;否則,繼續(xù)往下。queued_spin_trylock函數(shù)的作用具體來講是這樣:

            連續(xù)兩次使用原子操作檢查鎖的狀態(tài):如果val為0(對(duì)應(yīng)三元組(0, 0, 0))并且再次讀取val的locked域也為0,則表示可以獲得鎖。上文說過,此時(shí)鎖的狀態(tài)是未知的。

          4. 在調(diào)用xchg_tail函數(shù)之前,有smp_wmb內(nèi)存屏障,保證tail值獲得的正確性。該函數(shù)將lock->tail設(shè)置為新生成的tail值,并將舊的值存在old中。

          5. 如果舊的tail為0,說明隊(duì)列里只有我們這個(gè)新生成的節(jié)點(diǎn),直接跳到第10步;否則繼續(xù)往下執(zhí)行

          6. 通過舊的tail拿到等待隊(duì)列的尾結(jié)點(diǎn)prev。

          7. 將當(dāng)前節(jié)點(diǎn)插入等待隊(duì)列,作為新的尾節(jié)點(diǎn)。

          8. 自旋等待本節(jié)點(diǎn)的locked域變?yōu)?。在local CPU上自旋等待自己的locked成員被置位。arch_mcs_spin_lock_contended在arm64上最終會(huì)調(diào)用__cmpwait_case_##name宏,展開后同arm64的tick spinlock的arch_spin_lock的實(shí)現(xiàn)類似。核心功能都是在自旋時(shí)執(zhí)行WFE節(jié)省部分能耗。

          9. 后面3行代碼,是看下在我們后面是否還有其他人在排隊(duì),如果有的話,使用prfm指令,提前將相關(guān)值預(yù)取到local CPU的cache中,加速后續(xù)的執(zhí)行。

          10. 使用原子操作重新獲取lock->val的值,并且循環(huán)等到直到pendinglocked都為0。三元組( tail, pending, locked)的值為(*, 0, 0),也就是說需要等到皇位和太子位都是空閑的狀態(tài)才是我們真正獲得了鎖的條件。

          11. 如果tail還是我們?cè)O(shè)置的,說明我們同時(shí)是等待隊(duì)列的最后一個(gè)節(jié)點(diǎn)或者說是唯一節(jié)點(diǎn),后面沒人在排隊(duì)了。通過atomic_try_cmpxchg_relaxed原子的將locked域設(shè)置為1,至此才真正完成了鎖的合法擁有,直接跳到最后1步。三元組的狀態(tài)遷移是(n, 0, 0) --> (0, 0, 1)。否則,如果tail發(fā)生了改變,說明后面還有節(jié)點(diǎn)或者pending位被占,則繼續(xù)往下處理。

          12. 先將locked設(shè)置為1。三元組的狀態(tài)遷移是(*, *, 0) --> (*, 0, 1)。

          13. 等待的下一個(gè)節(jié)點(diǎn)還有正式加入進(jìn)來,則需要等next變成非空(非空才真正了完成加入)。

          14. 調(diào)用arch_mcs_spin_unlock_contended,將下一個(gè)節(jié)點(diǎn)的locked值設(shè)置成1 ,完成了鎖的傳遞。也就是完成了擊鼓傳花。

          15. 釋放本節(jié)點(diǎn)。

          6.4 解鎖實(shí)現(xiàn)

          static __always_inline void queued_spin_unlock(struct qspinlock *lock)
          {
          smp_store_release(&lock->locked, 0);
          }

          qspinlocklocked清零即可。


          【參考資料】

          1. 【原創(chuàng)】linux spinlock/rwlock/seqlock原理剖析(基于ARM64)

          2.? 宋寶華:幾個(gè)人一起搶spinlock,到底誰先搶到?

          3.?Linux內(nèi)核同步機(jī)制之(四):spin lock,蝸窩科技

          4.?Linux中的spinlock機(jī)制[一] - CAS和ticket spinlock,蘭新宇,知乎






          推薦閱讀:

          專輯|Linux文章匯總
          專輯|程序人生
          專輯|C語言
          我的知識(shí)小密圈

          關(guān)注公眾號(hào),后臺(tái)回復(fù)「1024」獲取學(xué)習(xí)資料網(wǎng)盤鏈接。

          歡迎點(diǎn)贊,關(guān)注,轉(zhuǎn)發(fā),在看,您的每一次鼓勵(lì),我都將銘記于心~



          瀏覽 310
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  影音先锋日韩精品 | 北条麻妃喷水 | 欧美一区二区三区在线 | 亚洲精品成人无码在线观看 | 婷婷在线伊人 |