使用 Node.js 實(shí)現(xiàn)文件流轉(zhuǎn)存服務(wù)
作者:董天成
原文地址: https://zhuanlan.zhihu.com/p/25367269
Buffer
Stream
setTimeout/setInterval
promise
使用遞歸的Promise來(lái)進(jìn)行流程控制
本文章所有的例子都采用 ES6 編寫,可以直接用node version 6.x 以上直接運(yùn)行,低版本的 node 可以使用 babel 或者 typescript 編譯器編譯之后再運(yùn)行。
本文相關(guān)的轉(zhuǎn)存服務(wù)代碼,單元測(cè)試代碼,以及測(cè)試服務(wù)代碼都在文章底部。
什么是轉(zhuǎn)存服務(wù)
相信很多同學(xué)都用過(guò)一個(gè)服務(wù)叫做圖片轉(zhuǎn)存服務(wù):即向服務(wù)器發(fā)送一個(gè)圖片的url地址,服務(wù)負(fù)責(zé)去將圖片下載到服務(wù)器上,之后再將這個(gè)圖片上傳到存儲(chǔ)服務(wù),得到一個(gè)可以訪問(wèn)(通常情況都是CDN服務(wù))的圖片地址。

但是類似這樣架構(gòu)的服務(wù)有一個(gè)軟肋—— 對(duì)于超大的文件,性能會(huì)明顯不足。
轉(zhuǎn)存服務(wù)在下載文件的時(shí)候,二進(jìn)制會(huì)先寫入本地硬盤上的緩存文件中,當(dāng)文件下載完成之后,再進(jìn)行上傳操作。但是對(duì)于大文件上傳和轉(zhuǎn)存,這個(gè)過(guò)程將會(huì)非常耗時(shí)。而且,大文件如果直接一次性上傳,也會(huì)導(dǎo)致非常高的失敗率。
在上傳這地方,業(yè)內(nèi)通常是采用分片上傳來(lái)進(jìn)行解決。
分片上傳一般是將一個(gè)大文件劃分成多個(gè)分片,然后通過(guò)并行或者串行的方式依次上傳至服務(wù)器端。
如果文件上傳失敗,只需要再重新上傳失敗的分片即可。
什么是文件流轉(zhuǎn)存服務(wù)

分片上傳解決了上傳可靠性和性能上的問(wèn)題,但是上傳依然需要等待整個(gè)文件都下載完成才能觸發(fā),
而一個(gè)大文件的下載需要很多時(shí)間,這依然會(huì)造成轉(zhuǎn)存一個(gè)大文件時(shí)間過(guò)長(zhǎng)的問(wèn)題。
如果能夠在下載到的數(shù)據(jù)量滿足上傳一個(gè)分片的時(shí)候就直接將分片上傳到接收分片的存儲(chǔ)服務(wù),那是不是就可以達(dá)到速度最快,實(shí)現(xiàn)文件流轉(zhuǎn)存服務(wù)。
捕獲下載到的數(shù)據(jù)內(nèi)容
流轉(zhuǎn)存服務(wù)實(shí)現(xiàn)的第一步即是捕獲下載到的內(nèi)容。Node.js中的stream模塊可以很方便的進(jìn)行文件的處理,Readable的Stream在接收到數(shù)據(jù)之后,會(huì)不斷的觸發(fā)data事件。通過(guò)監(jiān)聽Readable的Stream的data事件即可準(zhǔn)確獲取到每一次通過(guò)Stream進(jìn)行傳輸?shù)臄?shù)據(jù)。
由于數(shù)據(jù)來(lái)源是通過(guò)網(wǎng)絡(luò)下載,而且request模塊返回的同樣也是Readable的Stream對(duì)象。
當(dāng)具備了Readable的Stream之后,只需要?jiǎng)?chuàng)建一個(gè)Writable就能數(shù)據(jù)流動(dòng)起來(lái),它們之間通過(guò)pipe函數(shù)進(jìn)行聯(lián)接。
'use strict';
let request = require('request');
let fs = require('fs');
let httpStream = request({
method: 'GET',
url: 'https://baobao-3d.bj.bcebos.com/16-0-205.shuimian.mp4'
});
// 由于不需要獲取最終的文件,所以直接丟掉
let writeStream = fs.createWriteStream('/dev/null');
// 聯(lián)接Readable和Writable
httpStream.pipe(writeStream);
let totalLength = 0;
// 當(dāng)獲取到第一個(gè)HTTP請(qǐng)求的響應(yīng)獲取
httpStream.on('response', (response) => {
console.log('response headers is: ', response.headers);
});
httpStream.on('data', (chunk) => {
totalLength += chunk.length;
console.log('recevied data size: ' + totalLength + 'KB');
});
// 下載完成
writeStream.on('close', () => {
console.log('download finished');
});
運(yùn)行過(guò)程中會(huì)看到命令行不斷刷出 recevied data size: xxx KB,直到文件下載完畢,觸發(fā)close事件。
分片的緩沖
每次data事件的觸發(fā),獲取到的chunk大小取決于當(dāng)前的網(wǎng)絡(luò)環(huán)境,假設(shè)我們?cè)O(shè)置分片上傳的時(shí)候每個(gè)分片的固定大小為2MB。但是每次捕獲到的分片有可能會(huì)大于2M,也有可能會(huì)遠(yuǎn)遠(yuǎn)小余2M。有的時(shí)候下載會(huì)比上傳速度要快,如何能更穩(wěn)定更可控的讓上傳能持續(xù)下去,而不會(huì)收到下載速度的影響。所以我們需要在上傳和下載之間加一個(gè)緩沖區(qū)。
讓下載到的數(shù)據(jù)無(wú)論大小,快慢,統(tǒng)統(tǒng)扔到緩沖區(qū)中。而上傳只需要定時(shí)定量從緩沖區(qū)獲取數(shù)據(jù),
這樣雙方之間就互不影響了。

