大文件的分片上傳、斷點(diǎn)續(xù)傳及其相關(guān)拓展
點(diǎn)擊上方?前端Q,關(guān)注公眾號
回復(fù)加群,加入前端Q技術(shù)交流群
大文件的分片上傳、斷點(diǎn)續(xù)傳及其相關(guān)拓展
大文件分片上傳核心方法
在JavaScript中,文件FIle對象是Blob對象的子類,Blob對象包含一個(gè)重要的方法slice通過這個(gè)方法,我們就可以對二進(jìn)制文件進(jìn)行拆分 使用 FormData 格式進(jìn)行上傳 服務(wù)端接口接受到數(shù)據(jù),通過 multiparty 庫對數(shù)據(jù)進(jìn)行處理 區(qū)分 files 和 fields,通過 fse.move 將上傳的文件移動到目標(biāo)路徑下 客戶端使用 Promise.all 方法,當(dāng)監(jiān)聽到所有切片已上傳完,調(diào)用 merge 接口,通知服務(wù)端進(jìn)行切片的合并 使用 Stream 對切片邊讀邊寫,設(shè)置可寫流的 start Promise.all判斷所有切片是否寫入完畢
進(jìn)度條
使用瀏覽器 XMLHttpRequest 的 onprogress 的方法對進(jìn)度進(jìn)行監(jiān)聽
//?作為request的入?yún)?br>const?xhr?=?new?XMLHttpRequest();
xhr.upload.onprogress?=?onProgress;
//?回調(diào)方法
onProgress:?this.createProgressHandler(this.data[index])
//?接受回調(diào),通過?e.loaded?和?e.total?獲取進(jìn)度
createProgressHandler(item)?{
??return?(e)?=>?{
????item.percentage?=?parseInt(String((e.loaded?/?e.total)?*?100));
??};
},
復(fù)制代碼
斷點(diǎn)續(xù)傳核心方法
1、通過xhr的 abort 方法,主動放棄當(dāng)前請求
this.requestList.forEach((xhr)?=>?xhr?.abort());
復(fù)制代碼
2、番外篇:斷點(diǎn)續(xù)傳服務(wù)端做法
當(dāng)用戶在聽一首歌的時(shí)候,如果聽到一半(網(wǎng)絡(luò)下載了一半),網(wǎng)絡(luò)斷掉了,用戶需要繼續(xù)聽的時(shí)候,文件服務(wù)器不支持?jǐn)帱c(diǎn)的話,則用戶需要重新下載這個(gè)文件。而Range支持的話,客戶端應(yīng)該記錄了之前已經(jīng)讀取的文件范圍,網(wǎng)絡(luò)恢復(fù)之后,則向服務(wù)器發(fā)送讀取剩余Range的請求,服務(wù)端只需要發(fā)送客戶端請求的那部分內(nèi)容,而不用整個(gè)文件發(fā)送回客戶端,以此節(jié)省網(wǎng)絡(luò)帶寬。 如果Server支持Range,首先就要告訴客戶端,咱支持Range,之后客戶端才可能發(fā)起帶Range的請求。這里套用唐僧的一句話,你不說我怎么知道呢。response.setHeader('Accept-Ranges', 'bytes'); Server通過請求頭中的Range: bytes=0-xxx來判斷是否是做Range請求,如果這個(gè)值存在而且有效,則只發(fā)回請求的那部分文件內(nèi)容,響應(yīng)的狀態(tài)碼變成206,表示Partial Content,并設(shè)置Content-Range。如果無效,則返回416狀態(tài)碼,表明Request Range Not Satisfiable(www.w3.org/Protocols/r…[1] )。如果不包含Range的請求頭,則繼續(xù)通過常規(guī)的方式響應(yīng)。
getStream(req,?res,?filepath,?fileStat)?{
????res.setHeader('Accept-Range',?'bytes');?//告訴客戶端服務(wù)器支持Range
????let?range?=?req.headers['range'];
????let?start?=?0;
????let?end?=?fileStat.size;
????if?(range)?{
????????let?reg?=?/bytes=(\d*)-(\d*)/;
????????let?result?=?range.match(reg);
????????if?(result)?{
????????????start?=?isNaN(result[1])???0?:?parseInt(result[1]);
????????????end?=?isNaN(result[2])???0?:?parseInt(result[2]);
????????}
????};
????debug(`start=${start},end=${end}`);
????return?fs.createReadStream(filepath,?{
????????start,
????????end
????});
}
復(fù)制代碼
提高篇
時(shí)間切片計(jì)算文件hash:計(jì)算hash耗時(shí)的問題,不僅可以通過web-workder,還可以參考React的Fiber架構(gòu),通過requestIdleCallback來利用瀏覽器的空閑時(shí)間計(jì)算,也不會卡死主線程抽樣hash:文件hash的計(jì)算,是為了判斷文件是否存在,進(jìn)而實(shí)現(xiàn)秒傳的功能,所以我們可以參考布隆過濾器的理念, 犧牲一點(diǎn)點(diǎn)的識別率來換取時(shí)間,比如我們可以抽樣算hash根據(jù)文件名 + 文件修改時(shí)間 + size 生成hash網(wǎng)絡(luò)請求并發(fā)控制:大文件由于切片過多,過多的HTTP鏈接過去,也會把瀏覽器打掛, 我們可以通過控制異步請求的并發(fā)數(shù)來解決,這也是頭條的一個(gè)面試題慢啟動策略:由于文件大小不一,我們每個(gè)切片的大小設(shè)置成固定的也有點(diǎn)略顯笨拙,我們可以參考TCP協(xié)議的慢啟動策略, 設(shè)置一個(gè)初始大小,根據(jù)上傳任務(wù)完成的時(shí)候,來動態(tài)調(diào)整下一個(gè)切片的大小, 確保文件切片的大小和當(dāng)前網(wǎng)速匹配并發(fā)重試+報(bào)錯(cuò):并發(fā)上傳中,報(bào)錯(cuò)如何重試,比如每個(gè)切片我們允許重試兩次,三次再終止文件碎片清理
1、時(shí)間切片計(jì)算文件hash
其實(shí)就是time-slice概念,React中Fiber架構(gòu)的核心理念,利用瀏覽器的空閑時(shí)間,計(jì)算大的diff過程,中途又任何的高優(yōu)先級任務(wù),比如動畫和輸入,都會中斷diff任務(wù), 雖然整個(gè)計(jì)算量沒有減小,但是大大提高了用戶的交互體驗(yàn)
requestIdleCallback

