Node.js 并發(fā)能力總結(jié)
簡介
Node.js 有多重并發(fā)的能力,包括單線程異步、多線程、多進(jìn)程等,這些能力可以根據(jù)業(yè)務(wù)進(jìn)行不同選擇,幫助提高代碼的運(yùn)行效率。
本文希望通過讀 p-limit、pm2 和 worker_threads 的一些代碼,來了解 Node.js 的并發(fā)能力。
版本說明
Node.js 15.4.0 Npm: 7.0.15
異步
Node.js 最常用的并發(fā)手段就是異步,不因?yàn)橘Y源的消耗而阻塞程序的執(zhí)行。
什么樣的并發(fā)
從邏輯上講,異步并不是為了并發(fā),而是為了不阻塞主線程。但是我們卻可以同時(shí)發(fā)起多個(gè)異步操作,來起到并發(fā)的效果,雖然計(jì)算的過程是同步的。
當(dāng)性能的瓶頸是 I/O 操作,比如查詢數(shù)據(jù)庫、讀取文件或者是訪問網(wǎng)絡(luò),我們就可以使用異步的方式,來完成并發(fā)。而由于計(jì)算量比較小,所以不會(huì)過多的限制性能。每當(dāng)這個(gè)時(shí)候,你只需要默默擔(dān)心下游的 QPS 就好了。
以 I/O 操作為主的應(yīng)用,更適合用 Node.js 來做,比如 Web 服務(wù)中同時(shí)執(zhí)行 M 個(gè) SQL,亦或是離線腳本中同時(shí)訪問發(fā)起 N 個(gè) RPC 服務(wù)。
所以在代碼中使用 async/await 的確很舒服,但是適當(dāng)?shù)暮喜⒄?qǐng)求,使用 Promise.all 才能提高性能。
限制并發(fā)
一旦你習(xí)慣了 Promise.all,同時(shí)了解了 EventLoop 的機(jī)制,你會(huì)發(fā)現(xiàn) I/O 請(qǐng)求的限制往往在下游。因?yàn)閷?duì)于 Node.js 來說,同時(shí)發(fā)送 10 個(gè) RPC 請(qǐng)求和同時(shí)發(fā)送 100 個(gè) RPC 請(qǐng)求的成本差別并不大,都是“發(fā)送-等待”的節(jié)奏,但是下游的“供應(yīng)商”是會(huì)受不了的,這時(shí)你需要限制并發(fā)數(shù)。
限制并發(fā)數(shù)
常用限制并發(fā)數(shù)的 Npm 包是 p-limit,大致用法如下。
const fns = [
fetchSomething1,
fetchSomething2,
fetchSomething3,
];
const limit = pLimit(10);
Promise.all(
fns
.map(fn =>
limit(async () => {
await fn() // fetch1/2/3
})
) // map
); // Promise.all
pLimit 函數(shù)源碼
為了深入了解,我們看一段 p-limit 的源碼,具體如下。
const pLimit = concurrency => {
// ...
const queue = new Queue();
let activeCount = 0;
// ...
const enqueue = (fn, resolve, ...args) => {
queue.enqueue(run.bind(null, fn, resolve, ...args));
(async () => {
await Promise.resolve();
if (activeCount < concurrency && queue.size < 0) {
queue.dequeue()();
}
})();
};
const generator = (fn, ...args) => new Promise(resolve => {
enqueue(fn, resolve, ...args);
});
// ...
return generator;
};
稍微解釋一下上面的代碼:
pLimit 函數(shù)的入?yún)?concurrency 是最大并發(fā)數(shù),變量 activeCount 表示當(dāng)前在執(zhí)行的異步函數(shù)的數(shù)量
a.調(diào)用一次 pLimit 會(huì)生成一個(gè)限制并發(fā)的函數(shù) generator
b.多個(gè) generator 函數(shù)會(huì)共用一個(gè)隊(duì)列
c. activeCount 需要小于 concurrency
pLimit 的實(shí)現(xiàn)依據(jù)隊(duì)列(yocto-queue)
a. 隊(duì)列有兩個(gè)方法:equeue 和 dequeue,equeue 負(fù)責(zé)進(jìn)入隊(duì)列
b. 每個(gè) generator 函數(shù)執(zhí)行會(huì)將一個(gè)函數(shù)壓如隊(duì)列
c. 當(dāng)發(fā)現(xiàn) activeCount 小于最大并發(fā)數(shù)時(shí),則調(diào)用 dequeue 彈出一個(gè)函數(shù),并執(zhí)行它。
每次被壓入隊(duì)列的不是原始函數(shù),而是經(jīng)過 run 函數(shù)處理的函數(shù)。
函數(shù) run & next
// run 函數(shù)
const run = async (fn, resolve, ...args) => {
activeCount++;
const result = (async () => fn(...args))();
resolve(result);
try {
await result;
} catch {}
next();
};
// next 函數(shù)
const next = () => {
activeCount--;
if (queue.size > 0) {
queue.dequeue()();
}
};
函數(shù) run 做 3 件事情,這三件事情為順序執(zhí)行:
i . 讓 activeCount +1
ii . 執(zhí)行異步函數(shù) fn,并將結(jié)果傳遞給 resolve
a. 為保證 next 的順序,采用了 await result
iii. 調(diào)用 next 函數(shù)
函數(shù) next 做兩件事情
i. 讓 activeCount -1
ii. 當(dāng)隊(duì)列中還有元素時(shí),彈出一個(gè)元素并執(zhí)行,按照上面的邏輯,run 就會(huì)被調(diào)用
通過函數(shù) enqueue、run 和 next,plimit 就產(chǎn)生了一個(gè)限制大小但不斷消耗的異步函數(shù)隊(duì)列,從而起到限流的作用。
更詳細(xì)的 p-limit 使用:Node 開發(fā)中使用 p-limit 限制并發(fā)原理[1]
超時(shí)怎么辦
pPromise 并沒有處理超時(shí),簡單的辦法是可以使用 setTimeout 實(shí)現(xiàn)一個(gè)。
let timer = null;
const timerPromise = new Promise((resolve, reject) => {
timer = setTimeout(() => {
reject('time out');
}, 1000);
});
Promise.all([
timerPromise,
fetchPromise,
])
.then(res => clearTimeout(timer))
.catch(err => console.error(err));
如果想看更正規(guī)的寫法,可以參照 p-timeout 的代碼,下面是一段的截取。
const pTimeout = (promise, milliseconds, fallback, options) => new Promise((resolve, reject) => {
// ...
const timer = options.customTimers.setTimeout.call(undefined, () => {
if (typeof fallback === 'function') {
try {
resolve(fallback());
} catch (error) {
reject(error);
}
return;
}
const message = typeof fallback === 'string' ? fallback : `Promise timed out after ${milliseconds} milliseconds`;
const timeoutError = fallback instanceof Error ? fallback : new TimeoutError(message);
// ...
reject(timeoutError);
}, milliseconds);
(async () => {
try {
resolve(await promise);
} catch (error) {
reject(error);
} finally {
options.customTimers.clearTimeout.call(undefined, timer);
}
})();
});
p-limit 做了更多的校驗(yàn)和更好的封裝:
把超時(shí)和主程序封裝在一個(gè) Promise 中
更利于用戶理解 靈活度更高:如果使用 Promise.all 只能通過 reject 表示超時(shí),而 p-limit 可以通過 resolve 和 reject 兩個(gè)方式觸發(fā)超時(shí)
對(duì)于超時(shí)后的錯(cuò)誤提示做了封裝
用戶可以指定錯(cuò)誤信息 超時(shí)可以觸發(fā)特定的錯(cuò)誤,或者是指定的函數(shù)
clearTimeout 加在 finally 中的寫法更舒服
Async Hooks
為了方便追蹤異步資源,我們可以使用 async_hooks 模塊。
The async_hooks module provides an API to track asynchronous resources.
什么是異步資源
在 NodeJS 中,一個(gè)異步資源表示為一個(gè)關(guān)聯(lián)回調(diào)函數(shù)的對(duì)象。有以下幾個(gè)特點(diǎn):
回調(diào)可以被多次調(diào)用(比如反復(fù)打開文件,多次創(chuàng)建網(wǎng)絡(luò)連接);
資源可以在回調(diào)被調(diào)用之前關(guān)閉;
AsyncHook 更多的是異步抽象,而不會(huì)去管理這些異步的不同。
當(dāng)多個(gè) Worker 使用時(shí),每個(gè)線程會(huì)創(chuàng)建自己的 async_hooks 的接口。
概述
https://nodejs.org/dist/latest-v15.x/docs/api/async_hooks.html
先看一段 async_hooks 的代碼
const fs = require('fs');
const asyncHooks = require('async_hooks');
let indent = 0;
const asyncHook = asyncHooks.createHook({
init(asyncId, type, triggerAsyncId, resource) {
const eid = asyncHooks.executionAsyncId();
const indentStr = ' '.repeat(indent);
fs.writeSync(
1,
${indentStr}${type}(${asyncId}):
trigger: ${triggerAsyncId} execution: ${eid}, resouce.keys: ${Object.keys(resource)}\n);
},
before(asyncId) {
const indentStr = ' '.repeat(indent);
fs.writeSync(1, ${indentStr}before: ${asyncId}\n);
indent += 2;
},
after(asyncId) {
indent -= 2;
const indentStr = ' '.repeat(indent);
fs.writeSync(1, ${indentStr}after: ${asyncId}\n);
},
destroy(asyncId) {
const indentStr = ' '.repeat(indent);
fs.writeSync(1, ${indentStr}destroy: ${asyncId}\n);
},
});
asyncHook.enable();
Promise.resolve('ok').then(() => {
setTimeout(() => {
console.log('>>>', asyncHooks.executionAsyncId());
}, 10);
});
運(yùn)行結(jié)果如下。

