如何利用 JavaScript 實現(xiàn)并發(fā)控制
一、前言
??在開發(fā)過程中,有時會遇到需要控制任務并發(fā)執(zhí)行數(shù)量的需求。
??例如一個爬蟲程序,可以通過限制其并發(fā)任務數(shù)量來降低請求頻率,從而避免由于請求過于頻繁被封禁問題的發(fā)生。
??接下來,本文介紹如何實現(xiàn)一個并發(fā)控制器。
二、示例
??const?task?=?timeout?=>?new?Promise((resolve)?=>?setTimeout(()?=>?{
????resolve(timeout);
??},?timeout))
??const?taskList?=?[1000,?3000,?200,?1300,?800,?2000];
??async?function?startNoConcurrentControl()?{
????console.time(NO_CONCURRENT_CONTROL_LOG);
????await?Promise.all(taskList.map(item?=>?task(item)));
????console.timeEnd(NO_CONCURRENT_CONTROL_LOG);
??}
??startNoConcurrentControl();
??上述示例代碼利用 Promise.all 方法模擬6個任務并發(fā)執(zhí)行的場景,執(zhí)行完所有任務的總耗時為 3000 毫秒。
??下面會采用該示例來驗證實現(xiàn)方法的正確性。
三、實現(xiàn)
??由于任務并發(fā)執(zhí)行的數(shù)量是有限的,那么就需要一種數(shù)據(jù)結(jié)構(gòu)來管理不斷產(chǎn)生的任務。
??隊列的「先進先出」特性可以保證任務并發(fā)執(zhí)行的順序,在 JavaScript 中可以通過「數(shù)組來模擬隊列」:
??class?Queue?{
????constructor()?{
??????this._queue?=?[];
????}
????push(value)?{
??????return?this._queue.push(value);
????}
????shift()?{
??????return?this._queue.shift();
????}
????isEmpty()?{
??????return?this._queue.length?===?0;
????}
??}
??對于每一個任務,需要管理其執(zhí)行函數(shù)和參數(shù):
??class?DelayedTask?{
????constructor(resolve,?fn,?args)?{
??????this.resolve?=?resolve;
??????this.fn?=?fn;
??????this.args?=?args;
????}
??}
??接下來實現(xiàn)核心的 TaskPool 類,該類主要用來控制任務的執(zhí)行:
??class?TaskPool?{
????constructor(size)?{
??????this.size?=?size;
??????this.queue?=?new?Queue();
????}
????addTask(fn,?args)?{
??????return?new?Promise((resolve)?=>?{
????????this.queue.push(new?DelayedTask(resolve,?fn,?args));
????????if?(this.size)?{
??????????this.size--;
??????????const?{?resolve:?taskResole,?fn,?args?}?=?this.queue.shift();
??????????taskResole(this.runTask(fn,?args));
????????}
??????})
????}
????pullTask()?{
??????if?(this.queue.isEmpty())?{
????????return;
??????}
??????if?(this.size?===?0)?{
????????return;
??????}
??????this.size++;
??????const?{?resolve,?fn,?args?}?=?this.queue.shift();
??????resolve(this.runTask(fn,?args));
????}
????runTask(fn,?args)?{
??????const?result?=?Promise.resolve(fn(...args));
??????result.then(()?=>?{
????????this.size--;
????????this.pullTask();
??????}).catch(()?=>?{
????????this.size--;
????????this.pullTask();
??????})
??????return?result;
????}
??}
??TaskPool 包含三個關鍵方法:
addTask: 將新的任務放入隊列當中,并觸發(fā)任務池狀態(tài)檢測,如果當前任務池非滿載狀態(tài),則從隊列中取出任務放入任務池中執(zhí)行。 runTask: 執(zhí)行當前任務,任務執(zhí)行完成之后,更新任務池狀態(tài),此時觸發(fā)主動拉取新任務的機制。 pullTask: 如果當前隊列不為空,且任務池不滿載,則主動取出隊列中的任務執(zhí)行。