requestIdelCallback(myNonEssentialWork);
function?myNonEssentialWork?(deadline)?{
??//?deadline.timeRemaining()可以獲取到當(dāng)前幀剩余時(shí)間
??//?當(dāng)前幀還有時(shí)間?并且任務(wù)隊(duì)列不為空
??while?(deadline.timeRemaining()?>?0?&&?tasks.length?>?0)?{
????doWorkIfNeeded();
??}
??if?(tasks.length?>?0){
????requestIdleCallback(myNonEssentialWork);
??}
}
復(fù)制代碼
2、抽樣hash
計(jì)算文件md5值的作用,無非就是為了判定文件是否存在,我們可以考慮設(shè)計(jì)一個(gè)抽樣的hash,犧牲一些命中率的同時(shí),提升效率,設(shè)計(jì)思路如下
文件切成大小為 XXX Mb的切片 第一個(gè)和最后一個(gè)切片全部內(nèi)容,其他切片的取 首中尾三個(gè)地方各2個(gè)字節(jié) 合并后的內(nèi)容,計(jì)算md5,稱之為影分身Hash 這個(gè)hash的結(jié)果,就是文件存在,有小概率誤判,但是如果不存在,是100%準(zhǔn)的的 ,和 布隆過濾器的思路有些相似, 可以考慮兩個(gè)hash配合使用我在自己電腦上試了下1.5G的文件,全量大概要20秒,抽樣大概1秒還是很不錯(cuò)的, 可以先用來判斷文件是不是不存在