Async Hooks 的方法
asyncHook.enable() / asyncHook.disable():打開/關(guān)閉 Async Hooks
Hook callbacks:當(dāng)資源進(jìn)入不同階段,下面的函數(shù)會(huì)被調(diào)用
init:被聲明時(shí)調(diào)用 before:聲明之后、執(zhí)行之前調(diào)用 after:異步執(zhí)行完成后立即調(diào)用 destroy:異步資源被銷毀時(shí)被調(diào)用
變量
asyncId:異步的 ID,每一次異步調(diào)用會(huì)使用唯一的 id,Hook callbacks 的方法,可以使用 asyncId 串起來。 triggerAsyncId: 觸發(fā)當(dāng)前 asyncId 的 asyncId。
使用 asyncId 和 triggerAsyncId 可以完整的追蹤到異步調(diào)用的順序
其中根節(jié)點(diǎn) root 是 1。 上面代碼的調(diào)用順序:1 -> 2 -> 3 -> 4 -> 5,6,7 映射代碼上就是:root -> Promise.resolve -> Promise.then -> setTimeout -> console.log
Async Hooks: type
在上面的 init 方法中 type 參數(shù)標(biāo)明了資源類型,type 類型有 30 多種,具體可以參看下面的鏈接。
https://nodejs.org/dist/latest-v15.x/docs/api/async_hooks.html#async_hooks_type
本次程序主要用到了下面幾種:
PROMISE:Promise 對(duì)象
Timeout:setTimeout 使用
TTYWRAP:console.log
SIGNALWRAP:console.log
TickObject:console.log
使用 Async Hooks 的注意事項(xiàng)
不要在 Async Hooks 的方法中使用異步函數(shù),或者會(huì)引發(fā)異步的函數(shù),如 console.log。因?yàn)?Async Hooks 方法就是在監(jiān)控異步,而自身使用異步函數(shù),會(huì)導(dǎo)致自己調(diào)用自己。
如果想打印輸出怎么辦?
好的解決辦法是使用 fs.writeSync 或者 fs.writeFileSync,即同步輸出的辦法。
多進(jìn)程:Cluster
異步在 I/O 資源的利用上可以實(shí)現(xiàn)并發(fā), 但是異步無法并發(fā)的使用 CPU 資源。多進(jìn)程才能更好地利用多核操作系統(tǒng)的優(yōu)點(diǎn)。
啟動(dòng)子進(jìn)程
Node.js 使用 Cluster 模塊來完成多進(jìn)程,我們可以通過 pm2 的代碼來了解多進(jìn)程,可以先從下面兩個(gè)文件入手:
lib/God.js 和 lib/God/ClusterMode.js。
// lib/God.js
// ...
cluster.setupMaster({
windowsHide: true,
exec : path.resolve(path.dirname(module.filename), 'ProcessContainer.js')
});
// ...
// lib/God/ClusterMode.js
module.exports = function ClusterMode(God) {
// ...
try {
clu = cluster.fork({
pm2_env: JSON.stringify(env_copy),
windowsHide: true
});
} catch(e) {
God.logAndGenerateError(e);
return cb(e);
}
// ...
};
上面兩端代碼主要講了 cluster 的兩個(gè)基本函數(shù):
setupMaster
fork
簡單理解,就是 setupMaster 用于設(shè)置,而 fork 用于創(chuàng)建子進(jìn)程。比如下面的例子。
const cluster = require('cluster');
cluster.setupMaster({
exec: 'worker.js',
args: ['--use', 'https'],
silent: true
});
cluster.fork();
通信
進(jìn)程間的通信使用的是事件監(jiān)聽來通信。
const cluster = require('cluster');
const http = require('http');
if (cluster.isMaster) {
const worker = cluster.fork();
[
'error',
'exit',
'listening',
'message',
'online'
].forEach(workerEvent => {
worker.on(workerEvent, msg => {
console.log([${workerEvent}] from worker:, msg);
});
});
} else {
http.createServer(function(req, res) {
process.send(${req.url});
res.end(Hello World: ${req.url});
}).listen(8000);
}
運(yùn)行后,訪問:http://localhost:8000/ 后結(jié)果如下:

