<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Nodejs Stream pipe 的使用與實(shí)現(xiàn)原理分析

          共 9866字,需瀏覽 20分鐘

           ·

          2021-01-14 23:05


          今日文章由 “Node.js技術(shù)棧@五月君” 授權(quán)分享,正文從下面開(kāi)始~


          通過(guò)流我們可以將一大塊數(shù)據(jù)拆分為一小部分一點(diǎn)一點(diǎn)的流動(dòng)起來(lái),而無(wú)需一次性全部讀入,在 Linux 下我們可以通過(guò) | 符號(hào)實(shí)現(xiàn),類似的在 Nodejs 的 Stream 模塊中同樣也為我們提供了 pipe() 方法來(lái)實(shí)現(xiàn)。

          1. Nodejs Stream pipe 基本示例

          選擇 Koa 來(lái)實(shí)現(xiàn)這個(gè)簡(jiǎn)單的 Demo,因?yàn)橹坝腥嗽?“Nodejs技術(shù)棧” 交流群?jiǎn)栠^(guò)一個(gè)問(wèn)題,怎么在 Koa 中返回一個(gè) Stream,順便在下文借此機(jī)會(huì)提下。

          1.1 未使用 Stream pipe 情況

          在 Nodejs 中 I/O 操作都是異步的,先用 util 模塊的 promisify 方法將 fs.readFile 的 callback 形式轉(zhuǎn)為 Promise 形式,這塊代碼看似沒(méi)問(wèn)題,但是它的體驗(yàn)不是很好,因?yàn)樗菍?shù)據(jù)一次性讀入內(nèi)存再進(jìn)行的返回,當(dāng)數(shù)據(jù)文件很大的時(shí)候也是對(duì)內(nèi)存的一種消耗,因此不推薦它。

          const?Koa?=?require('koa');
          const?fs?=?require('fs');
          const?app?=?new?Koa();
          const?{?promisify?}?=?require('util');
          const?{?resolve?}?=?require('path');
          const?readFile?=?promisify(fs.readFile);

          app.use(async?ctx?=>?{
          ??try?{
          ????ctx.body?=?await?readFile(resolve(__dirname,?'test.json'));
          ??}?catch(err)?{?ctx.body?=?err?};
          });

          app.listen(3000);

          1.2 使用 Stream pipe 情況

          下面,再看看怎么通過(guò) Stream 的方式在 Koa 框架中響應(yīng)數(shù)據(jù)

          ...
          app.use(async?ctx?=>?{
          ??try?{
          ????const?readable?=?fs.createReadStream(resolve(__dirname,?'test.json'));
          ????ctx.body?=?readable;
          ??}?catch(err)?{?ctx.body?=?err?};
          });

          以上在 Koa 中直接創(chuàng)建一個(gè)可讀流賦值給 ctx.body 就可以了,你可能疑惑了為什么沒(méi)有 pipe 方法,因?yàn)榭蚣芙o你封裝好了,不要被表象所迷惑了,看下相關(guān)源碼:

          //?https://github.com/koajs/koa/blob/master/lib/application.js#L256
          function?respond(ctx)?{
          ??...
          ??let?body?=?ctx.body;
          ??if?(body?instanceof?Stream)?return?body.pipe(res);
          ??...
          }

          沒(méi)有神奇之處,框架在返回的時(shí)候做了層判斷,因?yàn)?res 是一個(gè)可寫(xiě)流對(duì)象,如果 body 也是一個(gè) Stream 對(duì)象(此時(shí)的 Body 是一個(gè)可讀流),則使用 body.pipe(res) 以流的方式進(jìn)行響應(yīng)。

          1.3 使用 Stream VS 不使用 Stream


          看到一個(gè)圖片,不得不說(shuō)畫(huà)的實(shí)在太萌了,來(lái)源 https://www.cnblogs.com/vajoy/p/6349817.html


          2 pipe 的調(diào)用過(guò)程與實(shí)現(xiàn)原理分析

          以上最后以流的方式響應(yīng)數(shù)據(jù)最核心的實(shí)現(xiàn)就是使用 pipe 方法來(lái)實(shí)現(xiàn)的輸入、輸出,本節(jié)的重點(diǎn)也是研究 pipe 的實(shí)現(xiàn),最好的打開(kāi)方式通過(guò)閱讀源碼實(shí)現(xiàn)吧。

          2.1 順藤摸瓜

          在應(yīng)用層我們調(diào)用了 fs.createReadStream() 這個(gè)方法,順藤摸瓜找到這個(gè)方法創(chuàng)建的可讀流對(duì)象的 pipe 方法實(shí)現(xiàn),以下僅列舉核心代碼實(shí)現(xiàn),基于 Nodejs v12.x 源碼。

          2.1.1 /lib/fs.js

          導(dǎo)出一個(gè) createReadStream 方法,在這個(gè)方法里面創(chuàng)建了一個(gè) ReadStream 可讀流對(duì)象,且 ReadStream 來(lái)自 internal/fs/streams 文件,繼續(xù)向下找。

          //?https://github.com/nodejs/node/blob/v12.x/lib/fs.js
          //?懶加載,主要在用到的時(shí)候用來(lái)實(shí)例化?ReadStream、WriteStream?...?等對(duì)象
          function?lazyLoadStreams()?{
          ??if?(!ReadStream)?{
          ????({?ReadStream,?WriteStream?}?=?require('internal/fs/streams'));
          ????[?FileReadStream,?FileWriteStream?]?=?[?ReadStream,?WriteStream?];
          ??}
          }

          function?createReadStream(path,?options)?{
          ??lazyLoadStreams();
          ??return?new?ReadStream(path,?options);?//?創(chuàng)建一個(gè)可讀流
          }

          module.exports?=?fs?=?{
          ??createReadStream,?//?導(dǎo)出?createReadStream?方法
          ??...
          }

          2.1.2 /lib/internal/fs/streams.js

          這個(gè)方法里定義了構(gòu)造函數(shù) ReadStream,且在原型上定義了 open、_read、_destroy 等方法,并沒(méi)有我們要找的 pipe 方法。

          但是呢通過(guò) ObjectSetPrototypeOf 方法實(shí)現(xiàn)了繼承,ReadStream 繼承了 Readable 在原型中定義的函數(shù),接下來(lái)繼續(xù)查找 Readable 的實(shí)現(xiàn)

          //?https://github.com/nodejs/node/blob/v12.x/lib/internal/fs/streams.js
          const?{?Readable,?Writable?}?=?require('stream');

          function?ReadStream(path,?options)?{
          ??if?(!(this?instanceof?ReadStream))
          ????return?new?ReadStream(path,?options);

          ??...
          ??Readable.call(this,?options);
          ??...
          }
          ObjectSetPrototypeOf(ReadStream.prototype,?Readable.prototype);
          ObjectSetPrototypeOf(ReadStream,?Readable);

          ReadStream.prototype.open?=?function()?{?...?};

          ReadStream.prototype._read?=?function(n)?{?...?};;

          ReadStream.prototype._destroy?=?function(err,?cb)?{?...?};
          ...

          module.exports?=?{
          ??ReadStream,
          ??WriteStream
          };

          2.1.3 /lib/stream.js

          在 stream.js 的實(shí)現(xiàn)中,有條注釋:在 Readable/Writable/Duplex/... 之前導(dǎo)入 Stream,原因是為了避免 cross-reference(require),為什么會(huì)這樣?

          第一步 stream.js 這里將 require('internal/streams/legacy') 導(dǎo)出復(fù)制給了 Stream。

          在之后的 _stream_readable、Writable、Duplex ... 模塊也會(huì)反過(guò)來(lái)引用 stream.js 文件,具體實(shí)現(xiàn)下面會(huì)看到。

          Stream 導(dǎo)入了 internal/streams/legacy

          上面 /lib/internal/fs/streams.js 文件從 stream 模塊獲取了一個(gè) Readable 對(duì)象,就是下面的 Stream.Readable 的定義。

          //?https://github.com/nodejs/node/blob/v12.x/lib/stream.js
          //?Note:?export?Stream?before?Readable/Writable/Duplex/...
          //?to?avoid?a?cross-reference(require)?issues
          const?Stream?=?module.exports?=?require('internal/streams/legacy');

          Stream.Readable?=?require('_stream_readable');
          Stream.Writable?=?require('_stream_writable');
          Stream.Duplex?=?require('_stream_duplex');
          Stream.Transform?=?require('_stream_transform');
          Stream.PassThrough?=?require('_stream_passthrough');
          ...

          2.1.4 /lib/internal/streams/legacy.js

          上面的 Stream 等于 internal/streams/legacy,首先繼承了 Events 模塊,之后呢在原型上定義了 pipe 方法,剛開(kāi)始看到這里的時(shí)候以為實(shí)現(xiàn)是在這里了,但后來(lái)看 _stream_readable 的實(shí)現(xiàn)之后,發(fā)現(xiàn) _stream_readable 繼承了 Stream 之后自己又重新實(shí)現(xiàn)了 pipe 方法,那么疑問(wèn)來(lái)了這個(gè)模塊的 pipe 方法是干嘛的?什么時(shí)候會(huì)被用?翻譯文件名 “l(fā)egacy=遺留”?有點(diǎn)沒(méi)太理解,難道是遺留了?有清楚的大佬可以指點(diǎn)下,也歡迎在公眾號(hào) “Nodejs技術(shù)棧” 后臺(tái)加我微信一塊討論下!

          //?https://github.com/nodejs/node/blob/v12.x/lib/internal/streams/legacy.js
          const?{
          ??ObjectSetPrototypeOf,
          }?=?primordials;
          const?EE?=?require('events');
          function?Stream(opts)?{
          ??EE.call(this,?opts);
          }
          ObjectSetPrototypeOf(Stream.prototype,?EE.prototype);
          ObjectSetPrototypeOf(Stream,?EE);

          Stream.prototype.pipe?=?function(dest,?options)?{
          ??...
          };

          module.exports?=?Stream;

          2.1.5 /lib/_stream_readable.js

          在 _stream_readable.js 的實(shí)現(xiàn)里面定義了 Readable 構(gòu)造函數(shù),且繼承于 Stream,這個(gè) Stream 正是我們上面提到的 /lib/stream.js 文件,而在 /lib/stream.js 文件里加載了 internal/streams/legacy 文件且重寫(xiě)了里面定義的 pipe 方法。

          經(jīng)過(guò)上面一系列的分析,終于找到可讀流的 pipe 在哪里,同時(shí)也更進(jìn)一步的認(rèn)識(shí)到了在創(chuàng)建一個(gè)可讀流時(shí)的執(zhí)行調(diào)用過(guò)程,下面將重點(diǎn)來(lái)看這個(gè)方法的實(shí)現(xiàn)。

          module.exports?=?Readable;
          Readable.ReadableState?=?ReadableState;

          const?EE?=?require('events');
          const?Stream?=?require('stream');

          ObjectSetPrototypeOf(Readable.prototype,?Stream.prototype);
          ObjectSetPrototypeOf(Readable,?Stream);

          function?Readable(options)?{
          ??if?(!(this?instanceof?Readable))
          ????return?new?Readable(options);

          ??...
          ??Stream.call(this,?options);?//?繼承自?Stream?構(gòu)造函數(shù)的定義
          }
          ...

          2.2 _stream_readable 實(shí)現(xiàn)分析

          2.2.1 聲明構(gòu)造函數(shù) Readable

          聲明構(gòu)造函數(shù) Readable 繼承 Stream 的構(gòu)造函數(shù)和原型。

          Stream 是 /lib/stream.js 文件,上面分析了,這個(gè)文件繼承了 events 事件,此時(shí)也就擁有了 events 在原型中定義的屬性,例如 on、emit 等方法。

          const?Stream?=?require('stream');
          ObjectSetPrototypeOf(Readable.prototype,?Stream.prototype);
          ObjectSetPrototypeOf(Readable,?Stream);

          function?Readable(options)?{
          ??if?(!(this?instanceof?Readable))
          ????return?new?Readable(options);

          ??...

          ??Stream.call(this,?options);
          }

          2.2.2 聲明 pipe 方法,訂閱 data 事件

          在 Stream 的原型上聲明 pipe 方法,訂閱 data 事件,src 為可讀流對(duì)象,dest 為可寫(xiě)流對(duì)象。

          我們?cè)谑褂?pipe 方法的時(shí)候也是監(jiān)聽(tīng)的 data 事件,一邊讀取數(shù)據(jù)一邊寫(xiě)入數(shù)據(jù)。

          看下 ondata() 方法里的幾個(gè)核心實(shí)現(xiàn):

          • dest.write(chunk):接收 chunk 寫(xiě)入數(shù)據(jù),如果內(nèi)部的緩沖小于創(chuàng)建流時(shí)配置的 highWaterMark,則返回 true,否則返回 false 時(shí)應(yīng)該停止向流寫(xiě)入數(shù)據(jù),直到 'drain' 事件被觸發(fā)
          • src.pause():可讀流會(huì)停止 data 事件,意味著此時(shí)暫停數(shù)據(jù)寫(xiě)入了。

          之所以調(diào)用 src.pause() 是為了防止讀入數(shù)據(jù)過(guò)快來(lái)不及寫(xiě)入,什么時(shí)候知道來(lái)不及寫(xiě)入呢,要看 dest.write(chunk) 什么時(shí)候返回 false,是根據(jù)創(chuàng)建流時(shí)傳的 highWaterMark 屬性,默認(rèn)為 16384 (16kb),對(duì)象模式的流默認(rèn)為 16。

          Readable.prototype.pipe?=?function(dest,?options)?{
          ??const?src?=?this;
          ??src.on('data',?ondata);
          ??function?ondata(chunk)?{
          ????const?ret?=?dest.write(chunk);
          ????if?(ret?===?false)?{
          ??????...
          ??????src.pause();
          ????}
          ??}
          ??...
          };

          2.2.3 訂閱 drain 事件,繼續(xù)流動(dòng)數(shù)據(jù)

          上面提到在 data 事件里,如果調(diào)用 dest.write(chunk) 返回 false,就會(huì)調(diào)用 src.pause() 停止數(shù)據(jù)流動(dòng),什么時(shí)候再次開(kāi)啟呢?

          如果說(shuō)可以繼續(xù)寫(xiě)入事件到流時(shí)會(huì)觸發(fā) drain 事件,也是在 dest.write(chunk) 等于 false 時(shí),如果 ondrain 不存在則注冊(cè) drain 事件。

          Readable.prototype.pipe?=?function(dest,?options)?{
          ??const?src?=?this;
          ??src.on('data',?ondata);
          ??function?ondata(chunk)?{
          ????const?ret?=?dest.write(chunk);
          ????if?(ret?===?false)?{
          ??????...
          ??????if?(!ondrain)?{
          ????????//?When?the?dest?drains,?it?reduces?the?awaitDrain?counter
          ????????//?on?the?source.??This?would?be?more?elegant?with?a?.once()
          ????????//?handler?in?flow(),?but?adding?and?removing?repeatedly?is
          ????????//?too?slow.
          ????????ondrain?=?pipeOnDrain(src);
          ????????dest.on('drain',?ondrain);
          ??????}
          ??????src.pause();
          ????}
          ??}
          ??...
          };

          //?當(dāng)可寫(xiě)入流?dest?耗盡時(shí),它將會(huì)在可讀流對(duì)象?source?上減少?awaitDrain?計(jì)數(shù)器
          //?為了確保所有需要緩沖的寫(xiě)入都完成,即?state.awaitDrain?===?0?和?src?可讀流上的?data?事件存在,切換流到流動(dòng)模式
          function?pipeOnDrain(src)?{
          ??return?function?pipeOnDrainFunctionResult()?{
          ????const?state?=?src._readableState;
          ????debug('pipeOnDrain',?state.awaitDrain);
          ????if?(state.awaitDrain)
          ??????state.awaitDrain--;
          ????if?(state.awaitDrain?===?0?&&?EE.listenerCount(src,?'data'))?{
          ??????state.flowing?=?true;
          ??????flow(src);
          ????}
          ??};
          }

          // stream.read()?從內(nèi)部緩沖拉取并返回?cái)?shù)據(jù)。如果沒(méi)有可讀的數(shù)據(jù),則返回 null。在可讀流上 src 還有一個(gè) readable 屬性,如果可以安全地調(diào)用 readable.read(),則為 true
          function?flow(stream)?{
          ??const?state?=?stream._readableState;
          ??debug('flow',?state.flowing);
          ??while?(state.flowing?&&?stream.read()?!==?null);
          }

          2.2.4 觸發(fā) data 事件

          調(diào)用 readable 的 resume() 方法,觸發(fā)可讀流的 'data' 事件,進(jìn)入流動(dòng)模式。

          Readable.prototype.pipe?=?function(dest,?options)?{
          ??const?src?=?this;
          ??//?Start?the?flow?if?it?hasn't?been?started?already.
          ??if?(!state.flowing)?{
          ????debug('pipe?resume');
          ????src.resume();
          ??}
          ??...

          然后實(shí)例上的 resume(Readable 原型上定義的)會(huì)在調(diào)用 resume() 方法,在該方法內(nèi)部又調(diào)用了 resume_(),最終執(zhí)行了 stream.read(0) 讀取了一次空數(shù)據(jù)(size 設(shè)置的為 0),將會(huì)觸發(fā)實(shí)例上的 _read() 方法,之后會(huì)在觸發(fā) data 事件。

          function?resume(stream,?state)?{
          ??...
          ??process.nextTick(resume_,?stream,?state);
          }

          function?resume_(stream,?state)?{
          ??debug('resume',?state.reading);
          ??if?(!state.reading)?{
          ????stream.read(0);
          ??}

          ??...
          }

          2.2.5 訂閱 end 事件

          end 事件:當(dāng)可讀流中沒(méi)有數(shù)據(jù)可供消費(fèi)時(shí)觸發(fā),調(diào)用 onend 函數(shù),執(zhí)行 dest.end() 方法,表明已沒(méi)有數(shù)據(jù)要被寫(xiě)入可寫(xiě)流,進(jìn)行關(guān)閉(關(guān)閉可寫(xiě)流的 fd),之后再調(diào)用 stream.write() 會(huì)導(dǎo)致錯(cuò)誤。

          Readable.prototype.pipe?=?function(dest,?options)?{
          ??...
          ??const?doEnd?=?(!pipeOpts?||?pipeOpts.end?!==?false)?&&
          ??????????????dest?!==?process.stdout?&&
          ??????????????dest?!==?process.stderr;

          ??const?endFn?=?doEnd???onend?:?unpipe;
          ??if?(state.endEmitted)
          ????process.nextTick(endFn);
          ??else
          ????src.once('end',?endFn);

          ??dest.on('unpipe',?onunpipe);
          ??...

          ??function?onend()?{
          ????debug('onend');
          ????dest.end();
          ??}
          }

          2.2.6 觸發(fā) pipe 事件

          在 pipe 方法里面最后還會(huì)觸發(fā)一個(gè) pipe 事件,傳入可讀流對(duì)象

          Readable.prototype.pipe?=?function(dest,?options)?{
          ??...
          ??const?source?=?this;
          ??dest.emit('pipe',?src);
          ??...
          };

          在應(yīng)用層使用的時(shí)候可以在可寫(xiě)流上訂閱 pipe 事件,做一些判斷,具體可參考官網(wǎng)給的這個(gè)示例 stream_event_pipe

          2.2.7 支持鏈?zhǔn)秸{(diào)用

          最后返回 dest,支持類似 unix 的用法:A.pipe(B).pipe(C)

          Stream.prototype.pipe?=?function(dest,?options)?{
          ??return?dest;
          };

          3. 總結(jié)

          本文總體分為兩部分:

          • 第一部分相對(duì)較基礎(chǔ),講解了 Nodejs Stream 的 pipe 方法在 Koa2 中是怎么去應(yīng)用的。
          • 第二部分仍以 Nodejs Stream pipe 方法為題,查找它的實(shí)現(xiàn),以及對(duì)源碼的一個(gè)簡(jiǎn)單分析,其實(shí) pipe 方法核心還是要去監(jiān)聽(tīng) data 事件,向可寫(xiě)流寫(xiě)入數(shù)據(jù),如果內(nèi)部緩沖大于創(chuàng)建流時(shí)配置的 highWaterMark,則要停止數(shù)據(jù)流動(dòng),直到 drain 事件觸發(fā)或者結(jié)束,當(dāng)然還要監(jiān)聽(tīng) end、error 等事件做一些處理。

          4. Reference

          • nodejs.cn/api/stream.html
          • cnodejs.org/topic/56ba030271204e03637a3870
          • github.com/nodejs/node/blob/master/lib/_stream_readable.js
          ??愛(ài)心三連擊

          1.看到這里了就點(diǎn)個(gè)在看支持下吧,你的點(diǎn)贊在看是我創(chuàng)作的動(dòng)力。

          2.關(guān)注公眾號(hào)程序員成長(zhǎng)指北,回復(fù)「1」加入高級(jí)前端交流群!「在這里有好多 前端?開(kāi)發(fā)者,會(huì)討論?前端 Node 知識(shí),互相學(xué)習(xí)」!

          3.也可添加微信【ikoala520】,一起成長(zhǎng)。

          “在看轉(zhuǎn)發(fā)”是最大的支持

          瀏覽 37
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  久久青青草香蕉手机视频在线 | 国产青娱视频在线观看 | 色噜噜人妻av中文字幕 | 日韩成人三级片 | 日本中文字幕在线 |