結(jié)合異步迭代器實(shí)現(xiàn) Node.js 流式數(shù)據(jù)復(fù)制
實(shí)現(xiàn)可讀流到可寫流數(shù)據(jù)復(fù)制,就是不斷的讀取->寫入這個過程,那么你首先想到的是不是下面這樣呢?代碼看似很簡單,結(jié)果卻是很糟糕的,沒有任何的數(shù)據(jù)積壓處理。如果讀取的文件很大了,造成的后果就是緩沖區(qū)數(shù)據(jù)溢出,程序會占用過多的系統(tǒng)內(nèi)存,拖垮服務(wù)器上的其它應(yīng)用,如果不明白的回顧下這篇文章 Node.js Stream 背壓 — 消費(fèi)端數(shù)據(jù)積壓來不及處理會怎么樣?。
//?糟糕的示例,沒有數(shù)據(jù)積壓處理
readable.on('data',?data?=>?{
??writable.write(data)
});
類似以上的需求,推薦你用 pipe() 方法以流的形式完成數(shù)據(jù)的復(fù)制。
作為學(xué)習(xí),結(jié)合異步迭代器以一種簡單的方式實(shí)現(xiàn)一個類似于 pipe 一樣的方法完成數(shù)據(jù)源到目標(biāo)源的數(shù)據(jù)復(fù)制。
數(shù)據(jù)寫入方法實(shí)現(xiàn)
_write 方法目的是控制可寫流的數(shù)據(jù)寫入,它返回一個 Promise 對象,如果可寫流的 dest.write() 方法返回 true,表示內(nèi)部緩沖區(qū)未滿,繼續(xù)寫入。
當(dāng) dest.write() 方法返回 false 表示向流中寫入數(shù)據(jù)超過了它所能處理的最大能力限制,此時暫停向流中寫入數(shù)據(jù),直到 drain 事件觸發(fā),表示緩沖區(qū)中的數(shù)據(jù)已排空了可以繼續(xù)寫入,再將 Promise 對象變?yōu)榻鉀Q。
function?_write(dest,?chunk)?{
??return?new?Promise(resolve?=>?{
????if?(dest.write(chunk))?{
??????return?resolve(null);
????}
????dest.once('drain',?resolve);
??})??
}
結(jié)合異步迭代器實(shí)現(xiàn)
異步迭代器使從可讀流對象讀取數(shù)據(jù)變得更簡單,異步的讀取數(shù)據(jù)并調(diào)用我們封裝的 _write(chunk) 方法寫入數(shù)據(jù),如果緩沖區(qū)空間已滿,這里 await _write(dest, chunk) 也會等待,當(dāng)緩沖區(qū)有空間可以繼續(xù)寫入了,再次進(jìn)行讀取 -> 寫入。
function?myCopy(src,?dest)?{
??return?new?Promise(async?(resolve,?reject)?=>?{
????dest.on('error',?reject);
????try?{
??????for?await?(const?chunk?of?src)?{
????????await?_write(dest,?chunk);
??????}
??????resolve();
????}?catch?(err)?{
??????reject(err);
????}
??});
}
使用如下所示:
const?readable?=?fs.createReadStream('text.txt');
const?writable?=?fs.createWriteStream('dest-text.txt');
await?myCopy(readable,?writable);

往期推薦



最后
歡迎加我微信,拉你進(jìn)技術(shù)群,長期交流學(xué)習(xí)...
歡迎關(guān)注「前端Q」,認(rèn)真學(xué)前端,做個專業(yè)的技術(shù)人...