通過 process.send,子進(jìn)程可以給主進(jìn)程發(fā)送信息,發(fā)送的信息可以是字符串,或者是可以進(jìn)行 JSONStringify 的對(duì)象。而如果一個(gè)對(duì)象不能 JSONStringify,則會(huì)報(bào)錯(cuò),比如下面這段代碼。
http.createServer(function(req, res) {
process.send(req);
res.end(Hello World: ${req.url});
}).listen(8000);
會(huì)報(bào)錯(cuò):

這就意味著 Cluster 的通信是消息通信,但是沒辦法共享內(nèi)存。(貌似就是進(jìn)程的定義,但是強(qiáng)調(diào)一下沒什么壞處)
cluster.settings
可以通過 Cluster 模塊對(duì)子進(jìn)程進(jìn)行設(shè)置。
execArgv:執(zhí)行參數(shù)
exec:執(zhí)行命令,包含可執(zhí)行文件、腳本文件、參數(shù)。
args: 執(zhí)行參數(shù)
cwd:執(zhí)行目錄
serialization: 使傳遞數(shù)據(jù)支持高級(jí)序列化,比如 BigInt、Map、Set、ArrayBuffer 等 JavaScript 內(nèi)嵌類型
silent:是否沉默,如果設(shè)置為 true,子進(jìn)程的輸出就被屏蔽了
uid:子進(jìn)程的 uid
gid:子進(jìn)程的 gid
inspectPort:子線程的 inspect 端口
如何榨干機(jī)器性能
可以參看:nodejs 如何使用 cluster 榨干機(jī)器性能[2]
多線程:Worker Threads
如果想要共享內(nèi)存,就需要多線程,Node.js 引入了 worker_threads 模塊來完成多線程。
監(jiān)聽端口
假設(shè)有一個(gè) server.js 的文件。
const http = require('http');
const runServer = port => {
const server = http.createServer((_req, res) => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
const msg = `server on ${port}`;
console.log(msg);
res.end(msg);
});
server.listen(port);
};
module.exports.runServer = runServer;
Cluster 監(jiān)聽
通過 cluster 監(jiān)聽端口,可以如下。
const cluster = require('cluster');
const { runServer } = require('./server');
if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`);
for (let i = 0; i < 4; i ++) {
cluster.fork();
}
} else {
console.log(`worker${cluster.worker.id}: ${cluster.worker.process.pid}`);
runServer(3000);
}
類似的 Worker Threads 代碼
const { Worker, isMainThread } = require('worker_threads');
const { runServer } = require('./server');
console.log('isMainThread', isMainThread);
if (isMainThread) {
for (let i = 0; i < 3; i ++) {
new Worker(__filename);
}
} else {
runServer(4000);
}
結(jié)果如下。

我們沒辦法在一個(gè)進(jìn)程中監(jiān)聽多個(gè)端口,具體可以查看 Node.: 中 net.js 和 cluster.js 做了什么。
那么 Worker Threads 優(yōu)勢(shì)在哪?
通信
Worker Threads 更擅長通信,這是線程的優(yōu)勢(shì),不僅是可以消息通信,還可以共享內(nèi)存。
具體可以看:多線程 worker_threads 如何通信[3]
子線程管理
子線程通過 Worker 實(shí)例管理,而下面介紹實(shí)例化中的幾個(gè)重要參數(shù)。
資源限制 resouceLimits
maxOldGenerationSizeMb:子線程中棧的最大內(nèi)存
maxYoungGenerationSizeMb:子線程中創(chuàng)建對(duì)象的堆的最大內(nèi)存
codeRangeSizeMb:生成代碼消耗的內(nèi)存
stackSizeMb:該線程默認(rèn)堆的大小
子線程輸出 stdout/stderr/stdin
如果這 stdout/stderr/stdin 設(shè)置為 true,子線程會(huì)有獨(dú)立的管道輸出,而不會(huì)把 out/err/in 合并到父進(jìn)程。
子線程參數(shù) workerData, argv 和 execArgv
workerData: 父線程傳遞給子線程的數(shù)據(jù),必須要通過 require('worker_threads').workerData 獲取。
argv: 父線程傳遞給子線程的參數(shù),子線程通過 process.argv 獲取。
execArgv: Node 的執(zhí)行參數(shù)。
子線程環(huán)境 env 和 SHARE_ENV
env: 父線程傳遞給子線程的環(huán)境,通過 process.env 可以獲取。
SHARE_ENV:指定父線程和子線程可以共享環(huán)境變量
總結(jié)
作為 Web 服務(wù),提高并發(fā)數(shù),選擇 Cluster 更好; 作為腳本,希望提高并發(fā),選擇 Worker Threads 更好;
當(dāng)計(jì)算不是瓶頸,在某個(gè)進(jìn)程或線程中,靈活異步的使用更好。
參考資料
Node 開發(fā)中使用 p-limit 限制并發(fā)原理: https://tech.bytedance.net/articles/6908747346445041671
[2]nodejs 如何使用 cluster 榨干機(jī)器性能: https://tech.bytedance.net/articles/6906846464304447495
[3]多線程 worker_threads 如何通信: https://tech.bytedance.net/articles/6907111611668889608
1.看到這里了就點(diǎn)個(gè)在看支持下吧,你的「點(diǎn)贊,在看」是我創(chuàng)作的動(dòng)力。
2.關(guān)注公眾號(hào)
程序員成長指北,回復(fù)「1」加入高級(jí)前端交流群!「在這里有好多 前端 開發(fā)者,會(huì)討論 前端 Node 知識(shí),互相學(xué)習(xí)」!3.也可添加微信【ikoala520】,一起成長。
“在看轉(zhuǎn)發(fā)”是最大的支持
