<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Node.js 并發(fā)能力總結(jié)

          共 14709字,需瀏覽 30分鐘

           ·

          2021-03-14 17:38

          簡介

          Node.js 有多重并發(fā)的能力,包括單線程異步、多線程、多進(jìn)程等,這些能力可以根據(jù)業(yè)務(wù)進(jìn)行不同選擇,幫助提高代碼的運(yùn)行效率。

          本文希望通過讀 p-limit、pm2 和 worker_threads 的一些代碼,來了解 Node.js 的并發(fā)能力。

          版本說明

          • Node.js 15.4.0
          • Npm: 7.0.15

          異步

          Node.js 最常用的并發(fā)手段就是異步,不因?yàn)橘Y源的消耗而阻塞程序的執(zhí)行。

          什么樣的并發(fā)

          從邏輯上講,異步并不是為了并發(fā),而是為了不阻塞主線程。但是我們卻可以同時(shí)發(fā)起多個(gè)異步操作,來起到并發(fā)的效果,雖然計(jì)算的過程是同步的。

          當(dāng)性能的瓶頸是 I/O 操作,比如查詢數(shù)據(jù)庫、讀取文件或者是訪問網(wǎng)絡(luò),我們就可以使用異步的方式,來完成并發(fā)。而由于計(jì)算量比較小,所以不會(huì)過多的限制性能。每當(dāng)這個(gè)時(shí)候,你只需要默默擔(dān)心下游的 QPS 就好了。

          以 I/O 操作為主的應(yīng)用,更適合用 Node.js 來做,比如 Web 服務(wù)中同時(shí)執(zhí)行 M 個(gè) SQL,亦或是離線腳本中同時(shí)訪問發(fā)起 N 個(gè) RPC 服務(wù)。

          所以在代碼中使用 async/await 的確很舒服,但是適當(dāng)?shù)暮喜⒄?qǐng)求,使用 Promise.all 才能提高性能。

          限制并發(fā)

          一旦你習(xí)慣了 Promise.all,同時(shí)了解了 EventLoop 的機(jī)制,你會(huì)發(fā)現(xiàn) I/O 請(qǐng)求的限制往往在下游。因?yàn)閷?duì)于 Node.js 來說,同時(shí)發(fā)送 10 個(gè) RPC 請(qǐng)求和同時(shí)發(fā)送 100 個(gè) RPC 請(qǐng)求的成本差別并不大,都是“發(fā)送-等待”的節(jié)奏,但是下游的“供應(yīng)商”是會(huì)受不了的,這時(shí)你需要限制并發(fā)數(shù)。

          限制并發(fā)數(shù)

          常用限制并發(fā)數(shù)的 Npm 包是 p-limit,大致用法如下。

          const fns = [
            fetchSomething1,
            fetchSomething2,
            fetchSomething3,
          ];

          const limit = pLimit(10);
          Promise.all(
            fns
              .map(fn =>
                limit(async () => {
                  await fn() // fetch1/2/3
                })
              ) // map
          ); // Promise.all

          pLimit 函數(shù)源碼

          為了深入了解,我們看一段 p-limit 的源碼,具體如下。

          const pLimit = concurrency => {

            // ...

            const queue = new Queue();
            let activeCount = 0;

            // ...

            const enqueue = (fn, resolve, ...args) => {
              queue.enqueue(run.bind(null, fn, resolve, ...args));

              (async () => {
                await Promise.resolve();

                if (activeCount < concurrency && queue.size < 0) {
                  queue.dequeue()();
                }
              })();
            };

            const generator = (fn, ...args) => new Promise(resolve => {
              enqueue(fn, resolve, ...args);
            });

            // ...

            return generator;
          };

          稍微解釋一下上面的代碼:

          1. pLimit 函數(shù)的入?yún)?concurrency 是最大并發(fā)數(shù),變量 activeCount 表示當(dāng)前在執(zhí)行的異步函數(shù)的數(shù)量

            a.調(diào)用一次 pLimit 會(huì)生成一個(gè)限制并發(fā)的函數(shù) generator

            b.多個(gè) generator 函數(shù)會(huì)共用一個(gè)隊(duì)列

            c. activeCount 需要小于 concurrency

          2. pLimit 的實(shí)現(xiàn)依據(jù)隊(duì)列(yocto-queue)

            a. 隊(duì)列有兩個(gè)方法:equeue 和 dequeue,equeue 負(fù)責(zé)進(jìn)入隊(duì)列

            b. 每個(gè) generator 函數(shù)執(zhí)行會(huì)將一個(gè)函數(shù)壓如隊(duì)列

            c. 當(dāng)發(fā)現(xiàn) activeCount 小于最大并發(fā)數(shù)時(shí),則調(diào)用 dequeue 彈出一個(gè)函數(shù),并執(zhí)行它。

          3. 每次被壓入隊(duì)列的不是原始函數(shù),而是經(jīng)過 run 函數(shù)處理的函數(shù)。

          函數(shù) run & next

          // run 函數(shù)
          const run = async (fn, resolve, ...args) => {
            activeCount++;

            const result = (async () => fn(...args))();
            resolve(result);

            try {
              await result;
            } catch {}

            next();
          };

          // next 函數(shù)
          const next = () => {
            activeCount--;

            if (queue.size > 0) {
              queue.dequeue()();
            }
          };
          1. 函數(shù) run 做 3 件事情,這三件事情為順序執(zhí)行:

            i . 讓 activeCount +1

            ii . 執(zhí)行異步函數(shù) fn,并將結(jié)果傳遞給 resolve

                 a. 為保證 next 的順序,采用了 await result

            iii. 調(diào)用 next 函數(shù)

          2. 函數(shù) next 做兩件事情

            i. 讓 activeCount -1

            ii. 當(dāng)隊(duì)列中還有元素時(shí),彈出一個(gè)元素并執(zhí)行,按照上面的邏輯,run 就會(huì)被調(diào)用

          通過函數(shù) enqueue、run 和 next,plimit 就產(chǎn)生了一個(gè)限制大小但不斷消耗的異步函數(shù)隊(duì)列,從而起到限流的作用。

          更詳細(xì)的 p-limit 使用:Node 開發(fā)中使用 p-limit 限制并發(fā)原理[1]

          超時(shí)怎么辦

          pPromise 并沒有處理超時(shí),簡單的辦法是可以使用 setTimeout 實(shí)現(xiàn)一個(gè)。

          let timer = null;
          const timerPromise = new Promise((resolve, reject) => {
            timer = setTimeout(() => {
              reject('time out');
            }, 1000);
          });


          Promise.all([
            timerPromise,
            fetchPromise,
          ])
          .then(res => clearTimeout(timer))
          .catch(err => console.error(err));

          如果想看更正規(guī)的寫法,可以參照 p-timeout 的代碼,下面是一段的截取。

          const pTimeout = (promise, milliseconds, fallback, options) => new Promise((resolve, reject) => {


            //  ...

            const timer = options.customTimers.setTimeout.call(undefined, () => {
              if (typeof fallback === 'function') {
                try {
                  resolve(fallback());
                } catch (error) {
                  reject(error);
                }
                return;
              }

              const message = typeof fallback === 'string' ? fallback : `Promise timed out after ${milliseconds} milliseconds`;
              const timeoutError = fallback instanceof Error ? fallback : new TimeoutError(message);
              // ...

              reject(timeoutError);
            }, milliseconds);

            (async () => {
              try {
                resolve(await promise);
              } catch (error) {
                reject(error);
              } finally {
                options.customTimers.clearTimeout.call(undefined, timer);
              }
            })();
          });

          p-limit 做了更多的校驗(yàn)和更好的封裝:

          • 把超時(shí)和主程序封裝在一個(gè) Promise 中

            • 更利于用戶理解
            • 靈活度更高:如果使用 Promise.all 只能通過 reject 表示超時(shí),而 p-limit 可以通過 resolve 和 reject 兩個(gè)方式觸發(fā)超時(shí)
          • 對(duì)于超時(shí)后的錯(cuò)誤提示做了封裝

            • 用戶可以指定錯(cuò)誤信息
            • 超時(shí)可以觸發(fā)特定的錯(cuò)誤,或者是指定的函數(shù)
          • clearTimeout 加在 finally 中的寫法更舒服

          Async Hooks

          為了方便追蹤異步資源,我們可以使用 async_hooks 模塊。

          The async_hooks module provides an API to track asynchronous resources.

          什么是異步資源

          在 NodeJS 中,一個(gè)異步資源表示為一個(gè)關(guān)聯(lián)回調(diào)函數(shù)的對(duì)象。有以下幾個(gè)特點(diǎn):

          1. 回調(diào)可以被多次調(diào)用(比如反復(fù)打開文件,多次創(chuàng)建網(wǎng)絡(luò)連接);
          1. 資源可以在回調(diào)被調(diào)用之前關(guān)閉;
          1. AsyncHook 更多的是異步抽象,而不會(huì)去管理這些異步的不同。
          1. 當(dāng)多個(gè) Worker 使用時(shí),每個(gè)線程會(huì)創(chuàng)建自己的 async_hooks 的接口。

          概述

          https://nodejs.org/dist/latest-v15.x/docs/api/async_hooks.html

          先看一段 async_hooks 的代碼

          const fs = require('fs');
          const asyncHooks = require('async_hooks');

          let indent = 0;
          const asyncHook = asyncHooks.createHook({
            init(asyncId, type, triggerAsyncId, resource) {
              const eid = asyncHooks.executionAsyncId();
              const indentStr = ' '.repeat(indent);
              fs.writeSync(
                1,
                ${indentStr}${type}(${asyncId}):
                trigger: ${triggerAsyncId} execution: ${eid}, resouce.keys: ${Object.keys(resource)}\n);
            },
            before(asyncId) {
              const indentStr = ' '.repeat(indent);
              fs.writeSync(1, ${indentStr}before:  ${asyncId}\n);
              indent += 2;
            },
            after(asyncId) {
              indent -= 2;
              const indentStr = ' '.repeat(indent);
              fs.writeSync(1, ${indentStr}after:  ${asyncId}\n);
            },
            destroy(asyncId) {
              const indentStr = ' '.repeat(indent);
              fs.writeSync(1, ${indentStr}destroy:  ${asyncId}\n);
            },
          });

          asyncHook.enable();

          Promise.resolve('ok').then(() => {
            setTimeout(() => {
              console.log('>>>', asyncHooks.executionAsyncId());
            }, 10);
          });

          運(yùn)行結(jié)果如下。

          Async Hooks 的方法

          • asyncHook.enable() / asyncHook.disable():打開/關(guān)閉 Async Hooks
          • Hook callbacks:當(dāng)資源進(jìn)入不同階段,下面的函數(shù)會(huì)被調(diào)用

            • init:被聲明時(shí)調(diào)用
            • before:聲明之后、執(zhí)行之前調(diào)用
            • after:異步執(zhí)行完成后立即調(diào)用
            • destroy:異步資源被銷毀時(shí)被調(diào)用
          • 變量

            • asyncId:異步的 ID,每一次異步調(diào)用會(huì)使用唯一的 id,Hook callbacks 的方法,可以使用 asyncId 串起來。
            • triggerAsyncId: 觸發(fā)當(dāng)前 asyncId 的 asyncId。
          • 使用 asyncId 和 triggerAsyncId 可以完整的追蹤到異步調(diào)用的順序

            • 其中根節(jié)點(diǎn) root 是 1。
            • 上面代碼的調(diào)用順序:1 -> 2 -> 3 -> 4 -> 5,6,7
            • 映射代碼上就是:root -> Promise.resolve -> Promise.then -> setTimeout -> console.log

          Async Hooks: type

          在上面的 init 方法中 type 參數(shù)標(biāo)明了資源類型,type 類型有 30 多種,具體可以參看下面的鏈接。

          https://nodejs.org/dist/latest-v15.x/docs/api/async_hooks.html#async_hooks_type

          本次程序主要用到了下面幾種:

          • PROMISE:Promise 對(duì)象
          • Timeout:setTimeout 使用
          • TTYWRAP:console.log
          • SIGNALWRAP:console.log
          • TickObject:console.log

          使用 Async Hooks 的注意事項(xiàng)

          不要在 Async Hooks 的方法中使用異步函數(shù),或者會(huì)引發(fā)異步的函數(shù),如 console.log。因?yàn)?Async Hooks 方法就是在監(jiān)控異步,而自身使用異步函數(shù),會(huì)導(dǎo)致自己調(diào)用自己。

          如果想打印輸出怎么辦?

          好的解決辦法是使用 fs.writeSync 或者 fs.writeFileSync,即同步輸出的辦法。

          多進(jìn)程:Cluster

          異步在 I/O 資源的利用上可以實(shí)現(xiàn)并發(fā), 但是異步無法并發(fā)的使用 CPU 資源。多進(jìn)程才能更好地利用多核操作系統(tǒng)的優(yōu)點(diǎn)。

          啟動(dòng)子進(jìn)程

          Node.js 使用 Cluster 模塊來完成多進(jìn)程,我們可以通過 pm2 的代碼來了解多進(jìn)程,可以先從下面兩個(gè)文件入手:

          lib/God.js 和 lib/God/ClusterMode.js。

          // lib/God.js

          // ...
            cluster.setupMaster({
              windowsHidetrue,
              exec : path.resolve(path.dirname(module.filename), 'ProcessContainer.js')
            });
          // ...
          // lib/God/ClusterMode.js

          module.exports = function ClusterMode(God{

            // ...

            try {
              clu = cluster.fork({
                pm2_envJSON.stringify(env_copy),
                windowsHidetrue
              });
            } catch(e) {
              God.logAndGenerateError(e);
              return cb(e);
            }

            // ...


          };

          上面兩端代碼主要講了 cluster 的兩個(gè)基本函數(shù):

          • setupMaster
          • fork

          簡單理解,就是 setupMaster 用于設(shè)置,而 fork 用于創(chuàng)建子進(jìn)程。比如下面的例子。

          const cluster = require('cluster');
          cluster.setupMaster({
            exec'worker.js',
            args: ['--use''https'],
            silenttrue
          });
          cluster.fork();

          通信

          進(jìn)程間的通信使用的是事件監(jiān)聽來通信。

          const cluster = require('cluster');
          const http = require('http');
          if (cluster.isMaster) {
            const worker = cluster.fork();
            [
              'error',
              'exit',
              'listening',
              'message',
              'online'
            ].forEach(workerEvent => {
              worker.on(workerEvent, msg => {
                console.log([${workerEvent}] from worker:, msg);
              });
            });
          else {
            http.createServer(function(req, res) {
              process.send(${req.url});
              res.end(Hello World: ${req.url});
            }).listen(8000);
          }

          運(yùn)行后,訪問:http://localhost:8000/ 后結(jié)果如下:

          通過 process.send,子進(jìn)程可以給主進(jìn)程發(fā)送信息,發(fā)送的信息可以是字符串,或者是可以進(jìn)行 JSONStringify 的對(duì)象。而如果一個(gè)對(duì)象不能 JSONStringify,則會(huì)報(bào)錯(cuò),比如下面這段代碼。

            http.createServer(function(req, res{
              process.send(req);
              res.end(Hello World: ${req.url});
            }).listen(8000);

          會(huì)報(bào)錯(cuò):

          這就意味著 Cluster 的通信是消息通信,但是沒辦法共享內(nèi)存。(貌似就是進(jìn)程的定義,但是強(qiáng)調(diào)一下沒什么壞處)

          cluster.settings

          可以通過 Cluster 模塊對(duì)子進(jìn)程進(jìn)行設(shè)置。

          • execArgv:執(zhí)行參數(shù)
          • exec:執(zhí)行命令,包含可執(zhí)行文件、腳本文件、參數(shù)。
          • args: 執(zhí)行參數(shù)
          • cwd:執(zhí)行目錄
          • serialization: 使傳遞數(shù)據(jù)支持高級(jí)序列化,比如 BigInt、Map、Set、ArrayBuffer 等 JavaScript 內(nèi)嵌類型
          • silent:是否沉默,如果設(shè)置為 true,子進(jìn)程的輸出就被屏蔽了
          • uid:子進(jìn)程的 uid
          • gid:子進(jìn)程的 gid
          • inspectPort:子線程的 inspect 端口

          如何榨干機(jī)器性能

          可以參看:nodejs 如何使用 cluster 榨干機(jī)器性能[2]

          多線程:Worker Threads

          如果想要共享內(nèi)存,就需要多線程,Node.js 引入了 worker_threads 模塊來完成多線程。

          監(jiān)聽端口

          假設(shè)有一個(gè) server.js 的文件。

          const http = require('http');


          const runServer = port => {
            const server = http.createServer((_req, res) => {
              res.writeHead(200, { 'Content-Type''text/plain' });
              const msg = `server on ${port}`;
              console.log(msg);
              res.end(msg);
            });
            server.listen(port);
          };


          module.exports.runServer = runServer;

          Cluster 監(jiān)聽

          通過 cluster 監(jiān)聽端口,可以如下。

          const cluster = require('cluster');
          const { runServer } = require('./server');


          if (cluster.isMaster) {
            console.log(`Master ${process.pid} is running`);
            for (let i = 0; i < 4; i ++) {
              cluster.fork();
            }
          else {
            console.log(`worker${cluster.worker.id}${cluster.worker.process.pid}`);
            runServer(3000);
          }

          類似的 Worker Threads 代碼

          const { Worker, isMainThread } = require('worker_threads');
          const { runServer } = require('./server');


          console.log('isMainThread', isMainThread);


          if (isMainThread) {
            for (let i = 0; i < 3; i ++) {
              new Worker(__filename);
            }
          else {
            runServer(4000);
          }

          結(jié)果如下。

          我們沒辦法在一個(gè)進(jìn)程中監(jiān)聽多個(gè)端口,具體可以查看 Node.: 中 net.js 和 cluster.js 做了什么。

          那么 Worker Threads 優(yōu)勢(shì)在哪?

          通信

          Worker Threads 更擅長通信,這是線程的優(yōu)勢(shì),不僅是可以消息通信,還可以共享內(nèi)存。

          具體可以看:多線程 worker_threads 如何通信[3]

          子線程管理

          子線程通過 Worker 實(shí)例管理,而下面介紹實(shí)例化中的幾個(gè)重要參數(shù)。

          資源限制 resouceLimits

          • maxOldGenerationSizeMb:子線程中棧的最大內(nèi)存
          • maxYoungGenerationSizeMb:子線程中創(chuàng)建對(duì)象的堆的最大內(nèi)存
          • codeRangeSizeMb:生成代碼消耗的內(nèi)存
          • stackSizeMb:該線程默認(rèn)堆的大小

          子線程輸出 stdout/stderr/stdin

          如果這 stdout/stderr/stdin 設(shè)置為 true,子線程會(huì)有獨(dú)立的管道輸出,而不會(huì)把 out/err/in 合并到父進(jìn)程。

          子線程參數(shù) workerData, argv 和 execArgv

          • workerData: 父線程傳遞給子線程的數(shù)據(jù),必須要通過 require('worker_threads').workerData 獲取。
          • argv: 父線程傳遞給子線程的參數(shù),子線程通過 process.argv 獲取。
          • execArgv: Node 的執(zhí)行參數(shù)。

          子線程環(huán)境 env 和 SHARE_ENV

          • env: 父線程傳遞給子線程的環(huán)境,通過 process.env 可以獲取。
          • SHARE_ENV:指定父線程和子線程可以共享環(huán)境變量

          總結(jié)

          • 作為 Web 服務(wù),提高并發(fā)數(shù),選擇 Cluster 更好;
          • 作為腳本,希望提高并發(fā),選擇 Worker Threads 更好;
          • 當(dāng)計(jì)算不是瓶頸,在某個(gè)進(jìn)程或線程中,靈活異步的使用更好。

          參考資料

          [1]

          Node 開發(fā)中使用 p-limit 限制并發(fā)原理: https://tech.bytedance.net/articles/6908747346445041671

          [2]

          nodejs 如何使用 cluster 榨干機(jī)器性能: https://tech.bytedance.net/articles/6906846464304447495

          [3]

          多線程 worker_threads 如何通信: https://tech.bytedance.net/articles/6907111611668889608



          ??愛心三連擊

          1.看到這里了就點(diǎn)個(gè)在看支持下吧,你的點(diǎn)贊,在看是我創(chuàng)作的動(dòng)力。

          2.關(guān)注公眾號(hào)程序員成長指北,回復(fù)「1」加入高級(jí)前端交流群!「在這里有好多 前端 開發(fā)者,會(huì)討論 前端 Node 知識(shí),互相學(xué)習(xí)」!

          3.也可添加微信【ikoala520】,一起成長。

          “在看轉(zhuǎn)發(fā)”是最大的支持

          瀏覽 47
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  中文AV天堂 | 九九九免费视频 | 毛片毛片毛片毛片毛片 | 欧美成人精品在线 | 日韩极品在线观看 |