??接下來,將前面示例的并發(fā)數(shù)控制為2個:
??const?cc?=?new?ConcurrentControl(2);
??async?function?startConcurrentControl()?{
????console.time(CONCURRENT_CONTROL_LOG);
????await?Promise.all(taskList.map(item?=>?cc.addTask(task,?[item])))
????console.timeEnd(CONCURRENT_CONTROL_LOG);
??}
??startConcurrentControl();
??執(zhí)行流程如下:

??最終執(zhí)行任務的總耗時為 5000 毫秒。
四、高階函數(shù)優(yōu)化參數(shù)傳遞
??await?Promise.all(taskList.map(item?=>?cc.addTask(task,?[item])))
??手動傳遞每個任務的參數(shù)的方式顯得非常繁瑣,這里可以通過「高階函數(shù)實現(xiàn)參數(shù)的自動透傳」:
??addTask(fn)?{
????return?(...args)?=>?{
??????return?new?Promise((resolve)?=>?{
????????this.queue.push(new?DelayedTask(resolve,?fn,?args));
????????if?(this.size)?{
??????????this.size--;
??????????const?{?resolve:?taskResole,?fn:?taskFn,?args:?taskArgs?}?=?this.queue.shift();
??????????taskResole(this.runTask(taskFn,?taskArgs));
????????}
??????})
????}
??}
??改造之后的代碼顯得簡潔了很多:
??await?Promise.all(taskList.map(cc.addTask(task)))
五、優(yōu)化出隊操作
??數(shù)組一般都是基于一塊「連續(xù)內(nèi)存」來存儲,當調(diào)用數(shù)組的 shift 方法時,首先是刪除頭部元素(時間復雜度 O(1)),然后需要將未刪除元素左移一位(時間復雜度 O(n)),所以 shift 操作的時間復雜度為 O(n)。

??由于 JavaScript 語言的特性,V8 在實現(xiàn) JSArray 的時候給出了一種空間和時間權(quán)衡的解決方案,在不同的場景下,JSArray 會在 FixedArray 和 HashTable 兩種模式間切換。
??在 hashTable 模式下,shift 操作省去了左移的時間復雜度,其時間復雜度可以降低為 O(1),即使如此,shift 仍然是一個耗時的操作。
??在數(shù)組元素比較多且需要頻繁執(zhí)行 shift 操作的場景下,可以通過 「reverse + pop」 的方式優(yōu)化。
??const?Benchmark?=?require('benchmark');
??const?suite?=?new?Benchmark.Suite;
??suite.add('shift',?function()?{
????let?count?=?10;
????const?arr?=?generateArray(count);
????while?(count--)?{
??????arr.shift();
????}
??})
??.add('reverse?+?pop',?function()?{
????let?count?=?10;
????const?arr?=?generateArray(count);
????arr.reverse();
????while?(count--)?{
??????arr.pop();
????}
??})
??.on('cycle',?function(event)?{
????console.log(String(event.target));
??})
??.on('complete',?function()?{
????console.log('Fastest?is?'?+?this.filter('fastest').map('name'));
????console.log('\n')
??})
??.run({
????async:?true
??})
??通過 benchmark.js 跑出的基準測試數(shù)據(jù),可以很容易地看出哪種方式的效率更高:

??回顧之前 Queue 類的實現(xiàn),由于只有一個數(shù)組來存儲任務,直接使用 reverse + pop 的方式,必然會影響任務執(zhí)行的次序。
??這里就需要引入雙數(shù)組的設計,一個數(shù)組負責入隊操作,一個數(shù)組負責出隊操作。
??class?HighPerformanceQueue?{
????constructor()?{
??????this.q1?=?[];?//?用于?push?數(shù)據(jù)
??????this.q2?=?[];?//?用于?shift?數(shù)據(jù)
????}
????push(value)?{
??????return?this.q1.push(value);
????}
????shift()?{
??????let?q2?=?this.q2;
??????if?(q2.length?===?0)?{
????????const?q1?=?this.q1;
????????if?(q1.length?===?0)?{
??????????return;
????????}
????????q2?=?this.q2?=?q1.reverse();
??????}
??????return?q2.pop();
????}
????isEmpty()?{
??????if?(this.q1.length?===?0?&&?this.q2.length?===?0)?{
????????return?true;
??????}
??????return?false;
????}
??}
??最后通過基準測試來驗證優(yōu)化的效果:

