<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Java多線程之Semaphore信號(hào)量

          共 5411字,需瀏覽 11分鐘

           ·

          2021-11-30 14:22

          這里就JUC包中的Semaphore類做相關(guān)介紹

          abstract.jpeg

          概述

          JUC包中的Semaphore信號(hào)量作為一個(gè)并發(fā)工具類。其基本思想很簡(jiǎn)單,對(duì)于一個(gè)信號(hào)量實(shí)例而言,其含有指定數(shù)量的許可。每當(dāng)訪問資源前,需先向其申請(qǐng)?jiān)S可。并在處理完畢后釋放許可,以供后續(xù)申請(qǐng)。其實(shí),這個(gè)使用方式就很像現(xiàn)實(shí)世界的停車場(chǎng),即停車場(chǎng)有空余車位,車才可以進(jìn)車;否則要么等待要么離開(尋找下一個(gè)停車場(chǎng))。當(dāng)車從停車場(chǎng)的車位駛離時(shí),則會(huì)將相應(yīng)的車位就會(huì)空余出來。在整個(gè)過程停車場(chǎng)的車位資源是有限的固定的。常見的使用場(chǎng)景是對(duì)業(yè)務(wù)所使用的線程數(shù)進(jìn)行控制,即所謂基于線程數(shù)的限流方式。其常用方法及功能如下所示

          //?創(chuàng)建一個(gè)指定許可數(shù)的非公平信號(hào)量
          public?Semaphore(int?permits);

          //?創(chuàng)建一個(gè)指定許可數(shù)的公平/非公平信號(hào)量
          public?Semaphore(int?permits,?boolean?fair);

          //?釋放一個(gè)許可
          public?void?release();

          //?釋放指定數(shù)量的許可
          public?void?release(int?permits);

          //?當(dāng)前剩余可用的許可數(shù)量
          public?int?availablePermits();

          /***************************?獲取許可?******************************/

          //?阻塞等待,直到獲取一個(gè)許可
          public?void?acquire()?throws?InterruptedException;

          //?阻塞等待,直到獲取全部所需數(shù)量的許可
          public?void?acquire(int?permits)?throws?InterruptedException;

          //?阻塞等待(忽略InterruptedException異常),直到獲取一個(gè)許可
          public?void?acquireUninterruptibly();

          //?阻塞等待(忽略InterruptedException異常),直到獲取全部所需數(shù)量的許可
          public?void?acquireUninterruptibly(int?permits);

          //?非阻塞式獲取一個(gè)許可,?ture:?獲取成功;?false:?獲取失敗
          public?boolean?tryAcquire();

          //?非阻塞式獲取全部所需數(shù)量的許可,?ture:?獲取成功;?false:?獲取失敗
          public?boolean?tryAcquire(int?permits);

          //?支持超時(shí)機(jī)制的tryAcquire方法,?獲取一個(gè)許可,?ture:?獲取成功;?false:?獲取失敗
          public?boolean?tryAcquire(long?timeout,?TimeUnit?unit)?throws?InterruptedException;

          //?支持超時(shí)機(jī)制的tryAcquire方法,?獲取全部所需數(shù)量的許可,?ture:?獲取成功;?false:?獲取失敗
          public?boolean?tryAcquire(int?permits,?long?timeout,?TimeUnit?unit)?throws?InterruptedException;

          //?一次性獲取所有剩余可用的許可,?返回成功獲取的許可數(shù)
          public?int?drainPermits();

          /******************************************************************/

          可以看到,對(duì)于信號(hào)量而言,其支持公平和非公平兩種類型。默認(rèn)為非公平的。值得一提的是,對(duì)于tryAcquire()方法而言,其是非阻塞的。并且一旦存在可用的許可,會(huì)立即分配給它。不論是否存在其他正在等待許可的線程。即使當(dāng)前這個(gè)信號(hào)量實(shí)例是公平的,換言之tryAcquire()方法會(huì)破壞公平信號(hào)量實(shí)例的公平性。如果既期望使用非阻塞方式,又期望不破壞公平信號(hào)量的公平性,可以使用它的超時(shí)機(jī)制版本,同時(shí)將超時(shí)時(shí)間設(shè)為0。即 tryAcquire(0, TimeUnit.SECONDS) 。方法tryAcquire(int permits)同理,此處不再贅述

          基本實(shí)踐

          這里通過一個(gè)簡(jiǎn)單的實(shí)例,來進(jìn)行展示其基本的使用流程

          public?class?SemaphoreTest?{

          ????private?static?DateTimeFormatter?formatter?=?DateTimeFormatter.ofPattern("HH:mm:ss");

          ????//?系統(tǒng)最大的并發(fā)處理量
          ????private?static?Integer?maxLimit?=?5;

          ????@Test
          ????public?void?test1()?{
          ????????System.out.println("----------------------?系統(tǒng)上線?----------------------");
          ????????Semaphore?semaphore?=?new?Semaphore(maxLimit,?true);
          ????????ExecutorService?threadPool?=?Executors.newFixedThreadPool(10);

          ????????IntStream.rangeClosed(1,8)
          ????????????.mapToObj(?num?->?new?UserReq("用戶#"+num,?semaphore)?)
          ????????????.forEach(?threadPool::execute?);

          ????????//?主線程等待所有任務(wù)執(zhí)行完畢
          ????????try{?Thread.sleep(?120*1000?);?}?catch?(Exception?e)?{}
          ????????System.out.println("----------------------?系統(tǒng)下線?----------------------");
          ????}
          ????

          ????/**
          ?????*?打印信息
          ?????*?@param?msg
          ?????*/

          ????private?static?void?info(String?msg)?{
          ????????String?time?=?formatter.format(LocalTime.now());
          ????????String?log?=?"["+time+"]?"+?msg;
          ????????System.out.println(log);
          ????}

          ????@AllArgsConstructor
          ????private?static?class?UserReq?implements?Runnable{

          ????????private?String?name;

          ????????private?Semaphore?semaphore;

          ????????@Override
          ????????public?void?run()?{
          ????????????//?模擬用戶不定時(shí)發(fā)起請(qǐng)求
          ????????????try{?Thread.sleep(RandomUtils.nextLong(500,?2000));?}?catch?(Exception?e)?{}
          ????????????String?msg?=?name?+?":?發(fā)起請(qǐng)求,?系統(tǒng)可用資源數(shù):?"?+?semaphore.availablePermits();
          ????????????info(msg);

          ????????????//?阻塞等待,直到獲取許可
          ????????????try?{
          ????????????????semaphore.acquire();
          ????????????}catch?(InterruptedException?e)?{
          ????????????????System.out.println(?"Happen?Exception:?"?+?e.getMessage());
          ????????????}

          ????????????info(name?+?":?系統(tǒng)開始處理請(qǐng)求");
          ????????????//?模擬業(yè)務(wù)耗時(shí)
          ????????????try{?Thread.sleep(RandomUtils.nextInt(5,?20)*1000);?}?catch?(Exception?e)?{}

          ????????????//?用戶請(qǐng)求處理完畢,釋放許可
          ????????????semaphore.release();
          ????????????info(name?+?":?系統(tǒng)處理完畢");
          ????????}
          ????}
          }

          測(cè)試結(jié)果如下,符合預(yù)期

          figure 1.jpeg

          實(shí)現(xiàn)原理

          構(gòu)造器

          Semaphore信號(hào)量類的實(shí)現(xiàn)過程同樣依賴于AQS。具體地,其是對(duì)AQS中共享鎖的使用。在構(gòu)建Semaphore實(shí)例過程時(shí),一方面,通過sync變量持有AQS的實(shí)現(xiàn)類Sync,同時(shí)按公平性與否進(jìn)一步地可細(xì)分為NonfairSync、FairSync;另一方面,通過AQS的state字段來存儲(chǔ)許可的數(shù)量

          public?class?Semaphore?implements?java.io.Serializable?{

          ????private?final?Sync?sync;

          ????public?Semaphore(int?permits)?{
          ????????sync?=?new?NonfairSync(permits);
          ????}

          ????public?Semaphore(int?permits,?boolean?fair)?{
          ????????sync?=?fair???new?FairSync(permits)?:?new?NonfairSync(permits);
          ????}

          ????abstract?static?class?Sync?extends?AbstractQueuedSynchronizer?{
          ????????Sync(int?permits)?{
          ????????????setState(permits);
          ????????}
          ????}

          ????static?final?class?NonfairSync?extends?Sync?{
          ????????NonfairSync(int?permits)?{
          ????????????super(permits);
          ????????}
          ????}

          ????static?final?class?FairSync?extends?Sync?{
          ????????FairSync(int?permits)?{
          ????????????super(permits);
          ????????}
          ????}

          }

          acquire方法

          首先來看Semaphore的acquire()方法。其委托sync調(diào)用AQS的acquireSharedInterruptibly方法。而在AQS中通過調(diào)用tryAcquireShared方法判斷是否需要阻塞調(diào)用線程。具體地,在Semaphore的NonfairSync、FairSync內(nèi)部類分別實(shí)現(xiàn)了該tryAcquireShared方法的兩個(gè)版本:非公平、公平??梢钥吹絻煞N實(shí)現(xiàn)基本一致。tryAcquireShared如果返回負(fù)值,則說明當(dāng)前許可數(shù)不夠,當(dāng)前線程需要進(jìn)入AQS阻塞隊(duì)列;反之則獲取成功。只是在公平版本的實(shí)現(xiàn)中,會(huì)調(diào)用AQS的hasQueuedPredecessors方法來判斷是否有其他線程已經(jīng)在AQS隊(duì)列中進(jìn)行排隊(duì)。如果有,則tryAcquireShared直接返回-1,即當(dāng)前調(diào)用線程放棄獲取,轉(zhuǎn)而準(zhǔn)備進(jìn)入AQS隊(duì)列以保障公平性

          public?class?Semaphore?implements?java.io.Serializable?{

          ????public?void?acquire()?throws?InterruptedException?{
          ????????sync.acquireSharedInterruptibly(1);
          ????}

          ????abstract?static?class?Sync?extends?AbstractQueuedSynchronizer?{
          ????????//?非公平信號(hào)量獲取許可
          ????????final?int?nonfairTryAcquireShared(int?acquires)?{
          ????????????for?(;;)?{
          ????????????????int?available?=?getState();
          ????????????????int?remaining?=?available?-?acquires;
          ????????????????if?(remaining?0?||
          ????????????????????compareAndSetState(available,?remaining))
          ????????????????????return?remaining;
          ????????????}
          ????????}
          ????}????

          ????static?final?class?NonfairSync?extends?Sync?{
          ????????protected?int?tryAcquireShared(int?acquires)?{
          ????????????return?nonfairTryAcquireShared(acquires);
          ????????}
          ????}

          ????static?final?class?FairSync?extends?Sync?{
          ????????//?公平信號(hào)量獲取許可
          ????????protected?int?tryAcquireShared(int?acquires)?{
          ????????????for?(;;)?{
          ????????????????if?(hasQueuedPredecessors())
          ????????????????//?對(duì)于公平性實(shí)現(xiàn)而言,?如果AQS隊(duì)列存在排隊(duì)的節(jié)點(diǎn)
          ????????????????//?則直接返回-1,?即進(jìn)入AQS隊(duì)列進(jìn)行排隊(duì)以保證公平性
          ????????????????????return?-1;
          ????????????????//?通過訪問AQS的state字段,?獲取當(dāng)前可用的許可數(shù)量????
          ????????????????int?available?=?getState();
          ????????????????//?計(jì)算剩余可用的許可數(shù)量
          ????????????????int?remaining?=?available?-?acquires;
          ????????????????if?(remaining?0?||
          ????????????????????compareAndSetState(available,?remaining))
          ????????????????????return?remaining;
          ????????????}
          ????????}
          ????}
          }

          ...

          public?abstract?class?AbstractQueuedSynchronizer?extends?AbstractOwnableSynchronizer?implements?java.io.Serializable?{

          ????public?final?void?acquireSharedInterruptibly(int?arg)?throws?InterruptedException?{
          ????????//?線程被中斷則直接拋出異常
          ????????if?(Thread.interrupted())
          ????????????throw?new?InterruptedException();

          ????????if?(tryAcquireShared(arg)?0)
          ????????????doAcquireSharedInterruptibly(arg);
          ????}
          ????
          ????//?需要子類去實(shí)現(xiàn)
          ????protected?int?tryAcquireShared(int?arg)?{
          ????????throw?new?UnsupportedOperationException();
          ????}
          }

          release方法

          Semaphore的release()方法類似。其同樣是委托sync調(diào)用AQS的releaseShared方法。然后AQS執(zhí)行tryReleaseShared方法,如果該方法返回true,則會(huì)進(jìn)一步調(diào)用AQS的doReleaseShared方法來喚醒AQS隊(duì)列中其他線程??梢钥吹皆赟emaphore的Sync內(nèi)部類中,tryReleaseShared總是會(huì)返回true。其實(shí)現(xiàn)過程也很簡(jiǎn)單,如下所示

          public?class?Semaphore?implements?java.io.Serializable?{

          ????public?void?release()?{
          ????????sync.releaseShared(1);
          ????}

          ????abstract?static?class?Sync?extends?AbstractQueuedSynchronizer?{
          ????????protected?final?boolean?tryReleaseShared(int?releases)?{
          ????????????for?(;;)?{
          ????????????????//?通過訪問AQS的state字段,?獲取當(dāng)前可用的許可數(shù)量????
          ????????????????int?current?=?getState();
          ????????????????//?將釋放的許可數(shù)添加到當(dāng)前可用許可數(shù)量上
          ????????????????int?next?=?current?+?releases;
          ????????????????if?(next?//?overflow
          ????????????????????throw?new?Error("Maximum?permit?count?exceeded");
          ????????????????//?通過CAS的方式更新state字段
          ????????????????if?(compareAndSetState(current,?next))
          ????????????????????return?true;
          ????????????}
          ????????}
          ????}

          }

          ...

          public?abstract?class?AbstractQueuedSynchronizer?extends?AbstractOwnableSynchronizer?implements?java.io.Serializable?{
          ????public?final?boolean?releaseShared(int?arg)?{
          ????????if?(tryReleaseShared(arg))?{
          ????????????doReleaseShared();
          ????????????return?true;
          ????????}
          ????????return?false;
          ????}
          ????
          ????//?需要子類去實(shí)現(xiàn)
          ????protected?boolean?tryReleaseShared(int?arg)?{
          ????????throw?new?UnsupportedOperationException();
          ????}
          }

          參考文獻(xiàn)

          1. Java并發(fā)編程之美 翟陸續(xù)、薛賓田著
          瀏覽 35
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  婬乱A片欧美大片无码芳芳 | 台湾中文字幕娱乐网 | 黄色一类片 | 三级片AV网站 | 就爱搞AⅤ|