Node.js使用Buffer對(duì)象來(lái)描述一塊數(shù)據(jù)對(duì)象,上一節(jié)中每次data事件觸發(fā)的時(shí)候,回調(diào)函數(shù)的第一個(gè)參數(shù)的值就是一個(gè)Buffer對(duì)象。Buffer對(duì)象的prototype屬性中有一些非常類型數(shù)據(jù)方法的函數(shù),如slice,concat,使用方式也和數(shù)組方法類型。
于是我們可以實(shí)現(xiàn)下面這個(gè)類,用于控制緩沖區(qū),方面塞入數(shù)據(jù)和獲取切片。
/**
* @file 視頻下載緩沖區(qū)
*/
class BufferCache {
constructor (cutSize = 2097152) {
this._cache = Buffer.alloc(0);
this.cutSize = cutSize;
this.readyCache = []; // 緩沖區(qū)
}
// 放入不同大小的buffer
pushBuf (buf) {
let cacheLength = this._cache.length;
let bufLength = buf.length;
this._cache = Buffer.concat([this._cache, buf], cacheLength + bufLength);
this.cut();
}
/**
* 切分分片,小分片拼成大分片,超大分片切成小分片
*/
cut () {
if (this._cache.length >= this.cutSize) {
let totalLen = this._cache.length;
let cutCount = Math.floor(totalLen / this.cutSize);
for (let i = 0; i < cutCount; i++) {
let newBuf = Buffer.alloc(this.cutSize);
this._cache.copy(newBuf, 0, i * this.cutSize, (i + 1) * this.cutSize);
this.readyCache.push(newBuf);
}
this._cache = this._cache.slice(cutCount * this.cutSize);
}
}
/**
* 獲取等長(zhǎng)的分片
* @returns {Array}
*/
getChunks () {
return this.readyCache;
}
/**
* 獲取數(shù)據(jù)包的最后一小節(jié)
* @returns {*}
*/
getRemainChunks () {
if (this._cache.length <= this.cutSize) {
return this._cache;
}
else {
this.cut();
return this.getRemainChunks();
}
}
}
module.exports = BufferCache;
對(duì)于下載收到的不定長(zhǎng)的buffer,都統(tǒng)統(tǒng)調(diào)用pushBuf方法保存,pushBuf方法每次都會(huì)將緩存拼接成一個(gè)原始的數(shù)據(jù)段,并每次調(diào)用cut方法,從整個(gè)數(shù)據(jù)段中切分出一塊塊規(guī)整的數(shù)據(jù)塊,存儲(chǔ)在一個(gè)棧中,等待獲取。
如何連續(xù)寫入緩存
由于Readable的Stream的data事件會(huì)在stream收到數(shù)據(jù)的時(shí)候反復(fù)進(jìn)行觸發(fā),數(shù)據(jù)下載完畢又會(huì)觸發(fā)close事件。所以我們通過(guò)Javascript的函數(shù)將捕獲下載內(nèi)容的代碼封裝成一個(gè)函數(shù)。
'use strict';
let request = require('request');
let fs = require('fs');
// 引入緩存模塊
let BufferCache = require('./bufferCache');
const chunkSplice = 2097152; // 2MB
let bufferCache = new BufferCache(chunkSplice);
function getChunks(url, onStartDownload, onDownloading, onDownloadClose) {
'use strict';
let totalLength = 0;
let httpStream = request({
method: 'GET',
url: url
});
// 由于不需要獲取最終的文件,所以直接丟掉
let writeStream = fs.createWriteStream('/dev/null');
// 聯(lián)接Readable和Writable
httpStream.pipe(writeStream);
httpStream.on('response', (response) => {
onStartDownload(response.headers);
}).on('data', (chunk) => {
totalLength += chunk.length;
onDownloading(chunk, totalLength);
});
writeStream.on('close', () => {
onDownloadClose(totalLength);
});
}
function onStart(headers) {
console.log('start downloading, headers is :', headers);
}
function onData(chunk, downloadedLength) {
console.log('write ' + chunk.length + 'KB into cache');
// 都寫入緩存中
bufferCache.pushBuf(chunk);
}
function onFinished(totalLength) {
let chunkCount = Math.ceil(totalLength / chunkSplice);
console.log('total chunk count is:' + chunkCount);
}
getChunks('https://baobao-3d.bj.bcebos.com/16-0-205.shuimian.mp4', onStart, onData, onFinished);
通過(guò)3個(gè)傳入的回調(diào)函數(shù),我們就能很容易的掌控:第一個(gè)收到請(qǐng)求時(shí)觸發(fā)的操作,連續(xù)不斷收到數(shù)據(jù)時(shí)觸發(fā)的操作和下載完畢時(shí)觸發(fā)的操作。有個(gè)這個(gè)函數(shù),我們就只需要在接收數(shù)據(jù)的回調(diào)函數(shù)中將buffer都通過(guò)pushBuf函數(shù)寫入緩存即可。
準(zhǔn)備發(fā)送
目前下載數(shù)據(jù)包和緩存都已經(jīng)準(zhǔn)備就緒,接下來(lái)就是準(zhǔn)備進(jìn)行發(fā)送分片的操作了。但是,還依然存在以下問(wèn)題:
如何連續(xù)不斷的從緩存中獲取分片
如何發(fā)送分片
單個(gè)分片如果上傳失敗,如何重試
如何在所有分片都上傳完成之后觸發(fā)一個(gè)回調(diào)
如何實(shí)現(xiàn)多個(gè)分片并行上傳
下面將逐步講解思路,并提供相關(guān)實(shí)現(xiàn)代碼。
連續(xù)不斷獲取分片
連續(xù)不斷的獲取分片,實(shí)現(xiàn)上需要一個(gè)定時(shí)器來(lái)不斷的從緩存中獲取分片。
Javascript為我們提供好了簡(jiǎn)單易用的定時(shí)器,setTimeout和setInterval。每次回調(diào)函數(shù)的觸發(fā)都是在上一個(gè)時(shí)間周期完成之后運(yùn)行。這樣的機(jī)制能保證每次觸發(fā)setTimeout的時(shí)候,緩存中或少能塞進(jìn)一部分?jǐn)?shù)據(jù)進(jìn)去。
當(dāng)onStart函數(shù)觸發(fā)時(shí),就預(yù)示著下載已經(jīng)開始了,這個(gè)時(shí)候就可以開始進(jìn)行獲取分片了。
通過(guò)setInterval,設(shè)定一個(gè)200ms的時(shí)間間隔,在每一次時(shí)間間隔內(nèi)通過(guò)bufferCache.getChunks()方法獲取已經(jīng)切分好的分片。
最后一個(gè)分片是個(gè)特殊的情況,一個(gè)文件在經(jīng)過(guò)多次相同大小的切割之后,總會(huì)遺留下小的一塊分片,因此我們還需要對(duì)最后一個(gè)分片進(jìn)行特殊處理。當(dāng)readyCache的長(zhǎng)度為0的時(shí)候,而且下載已經(jīng)完成,不會(huì)再調(diào)用pushBuf函數(shù),就是獲取最后一段分片的時(shí)機(jī)。
function onStart(headers) {
console.log('start downloading, headers is :', headers);
let readyCache = bufferCache.getChunks();
let sendTimer = setInterval(() => {
if (readyCache.length > 0) {
let receivedChunk = readyCache.shift();
console.log('received Chunk', receivedChunk);
}
else if (isFinished) {
clearTimeout(sendTimer);
console.log('got last chunk');
let lastChunk = bufferCache.getRemainChunks();
console.log('the last chunk', lastChunk);
}
}, 200);
}
如何發(fā)送分片
使用HTTP進(jìn)行文件上傳,文件在傳輸過(guò)程中為一個(gè)byte序列,其content-type為multipart/form-data; boundary=----WebKitFormBoundarymqmPgKAUm2XuWnXu, boundary 是作為一個(gè)特殊的字符串來(lái)對(duì)發(fā)送的數(shù)據(jù)包進(jìn)行分割。上傳的數(shù)據(jù)中即可以包含二進(jìn)制文件的byte流,也可以包含常規(guī)的字符串鍵值對(duì)。
在瀏覽器上,上傳一個(gè)圖片的數(shù)據(jù)格式:
同樣,nodejs的request模塊也實(shí)現(xiàn)了和瀏覽器一樣的上傳文件協(xié)議,所以我們可以先通過(guò)Promise封裝一個(gè)上傳函數(shù)
function upload(url, data) {
return new Promise((resolve, reject) => {
request.post({
url: url,
formData: data
}, function (err, response, body) {
if (!err && response.statusCode === 200) {
resolve(body);
}
else {
reject(err);
}
});
});
}
發(fā)送分片的時(shí)候,需要間歇不斷的處理2件事情:
從緩存中拿出分片,直到拿完了,告知發(fā)送端已經(jīng)到底了
發(fā)送分片,發(fā)送成功,還有分片,繼續(xù)發(fā)送,直到分片都拿完了
對(duì)于這樣的邏輯,我們可以考慮使用遞歸來(lái)發(fā)送分片,函數(shù)的參數(shù)傳入readyCache的引用。
每次調(diào)用函數(shù),都通過(guò)引用從readyCache中把隊(duì)列最前面的分片拿出,再進(jìn)行分片發(fā)送,如果分片上傳成功,
再進(jìn)行遞歸,依然傳入readyCache的引用,直到readyCache的長(zhǎng)度為0。
由于我們?cè)诎l(fā)送的時(shí)候,使用了setInterval不斷輪詢,當(dāng)前輪詢的周期為200ms。
假設(shè)當(dāng)前網(wǎng)絡(luò)環(huán)境擁堵,會(huì)導(dǎo)致上傳一個(gè)分片的時(shí)間 > 200ms, 200ms之后下一次輪詢開始運(yùn)行時(shí),原先的分片還沒上傳完畢,由于沒有一個(gè)狀態(tài)值進(jìn)行判斷,依然會(huì)調(diào)用上傳函數(shù),又再一次進(jìn)行分片上傳,就會(huì)更加劇的網(wǎng)絡(luò)擁堵環(huán)境,導(dǎo)致分片上傳時(shí)間更短。如此反復(fù),時(shí)間一長(zhǎng)就會(huì)導(dǎo)致崩潰,造成分片上傳全部大面積失敗。
為了避免這樣的情況,我們就需要一個(gè)變量來(lái)表示當(dāng)前這個(gè)上傳流程的狀態(tài),目前我們只關(guān)心單個(gè)流程進(jìn)行上傳,可以只需要保證最大同時(shí)上傳的值為1即可。
function sendChunks() {
let chunkId = 0; // 給每個(gè)分片劃分ID
let sending = 0; // 當(dāng)前并行上傳的數(shù)量
let MAX_SENDING = 1; // 最大并行上傳數(shù)
function send(readyCache) {
// 在并行上傳會(huì)用到,
if (readyCache.length === 0) {
return;
}
let chunk = readyCache.shift();
// 測(cè)試使用的服務(wù),用于接收分片
let sendP = upload('http://localhost:3000', {
chunk: {
value: chunk,
options: {
// 在文件名稱上添加chunkId,可以方便后端服務(wù)進(jìn)行分片整理
filename: 'example.mp4_IDSPLIT_' + chunkId
}
}
});
sending++;
sendP.then((response) => {
sending--;
if (response.errno === 0 && readyCache.length > 0) {
// 成功上傳,繼續(xù)遞歸
send(readyCache);
}
});
chunkId++;
}
return new Promise((resolve, reject) => {
let readyCache = bufferCache.getChunks();
let sendTimer = setInterval(() => {
let readyCache = bufferCache.getChunks();
if (sending < MAX_SENDING && readyCache.length > 0) {
send(readyCache);
}
// 如果isFinished 不為true的話,有可能是正在下載分片
else if (isFinished && readyCache.length === 0) {
clearTimeout(sendTimer);
let lastChunk = bufferCache.getRemainChunks();
readyCache.push(lastChunk);
send(readyCache);
}
// 到這里是為分片正在下載,同時(shí)又正在上傳
// 或者上傳比下載快,已經(jīng)下載好的分片都傳完了,等待下載完成
}, 200);
});
}
function onStart(headers) {
// console.log('start downloading, headers is :', headers);
sendChunks();
}
單個(gè)分片如果上傳失敗,如何重試
到目前為止,分片上傳已經(jīng)初步完成,但僅僅是初步完成。因?yàn)槿绻厦娴拇a能連續(xù)穩(wěn)定運(yùn)行十幾年不出bug,是建立在以下情況的:接收端超穩(wěn)定,無(wú)論多少分片,多大速率,返回一律成功
但是現(xiàn)實(shí)是殘酷的,當(dāng)數(shù)量和頻率增加的時(shí)候,總會(huì)有分片上傳失敗,從而導(dǎo)致正在文件都上傳失敗。
因此我們需要讓分片上傳都具備重試功能。
在發(fā)送分片的時(shí)候,send函數(shù)可以當(dāng)成是發(fā)送單個(gè)分片的一個(gè)控制器,如果分片發(fā)送失敗,最容易捕獲并重試的地方就應(yīng)該在send函數(shù)內(nèi)部,所以當(dāng)錯(cuò)誤發(fā)生時(shí),只需將原先的數(shù)據(jù)保存下來(lái),然后再一次調(diào)用send函數(shù)就能進(jìn)行重試操作。
這樣的邏輯,我們可以簡(jiǎn)化成下面這段JS代碼。
let max = 4;
function send() {
return new Promise((resolve, reject) => {
if (max > 2) {
reject(new Error('error!!'));
}
else {
resolve('ok');
}
}).catch(() => {
max--;
return send();
});
}
send().then(() => {
console.log('finished');
}).catch(() => {
console.log('error');
});
當(dāng)Max > 2的時(shí)候,Promise就會(huì)返回異常,所以在最初的2次調(diào)用,Promise都會(huì)觸發(fā)catch函數(shù)。不過(guò)在每次catch的時(shí)候,再遞歸函數(shù),之前錯(cuò)誤的Promise就能夠被遞歸創(chuàng)建的新的Promise處理,直到這個(gè)Promise能夠成功返回。我們只需簡(jiǎn)單控制max的值,就能控制處理錯(cuò)誤的次數(shù)。這樣就能將錯(cuò)誤重試控制都包含在send函數(shù)內(nèi)部。
所以我們也可以使用這樣的邏輯來(lái)進(jìn)行分片的發(fā)送,當(dāng)請(qǐng)求出現(xiàn)錯(cuò)誤的時(shí)候,在catch函數(shù)內(nèi)判斷重試次數(shù),次數(shù)若大于0,則再返回一個(gè)遞歸的send函數(shù),直到次數(shù)等于0,直接用Promise.reject將異常拋出Promise。
如果接收服務(wù)一直存在問(wèn)題,導(dǎo)致多次上傳全部失敗的話,需要直接終止當(dāng)前問(wèn)題,于是我們還需要一個(gè)變量stopSend,用于在多次錯(cuò)誤之后,直接停止上傳。
function sendChunks() {
let chunkId = 0;
let sending = 0; // 當(dāng)前并行上傳的數(shù)量
let MAX_SENDING = 1; // 最大并行上傳數(shù)
let stopSend = false;
function send(options) {
let readyCache = options.readyCache;
let fresh = options.fresh;
let retryCount = options.retry;
let chunkIndex;
let chunk = null;
// 新的數(shù)據(jù)
if (fresh) {
if (readyCache.length === 0) {
return;
}
chunk = readyCache.shift();
chunkIndex = chunkId;
chunkId++;
}
// 失敗重試的數(shù)據(jù)
else {
chunk = options.data;
chunkIndex = options.index;
}
sending++;
let sendP = upload('http://localhost:3000', {
chunk: {
value: chunk,
options: {
filename: 'example.mp4_IDSPLIT_' + chunkIndex
}
}
}).then((response) => {
sending--;
let json = JSON.parse(response);
if (json.errno === 0 && readyCache.length > 0) {
return send({
retry: RETRY_COUNT,
fresh: true,
readyCache: readyCache
});
}
// 這里一直返回成功
return Promise.resolve(json);
}).catch(err => {
if (retryCount > 0) {
// 這里遞歸下去,如果成功的話,就等同于錯(cuò)誤已經(jīng)處理
return send({
retry: retryCount - 1,
index: chunkIndex,
fresh: false,
data: chunk,
readyCache: readyCache
});
}
else {
console.log(`upload failed of chunkIndex: ${chunkIndex}`);
// 停止上傳標(biāo)識(shí),會(huì)直接停止上傳
stopSend = true;
// 返回reject,異常拋出
return Promise.reject(err);
}
});
}
return new Promise((resolve, reject) => {
let readyCache = bufferCache.getChunks();
let sendTimer = setInterval(() => {
if (sending < MAX_SENDING && readyCache.length > 0) {
// 改用傳入對(duì)象
send({
retry: 3, // 最大重試3次
fresh: true, // 用這個(gè)字段來(lái)區(qū)分是新的分片,還是由于失敗重試的
readyCache: readyCache
}).catch(err => {
console.log('upload failed, errmsg: ', err);
});
}
else if (isFinished && readyCache.length === 0 || stopSend) {
clearTimeout(sendTimer);
// 已經(jīng)成功走到最后一個(gè)分片了。
if (!stopSend) {
let lastChunk = bufferCache.getRemainChunks();
readyCache.push(lastChunk);
send({
retry: 3,
fresh: true,
readyCache: readyCache
}).catch(err => {
console.log('upload failed, errmsg: ', err);
});
}
}
// 到這里是為分片正在下載,同時(shí)又正在上傳
// 或者上傳比下載快,已經(jīng)下載好的分片都傳完了,等待下載完成
}, 200);
});
}
在錯(cuò)誤模擬上面,我們可以在在測(cè)試的server上加了幾行代碼來(lái)模擬上傳失敗的情況,當(dāng)?shù)诙€(gè)分片到達(dá)的時(shí)候,一定會(huì)失敗。之后我們得到的日志如下:
// 錯(cuò)誤處理測(cè)試使用
if (chunkIndex == 1) {
console.log(`set failed of ${chunkIndex}`);
this.status = 500;
return;
}
server端得到的日志如下:
<-- POST /
uploading example.mp4_IDSPLIT_0 -> /Users/baidu/baiduYun/learn/koa-example/receive/example.mp4/0
--> POST / 200 93ms 25b
<-- POST /
set failed of 1
--> POST / 500 9ms -
<-- POST /
set failed of 1
--> POST / 500 15ms -
<-- POST /
set failed of 1
--> POST / 500 7ms -
<-- POST /
set failed of 1
--> POST / 500 14ms -
可見,在上傳失敗之后,當(dāng)前分片會(huì)自動(dòng)進(jìn)行重試上傳,直到超出重試次數(shù),再直接拋出異常。
如何在所有分片都上傳完成之后觸發(fā)一個(gè)回調(diào)
到目前為止,整個(gè)服務(wù)的核心部分已經(jīng)差不多了,send函數(shù)無(wú)論怎么調(diào)用,都會(huì)返回Promise對(duì)象,所以在所有分片都上傳完成之后觸發(fā)一個(gè)回調(diào)也就很容易了,只需要將所有的send函數(shù)返回的Promise對(duì)象放進(jìn)數(shù)組,然后通過(guò)Promise.all函數(shù)來(lái)捕獲即可,可見,基建搭的好,上層建筑建設(shè)也就輕而易舉了。
所以我們只需要更改sendTimer這個(gè)定時(shí)器內(nèi)部的代碼即可。
let readyCache = bufferCache.getChunks();
let sendPromise = [];
let sendTimer = setInterval(() => {
if (sending < MAX_SENDING && readyCache.length > 0) {
// 把Promise塞進(jìn)數(shù)組
sendPromise.push(send({
retry: RETRY_COUNT,
fresh: true,
readyCache: readyCache
}));
}
else if ((isFinished && readyCache.length === 0) || stopSend) {
clearTimeout(sendTimer);
if (!stopSend) {
console.log('got last chunk');
let lastChunk = bufferCache.getRemainChunks();
readyCache.push(lastChunk);
// 把Promise塞進(jìn)數(shù)組
sendPromise.push(send({
retry: RETRY_COUNT,
fresh: true,
readyCache: readyCache
}));
}
// 當(dāng)所有的分片都發(fā)送之后觸發(fā),
Promise.all(sendPromise).then(() => {
console.log('send success');
}).catch(err => {
console.log('send failed');
});
}
// not ready, wait for next interval
}, 200);
如何實(shí)現(xiàn)多個(gè)分片并行上傳
Node.js提供事件驅(qū)動(dòng)和非阻塞I/O可不是用來(lái)寫callbackHell的。有了這2個(gè)利器,我們可以輕松在一個(gè)進(jìn)程上使用一個(gè)線程調(diào)度,控制多個(gè)I/O操作。這樣的設(shè)計(jì)就無(wú)需使用多線程編程,也就不用關(guān)心鎖之類的東西了。
實(shí)現(xiàn)多個(gè)分片上傳,所以只需要直接創(chuàng)建多個(gè)HTTP連接進(jìn)行上傳,多個(gè)上傳操作同享一個(gè)readyCache。
而目前我們實(shí)現(xiàn)的send函數(shù)可以讓一個(gè)分片上傳自我控制,同樣,同時(shí)調(diào)用多次send函數(shù)也就等同于讓多個(gè)分片進(jìn)行自我控制。而且多個(gè)send函數(shù)運(yùn)行在同一個(gè)node.js進(jìn)程上,所以對(duì)共享的reayCache的獲取是一個(gè)串行的操作(nodejs進(jìn)程在一個(gè)事件輪詢周期中會(huì)依次執(zhí)行多個(gè)send函數(shù))。也就不可能出現(xiàn)多個(gè)send函數(shù)對(duì)readyCache的競(jìng)爭(zhēng)造成死鎖這樣的情況。
可見,單進(jìn)程異步輪詢這樣的設(shè)計(jì)方案,能完全避免死鎖這樣的情況。
所以直接把調(diào)用send函數(shù)平行擴(kuò)展:
let readyCache = bufferCache.getChunks();
let threadPool = [];
let sendTimer = setInterval(() => {
if (sending < MAX_SENDING && readyCache.length > 0) {
// 這個(gè)例子同時(shí)開啟4個(gè)分片上傳
for (let i = 0; i < MAX_SENDING; i++) {
let thread = send({
retry: RETRY_COUNT,
fresh: true,
readyCache: readyCache
});
threadPool.push(thread);
}
}
else if ((isFinished && readyCache.length === 0) || stopSend) {
clearTimeout(sendTimer);
if (!stopSend) {
console.log('got last chunk');
let lastChunk = bufferCache.getRemainChunks();
readyCache.push(lastChunk);
threadPool.push(send({
retry: RETRY_COUNT,
fresh: true,
readyCache: readyCache
}));
}
Promise.all(threadPool).then(() => {
console.log('send success');
}).catch(err => {
console.log('send failed');
});
}
// not ready, wait for next interval
}, 200);
測(cè)試
不能穩(wěn)定運(yùn)行的代碼不是好代碼,寫不出穩(wěn)定運(yùn)行的程序員不是好的程序員。保證軟件質(zhì)量穩(wěn)定可靠,測(cè)試是必不可少的。
文件流轉(zhuǎn)存服務(wù)的單元測(cè)試需要覆蓋2個(gè)方面:
BufferCache的單元測(cè)試
將文件都上傳到測(cè)試服務(wù),并驗(yàn)證上傳前和上傳后的md5值。
BufferCache.js單元測(cè)試
BufferCache最主要的目的就是進(jìn)行分片的緩存與切割,所以我們可以在測(cè)試內(nèi)制造一些測(cè)試數(shù)據(jù)。
由于緩存和獲取是同步進(jìn)行的,所以我們可以用2個(gè)setInterval函數(shù)來(lái)同步插入和獲取。設(shè)置一個(gè)時(shí)間長(zhǎng)度,來(lái)讓setInterval停下來(lái)。最后再將沒有push到bufferCache內(nèi)的數(shù)據(jù)和從push到bufferCache內(nèi)的數(shù)據(jù)值進(jìn)行對(duì)比。
it('bufferCache Test', function (done) {
let bufferCache = new BufferCache(1024 * 10);
var startTime = Date.now();
var originalBuffer = []; // 保存生成的數(shù)據(jù),不放進(jìn)bufferCache
let compiledBuffer = []; // 保存從bufferCache取出的數(shù)據(jù)
let isFinished = false; // 是否結(jié)束
// 寫入的定時(shí)器
let pushTimer = setInterval(() => {
var randomString = [];
// 構(gòu)造模擬數(shù)據(jù)
for (let i = 0; i < 1024; i ++) {
let arr = [];
for (let j = 0; j < 1024; j ++) {
arr.push(j % 10);
}
randomString.push(arr.join(''));
}
let buffer = Buffer.from(randomString.join(''));
// 拷貝buffer對(duì)象,消除對(duì)象引用
let bufferCopy = Buffer.alloc(buffer.length);
buffer.copy(bufferCopy);
originalBuffer.push(bufferCopy);
bufferCache.pushBuf(buffer);
// 該停下來(lái)了
if (Date.now() - startTime > 1000) {
isFinished = true;
clearTimeout(pushTimer);
}
}, 5);
// 讀取的定時(shí)器
let outputTimer = setInterval(() => {
let readyCache = bufferCache.getChunks();
while (readyCache.length > 0) {
let chunk = readyCache.shift();
compiledBuffer.push(chunk);
}
if (isFinished) {
let lastChunk = bufferCache.getRemainChunks();
compiledBuffer.push(lastChunk);
clearTimeout(outputTimer);
// 把2個(gè)buffer都合并
let originBuf = originalBuffer.reduce((total, next) => {
return Buffer.concat([total, next], total.length + next.length);
}, Buffer.alloc(0));
let compiledBuf = compiledBuffer.reduce((total, next) => {
return Buffer.concat([total, next], total.length + next.length);
}, Buffer.alloc(0));
assert.equal(originBuf.length, compiledBuf.length);
assert.equal(originBuf.compare(compiledBuf), 0);
done();
}
}, 10);
});
批量上傳測(cè)試
bluebird模塊的Promise.map函數(shù)可以同時(shí)執(zhí)行多條異步任務(wù),所以只需要簡(jiǎn)單使用Promise.map函數(shù),就能批量調(diào)用getChunks函數(shù),將數(shù)據(jù)發(fā)送到測(cè)試server。
it('upload test', function(done) {
Promise.map(exampleData, (item, index) => {
let md5 = item.md5;
let url = item.url;
return getChunks(url, uploadURL, md5);
}).then(() => {
done();
}).catch(err => {
done(err);
});
});
文件完整性驗(yàn)證
為了驗(yàn)證文件合法性,我在測(cè)試server上專門實(shí)現(xiàn)了一個(gè)接口,傳入上傳時(shí)附帶的filename參數(shù),就能按照分片順序?qū)⒍鄠€(gè)分片合并,并返回整個(gè)文件的md5值。
通過(guò)這個(gè)接口,測(cè)試只需要對(duì)比發(fā)送之前的md5和獲取到的md5是否相同就能判斷文件有沒有在上傳時(shí)候出錯(cuò)誤。
所以測(cè)試用例就只需要連續(xù)調(diào)接口獲取數(shù)據(jù)即可:
// 用Promise把request包裝一下
function getData(url, data) {
return new Promise((resolve, reject) => {
request({
url: url,
method: 'POST',
form: data
}, function (err, response, data) {
if (!err && response.statusCode === 200) {
resolve(data);
}
else {
reject(data);
}
});
});
}
it('download data md5sum test', (done) => {
Promise.each(exampleData, (item, index) => {
let md5 = item.md5;
let url = item.url;
return getData(getMD5URL, {
filename: md5
}).then((serverResponse) => {
serverResponse = JSON.parse(serverResponse);
let serverMd5 = serverResponse.data;
assert.equal(serverMd5, md5);
});
}).then(() => {
done();
}).catch(err => {
done(err);
})
});
server端源碼
總結(jié)
通過(guò)靈活使用Promise和遞歸,我們就能夠很輕松實(shí)現(xiàn)一些非異步模型看來(lái)很復(fù)雜的事情。
沒有了多線程編程,也就沒有了線程調(diào)度,線程狀態(tài)監(jiān)控,死鎖監(jiān)控,讀寫鎖設(shè)計(jì)等復(fù)雜的功能。不過(guò),能做到這一切也都得歸功于Node.js出色的設(shè)計(jì)以及Node.js的幕后英雄 —— libuv 跨平臺(tái)異步I/O庫(kù)
本文章所涉及的源代碼:GitHub - andycall/file-stream-upload-example
本文章測(cè)試需要的服務(wù)端源碼: GitHub - andycall/file-upload-example-server
最后
歡迎加我微信(winty230),拉你進(jìn)技術(shù)群,長(zhǎng)期交流學(xué)習(xí)...
歡迎關(guān)注「前端Q」,認(rèn)真學(xué)前端,做個(gè)專業(yè)的技術(shù)人...