3、根據(jù)文件名 + 文件修改時(shí)間 + size 生成hash
可根據(jù)File的lastModified、name、size生成hash,避免通過spark-md5對大文件進(jìn)行hash計(jì)算,大大的節(jié)省時(shí)間
lastModified:?1633436262311
lastModifiedDate:?Tue?Oct?05?2021?20:17:42?GMT+0800?(中國標(biāo)準(zhǔn)時(shí)間)?{}
name:?"2021.docx"
size:?1696681
type:?"application/vnd.openxmlformats-officedocument.wordprocessingml.document"
復(fù)制代碼
4、網(wǎng)絡(luò)請求并發(fā)控制
大文件hash計(jì)算后,一次發(fā)幾百個(gè)http請求,計(jì)算哈希沒卡,結(jié)果TCP建立的過程就把瀏覽器弄死了
思路其實(shí)也不難,就是我們把異步請求放在一個(gè)隊(duì)列里,比如并發(fā)數(shù)是3,就先同時(shí)發(fā)起3個(gè)請求,然后有請求結(jié)束了,再發(fā)起下一個(gè)請求即可
我們通過并發(fā)數(shù)max來管理并發(fā)數(shù),發(fā)起一個(gè)請求max--,結(jié)束一個(gè)請求max++即可
async?sendRequest(forms,?max=4)?{
??return?new?Promise(resolve?=>?{
????const?len?=?forms.length;
????let?idx?=?0;
????let?counter?=?0;
????const?start?=?async?()=>?{
??????//?有請求,有通道
??????while?(idx??0)?{
????????max--;?//?占用通道
????????console.log(idx,?"start");
????????const?form?=?forms[idx].form;
????????const?index?=?forms[idx].index;
????????idx++
????????request({
??????????url:?'/upload',
??????????data:?form,
??????????onProgress:?this.createProgresshandler(this.chunks[index]),
??????????requestList:?this.requestList
????????}).then(()?=>?{
??????????max++;?//?釋放通道
??????????counter++;
??????????if?(counter?===?len)?{
????????????resolve();
??????????}?else?{
????????????start();
??????????}
????????});
??????}
????}
????start();
??});
}
復(fù)制代碼
5、慢啟動策略實(shí)現(xiàn)
chunk中帶上size值,不過進(jìn)度條數(shù)量不確定了,修改createFileChunk, 請求加上時(shí)間統(tǒng)計(jì) 比如我們理想是30秒傳遞一個(gè) 初始大小定為1M,如果上傳花了10秒,那下一個(gè)區(qū)塊大小變成3M 如果上傳花了60秒,那下一個(gè)區(qū)塊大小變成500KB 以此類推
6、并發(fā)重試+報(bào)錯(cuò)
請求出錯(cuò).catch 把任務(wù)重新放在隊(duì)列中 出錯(cuò)后progress設(shè)置為-1 進(jìn)度條顯示紅色 數(shù)組存儲每個(gè)文件hash請求的重試次數(shù),做累加 比如[1,0,2],就是第0個(gè)文件切片報(bào)錯(cuò)1次,第2個(gè)報(bào)錯(cuò)2次 超過3的直接reject
7、服務(wù)器碎片文件清理
如果很多人傳了一半就離開了,這些切片存在就沒意義了,可以考慮定期清理
我們可以使用 node-schedule 來管理定時(shí)任務(wù) 比如我們每天掃一次存放文件目錄,如果文件的修改時(shí)間是一個(gè)月以前了,就直接刪除把
//?為了方便測試,我改成每5秒掃一次,?過期1鐘的刪除做演示
const?fse?=?require('fs-extra')
const?path?=?require('path')
const?schedule?=?require('node-schedule')
//?空目錄刪除
function?remove(file,stats){
????const?now?=?new?Date().getTime()
????const?offset?=?now?-?stats.ctimeMs?
????if(offset>1000*60){
????????//?大于60秒的碎片
????????console.log(file,'過期了,浪費(fèi)空間的玩意,刪除')
????????fse.unlinkSync(file)
????}
}
async?function?scan(dir,callback){
????const?files?=?fse.readdirSync(dir)
????files.forEach(filename=>{
????????const?fileDir?=?path.resolve(dir,filename)
????????const?stats?=?fse.statSync(fileDir)
????????if(stats.isDirectory()){
????????????return?scan(fileDir,remove)
????????}
????????if(callback){
????????????callback(fileDir,stats)
????????}
????})
}
//?*????*????*????*????*????*
//?┬????┬????┬????┬????┬????┬
//?│????│????│????│????│????│
//?│????│????│????│????│????└?day?of?week?(0?-?7)?(0?or?7?is?Sun)
//?│????│????│????│????└─────?month?(1?-?12)
//?│????│????│????└──────────?day?of?month?(1?-?31)
//?│????│????└───────────────?hour?(0?-?23)
//?│????└────────────────────?minute?(0?-?59)
//?└─────────────────────────?second?(0?-?59,?OPTIONAL)
let?start?=?function(UPLOAD_DIR){
????//?每5秒
????schedule.scheduleJob("*/5?*?*?*?*?*",function(){
????????console.log('開始掃描')
????????scan(UPLOAD_DIR)
????})
}
exports.start?=?start
復(fù)制代碼
客戶端核心代碼
??"app">
????
??????????????type="file"
????????:disabled="status?!==?Status.wait"
????????@change="handleFileChange"
??????/>
??????"handleUpload"?:disabled="uploadDisabled"
????????>上傳 ??????>
??????"handleResume"?v-if="status?===?Status.pause"
????????>恢復(fù) ??????>
??????????????v-else
????????:disabled="status?!==?Status.uploading?||?!container.hash"
????????@click="handlePause"
????????>暫停 ??????>
????
????
??????計(jì)算文件?hash
??????"hashPercentage">
??????總進(jìn)度
??????"fakeUploadPercentage">
????
????"data">
??????????????prop="hash"
????????label="切片hash"
????????align="center"
??????>
??????"大小(KB)"?align="center"?width="120">
????????"{?row?}">
??????????{{?row.size?|?transformByte?}}
????????
??????
??????"進(jìn)度"?align="center">
????????"{?row?}">
??????????????????????:percentage="row.percentage"
????????????color="#909399"
??????????>
????????
??????
????
??
復(fù)制代碼
服務(wù)端核心代碼
index.js
const?Controller?=?require("./controller");
const?http?=?require("http");
const?server?=?http.createServer();
const?controller?=?new?Controller();
server.on("request",?async?(req,?res)?=>?{
??res.setHeader("Access-Control-Allow-Origin",?"*");
??res.setHeader("Access-Control-Allow-Headers",?"*");
??if?(req.method?===?"OPTIONS")?{
????res.status?=?200;
????res.end();
????return;
??}
??if?(req.url?===?"/verify")?{
????await?controller.handleVerifyUpload(req,?res);
????return;
??}
??if?(req.url?===?"/merge")?{
????await?controller.handleMerge(req,?res);
????return;
??}
??if?(req.url?===?"/")?{
????await?controller.handleFormData(req,?res);
??}
});
server.listen(3000,?()?=>?console.log("正在監(jiān)聽?3000?端口"));
復(fù)制代碼
controller.js
const?multiparty?=?require("multiparty");
const?path?=?require("path");
const?fse?=?require("fs-extra");
const?extractExt?=?(filename)?=>
??filename.slice(filename.lastIndexOf("."),?filename.length);?//?提取后綴名
const?UPLOAD_DIR?=?path.resolve(__dirname,?"..",?"target");?//?大文件存儲目錄
const?pipeStream?=?(path,?writeStream)?=>
??new?Promise((resolve)?=>?{
????const?readStream?=?fse.createReadStream(path);
????readStream.on("end",?()?=>?{
??????fse.unlinkSync(path);
??????resolve();
????});
????readStream.pipe(writeStream);
??});
//?合并切片
const?mergeFileChunk?=?async?(filePath,?fileHash,?size)?=>?{
??const?chunkDir?=?path.resolve(UPLOAD_DIR,?fileHash);
??const?chunkPaths?=?await?fse.readdir(chunkDir);
??//?根據(jù)切片下標(biāo)進(jìn)行排序
??//?否則直接讀取目錄的獲得的順序可能會錯(cuò)亂
??chunkPaths.sort((a,?b)?=>?a.split("-")[1]?-?b.split("-")[1]);
??await?Promise.all(
????chunkPaths.map((chunkPath,?index)?=>
??????pipeStream(
????????path.resolve(chunkDir,?chunkPath),
????????//?指定位置創(chuàng)建可寫流
????????fse.createWriteStream(filePath,?{
??????????start:?index?*?size,
??????????end:?(index?+?1)?*?size,
????????})
??????)
????)
??);
??fse.rmdirSync(chunkDir);?//?合并后刪除保存切片的目錄
};
const?resolvePost?=?(req)?=>
??new?Promise((resolve)?=>?{
????let?chunk?=?"";
????req.on("data",?(data)?=>?{
??????chunk?+=?data;
????});
????req.on("end",?()?=>?{
??????resolve(JSON.parse(chunk));
????});
??});
//?返回已經(jīng)上傳切片名
const?createUploadedList?=?async?(fileHash)?=>
??fse.existsSync(path.resolve(UPLOAD_DIR,?fileHash))
??????await?fse.readdir(path.resolve(UPLOAD_DIR,?fileHash))
????:?[];
module.exports?=?class?{
??//?合并切片
??async?handleMerge(req,?res)?{
????const?data?=?await?resolvePost(req);
????const?{?fileHash,?filename,?size?}?=?data;
????const?ext?=?extractExt(filename);
????const?filePath?=?path.resolve(UPLOAD_DIR,?`${fileHash}${ext}`);
????await?mergeFileChunk(filePath,?fileHash,?size);
????res.end(
??????JSON.stringify({
????????code:?0,
????????message:?"file?merged?success",
??????})
????);
??}
??//?處理切片
??async?handleFormData(req,?res)?{
????const?multipart?=?new?multiparty.Form();
????multipart.parse(req,?async?(err,?fields,?files)?=>?{
??????if?(err)?{
????????console.error(err);
????????res.status?=?500;
????????res.end("process?file?chunk?failed");
????????return;
??????}
??????const?[chunk]?=?files.chunk;
??????const?[hash]?=?fields.hash;
??????const?[fileHash]?=?fields.fileHash;
??????const?[filename]?=?fields.filename;
??????const?filePath?=?path.resolve(
????????UPLOAD_DIR,
????????`${fileHash}${extractExt(filename)}`
??????);
??????const?chunkDir?=?path.resolve(UPLOAD_DIR,?fileHash);
??????//?文件存在直接返回
??????if?(fse.existsSync(filePath))?{
????????res.end("file?exist");
????????return;
??????}
??????//?切片目錄不存在,創(chuàng)建切片目錄
??????if?(!fse.existsSync(chunkDir))?{
????????await?fse.mkdirs(chunkDir);
??????}
??????//?fs-extra?專用方法,類似?fs.rename?并且跨平臺
??????//?fs-extra?的?rename?方法?windows?平臺會有權(quán)限問題
??????//?https://github.com/meteor/meteor/issues/7852#issuecomment-255767835
??????await?fse.move(chunk.path,?path.resolve(chunkDir,?hash));
??????res.end("received?file?chunk");
????});
??}
??//?驗(yàn)證是否已上傳/已上傳切片下標(biāo)
??async?handleVerifyUpload(req,?res)?{
????const?data?=?await?resolvePost(req);
????const?{?fileHash,?filename?}?=?data;
????const?ext?=?extractExt(filename);
????const?filePath?=?path.resolve(UPLOAD_DIR,?`${fileHash}${ext}`);
????if?(fse.existsSync(filePath))?{
??????res.end(
????????JSON.stringify({
??????????shouldUpload:?false,
????????})
??????);
????}?else?{
??????res.end(
????????JSON.stringify({
??????????shouldUpload:?true,
??????????uploadedList:?await?createUploadedList(fileHash),
????????})
??????);
????}
??}
};
文章來由掘金@火車頭授權(quán)分享,https://juejin.cn/post/7071877982574346277

往期推薦



最后
歡迎加我微信,拉你進(jìn)技術(shù)群,長期交流學(xué)習(xí)...
歡迎關(guān)注「前端Q」,認(rèn)真學(xué)前端,做個(gè)專業(yè)的技術(shù)人...


