那些去請(qǐng)求鎖的線程都怎么樣了?
“?寫給自己看,說(shuō)給別人聽。你好,這是think123的第78篇原創(chuàng)文章”
不知道你有沒有想過(guò),那些去申請(qǐng)鎖的線程都怎樣了?有些可能申請(qǐng)到了鎖,馬上就能執(zhí)行業(yè)務(wù)代碼。但是如果有一個(gè)鎖被很多個(gè)線程需要,那么這些線程是如何被處理的呢?
ps: 如果你不想看分析結(jié)果,可以拉到最后,末尾有一張總結(jié)圖,一圖勝千言
之前文章分析過(guò)synchroinzed中鎖的優(yōu)化,但是如果存在大量競(jìng)爭(zhēng)的情況下,那么最終還是都會(huì)變成重量級(jí)鎖。所以我們這里開始直接分析重量級(jí)鎖的代碼。
申請(qǐng)鎖
在ObjectMonitor::enter函數(shù)中,有很多判斷和優(yōu)化執(zhí)行的邏輯,但是核心還是通過(guò)EnterI函數(shù)實(shí)際進(jìn)入隊(duì)列將將當(dāng)前線程阻塞
void?ObjectMonitor::EnterI(TRAPS)?{
??Thread?*?const?Self?=?THREAD;
??//?CAS嘗試將當(dāng)前線程設(shè)置為持有鎖的線程
??if?(TryLock?(Self)?>?0)?{
????assert(_succ?!=?Self,?"invariant");
????assert(_owner?==?Self,?"invariant");
????assert(_Responsible?!=?Self,?"invariant");
????return;
??}
??//?通過(guò)自旋方式調(diào)用tryLock再次嘗試,操作系統(tǒng)認(rèn)為會(huì)有一些微妙影響
??if?(TrySpin(Self)?>?0)?{
????assert(_owner?==?Self,?"invariant");
????assert(_succ?!=?Self,?"invariant");
????assert(_Responsible?!=?Self,?"invariant");
????return;
??}
??...
??//?將當(dāng)前線程構(gòu)建成ObjectWaiter
??ObjectWaiter?node(Self);
??Self->_ParkEvent->reset();
??node._prev???=?(ObjectWaiter?*)?0xBAD;
??node.TState??=?ObjectWaiter::TS_CXQ;
??ObjectWaiter?*?nxt;
??for?(;;)?{
????//?通過(guò)CAS方式將ObjectWaiter對(duì)象插入CXQ隊(duì)列頭部中
????node._next?=?nxt?=?_cxq;
????if?(Atomic::cmpxchg(&node,?&_cxq,?nxt)?==?nxt)?break;
????//?由于cxq改變,導(dǎo)致CAS失敗,這里進(jìn)行tryLock重試
????if?(TryLock?(Self)?>?0)?{
??????assert(_succ?!=?Self,?"invariant");
??????assert(_owner?==?Self,?"invariant");
??????assert(_Responsible?!=?Self,?"invariant");
??????return;
????}
??}
??//?阻塞當(dāng)前線程
??for?(;;)?{
????if?(TryLock(Self)?>?0)?break;
????assert(_owner?!=?Self,?"invariant");
????//?park?self
????if?(_Responsible?==?Self)?{
??????Self->_ParkEvent->park((jlong)?recheckInterval);
??????recheckInterval?*=?8;
??????if?(recheckInterval?>?MAX_RECHECK_INTERVAL)?{
????????recheckInterval?=?MAX_RECHECK_INTERVAL;
??????}
????}?else?{
??????Self->_ParkEvent->park();
????}
????...
????
????if?(TryLock(Self)?>?0)?break;
????++nWakeups;
????if?(TrySpin(Self)?>?0)?break;
????...
??}
??...
??//?Self已經(jīng)獲取到鎖了,需要將它從CXQ或者EntryList中移除
??UnlinkAfterAcquire(Self,?&node);
??...
}
在入隊(duì)之前,會(huì)調(diào)用tryLock嘗試通過(guò)CAS操作將_owner(當(dāng)前ObjectMonitor對(duì)象鎖持有的線程指針)字段設(shè)置為Self(指向當(dāng)前執(zhí)行的線程),如果設(shè)置成功,表示當(dāng)前線程獲得了鎖,否則沒有。
int?ObjectMonitor::TryLock(Thread?*?Self)?{
??void?*?own?=?_owner;
??if?(own?!=?NULL)?return?0;
??if?(Atomic::replace_if_null(Self,?&_owner))?{
????return?1;
??}
??return?-1;
}
如果tryLock沒有成功,又會(huì)再次調(diào)用tryLock(trySpin中調(diào)用了tryLock)去嘗試獲取鎖,因?yàn)檫@樣可以告訴操作系統(tǒng)我迫切需要這個(gè)資源,希望能盡量分配給我。不過(guò)這種親和力并不是一定能得到保證的協(xié)議,只是一種積極的操作。
通過(guò) ObjectWaiter對(duì)象將當(dāng)前線程包裹起來(lái),入到 CXQ 隊(duì)列的頭部
阻塞當(dāng)前線程(通過(guò)pthread_cond_wait)
當(dāng)線程被喚醒而獲取了鎖,調(diào)用UnlinkAfterAcquire方法將ObjectWaiter從CXQ或者EntryList中移除
核心數(shù)據(jù)結(jié)構(gòu)
ObjectMonitor對(duì)象中保存了 sychronized 阻塞的線程的隊(duì)列,以及實(shí)現(xiàn)了不同的隊(duì)列調(diào)度策略,因此我們有必須先來(lái)認(rèn)識(shí)下這個(gè)對(duì)象的一些重要屬性
class?ObjectMonitor?{
??//?mark?word
??volatile?markOop?_header;
??//?指向擁有線程或BasicLock的指針?????????????????
??void?*?volatile?_owner;?
??//?monitor的先前所有者的線程ID
??volatile?jlong?_previous_owner_tid;
??//?重入次數(shù),第一次為0
??volatile?intptr_t?_recursions;
??//?下一個(gè)被喚醒的線程
??Thread?*?volatile?_succ;
??//?線程在進(jìn)入或者重新進(jìn)入時(shí)被阻塞的列表,由ObjectWaiter組成,相當(dāng)于對(duì)線程的一個(gè)封裝對(duì)象
??ObjectWaiter?*?volatile?_EntryList;
??//?CXQ隊(duì)列存儲(chǔ)的是enter的時(shí)候因?yàn)殒i已經(jīng)被別的線程阻塞而進(jìn)不來(lái)的線程
??ObjectWaiter?*?volatile?_cxq;
??//?處于wait狀態(tài)(調(diào)用了wait())的線程,會(huì)被加入到waitSet
??ObjectWaiter?*?volatile?_WaitSet;
??//?省略其他屬性以及方法
}
class?ObjectWaiter?:?public?StackObj?{
?public:
??enum?TStates?{?TS_UNDEF,?TS_READY,?TS_RUN,?TS_WAIT,?TS_ENTER,?TS_CXQ?};
??//?后一個(gè)節(jié)點(diǎn)
??ObjectWaiter?*?volatile?_next;
??//?前一個(gè)節(jié)點(diǎn)
??ObjectWaiter?*?volatile?_prev;
??//?線程
??Thread*???????_thread;
??//?線程狀態(tài)
??volatile?TStates?TState;
?public:
??ObjectWaiter(Thread*?thread);
};
看到ObjectWaiter中的_next和_prev你就會(huì)明白,這是使用了雙向隊(duì)列實(shí)現(xiàn)等待隊(duì)列的的,但是實(shí)際上我們上面的入隊(duì)操作并沒有形成雙向列表,形成雙向列表是在exit鎖的時(shí)候。
wait
Java Object 類提供了一個(gè)基于 native 實(shí)現(xiàn)的 wait 和 notify 線程間通訊的方式,JDK中wait/notify/notifyAll全部是通過(guò)native實(shí)現(xiàn)的,當(dāng)然到了JVM,它的實(shí)現(xiàn)還是在 src/hotspot/share/runtime/objectMonitor.cpp 中。
void?ObjectMonitor::wait(jlong?millis,?bool?interruptible,?TRAPS)?{
?
??Thread?*?const?Self?=?THREAD;
??JavaThread?*jt?=?(JavaThread?*)THREAD;
??...
??//?如果線程被中斷,需要拋出異常
??if?(interruptible?&&?Thread::is_interrupted(Self,?true)?&&?!HAS_PENDING_EXCEPTION)?{
????THROW(vmSymbols::java_lang_InterruptedException());
????return;
??}
??
??jt->set_current_waiting_monitor(this);
??//?構(gòu)造?ObjectWaiter節(jié)點(diǎn)
??ObjectWaiter?node(Self);
??node.TState?=?ObjectWaiter::TS_WAIT;
??...
??//?將ObjectWaiter加入WaitSet的尾部
??AddWaiter(&node);
??//?讓出鎖
??exit(true,?Self);????????????????????
?
??...
??//?調(diào)研park(),阻塞當(dāng)前線程
??if?(interruptible?&&?(Thread::is_interrupted(THREAD,?false)?||?HAS_PENDING_EXCEPTION))?{
????????//?Intentionally?empty
??}?else?if?(node._notified?==?0)?{
????if?(millis?<=?0)?{
??????Self->_ParkEvent->park();
????}?else?{
??????ret?=?Self->_ParkEvent->park(millis);
????}
??}
??...
}
//?將node插入雙向列表_WaitSet的尾部
inline?void?ObjectMonitor::AddWaiter(ObjectWaiter*?node)?{
??if?(_WaitSet?==?NULL)?{
????_WaitSet?=?node;
????node->_prev?=?node;
????node->_next?=?node;
??}?else?{
????ObjectWaiter*?head?=?_WaitSet;
????ObjectWaiter*?tail?=?head->_prev;
????tail->_next?=?node;
????head->_prev?=?node;
????node->_next?=?head;
????node->_prev?=?tail;
??}
上面我把wait的主要方法邏輯列出來(lái)了,主要會(huì)執(zhí)行以下步驟
首先判斷當(dāng)前線程是否被中斷,如果被中斷了需要拋出InterruptedException 如果沒有被中斷,則會(huì)使用當(dāng)前線程構(gòu)造ObjectWaiter節(jié)點(diǎn),將其插入雙向鏈表WaitSet的尾部 調(diào)用exit,讓出鎖(讓出鎖的邏輯會(huì)在后面分析) 調(diào)用park(實(shí)際上是調(diào)用pthread_cond_wait)阻塞當(dāng)前線程
notify
同樣的notify的邏輯也是在ObjectMonitory.cpp中
void?ObjectMonitor::notify(TRAPS)?{
??CHECK_OWNER();
??//?waitSet為空,直接返回
??if?(_WaitSet?==?NULL)?{
????TEVENT(Empty-Notify);
????return;
??}
??DTRACE_MONITOR_PROBE(notify,?this,?object(),?THREAD);
??
??//?喚醒某個(gè)線程
??INotify(THREAD);
??OM_PERFDATA_OP(Notifications,?inc(1));
}
在notify中首先會(huì)判斷waitSet是否為空,如果為空,表示沒有線程在等待,則直接返回。否則則調(diào)用INotify方法。
notifyAll方法實(shí)際上是循環(huán)調(diào)用INotify
void?ObjectMonitor::INotify(Thread?*?Self)?{
??//?notify之前需要獲取一個(gè)鎖,保證并發(fā)安全
??Thread::SpinAcquire(&_WaitSetLock,?"WaitSet?-?notify");
??//?移除并返回WaitSet中的第一個(gè)元素,比如之前waitSet中是1?<-->?2?<-->?3,現(xiàn)在是返回1,然后waitSet變成?2<-->3
??ObjectWaiter?*?iterator?=?DequeueWaiter();
??if?(iterator?!=?NULL)?{
???
????//?Disposition?-?what?might?we?do?with?iterator??
????//?a.??add?it?directly?to?the?EntryList?-?either?tail?(policy?==?1)
????//?????or?head?(policy?==?0).
????//?b.??push?it?onto?the?front?of?the?_cxq?(policy?==?2).
????//?For?now?we?use?(b).
????//?設(shè)置線程狀態(tài)
????iterator->TState?=?ObjectWaiter::TS_ENTER;
????iterator->_notified?=?1;
????iterator->_notifier_tid?=?JFR_THREAD_ID(Self);
????ObjectWaiter?*?list?=?_EntryList;
????if?(list?!=?NULL)?{
??????assert(list->_prev?==?NULL,?"invariant");
??????assert(list->TState?==?ObjectWaiter::TS_ENTER,?"invariant");
??????assert(list?!=?iterator,?"invariant");
????}
????//?prepend?to?cxq
????if?(list?==?NULL)?{
??????iterator->_next?=?iterator->_prev?=?NULL;
??????_EntryList?=?iterator;
????}?else?{
??????iterator->TState?=?ObjectWaiter::TS_CXQ;
??????for?(;;)?{
????????//?將需要喚醒的node放到CXQ的頭部
????????ObjectWaiter?*?front?=?_cxq;
????????iterator->_next?=?front;
????????if?(Atomic::cmpxchg(iterator,?&_cxq,?front)?==?front)?{
??????????break;
????????}
??????}
????}
????iterator->wait_reenter_begin(this);
??}
??//?notify執(zhí)行完成之后釋放waitSet鎖,注意這里并不是釋放線程持有的鎖
??Thread::SpinRelease(&_WaitSetLock);
}
notify的邏輯比較簡(jiǎn)單,就是將WaitSet的頭節(jié)點(diǎn)從隊(duì)列中移除,如果EntryList為空,則將出隊(duì)節(jié)點(diǎn)放入到EntryList中,如果EntryList不為空,則將節(jié)點(diǎn)插入到CXQ列表的頭節(jié)點(diǎn)。
需要注意的是,notify并沒有釋放鎖,釋放鎖的邏輯是在exit中
exit
當(dāng)一個(gè)線程獲得對(duì)象鎖成功之后,就可以執(zhí)行自定義的同步代碼塊了。執(zhí)行完成之后會(huì)執(zhí)行到 ObjectMonitor 的 exit 函數(shù)中,釋放當(dāng)前對(duì)象鎖,方便下一個(gè)線程來(lái)獲取這個(gè)鎖。
void?ObjectMonitor::exit(bool?not_suspended,?TRAPS)?{
??Thread?*?const?Self?=?THREAD;
??if?(THREAD?!=?_owner)?{
????//?鎖的持有者是當(dāng)前線程
????if?(THREAD->is_lock_owned((address)?_owner))?{
??????assert(_recursions?==?0,?"invariant");
??????_owner?=?THREAD;
??????_recursions?=?0;
????}?else?{
??????assert(false,?"Non-balanced?monitor?enter/exit!?Likely?JNI?locking");
??????return;
????}
??}
??//?重入次數(shù)減去1
??if?(_recursions?!=?0)?{
????_recursions--;????????//?this?is?simple?recursive?enter
????return;
??}
??for?(;;)?{
????...
????w?=?_EntryList;
????//?如果entryList不為空,則將
????if?(w?!=?NULL)?{
??????assert(w->TState?==?ObjectWaiter::TS_ENTER,?"invariant");
??????//?執(zhí)行unpark,讓出鎖
??????ExitEpilog(Self,?w);
??????return;
????}
????w?=?_cxq;
????...
????_EntryList?=?w;
????ObjectWaiter?*?q?=?NULL;
????ObjectWaiter?*?p;
????//?這里將_cxq或者說(shuō)_EntryList從單向鏈表變成了一個(gè)雙向鏈表
????for?(p?=?w;?p?!=?NULL;?p?=?p->_next)?{
??????guarantee(p->TState?==?ObjectWaiter::TS_CXQ,?"Invariant");
??????p->TState?=?ObjectWaiter::TS_ENTER;
??????p->_prev?=?q;
??????q?=?p;
????}
????w?=?_EntryList;
????if?(w?!=?NULL)?{
??????guarantee(w->TState?==?ObjectWaiter::TS_ENTER,?"invariant");
??????//?執(zhí)行unpark,讓出鎖
??????ExitEpilog(Self,?w);
??????return;
????}
????...
??}
??...
}
void?ObjectMonitor::ExitEpilog(Thread?*?Self,?ObjectWaiter?*?Wakee)?{
??//?Exit?protocol:
??//?1.?ST?_succ?=?wakee
??//?2.?membar?#loadstore|#storestore;
??//?2.?ST?_owner?=?NULL
??//?3.?unpark(wakee)
??_succ?=?Wakee->_thread;
??ParkEvent?*?Trigger?=?Wakee->_event;
??Wakee??=?NULL;
??//?Drop?the?lock
??OrderAccess::release_store(&_owner,?(void*)NULL);
??OrderAccess::fence();
??...
??
??//?釋放鎖
??Trigger->unpark();
}
exit的邏輯還是比較簡(jiǎn)單的
如果當(dāng)前是當(dāng)前線程要讓出鎖,那么則查看其重入次數(shù)是否為0,不為0則將重入次數(shù)減去1,然后直接退出。
如果EntryList不為空,則將EntryList的頭元素中的線程喚醒
將cxq指針賦值給EntryList,然后通過(guò)循環(huán)將cxq鏈表變成雙向鏈表,然后調(diào)用ExitEpilog將CXQ鏈表的頭結(jié)點(diǎn)喚醒(實(shí)際是通過(guò)pthread_cond_signal)
從這里之后,EntryList和CXQ就是同一個(gè)了,因?yàn)閷XQ賦值給了EntryList了。
需要注意的是這里喚醒的線程會(huì)繼續(xù)執(zhí)行文章開頭的EnterI方法,此時(shí)會(huì)將ObjectWaiter從EntryList或者CXQ中移除。
實(shí)戰(zhàn)演示
上面的源碼均是基于JDK12,JDK8中的代碼關(guān)于exit和notify都還有其他策略(選擇哪個(gè)線程),而從JDK9開始就只保留了默認(rèn)策略了。
所以下面的Java代碼的運(yùn)行結(jié)果無(wú)論是在jdk8還是jdk12,得到的結(jié)果都是一樣的。
Object?lock?=?new?Object();
Thread?t1?=?new?Thread(()?->?{
??System.out.println("Thread?1?start!!!!!!");
??synchronized?(lock)?{
????try?{
??????lock.wait();
????}?catch?(Exception?e)?{
????}
????System.out.println("Thread?1?end!!!!!!");
??}
});
Thread?t2?=?new?Thread(()?->?{
??System.out.println("Thread?2?start!!!!!!");
??synchronized?(lock)?{
??????try?{
????????lock.wait();
??????}?catch?(Exception?e)?{
??????}
??????System.out.println("Thread?2?end!!!!!!");
??}
});
Thread?t3?=?new?Thread(()?->?{
??System.out.println("Thread?3?start!!!!!!");
??synchronized?(lock)?{
??????try?{
????????lock.wait();
??????}?catch?(Exception?e)?{
??????}
??????System.out.println("Thread?3?end!!!!!!");
??}
});
Thread?t4?=?new?Thread(()?->?{
??System.out.println("Thread?4?start!!!!!!");
??synchronized?(lock)?{
????try?{
??????System.in.read();
????}?catch?(Exception?e)?{
????}
????lock.notify();
????lock.notify();
????lock.notify();
????System.out.println("Thread?4?end!!!!!!");
??}
});
Thread?t5?=?new?Thread(()?->?{
??System.out.println("Thread?5?start!!!!!!");
??synchronized?(lock)?{
??????System.out.println("Thread?5?end!!!!!!");
??}
});
Thread?t6?=?new?Thread(()?->?{
??System.out.println("Thread?6?start!!!!!!");
??synchronized?(lock)?{
??????System.out.println("Thread?6?end!!!!!!");
??}
});
Thread?t7?=?new?Thread(()?->?{
??System.out.println("Thread?7?start!!!!!!");
??synchronized?(lock)?{
??????System.out.println("Thread?7?end!!!!!!");
??}
});
t1.start();
sleep_1_second();
t2.start();
sleep_1_second();
t3.start();
sleep_1_second();
t4.start();
sleep_1_second();
t5.start();
sleep_1_second();
t6.start();
sleep_1_second();
t7.start();
上面的代碼很簡(jiǎn)單,我們來(lái)分析一下。
線程1,2,3都調(diào)用了wait,所以會(huì)阻塞,然后WaitSet的鏈表結(jié)構(gòu)如下:

