文件上傳下載原理:http協(xié)議分析及實(shí)現(xiàn)
我們現(xiàn)在用得非常多互聯(lián)網(wǎng)下載文件,非常直觀。有一個(gè)下載按鈕,然后我點(diǎn)擊了下載,然后文件慢慢就下載到本地了。就好像是一個(gè)復(fù)制的過程。
而既然是互聯(lián)網(wǎng),那么必然會(huì)是使用網(wǎng)絡(luò)進(jìn)行傳輸?shù)?。那么到底是怎樣傳輸?shù)哪兀?/span>
當(dāng)然,下載文件有兩種方式:一是直接針對某個(gè)文件資源進(jìn)行下載,無需應(yīng)用開發(fā)代碼;二是應(yīng)用代碼臨時(shí)生成需要的內(nèi)容文件,然后輸出給到下載端。
其中,直接下載資源文件的場景給我們感覺是下載就是針對這個(gè)文件本身的一個(gè)操作,和復(fù)制一樣沒有什么疑義。而由應(yīng)用代碼進(jìn)行下載文件時(shí),又當(dāng)如何處理呢?
1:上傳下載文件demo
在網(wǎng)上你可以非常容易地找到相應(yīng)的模板代碼,然后處理掉。基本的樣子就是設(shè)置幾個(gè)頭信息,然后將數(shù)據(jù)寫入到response中。
demo1. 服務(wù)端接收文件上傳,并同時(shí)輸出文件到客戶端
@PostMapping("fileUpDownTest")@ResponseBodypublic Object fileUpDownTest(@ModelAttribute EncSingleDocFileReqModel reqModel,MultipartFile file,HttpServletResponse response) {// 做兩件事:1. 接收上傳的文件;2. 將文件下載給到上傳端;// 即向雙向文件的傳輸,下載的文件可以是你處理之后的任意文件。String tmpPath = saveMultipartToLocalFile(file);outputEncFileStream(tmpPath, response);System.out.println("path:" + tmpPath);return null;}/*** 保存文件到本地路徑** @param file 文件流* @return 本地存儲(chǔ)路徑*/private String saveMultipartToLocalFile(MultipartFile file) {try (InputStream inputStream = file.getInputStream()){// 往臨時(shí)目錄寫文件String fileSuffix = file.getOriginalFilename().substring(file.getOriginalFilename().lastIndexOf('.'));File tmpFile = File.createTempFile(file.getName(), ".tmp" + fileSuffix);FileUtils.copyInputStreamToFile(inputStream, tmpFile);return tmpFile.getCanonicalPath();}catch (Exception e){log.error("【加密文件】文件流處理失敗:" + file.getName(), e);throw new EncryptSysException("0011110", "文件接收失敗");}}/*** 輸出文件流數(shù)據(jù)** @param encFileLocalPath 文件所在路徑* @param response servlet io 流*/private void outputEncFileStream(String encFileLocalPath, HttpServletResponse response) {File outFile = new File(encFileLocalPath);OutputStream os = null;InputStream inputStream = null;try {response.reset();response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate");// response.setHeader("Content-Length", file.getContentLength()+"");String outputFileName = encFileLocalPath.substring(encFileLocalPath.lastIndexOf('/') + 1);response.setHeader("Content-Disposition", String.format("attachment; filename=%s", URLEncoder.encode(outputFileName, "UTF-8")));response.setContentType("application/octet-stream; charset=utf-8");response.setHeader("Pragma", "no-cache");response.setHeader("Expires", "0");inputStream = new FileInputStream(outFile);//寫入信息os = CommonUtil.readInputStream(inputStream, response.getOutputStream());}catch (Exception re) {log.error("輸出文件流失敗,", re);throw new RuntimeException("0011113: 輸出加密后的文件失敗");}finally {if (os != null) {try {os.flush();os.close();}catch (IOException e) {log.error("輸出流文件失敗", e);}}if(inputStream != null) {try {inputStream.close();}catch (IOException e) {log.error("加密文件輸入流關(guān)閉失敗", e);}}}}
我們在做開發(fā)時(shí),面對的僅僅是 Request, Response 這種什么都有對象,直接問其要相關(guān)信息即可。給我們提供方便的同時(shí),也失去了了解真相的機(jī)會(huì)。
demo2. 服務(wù)端轉(zhuǎn)發(fā)文件到另一個(gè)服務(wù)端,并同接收處理響應(yīng)回來的文件流數(shù)據(jù)
/*** 使用本地文件,向加密服務(wù)器請求加密文件,并輸出到用戶端** @param localFilePath 想要下載的文件* @return 文件流*/@GetMapping("transLocalFileToEnc")public Object transLocalFileToEnc(@ModelAttribute EncSingleDocFileReqModel reqModel,@RequestParam String localFilePath,HttpServletResponse response) {File localFileIns = new File(localFilePath);if(!localFileIns.exists()) {return ResponseInfoBuilderUtil.fail("指定文件未找到");}try(InputStream sourceFileInputStream = new FileInputStream(localFileIns);) {//這個(gè)url是要上傳到另一個(gè)服務(wù)器上接口, 此處模擬向本機(jī)發(fā)起加密請求String url = "http://localhost:8082/encrypt/testEnc";int lastFileSeparatorIndex = localFilePath.lastIndexOf('/');String filename = lastFileSeparatorIndex == -1? localFilePath.substring(localFilePath.lastIndexOf('\\')): localFilePath.substring(lastFileSeparatorIndex);Object object = null;// 創(chuàng)建HttpClients實(shí)體類CloseableHttpClient aDefault = HttpClients.createDefault();try {HttpPost httpPost = new HttpPost(url);MultipartEntityBuilder builder = MultipartEntityBuilder.create();//使用這個(gè),另一個(gè)服務(wù)就可以接收到這個(gè)file文件了builder.addBinaryBody("file", sourceFileInputStream, ContentType.create("multipart/form-data"), URLEncoder.encode(filename, "utf-8"));builder.addTextBody("systemCode", "self");String encOutputFilename = filename;builder.addTextBody("encOutputFileName", encOutputFilename);HttpEntity entity = builder.build();httpPost.setEntity(entity);ResponseHandler<Object> rh = new ResponseHandler<Object>() {@Overridepublic Object handleResponse(HttpResponse re) throws IOException {HttpEntity entity = re.getEntity();if(entity.getContentType().toString().contains("application/json")) {// 通過判斷響應(yīng)類型來判斷是否輸出文件流,非嚴(yán)謹(jǐn)?shù)淖龇?/span>String retMsg = EntityUtils.toString(entity, "UTF-8");return JSONObject.parseObject(retMsg, ResponseInfo.class);}InputStream input = entity.getContent();// String result = EntityUtils.toString(entity, "UTF-8");// 寫入響應(yīng)流信息OutputStream os = null;try {response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate");// response.setHeader("Content-Length", file.getContentLength()+"");response.setHeader("Content-Disposition", String.format("attachment; filename=%s", URLEncoder.encode(filename, "UTF-8")));response.setContentType("application/octet-stream; charset=utf-8");response.setHeader("Pragma", "no-cache");response.setHeader("Expires", "0");// 往臨時(shí)目錄寫文件File tmpFile = File.createTempFile(filename, "");FileUtils.copyInputStreamToFile(input, tmpFile);String encFilePathTmp = tmpFile.getCanonicalPath();File encFileIns = new File(encFilePathTmp);if(encFileIns.exists()) {FileInputStream zipStream = new FileInputStream(encFileIns);os = CommonUtil.readInputStream(zipStream, response.getOutputStream());}}finally {if(os != null) {os.flush();os.close();}}// 已向客戶端輸出文件流return Boolean.TRUE;}};object = aDefault.execute(httpPost, rh);return object == Boolean.TRUE? "加密成功,下載文件去!": object;}catch (Exception e) {log.error("", e);}finally {try {aDefault.close();} catch (IOException e) {log.error("關(guān)閉錯(cuò)誤", e);}}}catch (FileNotFoundException e) {log.error("要加密的文件不存在", e);}catch (IOException e) {log.error("要加密的文件不存在", e);}return "處理失敗";}// 抽出寫socket流的邏輯,方便統(tǒng)一控制/*** 從輸入流中獲取字節(jié)數(shù)組** @param inputStream 輸入流* @return 輸出流,超過5000行數(shù)據(jù),刷寫一次網(wǎng)絡(luò)* @throws IOException*/public static OutputStream readInputStream(InputStream inputStream, OutputStream os) throws IOException {byte[] bytes = new byte[2048];int i = 0;int read = 0;//按字節(jié)逐個(gè)寫入,避免內(nèi)存占用過高while ((read = inputStream.read(bytes)) != -1) {os.write(bytes, 0, read);i++;// 每5000行if (i % 5000 == 0) {os.flush();}}inputStream.close();return os;}
此處僅是使用后端代碼展現(xiàn)了前端的一人 form 提交過程,并無技巧可言。不過,這里說明了一個(gè)問題:文件流同樣可以任意在各服務(wù)器間流轉(zhuǎn)。只要按照協(xié)議規(guī)范實(shí)現(xiàn)即可。(注意以上代碼可能需要引入pom依賴:org.apache.httpcomponents:httpclient:4.5.6,org.apache.httpcomponents:httpmime:4.5.6)
2. http 協(xié)議之文件處理
一般地,我們應(yīng)對的互聯(lián)網(wǎng)上的整個(gè)上傳下載文件,基本都是基于http協(xié)議的。所以,要從根本上理解上傳下載文件的原理,來看看http協(xié)議就好了。
我們可以通過上面的demo看下上傳時(shí)候的數(shù)據(jù)樣子,我們通過 fiddler進(jìn)行抓包查看數(shù)據(jù)即可得如下:
POST http://localhost:8082/test/fileUpDownTest?systemCode=1111&outputFileName=111 HTTP/1.1Host: localhost:8082Connection: keep-aliveContent-Length: 197Accept: */*X-Requested-With: XMLHttpRequestUser-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.129 Safari/537.36 OPR/68.0.3618.63Content-Type: multipart/form-data; boundary=----WebKitFormBoundaryen2ZJyNfx7WhA3yOOrigin: http://localhost:8082Sec-Fetch-Site: same-originSec-Fetch-Mode: corsSec-Fetch-Dest: emptyReferer: http://localhost:8082/swagger-ui.htmlAccept-Encoding: gzip, deflate, brAccept-Language: zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7Cookie: JSESSIONID=40832A6766FB11E105717690AEF826AA------WebKitFormBoundaryen2ZJyNfx7WhA3yOContent-Disposition: form-data; name="file"; filename="123.txt"Content-Type: text/plain123contentover------WebKitFormBoundaryen2ZJyNfx7WhA3yOContent-Disposition: form-data; name="file2"; filename="123-2.txt"Content-Type: text/plain2222content2over------WebKitFormBoundaryen2ZJyNfx7WhA3yO--
因?yàn)閒iddler會(huì)做解碼操作,且http是一種基于字符串的傳輸協(xié)議,所以,我們看到的都是可讀的文件信息。我這里模擬是使用一個(gè) 123.txt 的文件,里面輸入了少量字符:“123content\nover”;
我們知道,http協(xié)議是每行作為一個(gè)header的,其中前三是固定的,不必多說。
與我們相關(guān)的有:
Content-Type: multipart/form-data; boundary=----WebKitFormBoundaryen2ZJyNfx7WhA3yO
Content-Type是個(gè)重要的標(biāo)識(shí)字段,當(dāng)我們用文件上傳時(shí),multipart/form-data代表了這是一個(gè)多部分上傳的文件類型請求,即此處的文件上傳請求。后面的 boundary 代表在上傳的實(shí)際多個(gè)部分內(nèi)容時(shí)的分界線,該值應(yīng)是在每次請求時(shí)隨機(jī)生成且避免與業(yè)務(wù)數(shù)據(jù)的沖突。
Content-Length: 197.
這個(gè)值是由瀏覽器主動(dòng)計(jì)算出來的負(fù)載內(nèi)容長度,服務(wù)端收到該信息后,只會(huì)讀取這么多的長度即認(rèn)為傳輸完成。
http協(xié)議的包體是從遇到第一個(gè)兩個(gè)連續(xù)的換行符開始的。(所以,如果在header中包含了此特征時(shí),需要自行編碼后再請求,否則將發(fā)生協(xié)議沖突。)
每個(gè)part部分的內(nèi)容,以boundary作為分界線。part部分的內(nèi)容可以是文件、流、或者純粹的key-value。
根據(jù)以上數(shù)據(jù)格式,服務(wù)端作出相應(yīng)的反向解析就可以得到相應(yīng)的內(nèi)容了。
如果服務(wù)響應(yīng)的結(jié)果是一個(gè)文件下載,那么對于響應(yīng)的結(jié)果示例如下:
HTTP/1.1 200Cache-Control: no-cache, no-store, must-revalidateContent-Disposition: attachment; filename=file5983940017135638617.tmp.txtPragma: no-cacheExpires: 0Content-Type: application/octet-stream;charset=utf-8Transfer-Encoding: chunkedDate: Sun, 17 May 2020 05:30:57 GMT10123contentover0
重要字段說明:
Content-Disposition: attachment; filename=file5983940017135638617.tmp.txt
該字段說明本次響應(yīng)的值應(yīng)該作為一個(gè)附件形式下載保存到本地,這會(huì)被幾乎所有瀏覽器支持。但如果你自己寫代碼接收,那就隨你意好了,它只是一個(gè)標(biāo)識(shí)而已;其中 filename 是用作用戶下載時(shí)的默認(rèn)保存名稱,如果本地已存在一般會(huì)被添加(xxx)的后綴以避免下載覆蓋。
Content-Type: application/octet-stream;charset=utf-8
代表這是一個(gè)二進(jìn)制的文件,也就是說,瀏覽器一般無法作出相應(yīng)的處理。當(dāng)然,這也只是一個(gè)建議,至于你輸出的是啥也無所謂了,反正只要追加到文件之后,就可以還原文件內(nèi)容了。同樣,遇到第一個(gè)連續(xù)的換行之后,代表正式的文件內(nèi)容開始了。
如上的輸出中,并沒有 Content-Length 字段,所以無法直接推斷出下載的數(shù)據(jù)大小,所以會(huì)在前后加一些字符器,用于判定結(jié)束。這樣做可能導(dǎo)致瀏覽器上無法判定已下載的數(shù)據(jù)量占比,即無法展示進(jìn)度條。雖然不影響最終下載數(shù)據(jù),但是一般別這么干。
如下,我們加下content-length之后的響應(yīng)如下:
HTTP/1.1 200Cache-Control: no-cache, no-store, must-revalidateContent-Disposition: attachment; filename=file4383190990004865558.tmp.txtPragma: no-cacheExpires: 0Content-Type: application/octet-stream;charset=utf-8Content-Length: 16Date: Sun, 17 May 2020 07:26:47 GMT123contentover
如上,就是http協(xié)議對于文件的處理方式了,只要你按照協(xié)議規(guī)定進(jìn)行請求時(shí),對端就能接受你的文件上傳。只要服務(wù)按照協(xié)議規(guī)定輸出響應(yīng)數(shù)據(jù),瀏覽器端就可以進(jìn)行相應(yīng)文件下載。
http協(xié)議頭更多信息可以參考:https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers
3. http協(xié)議上傳下載的背后,還有什么?
我們知道,http協(xié)議是基于tcp協(xié)議上實(shí)現(xiàn)的一個(gè)應(yīng)用層協(xié)議。上一節(jié)我們說到的,如何進(jìn)行上傳下載文件,也是基于應(yīng)用層去說的。說直接點(diǎn)就是,如果把網(wǎng)絡(luò)比作黑盒,那么我們認(rèn)為這個(gè)黑盒會(huì)給我們正確的數(shù)據(jù)。我們只要基于這些數(shù)據(jù),就可以解析相應(yīng)的文件信息了。
實(shí)際上,tcp協(xié)議是一種可靠的傳輸協(xié)議。至于如何可靠,額,這么說吧:網(wǎng)絡(luò)上的信息是非常復(fù)雜和無序的,你從一個(gè)端點(diǎn)發(fā)送數(shù)據(jù)到另一個(gè)網(wǎng)絡(luò)站點(diǎn),會(huì)使用IP協(xié)議通過網(wǎng)絡(luò)傳送出去,而這些傳輸是單向的,多包的。它會(huì)受到外部復(fù)雜環(huán)境的影響,可能有的包丟失,可能有的包后發(fā)先到等等。如果不能處理好它們的這些丟包、亂序,重復(fù)等問題,那么網(wǎng)絡(luò)發(fā)過來的數(shù)據(jù)將是無法使用的。(基本就是數(shù)據(jù)損壞這個(gè)結(jié)論)
tcp則是專門為處理這些問題而設(shè)計(jì)的,具體嘛,就很復(fù)雜了。總之一句話,使用了tcp協(xié)議后,你就無需關(guān)注復(fù)雜的網(wǎng)絡(luò)環(huán)境了,你可以無條件相信你從操作系統(tǒng)tcp層給你的數(shù)據(jù)就是有序的完整的數(shù)據(jù)。你可以去看書,或者查看更多網(wǎng)上資料。(書更可靠些,只是更費(fèi)時(shí)間精力)可以參考這篇文章: http://www.ruanyifeng.com/blog/2017/06/tcp-protocol.html
4. java中對于文件上傳的處理實(shí)現(xiàn)?
雖然前面我們解讀完成http協(xié)議對于文件的上傳處理方式,但是,到具體如何實(shí)現(xiàn),又當(dāng)如何呢?如果給你一個(gè)socket的入口lib,你又如何去處理這些http請求呢?
可以大概這么思考:1. 接收到頭信息,判斷出是文件類型的上傳;2. 取出 boundary, 取出content-length, 備用;3. 繼續(xù)讀取后續(xù)的網(wǎng)絡(luò)流數(shù)據(jù),當(dāng)發(fā)現(xiàn)傳輸?shù)氖莐ey-value數(shù)據(jù)時(shí),將其放入內(nèi)存緩沖中存起來,當(dāng)發(fā)現(xiàn)是文件類型的數(shù)據(jù)時(shí),創(chuàng)建一個(gè)臨時(shí)文件,將讀取到的數(shù)據(jù)寫入其中,直到該部分文件傳輸完成,并存儲(chǔ)臨時(shí)文件信息;4. 讀取完整個(gè)http協(xié)議指定的數(shù)據(jù)后,封裝相應(yīng)的請求給到應(yīng)用代碼,待應(yīng)用處理完成后響應(yīng)給客戶端;
以tomcat為例,它會(huì)依次解析各個(gè)參數(shù)值。
有興趣的的同學(xué)可以先看看它是如何接入http請求的吧:(基于nio socket)大概流程為(下圖為其線程模型):Accepter -> Pollor -> SocketProcessor 。

// org.apache.tomcat.util.net.NioEndpoint.Acceptor@Overridepublic void run() {int errorDelay = 0;// Loop until we receive a shutdown commandwhile (running) {// Loop if endpoint is pausedwhile (paused && running) {state = AcceptorState.PAUSED;try {Thread.sleep(50);} catch (InterruptedException e) {// Ignore}}if (!running) {break;}state = AcceptorState.RUNNING;try {//if we have reached max connections, waitcountUpOrAwaitConnection();SocketChannel socket = null;try {// Accept the next incoming connection from the server// socket// Nio 的 ServerSocketChannelImpl, 阻塞等待socket accept 事件socket = serverSock.accept();} catch (IOException ioe) {// We didn't get a socketcountDownConnection();if (running) {// Introduce delay if necessaryerrorDelay = handleExceptionWithDelay(errorDelay);// re-throwthrow ioe;} else {break;}}// Successful accept, reset the error delayerrorDelay = 0;// Configure the socketif (running && !paused) {// setSocketOptions() will hand the socket off to// an appropriate processor if successful// 處理socket事件if (!setSocketOptions(socket)) {closeSocket(socket);}} else {closeSocket(socket);}} catch (Throwable t) {ExceptionUtils.handleThrowable(t);log.error(sm.getString("endpoint.accept.fail"), t);}}state = AcceptorState.ENDED;}/*** Process the specified connection.* @param socket The socket channel* @return <code>true</code> if the socket was correctly configured* and processing may continue, <code>false</code> if the socket needs to be* close immediately*/protected boolean setSocketOptions(SocketChannel socket) {// Process the connectiontry {//disable blocking, APR style, we are gonna be polling it// 組裝channel,交給 Pollorsocket.configureBlocking(false);Socket sock = socket.socket();socketProperties.setProperties(sock);NioChannel channel = nioChannels.pop();if (channel == null) {SocketBufferHandler bufhandler = new SocketBufferHandler(socketProperties.getAppReadBufSize(),socketProperties.getAppWriteBufSize(),socketProperties.getDirectBuffer());if (isSSLEnabled()) {channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);} else {channel = new NioChannel(socket, bufhandler);}} else {channel.setIOChannel(socket);channel.reset();}// 添加到 Pollor 隊(duì)列中,Poller 的獲取使用輪詢方式獲取getPoller0().register(channel);} catch (Throwable t) {ExceptionUtils.handleThrowable(t);try {log.error("",t);} catch (Throwable tt) {ExceptionUtils.handleThrowable(tt);}// Tell to close the socketreturn false;}return true;}/*** Return an available poller in true round robin fashion.** @return The next poller in sequence*/public Poller getPoller0() {// 第1次取1,第2次取2,第3次取1... 輪詢int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;return pollers[idx];}// org.apache.tomcat.util.net.NioEndpoint.Poller#register/*** Registers a newly created socket with the poller.** @param socket The newly created socket*/public void register(final NioChannel socket) {socket.setPoller(this);NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);socket.setSocketWrapper(ka);ka.setPoller(this);ka.setReadTimeout(getSocketProperties().getSoTimeout());ka.setWriteTimeout(getSocketProperties().getSoTimeout());ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());ka.setSecure(isSSLEnabled());ka.setReadTimeout(getConnectionTimeout());ka.setWriteTimeout(getConnectionTimeout());PollerEvent r = eventCache.pop();// 注冊O(shè)P_READ事件,給selector使用ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.// 將socket信息添加到 PollerEvent 中if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);else r.reset(socket,ka,OP_REGISTER);addEvent(r);}// 添加事件并喚醒selector// org.apache.tomcat.util.net.NioEndpoint.Poller#addEventprivate void addEvent(PollerEvent event) {events.offer(event);// 正在select()阻塞中的 selector, wakeupCounter=-1, 即可被喚醒狀態(tài)if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();}// step2. Poller 使用selector池處理讀就緒事件/*** The background thread that adds sockets to the Poller, checks the* poller for triggered events and hands the associated socket off to an* appropriate processor as events occur.*/@Overridepublic void run() {// Loop until destroy() is calledwhile (true) {boolean hasEvents = false;try {if (!close) {// events() 會(huì)檢查是否有acceptor提交過來的 PollerEvent, 如果有,會(huì)先初始化event// 向selector注冊讀事件等等,以便后續(xù) select() 生效hasEvents = events();if (wakeupCounter.getAndSet(-1) > 0) {//if we are here, means we have other stuff to do//do a non blocking selectkeyCount = selector.selectNow();} else {keyCount = selector.select(selectorTimeout);}wakeupCounter.set(0);}if (close) {events();timeout(0, false);try {selector.close();} catch (IOException ioe) {log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);}break;}} catch (Throwable x) {ExceptionUtils.handleThrowable(x);log.error("",x);continue;}//either we timed out or we woke up, process events firstif ( keyCount == 0 ) hasEvents = (hasEvents | events());Iterator<SelectionKey> iterator =keyCount > 0 ? selector.selectedKeys().iterator() : null;// Walk through the collection of ready keys and dispatch// any active event.while (iterator != null && iterator.hasNext()) {SelectionKey sk = iterator.next();NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();// Attachment may be null if another thread has called// cancelledKey()if (attachment == null) {iterator.remove();} else {// 把key監(jiān)聽移除,然后去處理具體key, 網(wǎng)絡(luò)接入成功iterator.remove();processKey(sk, attachment);}}//while//process timeoutstimeout(keyCount,hasEvents);}//whilegetStopLatch().countDown();}// org.apache.tomcat.util.net.NioEndpoint.Poller#processKeyprotected void processKey(SelectionKey sk, NioSocketWrapper attachment) {try {if ( close ) {cancelledKey(sk);} else if ( sk.isValid() && attachment != null ) {if (sk.isReadable() || sk.isWritable() ) {// sendfileif ( attachment.getSendfileData() != null ) {processSendfile(sk,attachment, false);} else {// 取消事件監(jiān)聽,那么后續(xù)如何讀數(shù)據(jù)呢?// 這意味著當(dāng)前socket將會(huì)從epoll的表中移除掉,不再被其管理,但并不影響后續(xù)的read// 后續(xù)的read() 操作將以bio等式展開unreg(sk, attachment, sk.readyOps());boolean closeSocket = false;// Read goes before write// 優(yōu)先處理讀事件,再處理寫事件if (sk.isReadable()) {if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {closeSocket = true;}}if (!closeSocket && sk.isWritable()) {if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {closeSocket = true;}}if (closeSocket) {cancelledKey(sk);}}}} else {//invalid keycancelledKey(sk);}} catch ( CancelledKeyException ckx ) {cancelledKey(sk);} catch (Throwable t) {ExceptionUtils.handleThrowable(t);log.error("",t);}}// org.apache.tomcat.util.net.AbstractEndpoint#processSocket/*** Process the given SocketWrapper with the given status. Used to trigger* processing as if the Poller (for those endpoints that have one)* selected the socket.** @param socketWrapper The socket wrapper to process* @param event The socket event to be processed* @param dispatch Should the processing be performed on a new* container thread** @return if processing was triggered successfully*/public boolean processSocket(SocketWrapperBase<S> socketWrapper,SocketEvent event, boolean dispatch) {try {if (socketWrapper == null) {return false;}// 使用線程池處理單個(gè)讀事件SocketProcessorBase<S> sc = processorCache.pop();if (sc == null) {sc = createSocketProcessor(socketWrapper, event);} else {sc.reset(socketWrapper, event);}// 線程池默認(rèn)10個(gè)核心線程// 此處的線程池并非原生jdk的線程池ThreadPoolExecutor,而是經(jīng)過tomcat繼承過來的 org.apache.tomcat.util.threads.ThreadPoolExecutor, 主要用于做一次統(tǒng)計(jì)類工作// 最終的socket處理將會(huì)由 SocketProcessor 進(jìn)行統(tǒng)一調(diào)度具體的Handler處理Executor executor = getExecutor();if (dispatch && executor != null) {executor.execute(sc);} else {sc.run();}} catch (RejectedExecutionException ree) {getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);return false;} catch (Throwable t) {ExceptionUtils.handleThrowable(t);// This means we got an OOM or similar creating a thread, or that// the pool and its queue are fullgetLog().error(sm.getString("endpoint.process.fail"), t);return false;}return true;}// 以上過程,請求就從 Poller 中提交到了 SocketProcessor 了,將由 SocketProcessor 進(jìn)行統(tǒng)一處理// org.apache.tomcat.util.net.NioEndpoint.SocketProcessor#doRun@Overrideprotected void doRun() {NioChannel socket = socketWrapper.getSocket();SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());try {int handshake = -1;try {if (key != null) {if (socket.isHandshakeComplete()) {// No TLS handshaking required. Let the handler// process this socket / event combination.handshake = 0;} else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||event == SocketEvent.ERROR) {// Unable to complete the TLS handshake. Treat it as// if the handshake failed.handshake = -1;} else {handshake = socket.handshake(key.isReadable(), key.isWritable());// The handshake process reads/writes from/to the// socket. status may therefore be OPEN_WRITE once// the handshake completes. However, the handshake// happens when the socket is opened so the status// must always be OPEN_READ after it completes. It// is OK to always set this as it is only used if// the handshake completes.event = SocketEvent.OPEN_READ;}}} catch (IOException x) {handshake = -1;if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);} catch (CancelledKeyException ckx) {handshake = -1;}if (handshake == 0) {SocketState state = SocketState.OPEN;// Process the request from this socketif (event == null) {state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);} else {// org.apache.coyote.AbstractProtocol$ConnectionHandler// 根據(jù)具體協(xié)議,創(chuàng)建不同的processor處理 如: Http11Processor// 此處hander統(tǒng)一調(diào)用外部類的父類處理為: org.apache.coyote.AbstractProtocol$ConnectionHandlerstate = getHandler().process(socketWrapper, event);}// 如果具體協(xié)議處理結(jié)果是 CLOSED, 那么就把該close關(guān)閉掉// 從這個(gè)意義上來說,普通的請求實(shí)際上都是進(jìn)行長連接的(當(dāng)然了,客戶端一般會(huì)主動(dòng)再調(diào)用一個(gè)close(),這就沒法了)if (state == SocketState.CLOSED) {close(socket, key);}} else if (handshake == -1 ) {close(socket, key);} else if (handshake == SelectionKey.OP_READ){socketWrapper.registerReadInterest();} else if (handshake == SelectionKey.OP_WRITE){socketWrapper.registerWriteInterest();}} catch (CancelledKeyException cx) {socket.getPoller().cancelledKey(key);} catch (VirtualMachineError vme) {ExceptionUtils.handleThrowable(vme);} catch (Throwable t) {log.error("", t);socket.getPoller().cancelledKey(key);} finally {socketWrapper = null;event = null;//return to cache// 復(fù)用 proccosor 處理器if (running && !paused) {processorCache.push(this);}}}}// 以上,就是整個(gè)http請求如何轉(zhuǎn)交給應(yīng)用處理的大體流程了。// 不過還有一個(gè)問題:就是http請求處理完成之后,是關(guān)閉連接不是保持連接又當(dāng)如何判定呢?// 實(shí)際上它是通過協(xié)議處理完成后返回一個(gè) SocketState 來決定的,你如果有興趣,請繼續(xù)往下:// org.apache.coyote.AbstractProtocol.ConnectionHandler#process// 該 ConnectionHandler 將會(huì)統(tǒng)一管理實(shí)際的可復(fù)用的 Processor, 并針對無效的請求直接返回 SocketState.CLOSED, 以便直接關(guān)閉會(huì)話@Overridepublic SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.process",wrapper.getSocket(), status));}if (wrapper == null) {// Nothing to do. Socket has been closed.return SocketState.CLOSED;}S socket = wrapper.getSocket();// 針對socket的處理,ConnectionHandler又使用了一個(gè)可復(fù)用的容器進(jìn)行管理processors, 避免大量創(chuàng)建processor的開銷Processor processor = connections.get(socket);if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet",processor, socket));}// Async timeouts are calculated on a dedicated thread and then// dispatched. Because of delays in the dispatch process, the// timeout may no longer be required. Check here and avoid// unnecessary processing.if (SocketEvent.TIMEOUT == status && (processor == null ||!processor.isAsync() || !processor.checkAsyncTimeoutGeneration())) {// This is effectively a NO-OPreturn SocketState.OPEN;}if (processor != null) {// Make sure an async timeout doesn't firegetProtocol().removeWaitingProcessor(processor);} else if (status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR) {// Nothing to do. Endpoint requested a close and there is no// longer a processor associated with this socket.return SocketState.CLOSED;}ContainerThreadMarker.set();try {if (processor == null) {String negotiatedProtocol = wrapper.getNegotiatedProtocol();if (negotiatedProtocol != null) {UpgradeProtocol upgradeProtocol =getProtocol().getNegotiatedProtocol(negotiatedProtocol);if (upgradeProtocol != null) {processor = upgradeProtocol.getProcessor(wrapper, getProtocol().getAdapter());} else if (negotiatedProtocol.equals("http/1.1")) {// Explicitly negotiated the default protocol.// Obtain a processor below.} else {// TODO:// OpenSSL 1.0.2's ALPN callback doesn't support// failing the handshake with an error if no// protocol can be negotiated. Therefore, we need to// fail the connection here. Once this is fixed,// replace the code below with the commented out// block.if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.negotiatedProcessor.fail",negotiatedProtocol));}return SocketState.CLOSED;/** To replace the code above once OpenSSL 1.1.0 is* used.// Failed to create processor. This is a bug.throw new IllegalStateException(sm.getString("abstractConnectionHandler.negotiatedProcessor.fail",negotiatedProtocol));*/}}}if (processor == null) {processor = recycledProcessors.pop();if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.processorPop",processor));}}if (processor == null) {processor = getProtocol().createProcessor();register(processor);}processor.setSslSupport(wrapper.getSslSupport(getProtocol().getClientCertProvider()));// Associate the processor with the connectionconnections.put(socket, processor);SocketState state = SocketState.CLOSED;do {// 該state最終會(huì)由 具體的processor決定, 如: Http11Processorstate = processor.process(wrapper, status);if (state == SocketState.UPGRADING) {// Get the HTTP upgrade handlerUpgradeToken upgradeToken = processor.getUpgradeToken();// Retrieve leftover inputByteBuffer leftOverInput = processor.getLeftoverInput();if (upgradeToken == null) {// Assume direct HTTP/2 connectionUpgradeProtocol upgradeProtocol = getProtocol().getUpgradeProtocol("h2c");if (upgradeProtocol != null) {processor = upgradeProtocol.getProcessor(wrapper, getProtocol().getAdapter());wrapper.unRead(leftOverInput);// Associate with the processor with the connectionconnections.put(socket, processor);} else {if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.negotiatedProcessor.fail","h2c"));}return SocketState.CLOSED;}} else {HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();// Release the Http11 processor to be re-usedrelease(processor);// Create the upgrade processorprocessor = getProtocol().createUpgradeProcessor(wrapper, upgradeToken);if (getLog().isDebugEnabled()) {getLog().debug(sm.getString("abstractConnectionHandler.upgradeCreate",processor, wrapper));}wrapper.unRead(leftOverInput);// Mark the connection as upgradedwrapper.setUpgraded(true);// Associate with the processor with the connectionconnections.put(socket, processor);// Initialise the upgrade handler (which may trigger// some IO using the new protocol which is why the lines// above are necessary)// This cast should be safe. If it fails the error// handling for the surrounding try/catch will deal with// it.if (upgradeToken.getInstanceManager() == null) {httpUpgradeHandler.init((WebConnection) processor);} else {ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);try {httpUpgradeHandler.init((WebConnection) processor);} finally {upgradeToken.getContextBind().unbind(false, oldCL);}}}}} while ( state == SocketState.UPGRADING);if (state == SocketState.LONG) {// In the middle of processing a request/response. Keep the// socket associated with the processor. Exact requirements// depend on type of long polllongPoll(wrapper, processor);if (processor.isAsync()) {getProtocol().addWaitingProcessor(processor);}} else if (state == SocketState.OPEN) {// In keep-alive but between requests. OK to recycle// processor. Continue to poll for the next request.connections.remove(socket);release(processor);wrapper.registerReadInterest();} else if (state == SocketState.SENDFILE) {// Sendfile in progress. If it fails, the socket will be// closed. If it works, the socket either be added to the// poller (or equivalent) to await more data or processed// if there are any pipe-lined requests remaining.} else if (state == SocketState.UPGRADED) {// Don't add sockets back to the poller if this was a// non-blocking write otherwise the poller may trigger// multiple read events which may lead to thread starvation// in the connector. The write() method will add this socket// to the poller if necessary.if (status != SocketEvent.OPEN_WRITE) {longPoll(wrapper, processor);}} else if (state == SocketState.SUSPENDED) {// Don't add sockets back to the poller.// The resumeProcessing() method will add this socket// to the poller.} else {// Connection closed. OK to recycle the processor. Upgrade// processors are not recycled.connections.remove(socket);if (processor.isUpgrade()) {UpgradeToken upgradeToken = processor.getUpgradeToken();HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();InstanceManager instanceManager = upgradeToken.getInstanceManager();if (instanceManager == null) {httpUpgradeHandler.destroy();} else {ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);try {httpUpgradeHandler.destroy();} finally {try {instanceManager.destroyInstance(httpUpgradeHandler);} catch (Throwable e) {ExceptionUtils.handleThrowable(e);getLog().error(sm.getString("abstractConnectionHandler.error"), e);}upgradeToken.getContextBind().unbind(false, oldCL);}}} else {release(processor);}}// 將處理的狀態(tài),返回給總控 Processor, 以便決定是否close socketreturn state;} catch(java.net.SocketException e) {// SocketExceptions are normalgetLog().debug(sm.getString("abstractConnectionHandler.socketexception.debug"), e);} catch (java.io.IOException e) {// IOExceptions are normalgetLog().debug(sm.getString("abstractConnectionHandler.ioexception.debug"), e);} catch (ProtocolException e) {// Protocol exceptions normally mean the client sent invalid or// incomplete data.getLog().debug(sm.getString("abstractConnectionHandler.protocolexception.debug"), e);}// Future developers: if you discover any other// rare-but-nonfatal exceptions, catch them here, and log as// above.catch (Throwable e) {ExceptionUtils.handleThrowable(e);// any other exception or error is odd. Here we log it// with "ERROR" level, so it will show up even on// less-than-verbose logs.getLog().error(sm.getString("abstractConnectionHandler.error"), e);} finally {ContainerThreadMarker.clear();}// Make sure socket/processor is removed from the list of current// connectionsconnections.remove(socket);release(processor);return SocketState.CLOSED;}// org.apache.coyote.AbstractProcessorLight#process// Http11Processor 繼承該 AbstractProcessorLight, 使用模板方法模式處理細(xì)節(jié)不同點(diǎn)@Overridepublic SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)throws IOException {SocketState state = SocketState.CLOSED;Iterator<DispatchType> dispatches = null;do {if (dispatches != null) {DispatchType nextDispatch = dispatches.next();state = dispatch(nextDispatch.getSocketStatus());} else if (status == SocketEvent.DISCONNECT) {// Do nothing here, just wait for it to get recycled} else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {state = dispatch(status);if (state == SocketState.OPEN) {// There may be pipe-lined data to read. If the data isn't// processed now, execution will exit this loop and call// release() which will recycle the processor (and input// buffer) deleting any pipe-lined data. To avoid this,// process it now.state = service(socketWrapper);}} else if (status == SocketEvent.OPEN_WRITE) {// Extra write event likely after async, ignorestate = SocketState.LONG;} else if (status == SocketEvent.OPEN_READ){// 普通的http請求,將會(huì)被處理為 OPEN_READstate = service(socketWrapper);} else {// Default to closing the socket if the SocketEvent passed in// is not consistent with the current state of the Processorstate = SocketState.CLOSED;}if (getLog().isDebugEnabled()) {getLog().debug("Socket: [" + socketWrapper +"], Status in: [" + status +"], State out: [" + state + "]");}if (state != SocketState.CLOSED && isAsync()) {state = asyncPostProcess();if (getLog().isDebugEnabled()) {getLog().debug("Socket: [" + socketWrapper +"], State after async post processing: [" + state + "]");}}if (dispatches == null || !dispatches.hasNext()) {// Only returns non-null iterator if there are// dispatches to process.dispatches = getIteratorAndClearDispatches();}// 循環(huán)處理請求,直接狀態(tài)是 CLOSE, 或者異步結(jié)束} while (state == SocketState.ASYNC_END ||dispatches != null && state != SocketState.CLOSED);return state;}// org.apache.coyote.http11.Http11Processor#service// 具體的協(xié)議處理方法,將返回處理結(jié)果狀態(tài),決定是否關(guān)閉 socket@Overridepublic SocketState service(SocketWrapperBase<?> socketWrapper)throws IOException {// 由 RequestInfo 來管理整個(gè)處理的生命周期// STAGE_PARSE -> STAGE_PREPARE -> STAGE_SERVICE -> STAGE_ENDINPUT -> STAGE_ENDOUTPUT -> STAGE_KEEPALIVE -> STAGE_ENDEDRequestInfo rp = request.getRequestProcessor();rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);// Setting up the I/OsetSocketWrapper(socketWrapper);inputBuffer.init(socketWrapper);outputBuffer.init(socketWrapper);// FlagskeepAlive = true;openSocket = false;readComplete = true;boolean keptAlive = false;SendfileState sendfileState = SendfileState.DONE;// 此處會(huì)循環(huán)讀取 inputStream 進(jìn)行處理,如果只是一次 http 請求,則第一次處理完成之后,第二次將會(huì)產(chǎn)生 IOException// 從而觸發(fā)socket的關(guān)閉過程while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null &&sendfileState == SendfileState.DONE && !endpoint.isPaused()) {// Parsing the request headertry {if (!inputBuffer.parseRequestLine(keptAlive)) {if (inputBuffer.getParsingRequestLinePhase() == -1) {return SocketState.UPGRADING;} else if (handleIncompleteRequestLineRead()) {break;}}if (endpoint.isPaused()) {// 503 - Service unavailableresponse.setStatus(503);setErrorState(ErrorState.CLOSE_CLEAN, null);} else {keptAlive = true;// Set this every time in case limit has been changed via JMXrequest.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount());if (!inputBuffer.parseHeaders()) {// We've read part of the request, don't recycle it// instead associate it with the socketopenSocket = true;readComplete = false;break;}if (!disableUploadTimeout) {socketWrapper.setReadTimeout(connectionUploadTimeout);}}} catch (IOException e) {if (log.isDebugEnabled()) {log.debug(sm.getString("http11processor.header.parse"), e);}setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);break;} catch (Throwable t) {ExceptionUtils.handleThrowable(t);UserDataHelper.Mode logMode = userDataHelper.getNextMode();if (logMode != null) {String message = sm.getString("http11processor.header.parse");switch (logMode) {case INFO_THEN_DEBUG:message += sm.getString("http11processor.fallToDebug");//$FALL-THROUGH$case INFO:log.info(message, t);break;case DEBUG:log.debug(message, t);}}// 400 - Bad Requestresponse.setStatus(400);setErrorState(ErrorState.CLOSE_CLEAN, t);getAdapter().log(request, response, 0);}// Has an upgrade been requested?Enumeration<String> connectionValues = request.getMimeHeaders().values("Connection");boolean foundUpgrade = false;while (connectionValues.hasMoreElements() && !foundUpgrade) {foundUpgrade = connectionValues.nextElement().toLowerCase(Locale.ENGLISH).contains("upgrade");}if (foundUpgrade) {// Check the protocolString requestedProtocol = request.getHeader("Upgrade");UpgradeProtocol upgradeProtocol = httpUpgradeProtocols.get(requestedProtocol);if (upgradeProtocol != null) {if (upgradeProtocol.accept(request)) {// TODO Figure out how to handle request bodies at this// point.response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);response.setHeader("Connection", "Upgrade");response.setHeader("Upgrade", requestedProtocol);action(ActionCode.CLOSE, null);getAdapter().log(request, response, 0);InternalHttpUpgradeHandler upgradeHandler =upgradeProtocol.getInternalUpgradeHandler(getAdapter(), cloneRequest(request));UpgradeToken upgradeToken = new UpgradeToken(upgradeHandler, null, null);action(ActionCode.UPGRADE, upgradeToken);return SocketState.UPGRADING;}}}if (!getErrorState().isError()) {// Setting up filters, and parse some request headersrp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);try {prepareRequest();} catch (Throwable t) {ExceptionUtils.handleThrowable(t);if (log.isDebugEnabled()) {log.debug(sm.getString("http11processor.request.prepare"), t);}// 500 - Internal Server Errorresponse.setStatus(500);setErrorState(ErrorState.CLOSE_CLEAN, t);getAdapter().log(request, response, 0);}}if (maxKeepAliveRequests == 1) {keepAlive = false;} else if (maxKeepAliveRequests > 0 &&socketWrapper.decrementKeepAlive() <= 0) {keepAlive = false;}// Process the request in the adapterif (!getErrorState().isError()) {try {rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);// 使用適配器處理 request, 并忽略其返回狀態(tài)值, 此處真正的協(xié)議處理服務(wù),后續(xù)就是許多的 filterChain 處理了getAdapter().service(request, response);// Handle when the response was committed before a serious// error occurred. Throwing a ServletException should both// set the status to 500 and set the errorException.// If we fail here, then the response is likely already// committed, so we can't try and set headers.if(keepAlive && !getErrorState().isError() && !isAsync() &&statusDropsConnection(response.getStatus())) {setErrorState(ErrorState.CLOSE_CLEAN, null);}} catch (InterruptedIOException e) {setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);} catch (HeadersTooLargeException e) {log.error(sm.getString("http11processor.request.process"), e);// The response should not have been committed but check it// anyway to be safeif (response.isCommitted()) {setErrorState(ErrorState.CLOSE_NOW, e);} else {response.reset();response.setStatus(500);setErrorState(ErrorState.CLOSE_CLEAN, e);response.setHeader("Connection", "close"); // TODO: Remove}} catch (Throwable t) {ExceptionUtils.handleThrowable(t);log.error(sm.getString("http11processor.request.process"), t);// 500 - Internal Server Errorresponse.setStatus(500);setErrorState(ErrorState.CLOSE_CLEAN, t);getAdapter().log(request, response, 0);}}// Finish the handling of the requestrp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);if (!isAsync()) {// If this is an async request then the request ends when it has// been completed. The AsyncContext is responsible for calling// endRequest() in that case.// 非異步請求,則處理 input 上下文// 同時(shí)處理outputendRequest();}rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);// If there was an error, make sure the request is counted as// and error, and update the statistics counterif (getErrorState().isError()) {response.setStatus(500);}if (!isAsync() || getErrorState().isError()) {request.updateCounters();// 更換request,response 為空,以便proccor池安全復(fù)用if (getErrorState().isIoAllowed()) {inputBuffer.nextRequest();outputBuffer.nextRequest();}}if (!disableUploadTimeout) {int soTimeout = endpoint.getConnectionTimeout();if(soTimeout > 0) {socketWrapper.setReadTimeout(soTimeout);} else {socketWrapper.setReadTimeout(0);}}rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);// 正常處理完成一次請求,接著會(huì)進(jìn)入下一次處理流程,一般會(huì)以 IOException 結(jié)束sendfileState = processSendfile(socketWrapper);}rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);// 所以,普通http請求,一般都會(huì)得到一次 CLOSE_CONNECTION_NOW 的狀態(tài),觸發(fā)立即關(guān)閉 socketif (getErrorState().isError() || endpoint.isPaused()) {return SocketState.CLOSED;} else if (isAsync()) {return SocketState.LONG;} else if (isUpgrade()) {return SocketState.UPGRADING;} else {if (sendfileState == SendfileState.PENDING) {return SocketState.SENDFILE;} else {if (openSocket) {if (readComplete) {return SocketState.OPEN;} else {return SocketState.LONG;}} else {return SocketState.CLOSED;}}}}
當(dāng)把網(wǎng)絡(luò)請求接入進(jìn)來后,先會(huì)經(jīng)歷一系列的filterChain處理,然后其中某個(gè)Filter就會(huì)解析具體參數(shù)。
對于文件處理,我們從對 multipart/form-data 的分支處理開始觀察:
// org.apache.catalina.connector.Request#parseParameters/*** Parse request parameters.*/protected void parseParameters() {parametersParsed = true;Parameters parameters = coyoteRequest.getParameters();boolean success = false;try {...String contentType = getContentType();if (contentType == null) {contentType = "";}int semicolon = contentType.indexOf(';');if (semicolon >= 0) {contentType = contentType.substring(0, semicolon).trim();} else {contentType = contentType.trim();}// 如果是 multipart 類型數(shù)據(jù),解析每個(gè) part 數(shù)據(jù)if ("multipart/form-data".equals(contentType)) {parseParts(false);success = true;return;}...success = true;} finally {if (!success) {parameters.setParseFailedReason(FailReason.UNKNOWN);}}}// 對 mulipart 的處理實(shí)現(xiàn)// org.apache.catalina.connector.Request#parsePartsprivate void parseParts(boolean explicit) {// Return immediately if the parts have already been parsedif (parts != null || partsParseException != null) {return;}...boolean success = false;// Create a new file upload handlerDiskFileItemFactory factory = new DiskFileItemFactory();try {factory.setRepository(location.getCanonicalFile());} catch (IOException ioe) {parameters.setParseFailedReason(FailReason.IO_ERROR);partsParseException = ioe;return;}factory.setSizeThreshold(mce.getFileSizeThreshold());// 使用 ServletFileUplad 組件去解析文件信息ServletFileUpload upload = new ServletFileUpload();upload.setFileItemFactory(factory);upload.setFileSizeMax(mce.getMaxFileSize());upload.setSizeMax(mce.getMaxRequestSize());parts = new ArrayList<>();try {// step1. 使用 ServletFileUpload 解析參數(shù)為 FileItemList<FileItem> items =upload.parseRequest(new ServletRequestContext(this));int maxPostSize = getConnector().getMaxPostSize();int postSize = 0;Charset charset = getCharset();// step2. 將解析出的 FileItem 包裝為 ApplicationPart, 添加到當(dāng)前Request的 parts 容器中for (FileItem item : items) {ApplicationPart part = new ApplicationPart(item, location);parts.add(part);// 文件名處理if (part.getSubmittedFileName() == null) {String name = part.getName();String value = null;try {value = part.getString(charset.name());} catch (UnsupportedEncodingException uee) {// Not possible}if (maxPostSize >= 0) {// Have to calculate equivalent size. Not completely// accurate but close enough.postSize += name.getBytes(charset).length;if (value != null) {// Equals signpostSize++;// Value lengthpostSize += part.getSize();}// Value separatorpostSize++;if (postSize > maxPostSize) {parameters.setParseFailedReason(FailReason.POST_TOO_LARGE);throw new IllegalStateException(sm.getString("coyoteRequest.maxPostSizeExceeded"));}}parameters.addParameter(name, value);}}success = true;}...} finally {// This might look odd but is correct. setParseFailedReason() only// sets the failure reason if none is currently set. This code could// be more efficient but it is written this way to be robust with// respect to changes in the remainder of the method.if (partsParseException != null || !success) {parameters.setParseFailedReason(FailReason.UNKNOWN);}}}
以上就是對文件上傳的大概處理邏輯:1. 使用 ServletFileUpload 解析參數(shù)為 List<FileItem>; 2. 使用 ApplicationPart 包裝FileItem, 放到 Request的parts字段中,以備后續(xù)使用??梢?,對文件的解析是在 ServletFileUpload 中完成的。
// ServletFileUplad, 處理文件解析// org.apache.tomcat.util.http.fileupload.FileUploadBase#parseRequest/*** Processes an <a >RFC 1867</a>* compliant <code>multipart/form-data</code> stream.** @param ctx The context for the request to be parsed.** @return A list of <code>FileItem</code> instances parsed from the* request, in the order that they were transmitted.** @throws FileUploadException if there are problems reading/parsing* the request or storing files.*/public List<FileItem> parseRequest(RequestContext ctx)throws FileUploadException {List<FileItem> items = new ArrayList<>();boolean successful = false;try {// 依次迭代各part數(shù)據(jù)FileItemIterator iter = getItemIterator(ctx);FileItemFactory fac = getFileItemFactory();if (fac == null) {throw new NullPointerException("No FileItemFactory has been set.");}while (iter.hasNext()) {// 每進(jìn)行一次迭代,就會(huì)創(chuàng)建一個(gè) FileItemStreamImpl, 創(chuàng)建一個(gè)新的 InputStreamfinal FileItemStream item = iter.next();// Don't use getName() here to prevent an InvalidFileNameException.final String fileName = ((FileItemIteratorImpl.FileItemStreamImpl) item).name;// 為每個(gè)key 創(chuàng)建一個(gè)FileItem,用于輸出數(shù)據(jù)流FileItem fileItem = fac.createItem(item.getFieldName(), item.getContentType(),item.isFormField(), fileName);items.add(fileItem);try {// 將socket中的數(shù)據(jù)流,寫入到 fileItem 創(chuàng)建的臨時(shí)文件中,達(dá)到框架上傳的目的// fileItem.getOutputStream() 會(huì)創(chuàng)建臨時(shí)文件// item.openStream() 會(huì)使用網(wǎng)絡(luò)io流作為數(shù)據(jù)來源,當(dāng)讀取到 -1, 認(rèn)為輸入結(jié)束了// 最終會(huì)將所有單個(gè)part的數(shù)據(jù)全部寫入當(dāng)前的臨時(shí)文件中Streams.copy(item.openStream(), fileItem.getOutputStream(), true);} catch (FileUploadIOException e) {throw (FileUploadException) e.getCause();} catch (IOException e) {throw new IOFileUploadException(String.format("Processing of %s request failed. %s",MULTIPART_FORM_DATA, e.getMessage()), e);}final FileItemHeaders fih = item.getHeaders();fileItem.setHeaders(fih);}successful = true;return items;} catch (FileUploadIOException e) {throw (FileUploadException) e.getCause();} catch (IOException e) {throw new FileUploadException(e.getMessage(), e);} finally {if (!successful) {// 如果部分失敗,則全部將已上傳部分刪除for (FileItem fileItem : items) {try {fileItem.delete();} catch (Exception ignored) {// ignored TODO perhaps add to tracker delete failure list somehow?}}}}}
以上就是整個(gè)文件的上傳框架解析過程了。大概步驟就是:
1. 基于 boundary, 使用迭代器模式依次創(chuàng)建InputStream();
2. 每次失敗創(chuàng)建一個(gè) DiskFileItem 實(shí)例,用于存放讀取出的數(shù)據(jù)流;
3. 將網(wǎng)絡(luò)InputStream 寫入到Disk的臨時(shí)文件中;
4. 將各DiskFileItem作為輸入?yún)?shù)信息一起返回給應(yīng)用;
下面,我們來看看它是如何基于 boundary 進(jìn)行迭代 FileItemIteratorImpl 的吧:
// 1. 單個(gè)文件流的迭代準(zhǔn)備工作// FileItemIteratorImpl 會(huì)解析出content-Type, boundary 等信息,為后續(xù)迭代準(zhǔn)備// org.apache.tomcat.util.http.fileupload.FileUploadBase.FileItemIteratorImpl#FileItemIteratorImpl/*** Creates a new instance.** @param ctx The request context.* @throws FileUploadException An error occurred while* parsing the request.* @throws IOException An I/O error occurred.*/FileItemIteratorImpl(RequestContext ctx)throws FileUploadException, IOException {if (ctx == null) {throw new NullPointerException("ctx parameter");}String contentType = ctx.getContentType();if ((null == contentType)|| (!contentType.toLowerCase(Locale.ENGLISH).startsWith(MULTIPART))) {throw new InvalidContentTypeException(String.format("the request doesn't contain a %s or %s stream, content type header is %s",MULTIPART_FORM_DATA, MULTIPART_MIXED, contentType));}final long requestSize = ((UploadContext) ctx).contentLength();InputStream input; // N.B. this is eventually closed in MultipartStream processingif (sizeMax >= 0) {if (requestSize != -1 && requestSize > sizeMax) {throw new SizeLimitExceededException(String.format("the request was rejected because its size (%s) exceeds the configured maximum (%s)",Long.valueOf(requestSize), Long.valueOf(sizeMax)),requestSize, sizeMax);}// N.B. this is eventually closed in MultipartStream processinginput = new LimitedInputStream(ctx.getInputStream(), sizeMax) {@Overrideprotected void raiseError(long pSizeMax, long pCount)throws IOException {FileUploadException ex = new SizeLimitExceededException(String.format("the request was rejected because its size (%s) exceeds the configured maximum (%s)",Long.valueOf(pCount), Long.valueOf(pSizeMax)),pCount, pSizeMax);throw new FileUploadIOException(ex);}};} else {// 從 ServletRequestContext 中獲取inputStreaminput = ctx.getInputStream();}String charEncoding = headerEncoding;if (charEncoding == null) {charEncoding = ctx.getCharacterEncoding();}// 解析 boundary 參數(shù),如: ----WebKitFormBoundarydyG19jnnVWC9U1zYboundary = getBoundary(contentType);if (boundary == null) {IOUtils.closeQuietly(input); // avoid possible resource leakthrow new FileUploadException("the request was rejected because no multipart boundary was found");}notifier = new MultipartStream.ProgressNotifier(listener, requestSize);try {multi = new MultipartStream(input, boundary, notifier);} catch (IllegalArgumentException iae) {IOUtils.closeQuietly(input); // avoid possible resource leakthrow new InvalidContentTypeException(String.format("The boundary specified in the %s header is too long", CONTENT_TYPE), iae);}multi.setHeaderEncoding(charEncoding);skipPreamble = true;// 同樣會(huì)初始化第一個(gè)可被迭代的對象,后續(xù)的初始化動(dòng)作則是由 hasNext() 觸發(fā)。findNextItem();}// 在迭代輸入數(shù)據(jù)時(shí),會(huì)調(diào)用迭代方法 FileItemIteratorImpl next(); 讀取數(shù)據(jù)// org.apache.tomcat.util.http.fileupload.FileUploadBase.FileItemIteratorImpl#hasNext/*** Returns, whether another instance of {@link FileItemStream}* is available.** @throws FileUploadException Parsing or processing the* file item failed.* @throws IOException Reading the file item failed.* @return True, if one or more additional file items* are available, otherwise false.*/@Overridepublic boolean hasNext() throws FileUploadException, IOException {if (eof) {return false;}if (itemValid) {return true;}try {// 迭代出下一個(gè)part文件return findNextItem();} catch (FileUploadIOException e) {// unwrap encapsulated SizeExceptionthrow (FileUploadException) e.getCause();}}// org.apache.tomcat.util.http.fileupload.FileUploadBase.FileItemIteratorImpl#findNextItem/*** Called for finding the next item, if any.** @return True, if an next item was found, otherwise false.* @throws IOException An I/O error occurred.*/private boolean findNextItem() throws IOException {if (eof) {return false;}if (currentItem != null) {currentItem.close();currentItem = null;}for (;;) {boolean nextPart;// 首次讀取時(shí),使用新的 boundary 跳過// 后續(xù)則基于首個(gè)連續(xù)換行符進(jìn)行數(shù)據(jù)獲取, \r\n\r\nif (skipPreamble) {nextPart = multi.skipPreamble();} else {nextPart = multi.readBoundary();}// 如果沒有讀取到更多的part, 則返回if (!nextPart) {// 讀取第一個(gè)沒有 boundary 的數(shù)據(jù),再搜索一次,如果還是沒有 boundary, 則認(rèn)為數(shù)據(jù)已結(jié)束if (currentFieldName == null) {// Outer multipart terminated -> No more dataeof = true;return false;}// Inner multipart terminated -> Return to parsing the outermulti.setBoundary(boundary);// 將當(dāng)前字段信息置空,下次如果再讀取不到 boundary, 則讀取結(jié)束currentFieldName = null;continue;}// 以下有更多的 part 輸入FileItemHeaders headers = getParsedHeaders(multi.readHeaders());if (currentFieldName == null) {// We're parsing the outer multipart// 如: file, file2 ...String fieldName = getFieldName(headers);if (fieldName != null) {String subContentType = headers.getHeader(CONTENT_TYPE);if (subContentType != null&& subContentType.toLowerCase(Locale.ENGLISH).startsWith(MULTIPART_MIXED)) {currentFieldName = fieldName;// Multiple files associated with this field namebyte[] subBoundary = getBoundary(subContentType);multi.setBoundary(subBoundary);skipPreamble = true;continue;}// 獲取文件名稱從 header 中// Content-Disposition: form-data; name="file2"; filename="123-2.txt"String fileName = getFileName(headers);// 創(chuàng)建 FileItemStreamImpl, 以備后續(xù)迭代輸出,其構(gòu)造方法將會(huì)創(chuàng)建 stream 實(shí)例// FileItemStreamImpl 是迭代器的一個(gè)內(nèi)部類,共享 multi 對象// FileItemStreamImpl 會(huì)將網(wǎng)絡(luò)io流封裝為單個(gè)可讀取的inputStream, 備用currentItem = new FileItemStreamImpl(fileName,fieldName, headers.getHeader(CONTENT_TYPE),fileName == null, getContentLength(headers));currentItem.setHeaders(headers);notifier.noteItem();itemValid = true;return true;}} else {String fileName = getFileName(headers);if (fileName != null) {currentItem = new FileItemStreamImpl(fileName,currentFieldName,headers.getHeader(CONTENT_TYPE),false, getContentLength(headers));currentItem.setHeaders(headers);notifier.noteItem();itemValid = true;return true;}}// 其他情況,直接丟棄當(dāng)前 body 數(shù)據(jù)multi.discardBodyData();}}// org.apache.tomcat.util.http.fileupload.FileUploadBase.FileItemIteratorImpl.FileItemStreamImpl#FileItemStreamImpl/*** Creates a new instance.** @param pName The items file name, or null.* @param pFieldName The items field name.* @param pContentType The items content type, or null.* @param pFormField Whether the item is a form field.* @param pContentLength The items content length, if known, or -1* @throws IOException Creating the file item failed.*/FileItemStreamImpl(String pName, String pFieldName,String pContentType, boolean pFormField,long pContentLength) throws IOException {name = pName;fieldName = pFieldName;contentType = pContentType;formField = pFormField;// 創(chuàng)建一個(gè)inputStream 的流,備讀// 讀到哪里算是結(jié)束呢?當(dāng) read() 返回-1時(shí),認(rèn)為輸入結(jié)束了final ItemInputStream itemStream = multi.newInputStream();InputStream istream = itemStream;if (fileSizeMax != -1) {if (pContentLength != -1&& pContentLength > fileSizeMax) {FileSizeLimitExceededException e =new FileSizeLimitExceededException(String.format("The field %s exceeds its maximum permitted size of %s bytes.",fieldName, Long.valueOf(fileSizeMax)),pContentLength, fileSizeMax);e.setFileName(pName);e.setFieldName(pFieldName);throw new FileUploadIOException(e);}// 限制最大大小,該值用 spring.http.multipart.max-file-size 配置, 但往往還會(huì)同時(shí)有一個(gè)參數(shù)需要配置: spring.http.multipart.max-request-size,// 因?yàn)楫?dāng)文件很大時(shí),請求也會(huì)非常大,所以一般會(huì)要求同時(shí)設(shè)置這兩個(gè)參數(shù),否則都會(huì)拋出超出最大允許大小限制異常// 使用 LimitedInputStream 包裝提供的讀寫大小判定功能,原始istream接收網(wǎng)絡(luò)io輸入istream = new LimitedInputStream(istream, fileSizeMax) {@Overrideprotected void raiseError(long pSizeMax, long pCount)throws IOException {itemStream.close(true);FileSizeLimitExceededException e =new FileSizeLimitExceededException(String.format("The field %s exceeds its maximum permitted size of %s bytes.",fieldName, Long.valueOf(pSizeMax)),pCount, pSizeMax);e.setFieldName(fieldName);e.setFileName(name);throw new FileUploadIOException(e);}};}stream = istream;}// 為每個(gè)part創(chuàng)建一個(gè) DiskFileItem, 用于存儲(chǔ)網(wǎng)絡(luò)io的文件流,備用// org.apache.tomcat.util.http.fileupload.disk.DiskFileItemFactory#createItem/*** Create a new {@link DiskFileItem}* instance from the supplied parameters and the local factory* configuration.** @param fieldName The name of the form field.* @param contentType The content type of the form field.* @param isFormField <code>true</code> if this is a plain form field;* <code>false</code> otherwise.* @param fileName The name of the uploaded file, if any, as supplied* by the browser or other client.** @return The newly created file item.*/@Overridepublic FileItem createItem(String fieldName, String contentType,boolean isFormField, String fileName) {DiskFileItem result = new DiskFileItem(fieldName, contentType,isFormField, fileName, sizeThreshold, repository);result.setDefaultCharset(defaultCharset);return result;}// next() 進(jìn)行迭代, 使用 itemValid 來控制單次迭代使用/*** Returns the next available {@link FileItemStream}.** @throws java.util.NoSuchElementException No more items are* available. Use {@link #hasNext()} to prevent this exception.* @throws FileUploadException Parsing or processing the* file item failed.* @throws IOException Reading the file item failed.* @return FileItemStream instance, which provides* access to the next file item.*/@Overridepublic FileItemStream next() throws FileUploadException, IOException {if (eof || (!itemValid && !hasNext())) {throw new NoSuchElementException();}itemValid = false;return currentItem;}
// 2. 從網(wǎng)絡(luò)io流到臨時(shí)文件流的寫入// 主要分為三步:網(wǎng)絡(luò)io流的獲?。槐镜匚募敵隽鞯墨@??;流的對接;// 網(wǎng)絡(luò)io流即是在創(chuàng)建 FileItemStreamImpl 時(shí)生成的 stream;// org.apache.tomcat.util.http.fileupload.FileUploadBase.FileItemIteratorImpl.FileItemStreamImpl#openStream/*** Returns an input stream, which may be used to* read the items contents.** @return Opened input stream.* @throws IOException An I/O error occurred.*/@Overridepublic InputStream openStream() throws IOException {if (((Closeable) stream).isClosed()) {throw new FileItemStream.ItemSkippedException();}return stream;}// 而本地文件輸出流則是一個(gè)本地臨時(shí)文件,用于對接任意大小的輸入流// org.apache.tomcat.util.http.fileupload.disk.DiskFileItem#getOutputStream/*** Returns an {@link java.io.OutputStream OutputStream} that can* be used for storing the contents of the file.** @return An {@link java.io.OutputStream OutputStream} that can be used* for storing the contents of the file.** @throws IOException if an error occurs.*/@Overridepublic OutputStream getOutputStream()throws IOException {if (dfos == null) {// 創(chuàng)建臨時(shí)文件輸出File outputFile = getTempFile();// 使用 DeferredFileOutputStream 包裝臨時(shí)文件,dfos = new DeferredFileOutputStream(sizeThreshold, outputFile);}return dfos;}// org.apache.tomcat.util.http.fileupload.disk.DiskFileItem#getTempFile/*** Creates and returns a {@link java.io.File File} representing a uniquely* named temporary file in the configured repository path. The lifetime of* the file is tied to the lifetime of the <code>FileItem</code> instance;* the file will be deleted when the instance is garbage collected.* <p>* <b>Note: Subclasses that override this method must ensure that they return the* same File each time.</b>** @return The {@link java.io.File File} to be used for temporary storage.*/protected File getTempFile() {if (tempFile == null) {File tempDir = repository;if (tempDir == null) {tempDir = new File(System.getProperty("java.io.tmpdir"));}// uid 同進(jìn)程相同, getUniqueId() 會(huì)返回一個(gè)自增的id, 保證進(jìn)程唯一,加上 uid 后,就可以確定臨時(shí)文件唯一了String tempFileName =String.format("upload_%s_%s.tmp", UID, getUniqueId());tempFile = new File(tempDir, tempFileName);}return tempFile;}/*** Returns an identifier that is unique within the class loader used to* load this class, but does not have random-like appearance.** @return A String with the non-random looking instance identifier.*/private static String getUniqueId() {final int limit = 100000000;int current = COUNTER.getAndIncrement();String id = Integer.toString(current);// If you manage to get more than 100 million of ids, you'll// start getting ids longer than 8 characters.if (current < limit) {id = ("00000000" + id).substring(id.length());}return id;}// 最后,將網(wǎng)絡(luò)io流對接到臨時(shí)文件流中,完成數(shù)據(jù)的接收// org.apache.tomcat.util.http.fileupload.util.Streams#copy/*** Copies the contents of the given {@link InputStream}* to the given {@link OutputStream}. Shortcut for* <pre>* copy(pInputStream, pOutputStream, new byte[8192]);* </pre>** @param inputStream The input stream, which is being read.* It is guaranteed, that {@link InputStream#close()} is called* on the stream.* @param outputStream The output stream, to which data should* be written. May be null, in which case the input streams* contents are simply discarded.* @param closeOutputStream True guarantees, that {@link OutputStream#close()}* is called on the stream. False indicates, that only* {@link OutputStream#flush()} should be called finally.** @return Number of bytes, which have been copied.* @throws IOException An I/O error occurred.*/public static long copy(InputStream inputStream, OutputStream outputStream, boolean closeOutputStream)throws IOException {// 8096return copy(inputStream, outputStream, closeOutputStream, new byte[DEFAULT_BUFFER_SIZE]);}/*** Copies the contents of the given {@link InputStream}* to the given {@link OutputStream}.** @param inputStream The input stream, which is being read.* It is guaranteed, that {@link InputStream#close()} is called* on the stream.* @param outputStream The output stream, to which data should* be written. May be null, in which case the input streams* contents are simply discarded.* @param closeOutputStream True guarantees, that {@link OutputStream#close()}* is called on the stream. False indicates, that only* {@link OutputStream#flush()} should be called finally.* @param buffer Temporary buffer, which is to be used for* copying data.* @return Number of bytes, which have been copied.* @throws IOException An I/O error occurred.*/public static long copy(InputStream inputStream,OutputStream outputStream, boolean closeOutputStream,byte[] buffer)throws IOException {OutputStream out = outputStream;InputStream in = inputStream;try {long total = 0;for (;;) {// 阻塞式獲取文件數(shù)據(jù)流,寫入到臨時(shí)文件中// 所以,如果我們上傳超大文件時(shí),實(shí)際上有相當(dāng)大部分的時(shí)間,只是框架和客戶端在交互,應(yīng)用代碼則不會(huì)感知到// 直到所有上傳已完成int res = in.read(buffer);if (res == -1) {break;}if (res > 0) {total += res;if (out != null) {out.write(buffer, 0, res);}}}if (out != null) {// 關(guān)閉輸出流,即關(guān)閉臨時(shí)文件實(shí)例if (closeOutputStream) {out.close();} else {out.flush();}out = null;}in.close();in = null;return total;} finally {IOUtils.closeQuietly(in);if (closeOutputStream) {IOUtils.closeQuietly(out);}}}
總結(jié):主要點(diǎn)在于 boundary 的解析處理,難點(diǎn)在于io流的對接。包括接收請求io流,以及在響應(yīng)客戶端的os.flush(), 都是在做復(fù)雜的io操作。
5. 使用過后的清理工作?
外部請求的文件上傳,一般都會(huì)使用臨時(shí)文件進(jìn)行接收,比如上面的實(shí)現(xiàn)。雖說是臨時(shí)文件,但如果不做清理,每次上傳都生成一個(gè)臨時(shí)文件,則必然占用大量磁盤空間,遲早得耗盡資源。這不是一個(gè)好的框架該做的事?! ?/span>
比如:spring框架中,DispatcherServlet會(huì)在使用完成 MultipartFile 后,會(huì)主動(dòng)做一些清理操作。
// org.springframework.web.servlet.DispatcherServlet#doDispatch/*** Process the actual dispatching to the handler.* <p>The handler will be obtained by applying the servlet's HandlerMappings in order.* The HandlerAdapter will be obtained by querying the servlet's installed HandlerAdapters* to find the first that supports the handler class.* <p>All HTTP methods are handled by this method. It's up to HandlerAdapters or handlers* themselves to decide which methods are acceptable.* @param request current HTTP request* @param response current HTTP response* @throws Exception in case of any kind of processing failure*/protected void doDispatch(HttpServletRequest request, HttpServletResponse response) throws Exception {HttpServletRequest processedRequest = request;HandlerExecutionChain mappedHandler = null;boolean multipartRequestParsed = false;WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);try {ModelAndView mv = null;Exception dispatchException = null;try {// 如果是文件類型,則會(huì)用 StandardMultipartHttpServletRequest 再包裝一層的處理processedRequest = checkMultipart(request);multipartRequestParsed = (processedRequest != request);// Determine handler for the current request.mappedHandler = getHandler(processedRequest);if (mappedHandler == null || mappedHandler.getHandler() == null) {noHandlerFound(processedRequest, response);return;}// Determine handler adapter for the current request.HandlerAdapter ha = getHandlerAdapter(mappedHandler.getHandler());// Process last-modified header, if supported by the handler.String method = request.getMethod();boolean isGet = "GET".equals(method);if (isGet || "HEAD".equals(method)) {long lastModified = ha.getLastModified(request, mappedHandler.getHandler());if (logger.isDebugEnabled()) {logger.debug("Last-Modified value for [" + getRequestUri(request) + "] is: " + lastModified);}if (new ServletWebRequest(request, response).checkNotModified(lastModified) && isGet) {return;}}if (!mappedHandler.applyPreHandle(processedRequest, response)) {return;}// Actually invoke the handler.mv = ha.handle(processedRequest, response, mappedHandler.getHandler());if (asyncManager.isConcurrentHandlingStarted()) {return;}applyDefaultViewName(processedRequest, mv);mappedHandler.applyPostHandle(processedRequest, response, mv);}catch (Exception ex) {dispatchException = ex;}catch (Throwable err) {// As of 4.3, we're processing Errors thrown from handler methods as well,// making them available for @ExceptionHandler methods and other scenarios.dispatchException = new NestedServletException("Handler dispatch failed", err);}processDispatchResult(processedRequest, response, mappedHandler, mv, dispatchException);}catch (Exception ex) {triggerAfterCompletion(processedRequest, response, mappedHandler, ex);}catch (Throwable err) {triggerAfterCompletion(processedRequest, response, mappedHandler,new NestedServletException("Handler processing failed", err));}finally {if (asyncManager.isConcurrentHandlingStarted()) {// Instead of postHandle and afterCompletionif (mappedHandler != null) {mappedHandler.applyAfterConcurrentHandlingStarted(processedRequest, response);}}else {// Clean up any resources used by a multipart request.// 如果本次有解析文件,則會(huì)做清理操作if (multipartRequestParsed) {cleanupMultipart(processedRequest);}}}}// org.springframework.web.servlet.DispatcherServlet#cleanupMultipart/*** Clean up any resources used by the given multipart request (if any).* @param request current HTTP request* @see MultipartResolver#cleanupMultipart*/protected void cleanupMultipart(HttpServletRequest request) {// 獲取 MultipartHttpServletRequest 實(shí)例MultipartHttpServletRequest multipartRequest =WebUtils.getNativeRequest(request, MultipartHttpServletRequest.class);if (multipartRequest != null) {// 清理文件this.multipartResolver.cleanupMultipart(multipartRequest);}}// org.springframework.web.multipart.support.StandardServletMultipartResolver#cleanupMultipart@Overridepublic void cleanupMultipart(MultipartHttpServletRequest request) {if (!(request instanceof AbstractMultipartHttpServletRequest) ||((AbstractMultipartHttpServletRequest) request).isResolved()) {// To be on the safe side: explicitly delete the parts,// but only actual file parts (for Resin compatibility)try {for (Part part : request.getParts()) {if (request.getFile(part.getName()) != null) {// 調(diào)用各part部分的接口,delete()part.delete();}}}catch (Throwable ex) {LogFactory.getLog(getClass()).warn("Failed to perform cleanup of multipart items", ex);}}}// org.apache.catalina.core.ApplicationPart#delete@Overridepublic void delete() throws IOException {fileItem.delete();}// org.apache.tomcat.util.http.fileupload.disk.DiskFileItem#delete/*** Deletes the underlying storage for a file item, including deleting any* associated temporary disk file. Although this storage will be deleted* automatically when the <code>FileItem</code> instance is garbage* collected, this method can be used to ensure that this is done at an* earlier time, thus preserving system resources.*/@Overridepublic void delete() {cachedContent = null;// 獲取臨時(shí)文件實(shí)例,如果存在則刪除File outputFile = getStoreLocation();if (outputFile != null && !isInMemory() && outputFile.exists()) {outputFile.delete();}}// org.apache.tomcat.util.http.fileupload.disk.DiskFileItem#getStoreLocation/*** Returns the {@link java.io.File} object for the <code>FileItem</code>'s* data's temporary location on the disk. Note that for* <code>FileItem</code>s that have their data stored in memory,* this method will return <code>null</code>. When handling large* files, you can use {@link java.io.File#renameTo(java.io.File)} to* move the file to new location without copying the data, if the* source and destination locations reside within the same logical* volume.** @return The data file, or <code>null</code> if the data is stored in* memory.*/public File getStoreLocation() {if (dfos == null) {return null;}if (isInMemory()) {return null;}return dfos.getFile();}
大概思路就是:1. 判斷是否存在文件類型的上傳;2. 從request中取出ApplicationPart;3. 遍歷每個(gè)FileItem, 依次獲取文件信息刪除;
6. 斷點(diǎn)續(xù)傳
http1.1 中增加了標(biāo)準(zhǔn)準(zhǔn)頭如:Range: bytes=0-2000 , Content-range: bytes 100-2000 用于請求服務(wù)器文件的部分內(nèi)容,服務(wù)端只需按照要求取出相應(yīng)的數(shù)據(jù)流返回即可。
這樣做的好處是,增大了客戶端的控制力??蛻舳酥灰诒匾臅r(shí)候記錄這個(gè)偏移量,在發(fā)生網(wǎng)絡(luò)等故障,恢復(fù)后就可以從上次斷開的地方進(jìn)行請求,從而緊接上次未完下載任務(wù),實(shí)現(xiàn)斷點(diǎn)續(xù)傳。
我們以http協(xié)議作為出發(fā)點(diǎn),講解了文件上傳下載的基本原理,以tomcat的處理過程為樣板看了對該協(xié)議的實(shí)現(xiàn)方式。實(shí)際上協(xié)議可以有很多,比如各im工具,各下載工具,一般都會(huì)有自己的一套傳輸協(xié)議。不能說一樣吧,但大方向應(yīng)該是相通的。

騰訊、阿里、滴滴后臺(tái)面試題匯總總結(jié) — (含答案)
面試:史上最全多線程面試題 !
最新阿里內(nèi)推Java后端面試題
JVM難學(xué)?那是因?yàn)槟銢]認(rèn)真看完這篇文章

關(guān)注作者微信公眾號 —《JAVA爛豬皮》
了解更多java后端架構(gòu)知識(shí)以及最新面試寶典


看完本文記得給作者點(diǎn)贊+在看哦~~~大家的支持,是作者源源不斷出文的動(dòng)力
作者:等你歸去來
出處:https://www.cnblogs.com/yougewe/p/12916211.html
