多線程設(shè)計(jì)模式:保護(hù)性暫停模式詳解以及其在DUBBO應(yīng)用源碼分析
JAVA前線?
互聯(lián)網(wǎng)技術(shù)人思考與分享,歡迎長按關(guān)注
1 文章概述
在多線程編程實(shí)踐中,我們肯定會面臨線程間數(shù)據(jù)交互的問題。在處理這類問題時(shí)需要使用一些設(shè)計(jì)模式,從而保證程序的正確性和健壯性。
保護(hù)性暫停設(shè)計(jì)模式就是解決多線程間數(shù)據(jù)交互問題的一種模式。本文先從基礎(chǔ)案例介紹保護(hù)性暫停基本概念和實(shí)踐,再由淺入深,最終分析DUBBO源碼中保護(hù)性暫停設(shè)計(jì)模式使用場景。
2 什么是保護(hù)性暫停
我們設(shè)想這樣一種場景:線程A生產(chǎn)數(shù)據(jù),線程B讀取數(shù)據(jù)這個(gè)數(shù)據(jù)。
但是有一種情況:線程B準(zhǔn)備讀取數(shù)據(jù)時(shí),此時(shí)線程A還沒有生產(chǎn)出數(shù)據(jù)。
在這種情況下線程B不能一直空轉(zhuǎn),也不能立即退出,線程B要等到生產(chǎn)數(shù)據(jù)完成并拿到數(shù)據(jù)之后才退出。
那么在數(shù)據(jù)沒有生產(chǎn)出這段時(shí)間,線程B需要執(zhí)行一種等待機(jī)制,這樣可以達(dá)到對系統(tǒng)保護(hù)目的,這就是保護(hù)性暫停。
保護(hù)性暫停有多種實(shí)現(xiàn)方式,本文我們用synchronized/wait/notify的方式實(shí)現(xiàn)。
@Getter
@Setter
public?class?MyData?implements?Serializable?{
????private?static?final?long?serialVersionUID?=?1L;
????private?String?message;
????public?MyData(String?message)?{
????????this.message?=?message;
????}
}
class?Resource1?{
????private?MyData?data;
????private?Object?lock?=?new?Object();
????public?MyData?getData()?{
????????synchronized?(lock)?{
????????????while?(data?==?null)?{
????????????????try?{
????????????????????//?沒有數(shù)據(jù)則釋放鎖并暫停等待被喚醒
????????????????????lock.wait();
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????????return?data;
????????}
????}
????public?void?sendData(MyData?data)?{
????????synchronized?(lock)?{
????????????//?生產(chǎn)數(shù)據(jù)后喚醒消費(fèi)線程
????????????this.data?=?data;
????????????lock.notifyAll();
????????}
????}
}
/**
?*?保護(hù)性暫停實(shí)例一
?*
?*?@author?微信公眾號「JAVA前線」
?*/
public?class?ProtectDesignTest1?{
????public?static?void?main(String[]?args)?{
????????Resource1?resource?=?new?Resource1();
????????new?Thread(()?->?{
????????????try?{
????????????????MyData?data?=?new?MyData("hello");
????????????????System.out.println(Thread.currentThread().getName()?+?"生產(chǎn)數(shù)據(jù)="?+?data);
????????????????//?模擬發(fā)送耗時(shí)
????????????????TimeUnit.SECONDS.sleep(3);
????????????????resource.sendData(data);
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}
????????},?"t1").start();
????????new?Thread(()?->?{
????????????MyData?data?=?resource.getData();
????????????System.out.println(Thread.currentThread().getName()?+?"接收到數(shù)據(jù)="?+?data);
????????},?"t2").start();
????}
}
在上述實(shí)例中線程1生產(chǎn)數(shù)據(jù),線程2消費(fèi)數(shù)據(jù)。Resource1類中通過wait/notify實(shí)現(xiàn)了保護(hù)性暫停設(shè)計(jì)模式。
3 加一個(gè)超時(shí)時(shí)間
上述實(shí)例中如果線程2沒有獲取到數(shù)據(jù),那么線程2直到拿到數(shù)據(jù)才會退出。現(xiàn)在我們給獲取數(shù)據(jù)指定一個(gè)超時(shí)時(shí)間,如果在這個(gè)時(shí)間內(nèi)沒有獲取到數(shù)據(jù)則拋出超時(shí)異常。雖然只是加一個(gè)參數(shù),但是其中有很多細(xì)節(jié)需要注意。
3.1 一段有問題的代碼
我們分析下面這段代碼
class?Resource2?{
????private?MyData?data;
????private?Object?lock?=?new?Object();
????public?MyData?getData(int?timeOut)?{
????????synchronized?(lock)?{
????????????while?(data?==?null)?{
????????????????try?{
????????????????????//?代碼1
????????????????????lock.wait(timeOut);
????????????????????break;
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????????if?(data?==?null)?{
????????????????throw?new?RuntimeException("超時(shí)未獲取到結(jié)果");
????????????}
????????????return?data;
????????}
????}
????public?void?sendData(MyData?data)?{
????????synchronized?(lock)?{
????????????this.data?=?data;
????????????lock.notifyAll();
????????}
????}
}
/**
?*?保護(hù)性暫停實(shí)例二
?*
?*?@author?微信公眾號「JAVA前線」
?*/
public?class?ProtectDesignTest2?{
????public?static?void?main(String[]?args)?{
????????Resource2?resource?=?new?Resource2();
????????new?Thread(()?->?{
????????????try?{
????????????????MyData?data?=?new?MyData("hello");
????????????????System.out.println(Thread.currentThread().getName()?+?"生產(chǎn)數(shù)據(jù)="?+?data);
????????????????//?模擬發(fā)送耗時(shí)
????????????????TimeUnit.SECONDS.sleep(3);
????????????????resource.sendData(data);
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}
????????},?"t1").start();
????????new?Thread(()?->?{
????????????MyData?data?=?resource.getData(1000);
????????????System.out.println(Thread.currentThread().getName()?+?"接收到數(shù)據(jù)="?+?data);
????????},?"t2").start();
????}
}
這段代碼看似沒有問題,使用的也是wait帶有超時(shí)時(shí)間的參數(shù),那么問題可能出在哪里呢?
問題是線程虛假喚醒帶來的。如果還沒有到超時(shí)時(shí)間代碼1就被虛假喚醒,此時(shí)data還沒有值就會直接跳出循環(huán),這樣沒有達(dá)到我們預(yù)期的超時(shí)時(shí)間才跳出循環(huán)的預(yù)期。
關(guān)于虛假喚醒這個(gè)概念,我們看看JDK官方文檔相關(guān)介紹。
A thread can also wake up without being notified, interrupted, or timing out, a so-called spurious wakeup. While this will rarely occur in practice, applications must guard against it by testing for the condition that should have caused the thread to be awakened, and continuing to wait if the condition is not satisfied. In other words, waits should always occur in loops, like this one:
synchronized?(obj)?{
????while?()
????????obj.wait(timeout);
}
官方文檔告訴我們一個(gè)線程可能會在沒有被notify時(shí)虛假喚醒,所以判斷是否繼續(xù)wait時(shí)必須用while循環(huán)。我們在寫代碼時(shí)一定也要注意線程虛假喚醒問題。
3.2 正確實(shí)例
上面我們明白了虛假喚醒問題,現(xiàn)在我們對代碼進(jìn)行修改,說明參看代碼注釋。
class?Resource3?{
????private?MyData?data;
????private?Object?lock?=?new?Object();
????public?MyData?getData(int?timeOut)?{
????????synchronized?(lock)?{
????????????//?運(yùn)行時(shí)長
????????????long?timePassed?=?0;
????????????//?開始時(shí)間
????????????long?begin?=?System.currentTimeMillis();
????????????//?如果結(jié)果為空
????????????while?(data?==?null)?{
????????????????try?{
????????????????????//?如果運(yùn)行時(shí)長大于超時(shí)時(shí)間退出循環(huán)
????????????????????if?(timePassed?>?timeOut)?{
????????????????????????break;
????????????????????}
????????????????????//?如果運(yùn)行時(shí)長小于超時(shí)時(shí)間表示虛假喚醒?->?只需再等待時(shí)間差值
????????????????????long?waitTime?=?timeOut?-?timePassed;
????????????????????//?等待時(shí)間差值
????????????????????lock.wait(waitTime);
????????????????????//?結(jié)果不為空直接返回
????????????????????if?(data?!=?null)?{
????????????????????????break;
????????????????????}
????????????????????//?被喚醒后計(jì)算運(yùn)行時(shí)長
????????????????????timePassed?=?System.currentTimeMillis()?-?begin;
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????????if?(data?==?null)?{
????????????????throw?new?RuntimeException("超時(shí)未獲取到結(jié)果");
????????????}
????????????return?data;
????????}
????}
????public?void?sendData(MyData?data)?{
????????synchronized?(lock)?{
????????????this.data?=?data;
????????????lock.notifyAll();
????????}
????}
}
/**
?*?保護(hù)性暫停實(shí)例三
?*
?*?@author?微信公眾號「JAVA前線」
?*/
public?class?ProtectDesignTest3?{
????public?static?void?main(String[]?args)?{
????????Resource3?resource?=?new?Resource3();
????????new?Thread(()?->?{
????????????try?{
????????????????MyData?data?=?new?MyData("hello");
????????????????System.out.println(Thread.currentThread().getName()?+?"生產(chǎn)數(shù)據(jù)="?+?data);
????????????????//?模擬發(fā)送耗時(shí)
????????????????TimeUnit.SECONDS.sleep(3);
????????????????resource.sendData(data);
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}
????????},?"t1").start();
????????new?Thread(()?->?{
????????????MyData?data?=?resource.getData(1000);
????????????System.out.println(Thread.currentThread().getName()?+?"接收到數(shù)據(jù)="?+?data);
????????},?"t2").start();
????}
}
4 加一個(gè)編號
現(xiàn)在再來設(shè)想一個(gè)場景:現(xiàn)在有三個(gè)生產(chǎn)數(shù)據(jù)的線程1、2、3,三個(gè)獲取數(shù)據(jù)的線程4、5、6,我們希望每個(gè)獲取數(shù)據(jù)線程都只拿到其中一個(gè)生產(chǎn)線程的數(shù)據(jù),不能多拿也不能少拿。
這里引入一個(gè)Futures模型,這個(gè)模型為每個(gè)資源進(jìn)行編號并存儲在容器中,例如線程1生產(chǎn)的數(shù)據(jù)被拿走則從容器中刪除,一直到容器為空結(jié)束。
@Getter
@Setter
public?class?MyNewData?implements?Serializable?{
????private?static?final?long?serialVersionUID?=?1L;
????private?static?final?AtomicLong?ID?=?new?AtomicLong(0);
????private?Long?id;
????private?String?message;
????public?MyNewData(String?message)?{
????????this.id?=?newId();
????????this.message?=?message;
????}
????/**
?????*?自增到最大值會回到最小值(負(fù)值可以作為識別ID)
?????*/
????private?static?long?newId()?{
????????return?ID.getAndIncrement();
????}
????public?Long?getId()?{
????????return?this.id;
????}
}
class?MyResource?{
????private?MyNewData?data;
????private?Object?lock?=?new?Object();
????public?MyNewData?getData(int?timeOut)?{
????????synchronized?(lock)?{
????????????long?timePassed?=?0;
????????????long?begin?=?System.currentTimeMillis();
????????????while?(data?==?null)?{
????????????????try?{
????????????????????if?(timePassed?>?timeOut)?{
????????????????????????break;
????????????????????}
????????????????????long?waitTime?=?timeOut?-?timePassed;
????????????????????lock.wait(waitTime);
????????????????????if?(data?!=?null)?{
????????????????????????break;
????????????????????}
????????????????????timePassed?=?System.currentTimeMillis()?-?begin;
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????????if?(data?==?null)?{
????????????????throw?new?RuntimeException("超時(shí)未獲取到結(jié)果");
????????????}
????????????return?data;
????????}
????}
????public?void?sendData(MyNewData?data)?{
????????synchronized?(lock)?{
????????????this.data?=?data;
????????????lock.notifyAll();
????????}
????}
}
class?MyFutures?{
????private?static?final?Map?FUTURES?=?new?ConcurrentHashMap<>();
????public?static?MyResource?newResource(MyNewData?data)?{
????????final?MyResource?future?=?new?MyResource();
????????FUTURES.put(data.getId(),?future);
????????return?future;
????}
????public?static?MyResource?getResource(Long?id)?{
????????return?FUTURES.remove(id);
????}
????public?static?Set?getIds()? {
????????return?FUTURES.keySet();
????}
}
/**
?*?保護(hù)性暫停實(shí)例四
?*
?*?@author?微信公眾號「JAVA前線」
?*/
public?class?ProtectDesignTest4?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????for?(int?i?=?0;?i?3;?i++)?{
????????????final?int?index?=?i;
????????????new?Thread(()?->?{
????????????????try?{
????????????????????MyNewData?data?=?new?MyNewData("hello_"?+?index);
????????????????????MyResource?resource?=?MyFutures.newResource(data);
????????????????????//?模擬發(fā)送耗時(shí)
????????????????????TimeUnit.SECONDS.sleep(1);
????????????????????resource.sendData(data);
????????????????????System.out.println("生產(chǎn)數(shù)據(jù)data="?+?data);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}).start();
????????}
????????TimeUnit.SECONDS.sleep(1);
????????for?(Long?i?:?MyFutures.getIds())?{
????????????final?long?index?=?i;
????????????new?Thread(()?->?{
????????????????MyResource?resource?=?MyFutures.getResource(index);
????????????????int?timeOut?=?3000;
????????????????System.out.println("接收數(shù)據(jù)data="?+?resource.getData(timeOut));
????????????}).start();
????????}
????}
}
5 DUBBO應(yīng)用實(shí)例
我們順著這一個(gè)鏈路跟蹤代碼:消費(fèi)者發(fā)送請求 > 提供者接收請求并執(zhí)行,并且將運(yùn)行結(jié)果發(fā)送給消費(fèi)者 >消費(fèi)者接收結(jié)果。
(1) 消費(fèi)者發(fā)送請求
消費(fèi)者發(fā)送的數(shù)據(jù)包含請求ID,并且將關(guān)系維護(hù)進(jìn)FUTURES容器
final?class?HeaderExchangeChannel?implements?ExchangeChannel?{
????@Override
????public?ResponseFuture?request(Object?request,?int?timeout)?throws?RemotingException?{
????????if?(closed)?{
????????????throw?new?RemotingException(this.getLocalAddress(),?null,?"Failed?to?send?request?"?+?request?+?",?cause:?The?channel?"?+?this?+?"?is?closed!");
????????}
????????Request?req?=?new?Request();
????????req.setVersion(Version.getProtocolVersion());
????????req.setTwoWay(true);
????????req.setData(request);
????????//?代碼1
????????DefaultFuture?future?=?DefaultFuture.newFuture(channel,?req,?timeout);
????????try?{
????????????channel.send(req);
????????}?catch?(RemotingException?e)?{
????????????future.cancel();
????????????throw?e;
????????}
????????return?future;
????}
}
class?DefaultFuture?implements?ResponseFuture?{
????//?FUTURES容器
????private?static?final?Map?FUTURES?=?new?ConcurrentHashMap<>();
????private?DefaultFuture(Channel?channel,?Request?request,?int?timeout)?{
????????this.channel?=?channel;
????????this.request?=?request;
????????//?請求ID
????????this.id?=?request.getId();
????????this.timeout?=?timeout?>?0???timeout?:?channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY,?Constants.DEFAULT_TIMEOUT);
????????FUTURES.put(id,?this);
????????CHANNELS.put(id,?channel);
????}
}
(2) 提供者接收請求并執(zhí)行,并且將運(yùn)行結(jié)果發(fā)送給消費(fèi)者
public?class?HeaderExchangeHandler?implements?ChannelHandlerDelegate?{
????void?handleRequest(final?ExchangeChannel?channel,?Request?req)?throws?RemotingException?{
????????//?response與請求ID對應(yīng)
????????Response?res?=?new?Response(req.getId(),?req.getVersion());
????????if?(req.isBroken())?{
????????????Object?data?=?req.getData();
????????????String?msg;
????????????if?(data?==?null)?{
????????????????msg?=?null;
????????????}?else?if?(data?instanceof?Throwable)?{
????????????????msg?=?StringUtils.toString((Throwable)?data);
????????????}?else?{
????????????????msg?=?data.toString();
????????????}
????????????res.setErrorMessage("Fail?to?decode?request?due?to:?"?+?msg);
????????????res.setStatus(Response.BAD_REQUEST);
????????????channel.send(res);
????????????return;
????????}
????????//?message?=?RpcInvocation包含方法名、參數(shù)名、參數(shù)值等
????????Object?msg?=?req.getData();
????????try?{
????????????//?DubboProtocol.reply執(zhí)行實(shí)際業(yè)務(wù)方法
????????????CompletableFuture(3) 消費(fèi)者接收結(jié)果
以下DUBBO源碼很好體現(xiàn)了保護(hù)性暫停這個(gè)設(shè)計(jì)模式,說明參看注釋
class?DefaultFuture?implements?ResponseFuture?{
????private?final?Lock?lock?=?new?ReentrantLock();
????private?final?Condition?done?=?lock.newCondition();
????public?static?void?received(Channel?channel,?Response?response)?{
????????try?{
????????????//?取出對應(yīng)的請求對象
????????????DefaultFuture?future?=?FUTURES.remove(response.getId());
????????????if?(future?!=?null)?{
????????????????future.doReceived(response);
????????????}?else?{
????????????????logger.warn("The?timeout?response?finally?returned?at?"
????????????????????????????+?(new?SimpleDateFormat("yyyy-MM-dd?HH:mm:ss.SSS").format(new?Date()))
????????????????????????????+?",?response?"?+?response
????????????????????????????+?(channel?==?null???""?:?",?channel:?"?+?channel.getLocalAddress()
???????????????????????????????+?"?->?"?+?channel.getRemoteAddress()));
????????????}
????????}?finally?{
????????????CHANNELS.remove(response.getId());
????????}
????}
????@Override
????public?Object?get(int?timeout)?throws?RemotingException?{
????????if?(timeout?<=?0)?{
????????????timeout?=?Constants.DEFAULT_TIMEOUT;
????????}
????????if?(!isDone())?{
????????????long?start?=?System.currentTimeMillis();
????????????lock.lock();
????????????try?{
????????????????while?(!isDone())?{
????????????????????//?放棄鎖并使當(dāng)前線程阻塞,直到發(fā)出信號中斷它或者達(dá)到超時(shí)時(shí)間
????????????????????done.await(timeout,?TimeUnit.MILLISECONDS);
????????????????????//?阻塞結(jié)束后再判斷是否完成
????????????????????if?(isDone())?{
????????????????????????break;
????????????????????}
????????????????????//?阻塞結(jié)束后判斷是否超時(shí)
????????????????????if(System.currentTimeMillis()?-?start?>?timeout)?{
????????????????????????break;
????????????????????}
????????????????}
????????????}?catch?(InterruptedException?e)?{
????????????????throw?new?RuntimeException(e);
????????????}?finally?{
????????????????lock.unlock();
????????????}
????????????//?response對象仍然為空則拋出超時(shí)異常
????????????if?(!isDone())?{
????????????????throw?new?TimeoutException(sent?>?0,?channel,?getTimeoutMessage(false));
????????????}
????????}
????????return?returnFromResponse();
????}
????private?void?doReceived(Response?res)?{
????????lock.lock();
????????try?{
????????????//?接收到服務(wù)器響應(yīng)賦值response
????????????response?=?res;
????????????if?(done?!=?null)?{
????????????????//?喚醒get方法中處于等待的代碼塊
????????????????done.signal();
????????????}
????????}?finally?{
????????????lock.unlock();
????????}
????????if?(callback?!=?null)?{
????????????invokeCallback(callback);
????????}
????}
}
6 文章總結(jié)
本文我們從基礎(chǔ)案例介紹保護(hù)性暫停基本概念和實(shí)踐,最終分析DUBBO源碼中保護(hù)性暫停設(shè)計(jì)模式使用場景。我們在設(shè)計(jì)并發(fā)框架時(shí)要注意虛假喚醒問題,以及請求和響應(yīng)關(guān)系對應(yīng)問題,希望本文對大家有所幫助。
JAVA前線?
互聯(lián)網(wǎng)技術(shù)人思考與分享,歡迎長按關(guān)注