線程4獲取了鎖,在等待一個(gè)輸入
線程5,6,7也在等待鎖,所以他們也會(huì)把阻塞,所以CXQ鏈表結(jié)構(gòu)如下:

當(dāng)線程4輸入任意內(nèi)容,并回車結(jié)束后(調(diào)用了其中的3個(gè)notify方法,但還未釋放鎖)


線程4讓出鎖之后,由于EntryList不為空,所以會(huì)先喚醒EntryList中的線程1,然后接下來(lái)會(huì)喚醒CXQ隊(duì)列中的線程(后面你可以認(rèn)為CXQ就是EntryList)
所以最終線程執(zhí)行順序?yàn)?code style="font-size: 14px;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;color: rgb(0, 150, 136);">4 1 3 2 7 6 5,我們的輸出結(jié)果也能驗(yàn)證我們的結(jié)論
Thread?1?start!!!!!!
Thread?2?start!!!!!!
Thread?3?start!!!!!!
Thread?4?start!!!!!!
Thread?5?start!!!!!!
Thread?6?start!!!!!!
Thread?7?start!!!!!!
think123
Thread?4?end!!!!!!
Thread?1?end!!!!!!
Thread?3?end!!!!!!
Thread?2?end!!!!!!
Thread?7?end!!!!!!
Thread?6?end!!!!!!
Thread?5?end!!!!!!
一圖勝千言

