Java高并發(fā)之設(shè)計(jì)模式,設(shè)計(jì)思想
點(diǎn)擊上方“碼農(nóng)突圍”,馬上關(guān)注
這里是碼農(nóng)充電第一站,回復(fù)“666”,獲取一份專屬大禮包 真愛,請(qǐng)?jiān)O(shè)置“星標(biāo)”或點(diǎn)個(gè)“在看


單例
懶漢式: 方法上加synchronized
public?static?synchronized?Singleton?getInstance()?{??
?????????if?(single?==?null)?{????
?????????????single?=?new?Singleton();??
?????????}????
????????return?single;??
}??
懶漢式: 使用雙檢鎖 + volatile
private?volatile?Singleton?singleton?=?null;
????public?static?Singleton?getInstance()?{
????????if?(singleton?==?null)?{
????????????synchronized?(Singleton.class)?{
????????????????if?(singleton?==?null)?{
????????????????????singleton?=?new?Singleton();
????????????????}
????????????}
????????}
????????return?singleton;
????}
懶漢式: 使用靜態(tài)內(nèi)部類
public?class?Singleton?{
????private?static?class?LazyHolder?{
???????private?static?final?Singleton?INSTANCE?=?new?Singleton();
????}
????private?Singleton?(){}
????public?static?final?Singleton?getInstance()?{
???????return?LazyHolder.INSTANCE;
????}
}
餓漢式
public?class?Singleton1?{
????private?Singleton1()?{}
????private?static?final?Singleton1?single?=?new?Singleton1();
????public?static?Singleton1?getInstance()?{
????????return?single;
????}
}
Future模式


通過FutureTask實(shí)現(xiàn)
如果doOtherThing耗時(shí)2s, 則整個(gè)函數(shù)耗時(shí)2s左右.
如果doOtherThing耗時(shí)0.2s, 則整個(gè)函數(shù)耗時(shí)取決于RealData.costTime, 即1s左右結(jié)束.
public?class?FutureDemo1?{
????public?static?void?main(String[]?args)?throws?InterruptedException,?ExecutionException?{
????????FutureTask?future?=?new?FutureTask (new?Callable ()?{
????????????@Override
????????????public?String?call()?throws?Exception?{
????????????????return?new?RealData().costTime();
????????????}
????????});
????????ExecutorService?service?=?Executors.newCachedThreadPool();
????????service.submit(future);
????????System.out.println("RealData方法調(diào)用完畢");
????????//?模擬主函數(shù)中其他耗時(shí)操作
????????doOtherThing();
????????//?獲取RealData方法的結(jié)果
????????System.out.println(future.get());
????}
????private?static?void?doOtherThing()?throws?InterruptedException?{
????????Thread.sleep(2000L);
????}
}
class?RealData?{
????public?String?costTime()?{
????????try?{
????????????//?模擬RealData耗時(shí)操作
????????????Thread.sleep(1000L);
????????????return?"result";
????????}?catch?(InterruptedException?e)?{
????????????e.printStackTrace();
????????}
????????return?"exception";
????}
}
通過Future實(shí)現(xiàn)
public?class?FutureDemo2?{
????public?static?void?main(String[]?args)?throws?InterruptedException,?ExecutionException?{
????????ExecutorService?service?=?Executors.newCachedThreadPool();
????????Future?future?=?service.submit(new?RealData2());
????????System.out.println("RealData2方法調(diào)用完畢");
????????//?模擬主函數(shù)中其他耗時(shí)操作
????????doOtherThing();
????????//?獲取RealData2方法的結(jié)果
????????System.out.println(future.get());
????}
????private?static?void?doOtherThing()?throws?InterruptedException?{
????????Thread.sleep(2000L);
????}
}
class?RealData2?implements?Callable<String>{
????public?String?costTime()?{
????????try?{
????????????//?模擬RealData耗時(shí)操作
????????????Thread.sleep(1000L);
????????????return?"result";
????????}?catch?(InterruptedException?e)?{
????????????e.printStackTrace();
????????}
????????return?"exception";
????}
????@Override
????public?String?call()?throws?Exception?{
????????return?costTime();
????}
}
//?取消任務(wù)
????boolean?cancel(boolean?mayInterruptIfRunning);
????//?是否已經(jīng)取消
????boolean?isCancelled();
????//?是否已經(jīng)完成
????boolean?isDone();
????//?取得返回對(duì)象
????V?get()?throws?InterruptedException,?ExecutionException;
????//?取得返回對(duì)象,?并可以設(shè)置超時(shí)時(shí)間
????V?get(long?timeout,?TimeUnit?unit)
????????????throws?InterruptedException,?ExecutionException,?TimeoutException;
生產(chǎn)消費(fèi)者模式

生產(chǎn)者核心代碼
while(isRunning)?{
????????????Thread.sleep(r.nextInt(SLEEP_TIME));
????????????data?=?new?PCData(count.incrementAndGet);
????????????//?構(gòu)造任務(wù)數(shù)據(jù)
????????????System.out.println(data?+?"?is?put?into?queue");
????????????if?(!queue.offer(data,?2,?TimeUnit.SECONDS))?{
????????????????//?將數(shù)據(jù)放入隊(duì)列緩沖區(qū)中
????????????????System.out.println("faild?to?put?data?:?"?+?data);
????????????}
????????}
消費(fèi)者核心代碼
while?(true)?{
????????????PCData?data?=?queue.take();
????????????//?提取任務(wù)
????????????if?(data?!=?null)?{
????????????????//?獲取數(shù)據(jù),?執(zhí)行計(jì)算操作
????????????????int?re?=?data.getData()?*?10;
????????????????System.out.println("after?cal,?value?is?:?"?+?re);
????????????????Thread.sleep(r.nextInt(SLEEP_TIME));
????????????}
????????}
分而治之
Master-Worker模式

public?class?MasterDemo?{
????//?盛裝任務(wù)的集合
????private?ConcurrentLinkedQueue?workQueue?=?new?ConcurrentLinkedQueue ();
????//?所有worker
????private?HashMap?workers?=?new?HashMap<>();
????//?每一個(gè)worker并行執(zhí)行任務(wù)的結(jié)果
????private?ConcurrentHashMap?resultMap?=?new?ConcurrentHashMap<>();
????public?MasterDemo(WorkerDemo?worker,?int?workerCount)?{
????????//?每個(gè)worker對(duì)象都需要持有queue的引用,?用于領(lǐng)任務(wù)與提交結(jié)果
????????worker.setResultMap(resultMap);
????????worker.setWorkQueue(workQueue);
????????for?(int?i?=?0;?i?????????????workers.put("子節(jié)點(diǎn):?"?+?i,?new?Thread(worker));
????????}
????}
????//?提交任務(wù)
????public?void?submit(TaskDemo?task)?{
????????workQueue.add(task);
????}
????//?啟動(dòng)所有的子任務(wù)
????public?void?execute(){
????????for?(Map.Entry?entry?:?workers.entrySet())?{
????????????entry.getValue().start();
????????}
????}
????//?判斷所有的任務(wù)是否執(zhí)行結(jié)束
????public?boolean?isComplete()?{
????????for?(Map.Entry?entry?:?workers.entrySet())?{
????????????if?(entry.getValue().getState()?!=?Thread.State.TERMINATED)?{
????????????????return?false;
????????????}
????????}
????????return?true;
????}
????//?獲取最終匯總的結(jié)果
????public?int?getResult()?{
????????int?result?=?0;
????????for?(Map.Entry?entry?:?resultMap.entrySet())?{
????????????result?+=?Integer.parseInt(entry.getValue().toString());
????????}
????????return?result;
????}
}
public?class?WorkerDemo?implements?Runnable{
????private?ConcurrentLinkedQueue?workQueue;
????private?ConcurrentHashMap?resultMap;
????@Override
????public?void?run()?{
????????while?(true)?{
????????????TaskDemo?input?=?this.workQueue.poll();
????????????//?所有任務(wù)已經(jīng)執(zhí)行完畢
????????????if?(input?==?null)?{
????????????????break;
????????????}
????????????//?模擬對(duì)task進(jìn)行處理,?返回結(jié)果
????????????int?result?=?input.getPrice();
????????????this.resultMap.put(input.getId()?+?"",?result);
????????????System.out.println("任務(wù)執(zhí)行完畢,?當(dāng)前線程:?"?+?Thread.currentThread().getName());
????????}
????}
????public?ConcurrentLinkedQueue?getWorkQueue()? {
????????return?workQueue;
????}
????public?void?setWorkQueue(ConcurrentLinkedQueue?workQueue) ?{
????????this.workQueue?=?workQueue;
????}
????public?ConcurrentHashMap?getResultMap()? {
????????return?resultMap;
????}
????public?void?setResultMap(ConcurrentHashMap?resultMap) ?{
????????this.resultMap?=?resultMap;
????}
}
public?class?TaskDemo?{
????private?int?id;
????private?String?name;
????private?int?price;
????public?int?getId()?{
????????return?id;
????}
????public?void?setId(int?id)?{
????????this.id?=?id;
????}
????public?String?getName()?{
????????return?name;
????}
????public?void?setName(String?name)?{
????????this.name?=?name;
????}
????public?int?getPrice()?{
????????return?price;
????}
????public?void?setPrice(int?price)?{
????????this.price?=?price;
????}
}
MasterDemo?master?=?new?MasterDemo(new?WorkerDemo(),?10);
????????for?(int?i?=?0;?i?100;?i++)?{
????????????TaskDemo?task?=?new?TaskDemo();
????????????task.setId(i);
????????????task.setName("任務(wù)"?+?i);
????????????task.setPrice(new?Random().nextInt(10000));
????????????master.submit(task);
????????}
????????master.execute();
????????while?(true)?{
????????????if?(master.isComplete())?{
????????????????System.out.println("執(zhí)行的結(jié)果為:?"?+?master.getResult());
????????????????break;
????????????}
????????}
ForkJoin線程池

public?class?CountTask?extends?RecursiveTask<Long>{
????//?任務(wù)分解的閾值
????private?static?final?int?THRESHOLD?=?10000;
????private?long?start;
????private?long?end;
????public?CountTask(long?start,?long?end)?{
????????this.start?=?start;
????????this.end?=?end;
????}
????public?Long?compute()?{
????????long?sum?=?0;
????????boolean?canCompute?=?(end?-?start)?????????if?(canCompute)?{
????????????for?(long?i?=?start;?i?<=?end;?i++)?{
????????????????sum?+=?i;
????????????}
????????}?else?{
????????????//?分成100個(gè)小任務(wù)
????????????long?step?=?(start?+?end)?/?100;
????????????ArrayList?subTasks?=?new?ArrayList ();
????????????long?pos?=?start;
????????????for?(int?i?=?0;?i?100;?i++)?{
????????????????long?lastOne?=?pos?+?step;
????????????????if?(lastOne?>?end)?{
????????????????????lastOne?=?end;
????????????????}
????????????????CountTask?subTask?=?new?CountTask(pos,?lastOne);
????????????????pos?+=?step?+?1;
????????????????//?將子任務(wù)推向線程池
????????????????subTasks.add(subTask);
????????????????subTask.fork();
????????????}
????????????for?(CountTask?task?:?subTasks)?{
????????????????//?對(duì)結(jié)果進(jìn)行join
????????????????sum?+=?task.join();
????????????}
????????}
????????return?sum;
????}
????public?static?void?main(String[]?args)?throws?ExecutionException,?InterruptedException?{
????????ForkJoinPool?pool?=?new?ForkJoinPool();
????????//?累加求和?0?->?20000000L
????????CountTask?task?=?new?CountTask(0,?20000000L);
????????ForkJoinTask?result?=?pool.submit(task);
????????System.out.println("sum?result?:?"?+?result.get());
????}
}
-?END - 最近熱文
? ?武大94年博士年薪201萬入職華為!學(xué)霸日程表曝光,簡直降維打擊! ? ?我在美團(tuán)的八年 ? ?為什么下載小電影時(shí),經(jīng)常會(huì)卡在99%? ? ?朝陽群眾盯上了望京A座?舉報(bào)996造成交通嚴(yán)重堵塞
評(píng)論
圖片
表情
