bobolink輕量級(jí) JS 任務(wù)調(diào)度工具
___. ___. .__ .__ __ \_ |__ ____\_ |__ ____ | | |__| ____ | | __ | __ \ / _ \| __ \ / _ \| | | |/ \| |/ / | \_\ ( <_> ) \_\ ( <_> ) |_| | | \ < |___ /\____/|___ /\____/|____/__|___| /__|_ \ \/ \/ \/ \/
項(xiàng)目介紹
bobolink 是一個(gè)輕量級(jí) JS 任務(wù)調(diào)度工具,允許并行數(shù)控制,超時(shí),重試,錯(cuò)誤抓取,運(yùn)行狀態(tài)統(tǒng)計(jì)等,同時(shí)支持多種調(diào)度模式,包括立即調(diào)度、按頻率調(diào)度等。
為什么說任務(wù)執(zhí)行隊(duì)列對(duì) JS 來說比其它熱門語言更為重要?由于 JS 是事件驅(qū)動(dòng)型的動(dòng)態(tài)腳本語言,我們最常見到的就是在代碼里各種各樣的異步回調(diào),NodeJs 標(biāo)榜的輕量高效也是基于 Javascript 非阻塞 I/O 模型而談的,可以說,基于回調(diào)的編程思維是 JS 的一大特色,但有時(shí)它會(huì)出問題,例如在 for 循環(huán)中執(zhí)行異步任務(wù),由于每個(gè)任務(wù)都是瞬間返回的,也就是說,循環(huán)會(huì)很快遍歷完,但任務(wù)卻是堆積的,輕則服務(wù)繁忙,重則直接把服務(wù)壓垮,如何輕巧地控制任務(wù)并行量很多時(shí)候是一個(gè)痛點(diǎn)。
安裝
npm i bobolink
使用說明
-
創(chuàng)建一個(gè)默認(rèn)配置的Bobolink
按照需要?jiǎng)?chuàng)建一個(gè)Bobolink實(shí)例(Bobolink實(shí)例之間互不影響, 所以可以多種場景使用多個(gè)Bobolink,甚至可以通過多個(gè)Bobolink合從而應(yīng)對(duì)一些復(fù)雜場景)
const Bobolink = require('bobolink'); // 每個(gè)Bobolink實(shí)例都有一個(gè)大的隊(duì)列用于存放任務(wù),所以可以很放心地將任務(wù)扔給它,適當(dāng)?shù)臅r(shí)機(jī)下Bobolink會(huì)很可靠地調(diào)度這些任務(wù)。 let q = new Bobolink(); -
put單個(gè)任務(wù)
由于Promise的執(zhí)行代碼在創(chuàng)建的時(shí)刻就已經(jīng)被執(zhí)行(then和catch內(nèi)的代碼則通過回調(diào)執(zhí)行),所以簡單把Promise扔進(jìn)Bobolink是不可行的
// 下面的打印序是 1 2 3 new Promise(resolve => { console.log(1); resolve(3) }).then(res => { console.log(res); }) console.log(2)通過將Promise扔進(jìn)一個(gè)function可以達(dá)到延期執(zhí)行的效果
function p() { // 返回promise任務(wù) return new Promise(resolve => { console.log(1); resolve(3) }).then(res => { console.log(res); }) } console.log(2);此時(shí)p必須等待調(diào)用才會(huì)執(zhí)行內(nèi)部的Promise代碼,且p返回的是該P(yáng)romise,值便可以繼續(xù)傳遞。 每個(gè)放置到Bobolink的Promise任務(wù)都應(yīng)該以這種方式封裝
function p() { return new Promise(resolve => { console.log(1); resolve(2) }).then(res => { console.log(res); return 3; }) } // 由于隊(duì)列很空閑, 可以立即調(diào)度本任務(wù), // 所以很快就成功打印出了1, 之后的then則需要等待合適的時(shí)機(jī)回調(diào), // 如果Promise及其上面的所有then都執(zhí)行完了, 最終會(huì)傳遞到put.then q.put(p).then(task => { // 打印最終值3 console.log(task.res) });當(dāng)然,如果在put的時(shí)候,隊(duì)列執(zhí)行中的任務(wù)數(shù)已經(jīng)到達(dá)最大并行量,則需要等待有任務(wù)執(zhí)行完成時(shí)騰出空間,并且排在當(dāng)前任務(wù)之前的任務(wù)已經(jīng)都被調(diào)度完了才會(huì)得到執(zhí)行。
-
put一組任務(wù)
Bobolink允許同時(shí)put多個(gè)任務(wù),且put.then會(huì)在該組任務(wù)都被執(zhí)行完畢時(shí)才被調(diào)用
function getP(flag) { return function p() { return new Promise(resolve => { resolve(flag) }); } } q.put([getP(1), getP(2), getP(3)]).then(tasks => { // 打印每個(gè)任務(wù)的返回值, 按放入順序一一對(duì)應(yīng) for (let i = 0; i < tasks.length; i++) { console.log(tasks[i].res); } }) -
配置
目前支持的參數(shù)如下:
let q = new Bobolink({ // 最大并行數(shù),最小為1 concurrency: 5, // 任務(wù)超時(shí)時(shí)間ms,0不超時(shí) timeout: 15000, // 任務(wù)失敗重試次數(shù),0不重試 retry: 0, // 是否優(yōu)先處理失敗重試的任務(wù),為true則失敗的任務(wù)會(huì)被放置到隊(duì)列頭 retryPrior: false, // 是否優(yōu)先處理新任務(wù),為true則新任務(wù)會(huì)被放置到隊(duì)列頭 newPrior: false, // 最大可排隊(duì)的任務(wù)數(shù), -1為無限制, 超過最大限制時(shí)添加任務(wù)將返回錯(cuò)誤'bobolink_exceeded_maximum_task_number' max: -1, // 指定任務(wù)的調(diào)度模式,僅在初始化時(shí)設(shè)置有效 scheduling: { // 默認(rèn)為'immediately',任務(wù)將在隊(duì)列空閑時(shí)立即得到調(diào)度。 // 你也可以將它設(shè)置為'frequency', 并且指定countPerSecond, Bobolink將嚴(yán)格地按照設(shè)定的頻率去調(diào)度任務(wù)。 enable: 'frequency', frequency: { // 每秒需要調(diào)度的任務(wù)數(shù),僅在任務(wù)隊(duì)列有空閑時(shí)才會(huì)真正調(diào)度。 countPerSecond: 10000 } }, // 任務(wù)失敗的handler函數(shù),如果設(shè)置了重試,同個(gè)任務(wù)失敗多次會(huì)執(zhí)行catch多次 catch: (err) => { } });參數(shù)可以在運(yùn)行期更改, 對(duì)后續(xù)生效
q.setOptions({ concurrency: 5, timeout: 15000, retry: 0, retryPrior: false, newPrior: false, catch: null }); -
任務(wù)運(yùn)行狀態(tài)
使用Bobolink執(zhí)行的Promise任務(wù)所有錯(cuò)誤會(huì)被catch并包裝,所以只存在put.then而不存在put.catch(除非put.then自身出錯(cuò))。任務(wù)執(zhí)行之后獲取到的響應(yīng)有一些有用的值可以用于服務(wù)統(tǒng)計(jì)
taskRes = { // 執(zhí)行是否遇到錯(cuò)誤, 判斷任務(wù)是否執(zhí)行成功的判斷依據(jù)是err === undefined, err為任何其它值都代表了運(yùn)行失敗。 // 任務(wù)出錯(cuò)時(shí), 如果不重試, 那么catch到的錯(cuò)誤會(huì)直接放入err, 超時(shí)時(shí)err為'bobolink_timeout' // 如果重試, 且在最大重試次數(shù)之后依然錯(cuò)誤的話, 會(huì)將最后一次的錯(cuò)誤放入err // 如果重試, 且在重試期間成功的話, 被認(rèn)為是成功的, 所以err為空 err: undefined, // 執(zhí)行Promise返回的結(jié)果 res: Object, // 從任務(wù)放入隊(duì)列到該任務(wù)最后一次被調(diào)度, 所經(jīng)過的時(shí)間(ms) waittingTime: 20, // 該任務(wù)最后一次運(yùn)行的時(shí)間(ms) runTime: 1, // 該任務(wù)出錯(cuò)重試的次數(shù) retry: 2 } -
插隊(duì)
除了隊(duì)列控制參數(shù)newPrior和retryPrior之外,也允許在put的時(shí)候指定當(dāng)前任務(wù)是否優(yōu)先處理
Bobolink.ptototype.put(tasks, prior)
默認(rèn)情況下,任務(wù)是放入隊(duì)尾的,但如果指定了prior為true,則會(huì)被放置到隊(duì)頭,put任務(wù)組時(shí)會(huì)維持組任務(wù)原本的順序,并整個(gè)放入隊(duì)頭。
-
更多
-
q.options:獲取當(dāng)前隊(duì)列的配置。
-
q.queueTaskSize:獲取隊(duì)列排隊(duì)中的任務(wù)數(shù)。
-
q.runningTaskCount:獲取隊(duì)列執(zhí)行中的任務(wù)數(shù)。
-
