<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Java 高并發(fā)之設(shè)計模式

          共 31611字,需瀏覽 64分鐘

           ·

          2020-07-27 18:34



          作者:大道方圓
          鏈接:cnblogs.com/xdecode/p/9137793.html

          本文主要講解幾種常見并行模式, 具體目錄結(jié)構(gòu)如下圖.

          單例

          單例是最常見的一種設(shè)計模式, 一般用于全局對象管理, 比如xml配置讀寫之類的.

          一般分為懶漢式, 餓漢式.

          我公眾號 Java 相關(guān)的文章整理成了 PDF ,關(guān)注微信公眾號 Java后端 回復 666 下載。

          懶漢式: 方法上加synchronized

          1 public static synchronized Singleton getInstance() {
          2          if (single == null) {
          3              single = new Singleton();
          4          }
          5         return single;
          6 }

          這種方式, 由于每次獲取示例都要獲取鎖, 不推薦使用, 性能較差

          懶漢式: 使用雙檢鎖 + volatile

          1     private volatile Singleton singleton = null;
           2     public static Singleton getInstance() {
           3         if (singleton == null) {
           4             synchronized (Singleton.class) {
           5                 if (singleton == null) {
           6                     singleton = new Singleton();
           7                 }
           8             }
           9         }
          10         return singleton;
          11     }

          本方式是對直接在方法上加鎖的一個優(yōu)化, 好處在于只有第一次初始化獲取了鎖.

          后續(xù)調(diào)用getInstance已經(jīng)是無鎖狀態(tài). 只是寫法上稍微繁瑣點.

          至于為什么要volatile關(guān)鍵字, 主要涉及到j(luò)dk指令重排, 詳見之前的博文: Java內(nèi)存模型與指令重排

          懶漢式: 使用靜態(tài)內(nèi)部類

          1 public class Singleton {
          2     private static class LazyHolder {
          3        private static final Singleton INSTANCE = new Singleton();
          4     }
          5     private Singleton (){}
          6     public static final Singleton getInstance() {
          7        return LazyHolder.INSTANCE;
          8     }
          9 }

          該方式既解決了同步問題, 也解決了寫法繁瑣問題. 推薦使用改寫法.

          缺點在于無法響應(yīng)事件來重新初始化INSTANCE.

          餓漢式

          1 public class Singleton1 {
          2     private Singleton1() {}
          3     private static final Singleton1 single = new Singleton1();
          4     public static Singleton1 getInstance() {
          5         return single;
          6     }
          7 }

          缺點在于對象在一開始就直接初始化了.

          Future模式

          該模式的核心思想是異步調(diào)用. 有點類似于異步的ajax請求.

          當調(diào)用某個方法時, 可能該方法耗時較久, 而在主函數(shù)中也不急于立刻獲取結(jié)果.

          因此可以讓調(diào)用者立刻返回一個憑證, 該方法放到另外線程執(zhí)行,

          后續(xù)主函數(shù)拿憑證再去獲取方法的執(zhí)行結(jié)果即可, 其結(jié)構(gòu)圖如下

          jdk中內(nèi)置了Future模式的支持, 其接口如下:

          通過FutureTask實現(xiàn)

          注意其中兩個耗時操作.

          • 如果doOtherThing耗時2s, 則整個函數(shù)耗時2s左右.
          • 如果doOtherThing耗時0.2s, 則整個函數(shù)耗時取決于RealData.costTime, 即1s左右結(jié)束.
          1 public class FutureDemo1 {
           2
           3     public static void main(String[] args) throws InterruptedException, ExecutionException {
           4         FutureTask future = new FutureTask ( new Callable () {
            5              @Override
            6              public String call() throws Exception {
            7                  return  new RealData().costTime();
            8             }
            9         });
          10         ExecutorService service = Executors.newCachedThreadPool();
          11         service.submit(future);
          12
          13         System.out.println( "RealData方法調(diào)用完畢");
          14          // 模擬主函數(shù)中其他耗時操作
          15         doOtherThing();
          16          // 獲取RealData方法的結(jié)果
          17         System.out.println(future.get());
          18     }
          19
          20      private static void doOtherThing() throws InterruptedException {
          21         Thread.sleep( 2000L);
          22     }
          23 }
          24
          25  class RealData {
          26
          27      public String costTime() {
          28          try {
          29              // 模擬RealData耗時操作
          30             Thread.sleep( 1000L);
          31              return  "result";
          32         } catch (InterruptedException e) {
          33             e.printStackTrace();
          34         }
          35          return  "exception";
          36     }
          37
          38 }


          通過Future實現(xiàn)
          與上述FutureTask不同的是, RealData需要實現(xiàn)Callable接口
          1 public class FutureDemo2 {
           2
           3     public static void main(String[] args) throws InterruptedException, ExecutionException {
           4         ExecutorService service = Executors.newCachedThreadPool();
           5         Future future = service.submit( new RealData2());
            6
            7         System.out.println( "RealData2方法調(diào)用完畢");
            8          // 模擬主函數(shù)中其他耗時操作
            9         doOtherThing();
          10          // 獲取RealData2方法的結(jié)果
          11         System.out.println(future.get());
          12     }
          13
          14      private static void doOtherThing() throws InterruptedException {
          15         Thread.sleep( 2000L);
          16     }
          17 }
          18
          19  class RealData2 implements Callable<String>{
          20
          21      public String costTime() {
          22          try {
          23              // 模擬RealData耗時操作
          24             Thread.sleep( 1000L);
          25              return  "result";
          26         } catch (InterruptedException e) {
          27             e.printStackTrace();
          28         }
          29          return  "exception";
          30     }
          31
          32      @Override
          33      public String call() throws Exception {
          34          return costTime();
          35     }
          36 }
          另外Future本身還提供了一些額外的簡單控制功能, 其API如下
          1     // 取消任務(wù)
           2     boolean cancel(boolean mayInterruptIfRunning);
           3     // 是否已經(jīng)取消
           4     boolean isCancelled();
           5     // 是否已經(jīng)完成
           6     boolean isDone();
           7     // 取得返回對象
           8     V get() throws InterruptedException, ExecutionException;
           9     // 取得返回對象, 并可以設(shè)置超時時間
          10     V get(long timeout, TimeUnit unit)
          11 throws InterruptedException, ExecutionException, TimeoutException
          ;
          生產(chǎn)消費者模式
          生產(chǎn)者-消費者模式是一個經(jīng)典的多線程設(shè)計模式. 它為多線程間的協(xié)作提供了良好的解決方案。
          在生產(chǎn)者-消費者模式中,通常由兩類線程,即若干個生產(chǎn)者線程和若干個消費者線程。
          生產(chǎn)者線程負責提交用戶請求,消費者線程則負責具體處理生產(chǎn)者提交的任務(wù)。
          生產(chǎn)者和消費者之間則通過共享內(nèi)存緩沖區(qū)進行通信, 其結(jié)構(gòu)圖如下

          PCData為我們需要處理的元數(shù)據(jù)模型, 生產(chǎn)者構(gòu)建PCData, 并放入緩沖隊列.
          消費者從緩沖隊列中獲取數(shù)據(jù), 并執(zhí)行計算.
          生產(chǎn)者核心代碼
          1         while(isRunning) {
           2             Thread.sleep(r.nextInt(SLEEP_TIME));
           3             data = new PCData(count.incrementAndGet);
           4             // 構(gòu)造任務(wù)數(shù)據(jù)
           5             System.out.println(data + " is put into queue");
           6             if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
           7                 // 將數(shù)據(jù)放入隊列緩沖區(qū)中
           8                 System.out.println("faild to put data : " + data);
           9             }
          10         }
          消費者核心代碼
          1         while (true) {
           2             PCData data = queue.take();
           3             // 提取任務(wù)
           4             if (data != null) {
           5                 // 獲取數(shù)據(jù), 執(zhí)行計算操作
           6                 int re = data.getData() * 10;
           7                 System.out.println("after cal, value is : " + re);
           8                 Thread.sleep(r.nextInt(SLEEP_TIME));
           9             }
          10         }
          生產(chǎn)消費者模式可以有效對數(shù)據(jù)解耦, 優(yōu)化系統(tǒng)結(jié)構(gòu).
          降低生產(chǎn)者和消費者線程相互之間的依賴與性能要求.
          一般使用BlockingQueue作為數(shù)據(jù)緩沖隊列, 他是通過鎖和阻塞來實現(xiàn)數(shù)據(jù)之間的同步, 
          如果對緩沖隊列有性能要求, 則可以使用基于CAS無鎖設(shè)計的ConcurrentLinkedQueue.

          分而治之

          嚴格來講, 分而治之不算一種模式, 而是一種思想.
          它可以將一個大任務(wù)拆解為若干個小任務(wù)并行執(zhí)行, 提高系統(tǒng)吞吐量.
          我們主要講兩個場景, Master-Worker模式, ForkJoin線程池.

          Master-Worker模式

          該模式核心思想是系統(tǒng)由兩類進行協(xié)助工作: Master進程, Worker進程.
          Master負責接收與分配任務(wù), Worker負責處理任務(wù). 當各個Worker處理完成后, 
          將結(jié)果返回給Master進行歸納與總結(jié).

          假設(shè)一個場景, 需要計算100個任務(wù), 并對結(jié)果求和, Master持有10個子進程.
          Master代碼
          1 public class MasterDemo {
           2     // 盛裝任務(wù)的集合
           3     private ConcurrentLinkedQueue workQueue = new ConcurrentLinkedQueue ();
            4      // 所有worker
            5      private HashMap workers = new HashMap<>();
            6      // 每一個worker并行執(zhí)行任務(wù)的結(jié)果
            7      private ConcurrentHashMap resultMap = new ConcurrentHashMap<>();
            8
            9      public MasterDemo(WorkerDemo worker, int workerCount) {
          10          // 每個worker對象都需要持有queue的引用, 用于領(lǐng)任務(wù)與提交結(jié)果
          11         worker.setResultMap(resultMap);
          12         worker.setWorkQueue(workQueue);
          13          for ( int i = 0; i < workerCount; i++) {
          14             workers.put( "子節(jié)點: " + i, new Thread(worker));
          15         }
          16     }
          17
          18      // 提交任務(wù)
          19      public void submit(TaskDemo task) {
          20         workQueue. add(task);
          21     }
          22
          23      // 啟動所有的子任務(wù)
          24      public void execute(){
          25          for (Map.Entry entry : workers.entrySet()) {
          26             entry.getValue().start();
          27         }
          28     }
          29
          30      // 判斷所有的任務(wù)是否執(zhí)行結(jié)束
          31      public boolean isComplete() {
          32          for (Map.Entry entry : workers.entrySet()) {
          33              if (entry.getValue().getState() != Thread.State.TERMINATED) {
          34                  return  false;
          35             }
          36         }
          37
          38          return  true;
          39     }
          40
          41      // 獲取最終匯總的結(jié)果
          42      public int getResult() {
          43          int result = 0;
          44          for (Map.Entry entry : resultMap.entrySet()) {
          45             result += Integer.parseInt(entry.getValue().toString());
          46         }
          47
          48          return result;
          49     }
          50
          51 }
          Worker代碼
          1 public class WorkerDemo implements Runnable{
           2
           3     private ConcurrentLinkedQueue workQueue;
            4      private ConcurrentHashMap resultMap;
            5
            6      @Override
            7      public void run() {
            8          while ( true) {
            9             TaskDemo input = this.workQueue.poll();
          10              // 所有任務(wù)已經(jīng)執(zhí)行完畢
          11              if (input == null) {
          12                  break;
          13             }
          14              // 模擬對task進行處理, 返回結(jié)果
          15              int result = input.getPrice();
          16              this.resultMap.put(input.getId() + "", result);
          17             System.out.println( "任務(wù)執(zhí)行完畢, 當前線程: " + Thread.currentThread().getName());
          18         }
          19     }
          20
          21      public ConcurrentLinkedQueue getWorkQueue ()  {
          22          return workQueue;
          23     }
          24
          25      public void setWorkQueue(ConcurrentLinkedQueue workQueue)  {
          26          this.workQueue = workQueue;
          27     }
          28
          29      public ConcurrentHashMap getResultMap ()  {
          30          return resultMap;
          31     }
          32
          33      public void setResultMap(ConcurrentHashMap resultMap)  {
          34          this.resultMap = resultMap;
          35     }
          36 }
          1 public class TaskDemo {
           2
           3     private int id;
           4     private String name;
           5     private int price;
           6
           7     public int getId() {
           8         return id;
           9     }
          10
          11     public void setId(int id) {
          12         this.id = id;
          13     }
          14
          15     public String getName() {
          16         return name;
          17     }
          18
          19     public void setName(String name) {
          20         this.name = name;
          21     }
          22
          23     public int getPrice() {
          24         return price;
          25     }
          26
          27     public void setPrice(int price) {
          28         this.price = price;
          29     }
          30 }
          主函數(shù)測試
          1         MasterDemo master = new MasterDemo(new WorkerDemo(), 10);
           2         for (int i = 0; i < 100; i++) {
           3             TaskDemo task = new TaskDemo();
           4             task.setId(i);
           5             task.setName("任務(wù)" + i);
           6             task.setPrice(new Random().nextInt(10000));
           7             master.submit(task);
           8         }
           9
          10         master.execute();
          11
          12         while (true) {
          13             if (master.isComplete()) {
          14                 System.out.println("執(zhí)行的結(jié)果為: " + master.getResult());
          15                 break;
          16             }
          17         }


          ForkJoin線程池

          該線程池是jdk7之后引入的一個并行執(zhí)行任務(wù)的框架, 其核心思想也是將任務(wù)分割為子任務(wù), 
          有可能子任務(wù)還是很大, 還需要進一步拆解, 最終得到足夠小的任務(wù).
          將分割出來的子任務(wù)放入雙端隊列中, 然后幾個啟動線程從雙端隊列中獲取任務(wù)執(zhí)行.
          子任務(wù)執(zhí)行的結(jié)果放到一個隊列里, 另起線程從隊列中獲取數(shù)據(jù), 合并結(jié)果.

          假設(shè)我們的場景需要計算從0到20000000L的累加求和. CountTask繼承自RecursiveTask, 可以攜帶返回值.
          每次分解大任務(wù), 簡單的將任務(wù)劃分為100個等規(guī)模的小任務(wù), 并使用fork()提交子任務(wù).
          在子任務(wù)中通過THRESHOLD設(shè)置子任務(wù)分解的閾值, 如果當前需要求和的總數(shù)大于THRESHOLD, 則子任務(wù)需要再次分解,
          如果子任務(wù)可以直接執(zhí)行, 則進行求和操作, 返回結(jié)果. 最終等待所有的子任務(wù)執(zhí)行完畢, 對所有結(jié)果求和.
          1 public class CountTask extends RecursiveTask<Long>{
           2     // 任務(wù)分解的閾值
           3     private static final int THRESHOLD = 10000;
           4     private long start;
           5     private long end;
           6
           7
           8     public CountTask(long start, long end) {
           9         this.start = start;
          10         this.end = end;
          11     }
          12
          13     public Long compute() {
          14         long sum = 0;
          15         boolean canCompute = (end - start) < THRESHOLD;
          16         if (canCompute) {
          17             for (long i = start; i <= end; i++) {
          18                 sum += i;
          19             }
          20         } else {
          21             // 分成100個小任務(wù)
          22             long step = (start + end) / 100;
          23             ArrayList subTasks = new ArrayList ();
          24              long pos = start;
          25              for ( int i = 0; i < 100; i++) {
          26                  long lastOne = pos + step;
          27                  if (lastOne > end) {
          28                     lastOne = end;
          29                 }
          30                 CountTask subTask = new CountTask(pos, lastOne);
          31                 pos += step + 1;
          32                  // 將子任務(wù)推向線程池
          33                 subTasks.add(subTask);
          34                 subTask.fork();
          35             }
          36
          37              for (CountTask task : subTasks) {
          38                  // 對結(jié)果進行join
          39                 sum += task.join();
          40             }
          41         }
          42          return sum;
          43     }
          44
          45      public static void main(String[] args) throws ExecutionException, InterruptedException {
          46         ForkJoinPool pool = new ForkJoinPool();
          47          // 累加求和 0 -> 20000000L
          48         CountTask task = new CountTask( 0, 20000000L);
          49         ForkJoinTask result = pool.submit(task);
          50         System.out.println( "sum result : " + result.get());
          51     }
          52 }
          ForkJoin線程池使用一個無鎖的棧來管理空閑線程, 如果一個工作線程暫時取不到可用的任務(wù), 則可能被掛起.
          掛起的線程將被壓入由線程池維護的棧中, 待將來有任務(wù)可用時, 再從棧中喚醒這些線程. 

          最后免費給大家分享50個Java項目實戰(zhàn)資料,涵蓋入門、進階各個階段學習內(nèi)容,可以說非常全面了。大部分視頻還附帶源碼,學起來還不費勁!


          附上截圖。(下面有下載方式)。


          項目領(lǐng)取方式:

          掃描下方公眾號回復:50

          可獲取下載鏈接

          ???

                
          ?長按上方二維碼 2 秒
          回復「50」即可獲取資料


          點贊是最大的支持 

          瀏覽 41
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚韩一区二区三区 | 国产高清日韩无码 | 日本黄色视频免费在线 | 国产性爱无码视频 | 亚洲精品国产精品国自产网站 |