<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          【Nodejs】897- 使用 Node.js 實現(xiàn)文件流轉(zhuǎn)存服務

          共 20635字,需瀏覽 42分鐘

           ·

          2021-03-14 08:45

          作者:董天成

          原文地址: https://zhuanlan.zhihu.com/p/25367269


          本文你能學到的有關Javascript和Node.js的知識點
          1. Buffer

          2. Stream

          3. setTimeout/setInterval

          4. promise

          5. 使用遞歸的Promise來進行流程控制

          本文章所有的例子都采用 ES6 編寫,可以直接用node version 6.x 以上直接運行,低版本的 node 可以使用 babel 或者 typescript 編譯器編譯之后再運行。

          本文相關的轉(zhuǎn)存服務代碼,單元測試代碼,以及測試服務代碼都在文章底部。

          什么是轉(zhuǎn)存服務

          相信很多同學都用過一個服務叫做圖片轉(zhuǎn)存服務:即向服務器發(fā)送一個圖片的url地址,服務負責去將圖片下載到服務器上,之后再將這個圖片上傳到存儲服務,得到一個可以訪問(通常情況都是CDN服務)的圖片地址。




          但是類似這樣架構的服務有一個軟肋—— 對于超大的文件,性能會明顯不足。

          轉(zhuǎn)存服務在下載文件的時候,二進制會先寫入本地硬盤上的緩存文件中,當文件下載完成之后,再進行上傳操作。但是對于大文件上傳和轉(zhuǎn)存,這個過程將會非常耗時。而且,大文件如果直接一次性上傳,也會導致非常高的失敗率。

          在上傳這地方,業(yè)內(nèi)通常是采用分片上傳來進行解決。

          分片上傳一般是將一個大文件劃分成多個分片,然后通過并行或者串行的方式依次上傳至服務器端。

          如果文件上傳失敗,只需要再重新上傳失敗的分片即可。

          什么是文件流轉(zhuǎn)存服務



          分片上傳解決了上傳可靠性和性能上的問題,但是上傳依然需要等待整個文件都下載完成才能觸發(fā),

          而一個大文件的下載需要很多時間,這依然會造成轉(zhuǎn)存一個大文件時間過長的問題。

          如果能夠在下載到的數(shù)據(jù)量滿足上傳一個分片的時候就直接將分片上傳到接收分片的存儲服務,那是不是就可以達到速度最快,實現(xiàn)文件流轉(zhuǎn)存服務。


          捕獲下載到的數(shù)據(jù)內(nèi)容

          流轉(zhuǎn)存服務實現(xiàn)的第一步即是捕獲下載到的內(nèi)容。Node.js中的stream模塊可以很方便的進行文件的處理,Readable的Stream在接收到數(shù)據(jù)之后,會不斷的觸發(fā)data事件。通過監(jiān)聽Readable的Stream的data事件即可準確獲取到每一次通過Stream進行傳輸?shù)臄?shù)據(jù)。

          由于數(shù)據(jù)來源是通過網(wǎng)絡下載,而且request模塊返回的同樣也是Readable的Stream對象。

          當具備了Readable的Stream之后,只需要創(chuàng)建一個Writable就能數(shù)據(jù)流動起來,它們之間通過pipe函數(shù)進行聯(lián)接。

          'use strict';
          let request = require('request');
          let fs = require('fs');
          let httpStream = request({
          method: 'GET',
          url: 'https://baobao-3d.bj.bcebos.com/16-0-205.shuimian.mp4'
          });
          // 由于不需要獲取最終的文件,所以直接丟掉
          let writeStream = fs.createWriteStream('/dev/null');
          // 聯(lián)接Readable和Writable
          httpStream.pipe(writeStream);
          let totalLength = 0;
          // 當獲取到第一個HTTP請求的響應獲取
          httpStream.on('response', (response) => {
          console.log('response headers is: ', response.headers);
          });
          httpStream.on('data', (chunk) => {
          totalLength += chunk.length;
          console.log('recevied data size: ' + totalLength + 'KB');
          });
          // 下載完成
          writeStream.on('close', () => {
          console.log('download finished');
          });

          運行過程中會看到命令行不斷刷出 recevied data size: xxx KB,直到文件下載完畢,觸發(fā)close事件。

          分片的緩沖

          每次data事件的觸發(fā),獲取到的chunk大小取決于當前的網(wǎng)絡環(huán)境,假設我們設置分片上傳的時候每個分片的固定大小為2MB。但是每次捕獲到的分片有可能會大于2M,也有可能會遠遠小余2M。有的時候下載會比上傳速度要快,如何能更穩(wěn)定更可控的讓上傳能持續(xù)下去,而不會收到下載速度的影響。所以我們需要在上傳和下載之間加一個緩沖區(qū)。

          讓下載到的數(shù)據(jù)無論大小,快慢,統(tǒng)統(tǒng)扔到緩沖區(qū)中。而上傳只需要定時定量從緩沖區(qū)獲取數(shù)據(jù),

          這樣雙方之間就互不影響了。




          Node.js使用Buffer對象來描述一塊數(shù)據(jù)對象,上一節(jié)中每次data事件觸發(fā)的時候,回調(diào)函數(shù)的第一個參數(shù)的值就是一個Buffer對象。Buffer對象的prototype屬性中有一些非常類型數(shù)據(jù)方法的函數(shù),如slice,concat,使用方式也和數(shù)組方法類型。

          于是我們可以實現(xiàn)下面這個類,用于控制緩沖區(qū),方面塞入數(shù)據(jù)和獲取切片。

          /**
          * @file 視頻下載緩沖區(qū)
          */
          class BufferCache {
          constructor (cutSize = 2097152) {
          this._cache = Buffer.alloc(0);
          this.cutSize = cutSize;
          this.readyCache = []; // 緩沖區(qū)
          }
          // 放入不同大小的buffer
          pushBuf (buf) {
          let cacheLength = this._cache.length;
          let bufLength = buf.length;
          this._cache = Buffer.concat([this._cache, buf], cacheLength + bufLength);
          this.cut();
          }
          /**
          * 切分分片,小分片拼成大分片,超大分片切成小分片
          */
          cut () {
          if (this._cache.length >= this.cutSize) {
          let totalLen = this._cache.length;
          let cutCount = Math.floor(totalLen / this.cutSize);
          for (let i = 0; i < cutCount; i++) {
          let newBuf = Buffer.alloc(this.cutSize);
          this._cache.copy(newBuf, 0, i * this.cutSize, (i + 1) * this.cutSize);
          this.readyCache.push(newBuf);
          }

          this._cache = this._cache.slice(cutCount * this.cutSize);
          }
          }
          /**
          * 獲取等長的分片
          * @returns {Array}
          */
          getChunks () {
          return this.readyCache;
          }
          /**
          * 獲取數(shù)據(jù)包的最后一小節(jié)
          * @returns {*}
          */
          getRemainChunks () {
          if (this._cache.length <= this.cutSize) {
          return this._cache;
          }
          else {
          this.cut();
          return this.getRemainChunks();
          }
          }
          }
          module.exports = BufferCache;

          對于下載收到的不定長的buffer,都統(tǒng)統(tǒng)調(diào)用pushBuf方法保存,pushBuf方法每次都會將緩存拼接成一個原始的數(shù)據(jù)段,并每次調(diào)用cut方法,從整個數(shù)據(jù)段中切分出一塊塊規(guī)整的數(shù)據(jù)塊,存儲在一個棧中,等待獲取。

          如何連續(xù)寫入緩存

          由于Readable的Stream的data事件會在stream收到數(shù)據(jù)的時候反復進行觸發(fā),數(shù)據(jù)下載完畢又會觸發(fā)close事件。所以我們通過Javascript的函數(shù)將捕獲下載內(nèi)容的代碼封裝成一個函數(shù)。

          'use strict';
          let request = require('request');
          let fs = require('fs');
          // 引入緩存模塊
          let BufferCache = require('./bufferCache');
          const chunkSplice = 2097152; // 2MB
          let bufferCache = new BufferCache(chunkSplice);
          function getChunks(url, onStartDownload, onDownloading, onDownloadClose) {
          'use strict';
          let totalLength = 0;
          let httpStream = request({
          method: 'GET',
          url: url
          });
          // 由于不需要獲取最終的文件,所以直接丟掉
          let writeStream = fs.createWriteStream('/dev/null');
          // 聯(lián)接Readable和Writable
          httpStream.pipe(writeStream);
          httpStream.on('response', (response) => {
          onStartDownload(response.headers);
          }).on('data', (chunk) => {
          totalLength += chunk.length;
          onDownloading(chunk, totalLength);
          });
          writeStream.on('close', () => {
          onDownloadClose(totalLength);
          });
          }
          function onStart(headers) {
          console.log('start downloading, headers is :', headers);
          }
          function onData(chunk, downloadedLength) {
          console.log('write ' + chunk.length + 'KB into cache');
          // 都寫入緩存中
          bufferCache.pushBuf(chunk);
          }
          function onFinished(totalLength) {
          let chunkCount = Math.ceil(totalLength / chunkSplice);
          console.log('total chunk count is:' + chunkCount);
          }
          getChunks('https://baobao-3d.bj.bcebos.com/16-0-205.shuimian.mp4', onStart, onData, onFinished);

          通過3個傳入的回調(diào)函數(shù),我們就能很容易的掌控:第一個收到請求時觸發(fā)的操作,連續(xù)不斷收到數(shù)據(jù)時觸發(fā)的操作和下載完畢時觸發(fā)的操作。有個這個函數(shù),我們就只需要在接收數(shù)據(jù)的回調(diào)函數(shù)中將buffer都通過pushBuf函數(shù)寫入緩存即可。

          準備發(fā)送

          目前下載數(shù)據(jù)包和緩存都已經(jīng)準備就緒,接下來就是準備進行發(fā)送分片的操作了。但是,還依然存在以下問題:

          1. 如何連續(xù)不斷的從緩存中獲取分片

          2. 如何發(fā)送分片

          3. 單個分片如果上傳失敗,如何重試

          4. 如何在所有分片都上傳完成之后觸發(fā)一個回調(diào)

          5. 如何實現(xiàn)多個分片并行上傳

          下面將逐步講解思路,并提供相關實現(xiàn)代碼。

          連續(xù)不斷獲取分片

          連續(xù)不斷的獲取分片,實現(xiàn)上需要一個定時器來不斷的從緩存中獲取分片。

          Javascript為我們提供好了簡單易用的定時器,setTimeout和setInterval。每次回調(diào)函數(shù)的觸發(fā)都是在上一個時間周期完成之后運行。這樣的機制能保證每次觸發(fā)setTimeout的時候,緩存中或少能塞進一部分數(shù)據(jù)進去。

          當onStart函數(shù)觸發(fā)時,就預示著下載已經(jīng)開始了,這個時候就可以開始進行獲取分片了。

          通過setInterval,設定一個200ms的時間間隔,在每一次時間間隔內(nèi)通過bufferCache.getChunks()方法獲取已經(jīng)切分好的分片。

          最后一個分片是個特殊的情況,一個文件在經(jīng)過多次相同大小的切割之后,總會遺留下小的一塊分片,因此我們還需要對最后一個分片進行特殊處理。當readyCache的長度為0的時候,而且下載已經(jīng)完成,不會再調(diào)用pushBuf函數(shù),就是獲取最后一段分片的時機。

          function onStart(headers) {
          console.log('start downloading, headers is :', headers);
          let readyCache = bufferCache.getChunks();
          let sendTimer = setInterval(() => {
          if (readyCache.length > 0) {
          let receivedChunk = readyCache.shift();
          console.log('received Chunk', receivedChunk);
          }
          else if (isFinished) {
          clearTimeout(sendTimer);
          console.log('got last chunk');
          let lastChunk = bufferCache.getRemainChunks();
          console.log('the last chunk', lastChunk);
          }
          }, 200);
          }

          如何發(fā)送分片

          使用HTTP進行文件上傳,文件在傳輸過程中為一個byte序列,其content-type為multipart/form-data; boundary=----WebKitFormBoundarymqmPgKAUm2XuWnXu, boundary 是作為一個特殊的字符串來對發(fā)送的數(shù)據(jù)包進行分割。上傳的數(shù)據(jù)中即可以包含二進制文件的byte流,也可以包含常規(guī)的字符串鍵值對。

          在瀏覽器上,上傳一個圖片的數(shù)據(jù)格式:


          同樣,nodejs的request模塊也實現(xiàn)了和瀏覽器一樣的上傳文件協(xié)議,所以我們可以先通過Promise封裝一個上傳函數(shù)

          function upload(url, data) {
          return new Promise((resolve, reject) => {
          request.post({
          url: url,
          formData: data
          }, function (err, response, body) {
          if (!err && response.statusCode === 200) {
          resolve(body);
          }
          else {
          reject(err);
          }
          });
          });
          }

          發(fā)送分片的時候,需要間歇不斷的處理2件事情:

          1. 從緩存中拿出分片,直到拿完了,告知發(fā)送端已經(jīng)到底了

          2. 發(fā)送分片,發(fā)送成功,還有分片,繼續(xù)發(fā)送,直到分片都拿完了

          對于這樣的邏輯,我們可以考慮使用遞歸來發(fā)送分片,函數(shù)的參數(shù)傳入readyCache的引用。

          每次調(diào)用函數(shù),都通過引用從readyCache中把隊列最前面的分片拿出,再進行分片發(fā)送,如果分片上傳成功,

          再進行遞歸,依然傳入readyCache的引用,直到readyCache的長度為0。

          由于我們在發(fā)送的時候,使用了setInterval不斷輪詢,當前輪詢的周期為200ms。

          假設當前網(wǎng)絡環(huán)境擁堵,會導致上傳一個分片的時間 > 200ms, 200ms之后下一次輪詢開始運行時,原先的分片還沒上傳完畢,由于沒有一個狀態(tài)值進行判斷,依然會調(diào)用上傳函數(shù),又再一次進行分片上傳,就會更加劇的網(wǎng)絡擁堵環(huán)境,導致分片上傳時間更短。如此反復,時間一長就會導致崩潰,造成分片上傳全部大面積失敗。

          為了避免這樣的情況,我們就需要一個變量來表示當前這個上傳流程的狀態(tài),目前我們只關心單個流程進行上傳,可以只需要保證最大同時上傳的值為1即可。

          function sendChunks() {
          let chunkId = 0; // 給每個分片劃分ID
          let sending = 0; // 當前并行上傳的數(shù)量
          let MAX_SENDING = 1; // 最大并行上傳數(shù)

          function send(readyCache) {
          // 在并行上傳會用到,
          if (readyCache.length === 0) {
          return;
          }
          let chunk = readyCache.shift();
          // 測試使用的服務,用于接收分片
          let sendP = upload('http://localhost:3000', {
          chunk: {
          value: chunk,
          options: {
          // 在文件名稱上添加chunkId,可以方便后端服務進行分片整理
          filename: 'example.mp4_IDSPLIT_' + chunkId
          }
          }
          });
          sending++;
          sendP.then((response) => {
          sending--;
          if (response.errno === 0 && readyCache.length > 0) {
          // 成功上傳,繼續(xù)遞歸
          send(readyCache);
          }
          });
          chunkId++;
          }
          return new Promise((resolve, reject) => {
          let readyCache = bufferCache.getChunks();
          let sendTimer = setInterval(() => {
          let readyCache = bufferCache.getChunks();
          if (sending < MAX_SENDING && readyCache.length > 0) {
          send(readyCache);
          }
          // 如果isFinished 不為true的話,有可能是正在下載分片
          else if (isFinished && readyCache.length === 0) {
          clearTimeout(sendTimer);
          let lastChunk = bufferCache.getRemainChunks();
          readyCache.push(lastChunk);
          send(readyCache);
          }
          // 到這里是為分片正在下載,同時又正在上傳
          // 或者上傳比下載快,已經(jīng)下載好的分片都傳完了,等待下載完成
          }, 200);
          });
          }
          function onStart(headers) {
          // console.log('start downloading, headers is :', headers);
          sendChunks();
          }

          單個分片如果上傳失敗,如何重試

          到目前為止,分片上傳已經(jīng)初步完成,但僅僅是初步完成。因為如果上面的代碼能連續(xù)穩(wěn)定運行十幾年不出bug,是建立在以下情況的:接收端超穩(wěn)定,無論多少分片,多大速率,返回一律成功

          但是現(xiàn)實是殘酷的,當數(shù)量和頻率增加的時候,總會有分片上傳失敗,從而導致正在文件都上傳失敗。

          因此我們需要讓分片上傳都具備重試功能。

          在發(fā)送分片的時候,send函數(shù)可以當成是發(fā)送單個分片的一個控制器,如果分片發(fā)送失敗,最容易捕獲并重試的地方就應該在send函數(shù)內(nèi)部,所以當錯誤發(fā)生時,只需將原先的數(shù)據(jù)保存下來,然后再一次調(diào)用send函數(shù)就能進行重試操作。

          這樣的邏輯,我們可以簡化成下面這段JS代碼。

          let max = 4;

          function send() {
          return new Promise((resolve, reject) => {
          if (max > 2) {
          reject(new Error('error!!'));
          }
          else {
          resolve('ok');
          }
          }).catch(() => {
          max--;
          return send();
          });
          }

          send().then(() => {
          console.log('finished');
          }).catch(() => {
          console.log('error');
          });

          當Max > 2的時候,Promise就會返回異常,所以在最初的2次調(diào)用,Promise都會觸發(fā)catch函數(shù)。不過在每次catch的時候,再遞歸函數(shù),之前錯誤的Promise就能夠被遞歸創(chuàng)建的新的Promise處理,直到這個Promise能夠成功返回。我們只需簡單控制max的值,就能控制處理錯誤的次數(shù)。這樣就能將錯誤重試控制都包含在send函數(shù)內(nèi)部。

          所以我們也可以使用這樣的邏輯來進行分片的發(fā)送,當請求出現(xiàn)錯誤的時候,在catch函數(shù)內(nèi)判斷重試次數(shù),次數(shù)若大于0,則再返回一個遞歸的send函數(shù),直到次數(shù)等于0,直接用Promise.reject將異常拋出Promise。

          如果接收服務一直存在問題,導致多次上傳全部失敗的話,需要直接終止當前問題,于是我們還需要一個變量stopSend,用于在多次錯誤之后,直接停止上傳。

          function sendChunks() {
          let chunkId = 0;
          let sending = 0; // 當前并行上傳的數(shù)量
          let MAX_SENDING = 1; // 最大并行上傳數(shù)
          let stopSend = false;

          function send(options) {
          let readyCache = options.readyCache;
          let fresh = options.fresh;
          let retryCount = options.retry;
          let chunkIndex;

          let chunk = null;

          // 新的數(shù)據(jù)
          if (fresh) {
          if (readyCache.length === 0) {
          return;
          }

          chunk = readyCache.shift();
          chunkIndex = chunkId;
          chunkId++;
          }
          // 失敗重試的數(shù)據(jù)
          else {
          chunk = options.data;
          chunkIndex = options.index;
          }

          sending++;
          let sendP = upload('http://localhost:3000', {
          chunk: {
          value: chunk,
          options: {
          filename: 'example.mp4_IDSPLIT_' + chunkIndex
          }
          }
          }).then((response) => {
          sending--;
          let json = JSON.parse(response);

          if (json.errno === 0 && readyCache.length > 0) {
          return send({
          retry: RETRY_COUNT,
          fresh: true,
          readyCache: readyCache
          });
          }

          // 這里一直返回成功
          return Promise.resolve(json);
          }).catch(err => {
          if (retryCount > 0) {
          // 這里遞歸下去,如果成功的話,就等同于錯誤已經(jīng)處理
          return send({
          retry: retryCount - 1,
          index: chunkIndex,
          fresh: false,
          data: chunk,
          readyCache: readyCache
          });
          }
          else {
          console.log(`upload failed of chunkIndex: ${chunkIndex}`);
          // 停止上傳標識,會直接停止上傳
          stopSend = true;
          // 返回reject,異常拋出
          return Promise.reject(err);
          }
          });
          }

          return new Promise((resolve, reject) => {
          let readyCache = bufferCache.getChunks();

          let sendTimer = setInterval(() => {
          if (sending < MAX_SENDING && readyCache.length > 0) {
          // 改用傳入對象
          send({
          retry: 3, // 最大重試3次
          fresh: true, // 用這個字段來區(qū)分是新的分片,還是由于失敗重試的
          readyCache: readyCache
          }).catch(err => {
          console.log('upload failed, errmsg: ', err);
          });
          }
          else if (isFinished && readyCache.length === 0 || stopSend) {
          clearTimeout(sendTimer);

          // 已經(jīng)成功走到最后一個分片了。
          if (!stopSend) {
          let lastChunk = bufferCache.getRemainChunks();
          readyCache.push(lastChunk);

          send({
          retry: 3,
          fresh: true,
          readyCache: readyCache
          }).catch(err => {
          console.log('upload failed, errmsg: ', err);
          });
          }
          }

          // 到這里是為分片正在下載,同時又正在上傳
          // 或者上傳比下載快,已經(jīng)下載好的分片都傳完了,等待下載完成
          }, 200);
          });
          }

          在錯誤模擬上面,我們可以在在測試的server上加了幾行代碼來模擬上傳失敗的情況,當?shù)诙€分片到達的時候,一定會失敗。之后我們得到的日志如下:

          // 錯誤處理測試使用
          if (chunkIndex == 1) {
          console.log(`set failed of ${chunkIndex}`);
          this.status = 500;
          return;
          }

          server端得到的日志如下:

            <-- POST /
          uploading example.mp4_IDSPLIT_0 -> /Users/baidu/baiduYun/learn/koa-example/receive/example.mp4/0
          --> POST / 200 93ms 25b
          <-- POST /
          set failed of 1
          --> POST / 500 9ms -
          <-- POST /
          set failed of 1
          --> POST / 500 15ms -
          <-- POST /
          set failed of 1
          --> POST / 500 7ms -
          <-- POST /
          set failed of 1
          --> POST / 500 14ms -

          可見,在上傳失敗之后,當前分片會自動進行重試上傳,直到超出重試次數(shù),再直接拋出異常。

          如何在所有分片都上傳完成之后觸發(fā)一個回調(diào)

          到目前為止,整個服務的核心部分已經(jīng)差不多了,send函數(shù)無論怎么調(diào)用,都會返回Promise對象,所以在所有分片都上傳完成之后觸發(fā)一個回調(diào)也就很容易了,只需要將所有的send函數(shù)返回的Promise對象放進數(shù)組,然后通過Promise.all函數(shù)來捕獲即可,可見,基建搭的好,上層建筑建設也就輕而易舉了。

          所以我們只需要更改sendTimer這個定時器內(nèi)部的代碼即可。

          let readyCache = bufferCache.getChunks();
          let sendPromise = [];
          let sendTimer = setInterval(() => {
          if (sending < MAX_SENDING && readyCache.length > 0) {
          // 把Promise塞進數(shù)組
          sendPromise.push(send({
          retry: RETRY_COUNT,
          fresh: true,
          readyCache: readyCache
          }));
          }
          else if ((isFinished && readyCache.length === 0) || stopSend) {
          clearTimeout(sendTimer);
          if (!stopSend) {
          console.log('got last chunk');
          let lastChunk = bufferCache.getRemainChunks();
          readyCache.push(lastChunk);
          // 把Promise塞進數(shù)組
          sendPromise.push(send({
          retry: RETRY_COUNT,
          fresh: true,
          readyCache: readyCache
          }));
          }
          // 當所有的分片都發(fā)送之后觸發(fā),
          Promise.all(sendPromise).then(() => {
          console.log('send success');
          }).catch(err => {
          console.log('send failed');
          });
          }
          // not ready, wait for next interval
          }, 200);

          如何實現(xiàn)多個分片并行上傳

          Node.js提供事件驅(qū)動和非阻塞I/O可不是用來寫callbackHell的。有了這2個利器,我們可以輕松在一個進程上使用一個線程調(diào)度,控制多個I/O操作。這樣的設計就無需使用多線程編程,也就不用關心鎖之類的東西了。

          實現(xiàn)多個分片上傳,所以只需要直接創(chuàng)建多個HTTP連接進行上傳,多個上傳操作同享一個readyCache。

          而目前我們實現(xiàn)的send函數(shù)可以讓一個分片上傳自我控制,同樣,同時調(diào)用多次send函數(shù)也就等同于讓多個分片進行自我控制。而且多個send函數(shù)運行在同一個node.js進程上,所以對共享的reayCache的獲取是一個串行的操作(nodejs進程在一個事件輪詢周期中會依次執(zhí)行多個send函數(shù))。也就不可能出現(xiàn)多個send函數(shù)對readyCache的競爭造成死鎖這樣的情況。

          可見,單進程異步輪詢這樣的設計方案,能完全避免死鎖這樣的情況。

          所以直接把調(diào)用send函數(shù)平行擴展:

          let readyCache = bufferCache.getChunks();
          let threadPool = [];
          let sendTimer = setInterval(() => {
          if (sending < MAX_SENDING && readyCache.length > 0) {
          // 這個例子同時開啟4個分片上傳
          for (let i = 0; i < MAX_SENDING; i++) {
          let thread = send({
          retry: RETRY_COUNT,
          fresh: true,
          readyCache: readyCache
          });
          threadPool.push(thread);
          }
          }
          else if ((isFinished && readyCache.length === 0) || stopSend) {
          clearTimeout(sendTimer);
          if (!stopSend) {
          console.log('got last chunk');
          let lastChunk = bufferCache.getRemainChunks();
          readyCache.push(lastChunk);
          threadPool.push(send({
          retry: RETRY_COUNT,
          fresh: true,
          readyCache: readyCache
          }));
          }
          Promise.all(threadPool).then(() => {
          console.log('send success');
          }).catch(err => {
          console.log('send failed');
          });
          }
          // not ready, wait for next interval
          }, 200);

          測試

          不能穩(wěn)定運行的代碼不是好代碼,寫不出穩(wěn)定運行的程序員不是好的程序員。保證軟件質(zhì)量穩(wěn)定可靠,測試是必不可少的。

          文件流轉(zhuǎn)存服務的單元測試需要覆蓋2個方面:

          1. BufferCache的單元測試

          2. 將文件都上傳到測試服務,并驗證上傳前和上傳后的md5值。

          BufferCache.js單元測試

          BufferCache最主要的目的就是進行分片的緩存與切割,所以我們可以在測試內(nèi)制造一些測試數(shù)據(jù)。

          由于緩存和獲取是同步進行的,所以我們可以用2個setInterval函數(shù)來同步插入和獲取。設置一個時間長度,來讓setInterval停下來。最后再將沒有push到bufferCache內(nèi)的數(shù)據(jù)和從push到bufferCache內(nèi)的數(shù)據(jù)值進行對比。

          it('bufferCache Test', function (done) {
          let bufferCache = new BufferCache(1024 * 10);
          var startTime = Date.now();
          var originalBuffer = []; // 保存生成的數(shù)據(jù),不放進bufferCache
          let compiledBuffer = []; // 保存從bufferCache取出的數(shù)據(jù)
          let isFinished = false; // 是否結(jié)束
          // 寫入的定時器
          let pushTimer = setInterval(() => {
          var randomString = [];
          // 構造模擬數(shù)據(jù)
          for (let i = 0; i < 1024; i ++) {
          let arr = [];
          for (let j = 0; j < 1024; j ++) {
          arr.push(j % 10);
          }
          randomString.push(arr.join(''));
          }
          let buffer = Buffer.from(randomString.join(''));

          // 拷貝buffer對象,消除對象引用
          let bufferCopy = Buffer.alloc(buffer.length);
          buffer.copy(bufferCopy);

          originalBuffer.push(bufferCopy);
          bufferCache.pushBuf(buffer);
          // 該停下來了
          if (Date.now() - startTime > 1000) {
          isFinished = true;
          clearTimeout(pushTimer);
          }
          }, 5);
          // 讀取的定時器
          let outputTimer = setInterval(() => {
          let readyCache = bufferCache.getChunks();
          while (readyCache.length > 0) {
          let chunk = readyCache.shift();
          compiledBuffer.push(chunk);
          }
          if (isFinished) {
          let lastChunk = bufferCache.getRemainChunks();
          compiledBuffer.push(lastChunk);
          clearTimeout(outputTimer);
          // 把2個buffer都合并
          let originBuf = originalBuffer.reduce((total, next) => {
          return Buffer.concat([total, next], total.length + next.length);
          }, Buffer.alloc(0));
          let compiledBuf = compiledBuffer.reduce((total, next) => {
          return Buffer.concat([total, next], total.length + next.length);
          }, Buffer.alloc(0));
          assert.equal(originBuf.length, compiledBuf.length);
          assert.equal(originBuf.compare(compiledBuf), 0);
          done();
          }
          }, 10);
          });

          批量上傳測試

          bluebird模塊的Promise.map函數(shù)可以同時執(zhí)行多條異步任務,所以只需要簡單使用Promise.map函數(shù),就能批量調(diào)用getChunks函數(shù),將數(shù)據(jù)發(fā)送到測試server。

          it('upload test', function(done) {
          Promise.map(exampleData, (item, index) => {
          let md5 = item.md5;
          let url = item.url;
          return getChunks(url, uploadURL, md5);
          }).then(() => {
          done();
          }).catch(err => {
          done(err);
          });
          });

          文件完整性驗證

          為了驗證文件合法性,我在測試server上專門實現(xiàn)了一個接口,傳入上傳時附帶的filename參數(shù),就能按照分片順序?qū)⒍鄠€分片合并,并返回整個文件的md5值。

          通過這個接口,測試只需要對比發(fā)送之前的md5和獲取到的md5是否相同就能判斷文件有沒有在上傳時候出錯誤。

          所以測試用例就只需要連續(xù)調(diào)接口獲取數(shù)據(jù)即可:

          // 用Promise把request包裝一下
          function getData(url, data) {
          return new Promise((resolve, reject) => {
          request({
          url: url,
          method: 'POST',
          form: data
          }, function (err, response, data) {
          if (!err && response.statusCode === 200) {
          resolve(data);
          }
          else {
          reject(data);
          }
          });
          });
          }
          it('download data md5sum test', (done) => {
          Promise.each(exampleData, (item, index) => {
          let md5 = item.md5;
          let url = item.url;
          return getData(getMD5URL, {
          filename: md5
          }).then((serverResponse) => {
          serverResponse = JSON.parse(serverResponse);
          let serverMd5 = serverResponse.data;
          assert.equal(serverMd5, md5);
          });
          }).then(() => {
          done();
          }).catch(err => {
          done(err);
          })
          });

          server端源碼

          總結(jié)

          通過靈活使用Promise和遞歸,我們就能夠很輕松實現(xiàn)一些非異步模型看來很復雜的事情。

          沒有了多線程編程,也就沒有了線程調(diào)度,線程狀態(tài)監(jiān)控,死鎖監(jiān)控,讀寫鎖設計等復雜的功能。不過,能做到這一切也都得歸功于Node.js出色的設計以及Node.js的幕后英雄 —— libuv 跨平臺異步I/O庫

          本文章所涉及的源代碼:GitHub - andycall/file-stream-upload-example

          本文章測試需要的服務端源碼: GitHub - andycall/file-upload-example-server


          1. JavaScript 重溫系列(22篇全)
          2. ECMAScript 重溫系列(10篇全)
          3. JavaScript設計模式 重溫系列(9篇全)
          4. 正則 / 框架 / 算法等 重溫系列(16篇全)
          5. Webpack4 入門(上)|| Webpack4 入門(下)
          6. MobX 入門(上) ||  MobX 入門(下)
          7. 100+篇原創(chuàng)系列匯總

          回復“加群”與大佬們一起交流學習~

          點擊“閱讀原文”查看 100+ 篇原創(chuàng)文章


          瀏覽 43
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  在线波多野结衣 | 99视频在线观看6 | 激情乱伦中文字幕 | 激情五月天视频 | a片一级富二代表兄妹淫乱新春 |