<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          文件上傳下載原理:http協(xié)議分析及實(shí)現(xiàn)

          共 95106字,需瀏覽 191分鐘

           ·

          2021-03-13 01:09

          走過路過不要錯(cuò)過

          點(diǎn)擊藍(lán)字關(guān)注我們


          我們現(xiàn)在用得非常多互聯(lián)網(wǎng)下載文件,非常直觀。有一個(gè)下載按鈕,然后我點(diǎn)擊了下載,然后文件慢慢就下載到本地了。就好像是一個(gè)復(fù)制的過程。

          而既然是互聯(lián)網(wǎng),那么必然會(huì)是使用網(wǎng)絡(luò)進(jìn)行傳輸?shù)?。那么到底是怎樣傳輸?shù)哪兀?/span>

          當(dāng)然,下載文件有兩種方式:一是直接針對某個(gè)文件資源進(jìn)行下載,無需應(yīng)用開發(fā)代碼;二是應(yīng)用代碼臨時(shí)生成需要的內(nèi)容文件,然后輸出給到下載端。

          其中,直接下載資源文件的場景給我們感覺是下載就是針對這個(gè)文件本身的一個(gè)操作,和復(fù)制一樣沒有什么疑義。而由應(yīng)用代碼進(jìn)行下載文件時(shí),又當(dāng)如何處理呢?

          1:上傳下載文件demo

          在網(wǎng)上你可以非常容易地找到相應(yīng)的模板代碼,然后處理掉。基本的樣子就是設(shè)置幾個(gè)頭信息,然后將數(shù)據(jù)寫入到response中。

          demo1. 服務(wù)端接收文件上傳,并同時(shí)輸出文件到客戶端

              @PostMapping("fileUpDownTest")    @ResponseBody    public Object fileUpDownTest(@ModelAttribute EncSingleDocFileReqModel reqModel,                              MultipartFile file,                              HttpServletResponse response) {        // 做兩件事:1. 接收上傳的文件;2. 將文件下載給到上傳端;        // 即向雙向文件的傳輸,下載的文件可以是你處理之后的任意文件。        String tmpPath = saveMultipartToLocalFile(file);        outputEncFileStream(tmpPath, response);        System.out.println("path:" + tmpPath);        return null;    }    /**     * 保存文件到本地路徑     *     * @param file 文件流     * @return 本地存儲(chǔ)路徑     */    private String saveMultipartToLocalFile(MultipartFile file) {        try (InputStream inputStream = file.getInputStream()){            // 往臨時(shí)目錄寫文件            String fileSuffix = file.getOriginalFilename().substring(file.getOriginalFilename().lastIndexOf('.'));            File tmpFile = File.createTempFile(file.getName(), ".tmp" + fileSuffix);            FileUtils.copyInputStreamToFile(inputStream, tmpFile);            return tmpFile.getCanonicalPath();        }        catch (Exception e){            log.error("【加密文件】文件流處理失敗:" + file.getName(), e);            throw new EncryptSysException("0011110", "文件接收失敗");        }    }
          /** * 輸出文件流數(shù)據(jù) * * @param encFileLocalPath 文件所在路徑 * @param response servlet io 流 */ private void outputEncFileStream(String encFileLocalPath, HttpServletResponse response) { File outFile = new File(encFileLocalPath); OutputStream os = null; InputStream inputStream = null; try { response.reset(); response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate");// response.setHeader("Content-Length", file.getContentLength()+""); String outputFileName = encFileLocalPath.substring(encFileLocalPath.lastIndexOf('/') + 1); response.setHeader("Content-Disposition", String.format("attachment; filename=%s", URLEncoder.encode(outputFileName, "UTF-8"))); response.setContentType("application/octet-stream; charset=utf-8"); response.setHeader("Pragma", "no-cache"); response.setHeader("Expires", "0"); inputStream = new FileInputStream(outFile); //寫入信息 os = CommonUtil.readInputStream(inputStream, response.getOutputStream()); } catch (Exception re) { log.error("輸出文件流失敗,", re); throw new RuntimeException("0011113: 輸出加密后的文件失敗"); } finally { if (os != null) { try { os.flush(); os.close(); } catch (IOException e) { log.error("輸出流文件失敗", e); } } if(inputStream != null) { try { inputStream.close(); } catch (IOException e) { log.error("加密文件輸入流關(guān)閉失敗", e); } } } }

          我們在做開發(fā)時(shí),面對的僅僅是 Request, Response 這種什么都有對象,直接問其要相關(guān)信息即可。給我們提供方便的同時(shí),也失去了了解真相的機(jī)會(huì)。

          demo2.  服務(wù)端轉(zhuǎn)發(fā)文件到另一個(gè)服務(wù)端,并同接收處理響應(yīng)回來的文件流數(shù)據(jù)

              /**     * 使用本地文件,向加密服務(wù)器請求加密文件,并輸出到用戶端     *     * @param localFilePath 想要下載的文件     * @return 文件流     */    @GetMapping("transLocalFileToEnc")    public Object transLocalFileToEnc(@ModelAttribute EncSingleDocFileReqModel reqModel,                                      @RequestParam String localFilePath,                                      HttpServletResponse response) {        File localFileIns = new File(localFilePath);        if(!localFileIns.exists()) {            return ResponseInfoBuilderUtil.fail("指定文件未找到");        }
          try(InputStream sourceFileInputStream = new FileInputStream(localFileIns);) {
          //這個(gè)url是要上傳到另一個(gè)服務(wù)器上接口, 此處模擬向本機(jī)發(fā)起加密請求 String url = "http://localhost:8082/encrypt/testEnc"; int lastFileSeparatorIndex = localFilePath.lastIndexOf('/'); String filename = lastFileSeparatorIndex == -1 ? localFilePath.substring(localFilePath.lastIndexOf('\\')) : localFilePath.substring(lastFileSeparatorIndex); Object object = null; // 創(chuàng)建HttpClients實(shí)體類 CloseableHttpClient aDefault = HttpClients.createDefault(); try { HttpPost httpPost = new HttpPost(url); MultipartEntityBuilder builder = MultipartEntityBuilder.create(); //使用這個(gè),另一個(gè)服務(wù)就可以接收到這個(gè)file文件了 builder.addBinaryBody("file", sourceFileInputStream, ContentType.create("multipart/form-data"), URLEncoder.encode(filename, "utf-8")); builder.addTextBody("systemCode", "self"); String encOutputFilename = filename; builder.addTextBody("encOutputFileName", encOutputFilename); HttpEntity entity = builder.build(); httpPost.setEntity(entity); ResponseHandler<Object> rh = new ResponseHandler<Object>() { @Override public Object handleResponse(HttpResponse re) throws IOException { HttpEntity entity = re.getEntity(); if(entity.getContentType().toString().contains("application/json")) { // 通過判斷響應(yīng)類型來判斷是否輸出文件流,非嚴(yán)謹(jǐn)?shù)淖龇?/span> String retMsg = EntityUtils.toString(entity, "UTF-8"); return JSONObject.parseObject(retMsg, ResponseInfo.class); } InputStream input = entity.getContent();// String result = EntityUtils.toString(entity, "UTF-8"); // 寫入響應(yīng)流信息 OutputStream os = null; try { response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate"); // response.setHeader("Content-Length", file.getContentLength()+"");
          response.setHeader("Content-Disposition", String.format("attachment; filename=%s", URLEncoder.encode(filename, "UTF-8"))); response.setContentType("application/octet-stream; charset=utf-8"); response.setHeader("Pragma", "no-cache"); response.setHeader("Expires", "0");
          // 往臨時(shí)目錄寫文件 File tmpFile = File.createTempFile(filename, ""); FileUtils.copyInputStreamToFile(input, tmpFile); String encFilePathTmp = tmpFile.getCanonicalPath(); File encFileIns = new File(encFilePathTmp); if(encFileIns.exists()) { FileInputStream zipStream = new FileInputStream(encFileIns); os = CommonUtil.readInputStream(zipStream, response.getOutputStream()); }
          } finally { if(os != null) { os.flush(); os.close(); } } // 已向客戶端輸出文件流 return Boolean.TRUE; } }; object = aDefault.execute(httpPost, rh);
          return object == Boolean.TRUE ? "加密成功,下載文件去!" : object; } catch (Exception e) { log.error("", e); } finally { try { aDefault.close(); } catch (IOException e) { log.error("關(guān)閉錯(cuò)誤", e); } } } catch (FileNotFoundException e) { log.error("要加密的文件不存在", e); } catch (IOException e) { log.error("要加密的文件不存在", e); } return "處理失敗"; }
          // 抽出寫socket流的邏輯,方便統(tǒng)一控制 /** * 從輸入流中獲取字節(jié)數(shù)組 * * @param inputStream 輸入流 * @return 輸出流,超過5000行數(shù)據(jù),刷寫一次網(wǎng)絡(luò) * @throws IOException */ public static OutputStream readInputStream(InputStream inputStream, OutputStream os) throws IOException { byte[] bytes = new byte[2048]; int i = 0; int read = 0; //按字節(jié)逐個(gè)寫入,避免內(nèi)存占用過高 while ((read = inputStream.read(bytes)) != -1) { os.write(bytes, 0, read); i++; // 每5000行 if (i % 5000 == 0) { os.flush(); } } inputStream.close(); return os; }

          此處僅是使用后端代碼展現(xiàn)了前端的一人 form 提交過程,并無技巧可言。不過,這里說明了一個(gè)問題:文件流同樣可以任意在各服務(wù)器間流轉(zhuǎn)。只要按照協(xié)議規(guī)范實(shí)現(xiàn)即可。(注意以上代碼可能需要引入pom依賴:org.apache.httpcomponents:httpclient:4.5.6,org.apache.httpcomponents:httpmime:4.5.6)


          2. http 協(xié)議之文件處理

          一般地,我們應(yīng)對的互聯(lián)網(wǎng)上的整個(gè)上傳下載文件,基本都是基于http協(xié)議的。所以,要從根本上理解上傳下載文件的原理,來看看http協(xié)議就好了。

          我們可以通過上面的demo看下上傳時(shí)候的數(shù)據(jù)樣子,我們通過 fiddler進(jìn)行抓包查看數(shù)據(jù)即可得如下:

          POST http://localhost:8082/test/fileUpDownTest?systemCode=1111&outputFileName=111 HTTP/1.1Host: localhost:8082Connection: keep-aliveContent-Length: 197Accept: */*X-Requested-With: XMLHttpRequestUser-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.129 Safari/537.36 OPR/68.0.3618.63Content-Type: multipart/form-data; boundary=----WebKitFormBoundaryen2ZJyNfx7WhA3yOOrigin: http://localhost:8082Sec-Fetch-Site: same-originSec-Fetch-Mode: corsSec-Fetch-Dest: emptyReferer: http://localhost:8082/swagger-ui.htmlAccept-Encoding: gzip, deflate, brAccept-Language: zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7Cookie: JSESSIONID=40832A6766FB11E105717690AEF826AA
          ------WebKitFormBoundaryen2ZJyNfx7WhA3yOContent-Disposition: form-data; name="file"; filename="123.txt"Content-Type: text/plain
          123contentover------WebKitFormBoundaryen2ZJyNfx7WhA3yO Content-Disposition: form-data; name="file2"; filename="123-2.txt"Content-Type: text/plain
          2222content2over------WebKitFormBoundaryen2ZJyNfx7WhA3yO--


          因?yàn)閒iddler會(huì)做解碼操作,且http是一種基于字符串的傳輸協(xié)議,所以,我們看到的都是可讀的文件信息。我這里模擬是使用一個(gè) 123.txt 的文件,里面輸入了少量字符:“123content\nover”;

          我們知道,http協(xié)議是每行作為一個(gè)header的,其中前三是固定的,不必多說。

          與我們相關(guān)的有:

          Content-Type: multipart/form-data; boundary=----WebKitFormBoundaryen2ZJyNfx7WhA3yO 
          Content-Type是個(gè)重要的標(biāo)識(shí)字段,當(dāng)我們用文件上傳時(shí),multipart/form-data代表了這是一個(gè)多部分上傳的文件類型請求,即此處的文件上傳請求。后面的 boundary 代表在上傳的實(shí)際多個(gè)部分內(nèi)容時(shí)的分界線,該值應(yīng)是在每次請求時(shí)隨機(jī)生成且避免與業(yè)務(wù)數(shù)據(jù)的沖突。
          Content-Length: 197. 
          這個(gè)值是由瀏覽器主動(dòng)計(jì)算出來的負(fù)載內(nèi)容長度,服務(wù)端收到該信息后,只會(huì)讀取這么多的長度即認(rèn)為傳輸完成。
          http協(xié)議的包體是從遇到第一個(gè)兩個(gè)連續(xù)的換行符開始的。(所以,如果在header中包含了此特征時(shí),需要自行編碼后再請求,否則將發(fā)生協(xié)議沖突。)
          每個(gè)part部分的內(nèi)容,以boundary作為分界線。part部分的內(nèi)容可以是文件、流、或者純粹的key-value。

          根據(jù)以上數(shù)據(jù)格式,服務(wù)端作出相應(yīng)的反向解析就可以得到相應(yīng)的內(nèi)容了。

          如果服務(wù)響應(yīng)的結(jié)果是一個(gè)文件下載,那么對于響應(yīng)的結(jié)果示例如下:

          HTTP/1.1 200Cache-Control: no-cache, no-store, must-revalidateContent-Disposition: attachment; filename=file5983940017135638617.tmp.txtPragma: no-cacheExpires: 0Content-Type: application/octet-stream;charset=utf-8Transfer-Encoding: chunkedDate: Sun, 17 May 2020 05:30:57 GMT
          10123contentover0

          重要字段說明:

          Content-Disposition: attachment; filename=file5983940017135638617.tmp.txt
          該字段說明本次響應(yīng)的值應(yīng)該作為一個(gè)附件形式下載保存到本地,這會(huì)被幾乎所有瀏覽器支持。但如果你自己寫代碼接收,那就隨你意好了,它只是一個(gè)標(biāo)識(shí)而已;其中 filename 是用作用戶下載時(shí)的默認(rèn)保存名稱,如果本地已存在一般會(huì)被添加(xxx)的后綴以避免下載覆蓋。
          Content-Type: application/octet-stream;charset=utf-8
          代表這是一個(gè)二進(jìn)制的文件,也就是說,瀏覽器一般無法作出相應(yīng)的處理。當(dāng)然,這也只是一個(gè)建議,至于你輸出的是啥也無所謂了,反正只要追加到文件之后,就可以還原文件內(nèi)容了。

          同樣,遇到第一個(gè)連續(xù)的換行之后,代表正式的文件內(nèi)容開始了。
          如上的輸出中,并沒有 Content-Length 字段,所以無法直接推斷出下載的數(shù)據(jù)大小,所以會(huì)在前后加一些字符器,用于判定結(jié)束。這樣做可能導(dǎo)致瀏覽器上無法判定已下載的數(shù)據(jù)量占比,即無法展示進(jìn)度條。雖然不影響最終下載數(shù)據(jù),但是一般別這么干。

          如下,我們加下content-length之后的響應(yīng)如下:

          HTTP/1.1 200Cache-Control: no-cache, no-store, must-revalidateContent-Disposition: attachment; filename=file4383190990004865558.tmp.txtPragma: no-cacheExpires: 0Content-Type: application/octet-stream;charset=utf-8Content-Length: 16Date: Sun, 17 May 2020 07:26:47 GMT
          123contentover  

          如上,就是http協(xié)議對于文件的處理方式了,只要你按照協(xié)議規(guī)定進(jìn)行請求時(shí),對端就能接受你的文件上傳。只要服務(wù)按照協(xié)議規(guī)定輸出響應(yīng)數(shù)據(jù),瀏覽器端就可以進(jìn)行相應(yīng)文件下載。

          http協(xié)議頭更多信息可以參考:https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers

          3. http協(xié)議上傳下載的背后,還有什么?

          我們知道,http協(xié)議是基于tcp協(xié)議上實(shí)現(xiàn)的一個(gè)應(yīng)用層協(xié)議。上一節(jié)我們說到的,如何進(jìn)行上傳下載文件,也是基于應(yīng)用層去說的。說直接點(diǎn)就是,如果把網(wǎng)絡(luò)比作黑盒,那么我們認(rèn)為這個(gè)黑盒會(huì)給我們正確的數(shù)據(jù)。我們只要基于這些數(shù)據(jù),就可以解析相應(yīng)的文件信息了。

          實(shí)際上,tcp協(xié)議是一種可靠的傳輸協(xié)議。至于如何可靠,額,這么說吧:網(wǎng)絡(luò)上的信息是非常復(fù)雜和無序的,你從一個(gè)端點(diǎn)發(fā)送數(shù)據(jù)到另一個(gè)網(wǎng)絡(luò)站點(diǎn),會(huì)使用IP協(xié)議通過網(wǎng)絡(luò)傳送出去,而這些傳輸是單向的,多包的。它會(huì)受到外部復(fù)雜環(huán)境的影響,可能有的包丟失,可能有的包后發(fā)先到等等。如果不能處理好它們的這些丟包、亂序,重復(fù)等問題,那么網(wǎng)絡(luò)發(fā)過來的數(shù)據(jù)將是無法使用的。(基本就是數(shù)據(jù)損壞這個(gè)結(jié)論)

          tcp則是專門為處理這些問題而設(shè)計(jì)的,具體嘛,就很復(fù)雜了。總之一句話,使用了tcp協(xié)議后,你就無需關(guān)注復(fù)雜的網(wǎng)絡(luò)環(huán)境了,你可以無條件相信你從操作系統(tǒng)tcp層給你的數(shù)據(jù)就是有序的完整的數(shù)據(jù)。你可以去看書,或者查看更多網(wǎng)上資料。(書更可靠些,只是更費(fèi)時(shí)間精力)可以參考這篇文章: http://www.ruanyifeng.com/blog/2017/06/tcp-protocol.html

          4. java中對于文件上傳的處理實(shí)現(xiàn)?

          雖然前面我們解讀完成http協(xié)議對于文件的上傳處理方式,但是,到具體如何實(shí)現(xiàn),又當(dāng)如何呢?如果給你一個(gè)socket的入口lib,你又如何去處理這些http請求呢?

          可以大概這么思考:1. 接收到頭信息,判斷出是文件類型的上傳;2. 取出 boundary, 取出content-length, 備用;3. 繼續(xù)讀取后續(xù)的網(wǎng)絡(luò)流數(shù)據(jù),當(dāng)發(fā)現(xiàn)傳輸?shù)氖莐ey-value數(shù)據(jù)時(shí),將其放入內(nèi)存緩沖中存起來,當(dāng)發(fā)現(xiàn)是文件類型的數(shù)據(jù)時(shí),創(chuàng)建一個(gè)臨時(shí)文件,將讀取到的數(shù)據(jù)寫入其中,直到該部分文件傳輸完成,并存儲(chǔ)臨時(shí)文件信息;4. 讀取完整個(gè)http協(xié)議指定的數(shù)據(jù)后,封裝相應(yīng)的請求給到應(yīng)用代碼,待應(yīng)用處理完成后響應(yīng)給客戶端;

          以tomcat為例,它會(huì)依次解析各個(gè)參數(shù)值。

          有興趣的的同學(xué)可以先看看它是如何接入http請求的吧:(基于nio socket)大概流程為(下圖為其線程模型):Accepter -> Pollor -> SocketProcessor 。

                  // org.apache.tomcat.util.net.NioEndpoint.Acceptor        @Override        public void run() {
          int errorDelay = 0;
          // Loop until we receive a shutdown command while (running) {
          // Loop if endpoint is paused while (paused && running) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } }
          if (!running) { break; } state = AcceptorState.RUNNING;
          try { //if we have reached max connections, wait countUpOrAwaitConnection();
          SocketChannel socket = null; try { // Accept the next incoming connection from the server // socket // Nio 的 ServerSocketChannelImpl, 阻塞等待socket accept 事件 socket = serverSock.accept(); } catch (IOException ioe) { // We didn't get a socket countDownConnection(); if (running) { // Introduce delay if necessary errorDelay = handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } else { break; } } // Successful accept, reset the error delay errorDelay = 0;
          // Configure the socket if (running && !paused) { // setSocketOptions() will hand the socket off to // an appropriate processor if successful // 處理socket事件 if (!setSocketOptions(socket)) { closeSocket(socket); } } else { closeSocket(socket); } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("endpoint.accept.fail"), t); } } state = AcceptorState.ENDED; } /** * Process the specified connection. * @param socket The socket channel * @return <code>true</code> if the socket was correctly configured * and processing may continue, <code>false</code> if the socket needs to be * close immediately */ protected boolean setSocketOptions(SocketChannel socket) { // Process the connection try { //disable blocking, APR style, we are gonna be polling it // 組裝channel,交給 Pollor socket.configureBlocking(false); Socket sock = socket.socket(); socketProperties.setProperties(sock);
          NioChannel channel = nioChannels.pop(); if (channel == null) { SocketBufferHandler bufhandler = new SocketBufferHandler( socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); if (isSSLEnabled()) { channel = new SecureNioChannel(socket, bufhandler, selectorPool, this); } else { channel = new NioChannel(socket, bufhandler); } } else { channel.setIOChannel(socket); channel.reset(); } // 添加到 Pollor 隊(duì)列中,Poller 的獲取使用輪詢方式獲取 getPoller0().register(channel); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); try { log.error("",t); } catch (Throwable tt) { ExceptionUtils.handleThrowable(tt); } // Tell to close the socket return false; } return true; } /** * Return an available poller in true round robin fashion. * * @return The next poller in sequence */ public Poller getPoller0() { // 第1次取1,第2次取2,第3次取1... 輪詢 int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length; return pollers[idx]; } // org.apache.tomcat.util.net.NioEndpoint.Poller#register /** * Registers a newly created socket with the poller. * * @param socket The newly created socket */ public void register(final NioChannel socket) { socket.setPoller(this); NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this); socket.setSocketWrapper(ka); ka.setPoller(this); ka.setReadTimeout(getSocketProperties().getSoTimeout()); ka.setWriteTimeout(getSocketProperties().getSoTimeout()); ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); ka.setSecure(isSSLEnabled()); ka.setReadTimeout(getConnectionTimeout()); ka.setWriteTimeout(getConnectionTimeout()); PollerEvent r = eventCache.pop(); // 注冊O(shè)P_READ事件,給selector使用 ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. // 將socket信息添加到 PollerEvent 中 if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); else r.reset(socket,ka,OP_REGISTER); addEvent(r); } // 添加事件并喚醒selector // org.apache.tomcat.util.net.NioEndpoint.Poller#addEvent private void addEvent(PollerEvent event) { events.offer(event); // 正在select()阻塞中的 selector, wakeupCounter=-1, 即可被喚醒狀態(tài) if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup(); }
          // step2. Poller 使用selector池處理讀就緒事件 /** * The background thread that adds sockets to the Poller, checks the * poller for triggered events and hands the associated socket off to an * appropriate processor as events occur. */ @Override public void run() { // Loop until destroy() is called while (true) {
          boolean hasEvents = false;
          try { if (!close) { // events() 會(huì)檢查是否有acceptor提交過來的 PollerEvent, 如果有,會(huì)先初始化event // 向selector注冊讀事件等等,以便后續(xù) select() 生效 hasEvents = events(); if (wakeupCounter.getAndSet(-1) > 0) { //if we are here, means we have other stuff to do //do a non blocking select keyCount = selector.selectNow(); } else { keyCount = selector.select(selectorTimeout); } wakeupCounter.set(0); } if (close) { events(); timeout(0, false); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe); } break; } } catch (Throwable x) { ExceptionUtils.handleThrowable(x); log.error("",x); continue; } //either we timed out or we woke up, process events first if ( keyCount == 0 ) hasEvents = (hasEvents | events());
          Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (attachment == null) { iterator.remove(); } else { // 把key監(jiān)聽移除,然后去處理具體key, 網(wǎng)絡(luò)接入成功 iterator.remove(); processKey(sk, attachment); } }//while
          //process timeouts timeout(keyCount,hasEvents); }//while
          getStopLatch().countDown(); } // org.apache.tomcat.util.net.NioEndpoint.Poller#processKey protected void processKey(SelectionKey sk, NioSocketWrapper attachment) { try { if ( close ) { cancelledKey(sk); } else if ( sk.isValid() && attachment != null ) { if (sk.isReadable() || sk.isWritable() ) { // sendfile if ( attachment.getSendfileData() != null ) { processSendfile(sk,attachment, false); } else { // 取消事件監(jiān)聽,那么后續(xù)如何讀數(shù)據(jù)呢? // 這意味著當(dāng)前socket將會(huì)從epoll的表中移除掉,不再被其管理,但并不影響后續(xù)的read // 后續(xù)的read() 操作將以bio等式展開 unreg(sk, attachment, sk.readyOps()); boolean closeSocket = false; // Read goes before write // 優(yōu)先處理讀事件,再處理寫事件 if (sk.isReadable()) { if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) { closeSocket = true; } } if (!closeSocket && sk.isWritable()) { if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) { closeSocket = true; } } if (closeSocket) { cancelledKey(sk); } } } } else { //invalid key cancelledKey(sk); } } catch ( CancelledKeyException ckx ) { cancelledKey(sk); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error("",t); } } // org.apache.tomcat.util.net.AbstractEndpoint#processSocket /** * Process the given SocketWrapper with the given status. Used to trigger * processing as if the Poller (for those endpoints that have one) * selected the socket. * * @param socketWrapper The socket wrapper to process * @param event The socket event to be processed * @param dispatch Should the processing be performed on a new * container thread * * @return if processing was triggered successfully */ public boolean processSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) { try { if (socketWrapper == null) { return false; } // 使用線程池處理單個(gè)讀事件 SocketProcessorBase<S> sc = processorCache.pop(); if (sc == null) { sc = createSocketProcessor(socketWrapper, event); } else { sc.reset(socketWrapper, event); } // 線程池默認(rèn)10個(gè)核心線程 // 此處的線程池并非原生jdk的線程池ThreadPoolExecutor,而是經(jīng)過tomcat繼承過來的 org.apache.tomcat.util.threads.ThreadPoolExecutor, 主要用于做一次統(tǒng)計(jì)類工作 // 最終的socket處理將會(huì)由 SocketProcessor 進(jìn)行統(tǒng)一調(diào)度具體的Handler處理 Executor executor = getExecutor(); if (dispatch && executor != null) { executor.execute(sc); } else { sc.run(); } } catch (RejectedExecutionException ree) { getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree); return false; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full getLog().error(sm.getString("endpoint.process.fail"), t); return false; } return true; } // 以上過程,請求就從 Poller 中提交到了 SocketProcessor 了,將由 SocketProcessor 進(jìn)行統(tǒng)一處理 // org.apache.tomcat.util.net.NioEndpoint.SocketProcessor#doRun @Override protected void doRun() { NioChannel socket = socketWrapper.getSocket(); SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
          try { int handshake = -1;
          try { if (key != null) { if (socket.isHandshakeComplete()) { // No TLS handshaking required. Let the handler // process this socket / event combination. handshake = 0; } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT || event == SocketEvent.ERROR) { // Unable to complete the TLS handshake. Treat it as // if the handshake failed. handshake = -1; } else { handshake = socket.handshake(key.isReadable(), key.isWritable()); // The handshake process reads/writes from/to the // socket. status may therefore be OPEN_WRITE once // the handshake completes. However, the handshake // happens when the socket is opened so the status // must always be OPEN_READ after it completes. It // is OK to always set this as it is only used if // the handshake completes. event = SocketEvent.OPEN_READ; } } } catch (IOException x) { handshake = -1; if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x); } catch (CancelledKeyException ckx) { handshake = -1; } if (handshake == 0) { SocketState state = SocketState.OPEN; // Process the request from this socket if (event == null) { state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ); } else { // org.apache.coyote.AbstractProtocol$ConnectionHandler // 根據(jù)具體協(xié)議,創(chuàng)建不同的processor處理 如: Http11Processor // 此處hander統(tǒng)一調(diào)用外部類的父類處理為: org.apache.coyote.AbstractProtocol$ConnectionHandler state = getHandler().process(socketWrapper, event); } // 如果具體協(xié)議處理結(jié)果是 CLOSED, 那么就把該close關(guān)閉掉 // 從這個(gè)意義上來說,普通的請求實(shí)際上都是進(jìn)行長連接的(當(dāng)然了,客戶端一般會(huì)主動(dòng)再調(diào)用一個(gè)close(),這就沒法了) if (state == SocketState.CLOSED) { close(socket, key); } } else if (handshake == -1 ) { close(socket, key); } else if (handshake == SelectionKey.OP_READ){ socketWrapper.registerReadInterest(); } else if (handshake == SelectionKey.OP_WRITE){ socketWrapper.registerWriteInterest(); } } catch (CancelledKeyException cx) { socket.getPoller().cancelledKey(key); } catch (VirtualMachineError vme) { ExceptionUtils.handleThrowable(vme); } catch (Throwable t) { log.error("", t); socket.getPoller().cancelledKey(key); } finally { socketWrapper = null; event = null; //return to cache // 復(fù)用 proccosor 處理器 if (running && !paused) { processorCache.push(this); } } } } // 以上,就是整個(gè)http請求如何轉(zhuǎn)交給應(yīng)用處理的大體流程了。 // 不過還有一個(gè)問題:就是http請求處理完成之后,是關(guān)閉連接不是保持連接又當(dāng)如何判定呢? // 實(shí)際上它是通過協(xié)議處理完成后返回一個(gè) SocketState 來決定的,你如果有興趣,請繼續(xù)往下: // org.apache.coyote.AbstractProtocol.ConnectionHandler#process // 該 ConnectionHandler 將會(huì)統(tǒng)一管理實(shí)際的可復(fù)用的 Processor, 并針對無效的請求直接返回 SocketState.CLOSED, 以便直接關(guān)閉會(huì)話 @Override public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) { if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.process", wrapper.getSocket(), status)); } if (wrapper == null) { // Nothing to do. Socket has been closed. return SocketState.CLOSED; }
          S socket = wrapper.getSocket(); // 針對socket的處理,ConnectionHandler又使用了一個(gè)可復(fù)用的容器進(jìn)行管理processors, 避免大量創(chuàng)建processor的開銷 Processor processor = connections.get(socket); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet", processor, socket)); }
          // Async timeouts are calculated on a dedicated thread and then // dispatched. Because of delays in the dispatch process, the // timeout may no longer be required. Check here and avoid // unnecessary processing. if (SocketEvent.TIMEOUT == status && (processor == null || !processor.isAsync() || !processor.checkAsyncTimeoutGeneration())) { // This is effectively a NO-OP return SocketState.OPEN; }
          if (processor != null) { // Make sure an async timeout doesn't fire getProtocol().removeWaitingProcessor(processor); } else if (status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR) { // Nothing to do. Endpoint requested a close and there is no // longer a processor associated with this socket. return SocketState.CLOSED; }
          ContainerThreadMarker.set();
          try { if (processor == null) { String negotiatedProtocol = wrapper.getNegotiatedProtocol(); if (negotiatedProtocol != null) { UpgradeProtocol upgradeProtocol = getProtocol().getNegotiatedProtocol(negotiatedProtocol); if (upgradeProtocol != null) { processor = upgradeProtocol.getProcessor( wrapper, getProtocol().getAdapter()); } else if (negotiatedProtocol.equals("http/1.1")) { // Explicitly negotiated the default protocol. // Obtain a processor below. } else { // TODO: // OpenSSL 1.0.2's ALPN callback doesn't support // failing the handshake with an error if no // protocol can be negotiated. Therefore, we need to // fail the connection here. Once this is fixed, // replace the code below with the commented out // block. if (getLog().isDebugEnabled()) { getLog().debug(sm.getString( "abstractConnectionHandler.negotiatedProcessor.fail", negotiatedProtocol)); } return SocketState.CLOSED; /* * To replace the code above once OpenSSL 1.1.0 is * used. // Failed to create processor. This is a bug. throw new IllegalStateException(sm.getString( "abstractConnectionHandler.negotiatedProcessor.fail", negotiatedProtocol)); */ } } } if (processor == null) { processor = recycledProcessors.pop(); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.processorPop", processor)); } } if (processor == null) { processor = getProtocol().createProcessor(); register(processor); }
          processor.setSslSupport( wrapper.getSslSupport(getProtocol().getClientCertProvider()));
          // Associate the processor with the connection connections.put(socket, processor);
          SocketState state = SocketState.CLOSED; do { // 該state最終會(huì)由 具體的processor決定, 如: Http11Processor state = processor.process(wrapper, status);
          if (state == SocketState.UPGRADING) { // Get the HTTP upgrade handler UpgradeToken upgradeToken = processor.getUpgradeToken(); // Retrieve leftover input ByteBuffer leftOverInput = processor.getLeftoverInput(); if (upgradeToken == null) { // Assume direct HTTP/2 connection UpgradeProtocol upgradeProtocol = getProtocol().getUpgradeProtocol("h2c"); if (upgradeProtocol != null) { processor = upgradeProtocol.getProcessor( wrapper, getProtocol().getAdapter()); wrapper.unRead(leftOverInput); // Associate with the processor with the connection connections.put(socket, processor); } else { if (getLog().isDebugEnabled()) { getLog().debug(sm.getString( "abstractConnectionHandler.negotiatedProcessor.fail", "h2c")); } return SocketState.CLOSED; } } else { HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler(); // Release the Http11 processor to be re-used release(processor); // Create the upgrade processor processor = getProtocol().createUpgradeProcessor(wrapper, upgradeToken); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.upgradeCreate", processor, wrapper)); } wrapper.unRead(leftOverInput); // Mark the connection as upgraded wrapper.setUpgraded(true); // Associate with the processor with the connection connections.put(socket, processor); // Initialise the upgrade handler (which may trigger // some IO using the new protocol which is why the lines // above are necessary) // This cast should be safe. If it fails the error // handling for the surrounding try/catch will deal with // it. if (upgradeToken.getInstanceManager() == null) { httpUpgradeHandler.init((WebConnection) processor); } else { ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null); try { httpUpgradeHandler.init((WebConnection) processor); } finally { upgradeToken.getContextBind().unbind(false, oldCL); } } } } } while ( state == SocketState.UPGRADING);
          if (state == SocketState.LONG) { // In the middle of processing a request/response. Keep the // socket associated with the processor. Exact requirements // depend on type of long poll longPoll(wrapper, processor); if (processor.isAsync()) { getProtocol().addWaitingProcessor(processor); } } else if (state == SocketState.OPEN) { // In keep-alive but between requests. OK to recycle // processor. Continue to poll for the next request. connections.remove(socket); release(processor); wrapper.registerReadInterest(); } else if (state == SocketState.SENDFILE) { // Sendfile in progress. If it fails, the socket will be // closed. If it works, the socket either be added to the // poller (or equivalent) to await more data or processed // if there are any pipe-lined requests remaining. } else if (state == SocketState.UPGRADED) { // Don't add sockets back to the poller if this was a // non-blocking write otherwise the poller may trigger // multiple read events which may lead to thread starvation // in the connector. The write() method will add this socket // to the poller if necessary. if (status != SocketEvent.OPEN_WRITE) { longPoll(wrapper, processor); } } else if (state == SocketState.SUSPENDED) { // Don't add sockets back to the poller. // The resumeProcessing() method will add this socket // to the poller. } else { // Connection closed. OK to recycle the processor. Upgrade // processors are not recycled. connections.remove(socket); if (processor.isUpgrade()) { UpgradeToken upgradeToken = processor.getUpgradeToken(); HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler(); InstanceManager instanceManager = upgradeToken.getInstanceManager(); if (instanceManager == null) { httpUpgradeHandler.destroy(); } else { ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null); try { httpUpgradeHandler.destroy(); } finally { try { instanceManager.destroyInstance(httpUpgradeHandler); } catch (Throwable e) { ExceptionUtils.handleThrowable(e); getLog().error(sm.getString("abstractConnectionHandler.error"), e); } upgradeToken.getContextBind().unbind(false, oldCL); } } } else { release(processor); } } // 將處理的狀態(tài),返回給總控 Processor, 以便決定是否close socket return state; } catch(java.net.SocketException e) { // SocketExceptions are normal getLog().debug(sm.getString( "abstractConnectionHandler.socketexception.debug"), e); } catch (java.io.IOException e) { // IOExceptions are normal getLog().debug(sm.getString( "abstractConnectionHandler.ioexception.debug"), e); } catch (ProtocolException e) { // Protocol exceptions normally mean the client sent invalid or // incomplete data. getLog().debug(sm.getString( "abstractConnectionHandler.protocolexception.debug"), e); } // Future developers: if you discover any other // rare-but-nonfatal exceptions, catch them here, and log as // above. catch (Throwable e) { ExceptionUtils.handleThrowable(e); // any other exception or error is odd. Here we log it // with "ERROR" level, so it will show up even on // less-than-verbose logs. getLog().error(sm.getString("abstractConnectionHandler.error"), e); } finally { ContainerThreadMarker.clear(); }
          // Make sure socket/processor is removed from the list of current // connections connections.remove(socket); release(processor); return SocketState.CLOSED; }
          // org.apache.coyote.AbstractProcessorLight#process // Http11Processor 繼承該 AbstractProcessorLight, 使用模板方法模式處理細(xì)節(jié)不同點(diǎn) @Override public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status) throws IOException {
          SocketState state = SocketState.CLOSED; Iterator<DispatchType> dispatches = null; do { if (dispatches != null) { DispatchType nextDispatch = dispatches.next(); state = dispatch(nextDispatch.getSocketStatus()); } else if (status == SocketEvent.DISCONNECT) { // Do nothing here, just wait for it to get recycled } else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) { state = dispatch(status); if (state == SocketState.OPEN) { // There may be pipe-lined data to read. If the data isn't // processed now, execution will exit this loop and call // release() which will recycle the processor (and input // buffer) deleting any pipe-lined data. To avoid this, // process it now. state = service(socketWrapper); } } else if (status == SocketEvent.OPEN_WRITE) { // Extra write event likely after async, ignore state = SocketState.LONG; } else if (status == SocketEvent.OPEN_READ){ // 普通的http請求,將會(huì)被處理為 OPEN_READ state = service(socketWrapper); } else { // Default to closing the socket if the SocketEvent passed in // is not consistent with the current state of the Processor state = SocketState.CLOSED; }
          if (getLog().isDebugEnabled()) { getLog().debug("Socket: [" + socketWrapper + "], Status in: [" + status + "], State out: [" + state + "]"); }
          if (state != SocketState.CLOSED && isAsync()) { state = asyncPostProcess(); if (getLog().isDebugEnabled()) { getLog().debug("Socket: [" + socketWrapper + "], State after async post processing: [" + state + "]"); } }
          if (dispatches == null || !dispatches.hasNext()) { // Only returns non-null iterator if there are // dispatches to process. dispatches = getIteratorAndClearDispatches(); } // 循環(huán)處理請求,直接狀態(tài)是 CLOSE, 或者異步結(jié)束 } while (state == SocketState.ASYNC_END || dispatches != null && state != SocketState.CLOSED);
          return state; }
          // org.apache.coyote.http11.Http11Processor#service // 具體的協(xié)議處理方法,將返回處理結(jié)果狀態(tài),決定是否關(guān)閉 socket @Override public SocketState service(SocketWrapperBase<?> socketWrapper) throws IOException { // 由 RequestInfo 來管理整個(gè)處理的生命周期 // STAGE_PARSE -> STAGE_PREPARE -> STAGE_SERVICE -> STAGE_ENDINPUT -> STAGE_ENDOUTPUT -> STAGE_KEEPALIVE -> STAGE_ENDED RequestInfo rp = request.getRequestProcessor(); rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
          // Setting up the I/O setSocketWrapper(socketWrapper); inputBuffer.init(socketWrapper); outputBuffer.init(socketWrapper);
          // Flags keepAlive = true; openSocket = false; readComplete = true; boolean keptAlive = false; SendfileState sendfileState = SendfileState.DONE; // 此處會(huì)循環(huán)讀取 inputStream 進(jìn)行處理,如果只是一次 http 請求,則第一次處理完成之后,第二次將會(huì)產(chǎn)生 IOException // 從而觸發(fā)socket的關(guān)閉過程 while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null && sendfileState == SendfileState.DONE && !endpoint.isPaused()) {
          // Parsing the request header try { if (!inputBuffer.parseRequestLine(keptAlive)) { if (inputBuffer.getParsingRequestLinePhase() == -1) { return SocketState.UPGRADING; } else if (handleIncompleteRequestLineRead()) { break; } }
          if (endpoint.isPaused()) { // 503 - Service unavailable response.setStatus(503); setErrorState(ErrorState.CLOSE_CLEAN, null); } else { keptAlive = true; // Set this every time in case limit has been changed via JMX request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount()); if (!inputBuffer.parseHeaders()) { // We've read part of the request, don't recycle it // instead associate it with the socket openSocket = true; readComplete = false; break; } if (!disableUploadTimeout) { socketWrapper.setReadTimeout(connectionUploadTimeout); } } } catch (IOException e) { if (log.isDebugEnabled()) { log.debug(sm.getString("http11processor.header.parse"), e); } setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e); break; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); UserDataHelper.Mode logMode = userDataHelper.getNextMode(); if (logMode != null) { String message = sm.getString("http11processor.header.parse"); switch (logMode) { case INFO_THEN_DEBUG: message += sm.getString("http11processor.fallToDebug"); //$FALL-THROUGH$ case INFO: log.info(message, t); break; case DEBUG: log.debug(message, t); } } // 400 - Bad Request response.setStatus(400); setErrorState(ErrorState.CLOSE_CLEAN, t); getAdapter().log(request, response, 0); }
          // Has an upgrade been requested? Enumeration<String> connectionValues = request.getMimeHeaders().values("Connection"); boolean foundUpgrade = false; while (connectionValues.hasMoreElements() && !foundUpgrade) { foundUpgrade = connectionValues.nextElement().toLowerCase( Locale.ENGLISH).contains("upgrade"); }
          if (foundUpgrade) { // Check the protocol String requestedProtocol = request.getHeader("Upgrade");
          UpgradeProtocol upgradeProtocol = httpUpgradeProtocols.get(requestedProtocol); if (upgradeProtocol != null) { if (upgradeProtocol.accept(request)) { // TODO Figure out how to handle request bodies at this // point. response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS); response.setHeader("Connection", "Upgrade"); response.setHeader("Upgrade", requestedProtocol); action(ActionCode.CLOSE, null); getAdapter().log(request, response, 0);
          InternalHttpUpgradeHandler upgradeHandler = upgradeProtocol.getInternalUpgradeHandler( getAdapter(), cloneRequest(request)); UpgradeToken upgradeToken = new UpgradeToken(upgradeHandler, null, null); action(ActionCode.UPGRADE, upgradeToken); return SocketState.UPGRADING; } } }
          if (!getErrorState().isError()) { // Setting up filters, and parse some request headers rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE); try { prepareRequest(); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); if (log.isDebugEnabled()) { log.debug(sm.getString("http11processor.request.prepare"), t); } // 500 - Internal Server Error response.setStatus(500); setErrorState(ErrorState.CLOSE_CLEAN, t); getAdapter().log(request, response, 0); } }
          if (maxKeepAliveRequests == 1) { keepAlive = false; } else if (maxKeepAliveRequests > 0 && socketWrapper.decrementKeepAlive() <= 0) { keepAlive = false; }
          // Process the request in the adapter if (!getErrorState().isError()) { try { rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); // 使用適配器處理 request, 并忽略其返回狀態(tài)值, 此處真正的協(xié)議處理服務(wù),后續(xù)就是許多的 filterChain 處理了 getAdapter().service(request, response); // Handle when the response was committed before a serious // error occurred. Throwing a ServletException should both // set the status to 500 and set the errorException. // If we fail here, then the response is likely already // committed, so we can't try and set headers. if(keepAlive && !getErrorState().isError() && !isAsync() && statusDropsConnection(response.getStatus())) { setErrorState(ErrorState.CLOSE_CLEAN, null); } } catch (InterruptedIOException e) { setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e); } catch (HeadersTooLargeException e) { log.error(sm.getString("http11processor.request.process"), e); // The response should not have been committed but check it // anyway to be safe if (response.isCommitted()) { setErrorState(ErrorState.CLOSE_NOW, e); } else { response.reset(); response.setStatus(500); setErrorState(ErrorState.CLOSE_CLEAN, e); response.setHeader("Connection", "close"); // TODO: Remove } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("http11processor.request.process"), t); // 500 - Internal Server Error response.setStatus(500); setErrorState(ErrorState.CLOSE_CLEAN, t); getAdapter().log(request, response, 0); } }
          // Finish the handling of the request rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT); if (!isAsync()) { // If this is an async request then the request ends when it has // been completed. The AsyncContext is responsible for calling // endRequest() in that case. // 非異步請求,則處理 input 上下文 // 同時(shí)處理output endRequest(); } rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);
          // If there was an error, make sure the request is counted as // and error, and update the statistics counter if (getErrorState().isError()) { response.setStatus(500); }
          if (!isAsync() || getErrorState().isError()) { request.updateCounters(); // 更換request,response 為空,以便proccor池安全復(fù)用 if (getErrorState().isIoAllowed()) { inputBuffer.nextRequest(); outputBuffer.nextRequest(); } }
          if (!disableUploadTimeout) { int soTimeout = endpoint.getConnectionTimeout(); if(soTimeout > 0) { socketWrapper.setReadTimeout(soTimeout); } else { socketWrapper.setReadTimeout(0); } }
          rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE); // 正常處理完成一次請求,接著會(huì)進(jìn)入下一次處理流程,一般會(huì)以 IOException 結(jié)束 sendfileState = processSendfile(socketWrapper); }
          rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); // 所以,普通http請求,一般都會(huì)得到一次 CLOSE_CONNECTION_NOW 的狀態(tài),觸發(fā)立即關(guān)閉 socket if (getErrorState().isError() || endpoint.isPaused()) { return SocketState.CLOSED; } else if (isAsync()) { return SocketState.LONG; } else if (isUpgrade()) { return SocketState.UPGRADING; } else { if (sendfileState == SendfileState.PENDING) { return SocketState.SENDFILE; } else { if (openSocket) { if (readComplete) { return SocketState.OPEN; } else { return SocketState.LONG; } } else { return SocketState.CLOSED; } } } }

          當(dāng)把網(wǎng)絡(luò)請求接入進(jìn)來后,先會(huì)經(jīng)歷一系列的filterChain處理,然后其中某個(gè)Filter就會(huì)解析具體參數(shù)。

          對于文件處理,我們從對 multipart/form-data 的分支處理開始觀察:

              // org.apache.catalina.connector.Request#parseParameters    /**     * Parse request parameters.     */    protected void parseParameters() {
          parametersParsed = true;
          Parameters parameters = coyoteRequest.getParameters(); boolean success = false; try { ... String contentType = getContentType(); if (contentType == null) { contentType = ""; } int semicolon = contentType.indexOf(';'); if (semicolon >= 0) { contentType = contentType.substring(0, semicolon).trim(); } else { contentType = contentType.trim(); } // 如果是 multipart 類型數(shù)據(jù),解析每個(gè) part 數(shù)據(jù) if ("multipart/form-data".equals(contentType)) { parseParts(false); success = true; return; } ... success = true; } finally { if (!success) { parameters.setParseFailedReason(FailReason.UNKNOWN); } }
          } // 對 mulipart 的處理實(shí)現(xiàn) // org.apache.catalina.connector.Request#parseParts private void parseParts(boolean explicit) {
          // Return immediately if the parts have already been parsed if (parts != null || partsParseException != null) { return; } ... boolean success = false;
          // Create a new file upload handler DiskFileItemFactory factory = new DiskFileItemFactory(); try { factory.setRepository(location.getCanonicalFile()); } catch (IOException ioe) { parameters.setParseFailedReason(FailReason.IO_ERROR); partsParseException = ioe; return; } factory.setSizeThreshold(mce.getFileSizeThreshold()); // 使用 ServletFileUplad 組件去解析文件信息 ServletFileUpload upload = new ServletFileUpload(); upload.setFileItemFactory(factory); upload.setFileSizeMax(mce.getMaxFileSize()); upload.setSizeMax(mce.getMaxRequestSize());
          parts = new ArrayList<>(); try { // step1. 使用 ServletFileUpload 解析參數(shù)為 FileItem List<FileItem> items = upload.parseRequest(new ServletRequestContext(this)); int maxPostSize = getConnector().getMaxPostSize(); int postSize = 0; Charset charset = getCharset(); // step2. 將解析出的 FileItem 包裝為 ApplicationPart, 添加到當(dāng)前Request的 parts 容器中 for (FileItem item : items) { ApplicationPart part = new ApplicationPart(item, location); parts.add(part); // 文件名處理 if (part.getSubmittedFileName() == null) { String name = part.getName(); String value = null; try { value = part.getString(charset.name()); } catch (UnsupportedEncodingException uee) { // Not possible } if (maxPostSize >= 0) { // Have to calculate equivalent size. Not completely // accurate but close enough. postSize += name.getBytes(charset).length; if (value != null) { // Equals sign postSize++; // Value length postSize += part.getSize(); } // Value separator postSize++; if (postSize > maxPostSize) { parameters.setParseFailedReason(FailReason.POST_TOO_LARGE); throw new IllegalStateException(sm.getString( "coyoteRequest.maxPostSizeExceeded")); } } parameters.addParameter(name, value); } }
          success = true; } ... } finally { // This might look odd but is correct. setParseFailedReason() only // sets the failure reason if none is currently set. This code could // be more efficient but it is written this way to be robust with // respect to changes in the remainder of the method. if (partsParseException != null || !success) { parameters.setParseFailedReason(FailReason.UNKNOWN); } } }

          以上就是對文件上傳的大概處理邏輯:1. 使用 ServletFileUpload 解析參數(shù)為 List<FileItem>; 2. 使用 ApplicationPart 包裝FileItem, 放到 Request的parts字段中,以備后續(xù)使用??梢?,對文件的解析是在 ServletFileUpload 中完成的。

              // ServletFileUplad, 處理文件解析    // org.apache.tomcat.util.http.fileupload.FileUploadBase#parseRequest    /**     * Processes an <a >RFC 1867</a>     * compliant <code>multipart/form-data</code> stream.     *     * @param ctx The context for the request to be parsed.     *     * @return A list of <code>FileItem</code> instances parsed from the     *         request, in the order that they were transmitted.     *     * @throws FileUploadException if there are problems reading/parsing     *                             the request or storing files.     */    public List<FileItem> parseRequest(RequestContext ctx)            throws FileUploadException {        List<FileItem> items = new ArrayList<>();        boolean successful = false;        try {            // 依次迭代各part數(shù)據(jù)            FileItemIterator iter = getItemIterator(ctx);            FileItemFactory fac = getFileItemFactory();            if (fac == null) {                throw new NullPointerException("No FileItemFactory has been set.");            }            while (iter.hasNext()) {                // 每進(jìn)行一次迭代,就會(huì)創(chuàng)建一個(gè) FileItemStreamImpl, 創(chuàng)建一個(gè)新的 InputStream                final FileItemStream item = iter.next();                // Don't use getName() here to prevent an InvalidFileNameException.                final String fileName = ((FileItemIteratorImpl.FileItemStreamImpl) item).name;                // 為每個(gè)key 創(chuàng)建一個(gè)FileItem,用于輸出數(shù)據(jù)流                FileItem fileItem = fac.createItem(item.getFieldName(), item.getContentType(),                                                   item.isFormField(), fileName);                items.add(fileItem);                try {                    // 將socket中的數(shù)據(jù)流,寫入到 fileItem 創(chuàng)建的臨時(shí)文件中,達(dá)到框架上傳的目的                    // fileItem.getOutputStream() 會(huì)創(chuàng)建臨時(shí)文件                    // item.openStream() 會(huì)使用網(wǎng)絡(luò)io流作為數(shù)據(jù)來源,當(dāng)讀取到 -1, 認(rèn)為輸入結(jié)束了                    // 最終會(huì)將所有單個(gè)part的數(shù)據(jù)全部寫入當(dāng)前的臨時(shí)文件中                    Streams.copy(item.openStream(), fileItem.getOutputStream(), true);                } catch (FileUploadIOException e) {                    throw (FileUploadException) e.getCause();                } catch (IOException e) {                    throw new IOFileUploadException(String.format("Processing of %s request failed. %s",                                                           MULTIPART_FORM_DATA, e.getMessage()), e);                }                final FileItemHeaders fih = item.getHeaders();                fileItem.setHeaders(fih);            }            successful = true;            return items;        } catch (FileUploadIOException e) {            throw (FileUploadException) e.getCause();        } catch (IOException e) {            throw new FileUploadException(e.getMessage(), e);        } finally {            if (!successful) {                // 如果部分失敗,則全部將已上傳部分刪除                for (FileItem fileItem : items) {                    try {                        fileItem.delete();                    } catch (Exception ignored) {                        // ignored TODO perhaps add to tracker delete failure list somehow?                    }                }            }        }    }

          以上就是整個(gè)文件的上傳框架解析過程了。大概步驟就是:

          1. 基于 boundary, 使用迭代器模式依次創(chuàng)建InputStream();
          2. 每次失敗創(chuàng)建一個(gè) DiskFileItem 實(shí)例,用于存放讀取出的數(shù)據(jù)流;
          3. 將網(wǎng)絡(luò)InputStream 寫入到Disk的臨時(shí)文件中;
          4. 將各DiskFileItem作為輸入?yún)?shù)信息一起返回給應(yīng)用;

          下面,我們來看看它是如何基于 boundary 進(jìn)行迭代 FileItemIteratorImpl 的吧:

                  // 1. 單個(gè)文件流的迭代準(zhǔn)備工作        // FileItemIteratorImpl 會(huì)解析出content-Type, boundary 等信息,為后續(xù)迭代準(zhǔn)備        // org.apache.tomcat.util.http.fileupload.FileUploadBase.FileItemIteratorImpl#FileItemIteratorImpl        /**         * Creates a new instance.         *         * @param ctx The request context.         * @throws FileUploadException An error occurred while         *   parsing the request.         * @throws IOException An I/O error occurred.         */        FileItemIteratorImpl(RequestContext ctx)                throws FileUploadException, IOException {            if (ctx == null) {                throw new NullPointerException("ctx parameter");            }
          String contentType = ctx.getContentType(); if ((null == contentType) || (!contentType.toLowerCase(Locale.ENGLISH).startsWith(MULTIPART))) { throw new InvalidContentTypeException(String.format( "the request doesn't contain a %s or %s stream, content type header is %s", MULTIPART_FORM_DATA, MULTIPART_MIXED, contentType)); }

          final long requestSize = ((UploadContext) ctx).contentLength();
          InputStream input; // N.B. this is eventually closed in MultipartStream processing if (sizeMax >= 0) { if (requestSize != -1 && requestSize > sizeMax) { throw new SizeLimitExceededException(String.format( "the request was rejected because its size (%s) exceeds the configured maximum (%s)", Long.valueOf(requestSize), Long.valueOf(sizeMax)), requestSize, sizeMax); } // N.B. this is eventually closed in MultipartStream processing input = new LimitedInputStream(ctx.getInputStream(), sizeMax) { @Override protected void raiseError(long pSizeMax, long pCount) throws IOException { FileUploadException ex = new SizeLimitExceededException( String.format("the request was rejected because its size (%s) exceeds the configured maximum (%s)", Long.valueOf(pCount), Long.valueOf(pSizeMax)), pCount, pSizeMax); throw new FileUploadIOException(ex); } }; } else { // 從 ServletRequestContext 中獲取inputStream input = ctx.getInputStream(); }
          String charEncoding = headerEncoding; if (charEncoding == null) { charEncoding = ctx.getCharacterEncoding(); } // 解析 boundary 參數(shù),如: ----WebKitFormBoundarydyG19jnnVWC9U1zY boundary = getBoundary(contentType); if (boundary == null) { IOUtils.closeQuietly(input); // avoid possible resource leak throw new FileUploadException("the request was rejected because no multipart boundary was found"); }
          notifier = new MultipartStream.ProgressNotifier(listener, requestSize); try { multi = new MultipartStream(input, boundary, notifier); } catch (IllegalArgumentException iae) { IOUtils.closeQuietly(input); // avoid possible resource leak throw new InvalidContentTypeException( String.format("The boundary specified in the %s header is too long", CONTENT_TYPE), iae); } multi.setHeaderEncoding(charEncoding);
          skipPreamble = true; // 同樣會(huì)初始化第一個(gè)可被迭代的對象,后續(xù)的初始化動(dòng)作則是由 hasNext() 觸發(fā)。 findNextItem(); }
          // 在迭代輸入數(shù)據(jù)時(shí),會(huì)調(diào)用迭代方法 FileItemIteratorImpl next(); 讀取數(shù)據(jù) // org.apache.tomcat.util.http.fileupload.FileUploadBase.FileItemIteratorImpl#hasNext /** * Returns, whether another instance of {@link FileItemStream} * is available. * * @throws FileUploadException Parsing or processing the * file item failed. * @throws IOException Reading the file item failed. * @return True, if one or more additional file items * are available, otherwise false. */ @Override public boolean hasNext() throws FileUploadException, IOException { if (eof) { return false; } if (itemValid) { return true; } try { // 迭代出下一個(gè)part文件 return findNextItem(); } catch (FileUploadIOException e) { // unwrap encapsulated SizeException throw (FileUploadException) e.getCause(); } } // org.apache.tomcat.util.http.fileupload.FileUploadBase.FileItemIteratorImpl#findNextItem /** * Called for finding the next item, if any. * * @return True, if an next item was found, otherwise false. * @throws IOException An I/O error occurred. */ private boolean findNextItem() throws IOException { if (eof) { return false; } if (currentItem != null) { currentItem.close(); currentItem = null; } for (;;) { boolean nextPart; // 首次讀取時(shí),使用新的 boundary 跳過 // 后續(xù)則基于首個(gè)連續(xù)換行符進(jìn)行數(shù)據(jù)獲取, \r\n\r\n if (skipPreamble) { nextPart = multi.skipPreamble(); } else { nextPart = multi.readBoundary(); } // 如果沒有讀取到更多的part, 則返回 if (!nextPart) { // 讀取第一個(gè)沒有 boundary 的數(shù)據(jù),再搜索一次,如果還是沒有 boundary, 則認(rèn)為數(shù)據(jù)已結(jié)束 if (currentFieldName == null) { // Outer multipart terminated -> No more data eof = true; return false; } // Inner multipart terminated -> Return to parsing the outer multi.setBoundary(boundary); // 將當(dāng)前字段信息置空,下次如果再讀取不到 boundary, 則讀取結(jié)束 currentFieldName = null; continue; } // 以下有更多的 part 輸入 FileItemHeaders headers = getParsedHeaders(multi.readHeaders()); if (currentFieldName == null) { // We're parsing the outer multipart // 如: file, file2 ... String fieldName = getFieldName(headers); if (fieldName != null) { String subContentType = headers.getHeader(CONTENT_TYPE); if (subContentType != null && subContentType.toLowerCase(Locale.ENGLISH) .startsWith(MULTIPART_MIXED)) { currentFieldName = fieldName; // Multiple files associated with this field name byte[] subBoundary = getBoundary(subContentType); multi.setBoundary(subBoundary); skipPreamble = true; continue; } // 獲取文件名稱從 header 中 // Content-Disposition: form-data; name="file2"; filename="123-2.txt" String fileName = getFileName(headers); // 創(chuàng)建 FileItemStreamImpl, 以備后續(xù)迭代輸出,其構(gòu)造方法將會(huì)創(chuàng)建 stream 實(shí)例 // FileItemStreamImpl 是迭代器的一個(gè)內(nèi)部類,共享 multi 對象 // FileItemStreamImpl 會(huì)將網(wǎng)絡(luò)io流封裝為單個(gè)可讀取的inputStream, 備用 currentItem = new FileItemStreamImpl(fileName, fieldName, headers.getHeader(CONTENT_TYPE), fileName == null, getContentLength(headers)); currentItem.setHeaders(headers); notifier.noteItem(); itemValid = true; return true; } } else { String fileName = getFileName(headers); if (fileName != null) { currentItem = new FileItemStreamImpl(fileName, currentFieldName, headers.getHeader(CONTENT_TYPE), false, getContentLength(headers)); currentItem.setHeaders(headers); notifier.noteItem(); itemValid = true; return true; } } // 其他情況,直接丟棄當(dāng)前 body 數(shù)據(jù) multi.discardBodyData(); } } // org.apache.tomcat.util.http.fileupload.FileUploadBase.FileItemIteratorImpl.FileItemStreamImpl#FileItemStreamImpl /** * Creates a new instance. * * @param pName The items file name, or null. * @param pFieldName The items field name. * @param pContentType The items content type, or null. * @param pFormField Whether the item is a form field. * @param pContentLength The items content length, if known, or -1 * @throws IOException Creating the file item failed. */ FileItemStreamImpl(String pName, String pFieldName, String pContentType, boolean pFormField, long pContentLength) throws IOException { name = pName; fieldName = pFieldName; contentType = pContentType; formField = pFormField; // 創(chuàng)建一個(gè)inputStream 的流,備讀 // 讀到哪里算是結(jié)束呢?當(dāng) read() 返回-1時(shí),認(rèn)為輸入結(jié)束了 final ItemInputStream itemStream = multi.newInputStream(); InputStream istream = itemStream; if (fileSizeMax != -1) { if (pContentLength != -1 && pContentLength > fileSizeMax) { FileSizeLimitExceededException e = new FileSizeLimitExceededException( String.format("The field %s exceeds its maximum permitted size of %s bytes.", fieldName, Long.valueOf(fileSizeMax)), pContentLength, fileSizeMax); e.setFileName(pName); e.setFieldName(pFieldName); throw new FileUploadIOException(e); } // 限制最大大小,該值用 spring.http.multipart.max-file-size 配置, 但往往還會(huì)同時(shí)有一個(gè)參數(shù)需要配置: spring.http.multipart.max-request-size, // 因?yàn)楫?dāng)文件很大時(shí),請求也會(huì)非常大,所以一般會(huì)要求同時(shí)設(shè)置這兩個(gè)參數(shù),否則都會(huì)拋出超出最大允許大小限制異常 // 使用 LimitedInputStream 包裝提供的讀寫大小判定功能,原始istream接收網(wǎng)絡(luò)io輸入 istream = new LimitedInputStream(istream, fileSizeMax) { @Override protected void raiseError(long pSizeMax, long pCount) throws IOException { itemStream.close(true); FileSizeLimitExceededException e = new FileSizeLimitExceededException( String.format("The field %s exceeds its maximum permitted size of %s bytes.", fieldName, Long.valueOf(pSizeMax)), pCount, pSizeMax); e.setFieldName(fieldName); e.setFileName(name); throw new FileUploadIOException(e); } }; } stream = istream; } // 為每個(gè)part創(chuàng)建一個(gè) DiskFileItem, 用于存儲(chǔ)網(wǎng)絡(luò)io的文件流,備用 // org.apache.tomcat.util.http.fileupload.disk.DiskFileItemFactory#createItem /** * Create a new {@link DiskFileItem} * instance from the supplied parameters and the local factory * configuration. * * @param fieldName The name of the form field. * @param contentType The content type of the form field. * @param isFormField <code>true</code> if this is a plain form field; * <code>false</code> otherwise. * @param fileName The name of the uploaded file, if any, as supplied * by the browser or other client. * * @return The newly created file item. */ @Override public FileItem createItem(String fieldName, String contentType, boolean isFormField, String fileName) { DiskFileItem result = new DiskFileItem(fieldName, contentType, isFormField, fileName, sizeThreshold, repository); result.setDefaultCharset(defaultCharset); return result; } // next() 進(jìn)行迭代, 使用 itemValid 來控制單次迭代使用 /** * Returns the next available {@link FileItemStream}. * * @throws java.util.NoSuchElementException No more items are * available. Use {@link #hasNext()} to prevent this exception. * @throws FileUploadException Parsing or processing the * file item failed. * @throws IOException Reading the file item failed. * @return FileItemStream instance, which provides * access to the next file item. */ @Override public FileItemStream next() throws FileUploadException, IOException { if (eof || (!itemValid && !hasNext())) { throw new NoSuchElementException(); } itemValid = false; return currentItem; }
              // 2. 從網(wǎng)絡(luò)io流到臨時(shí)文件流的寫入    // 主要分為三步:網(wǎng)絡(luò)io流的獲?。槐镜匚募敵隽鞯墨@??;流的對接;            // 網(wǎng)絡(luò)io流即是在創(chuàng)建 FileItemStreamImpl 時(shí)生成的 stream;             // org.apache.tomcat.util.http.fileupload.FileUploadBase.FileItemIteratorImpl.FileItemStreamImpl#openStream            /**             * Returns an input stream, which may be used to             * read the items contents.             *             * @return Opened input stream.             * @throws IOException An I/O error occurred.             */            @Override            public InputStream openStream() throws IOException {                if (((Closeable) stream).isClosed()) {                    throw new FileItemStream.ItemSkippedException();                }                return stream;            }    // 而本地文件輸出流則是一個(gè)本地臨時(shí)文件,用于對接任意大小的輸入流    // org.apache.tomcat.util.http.fileupload.disk.DiskFileItem#getOutputStream    /**     * Returns an {@link java.io.OutputStream OutputStream} that can     * be used for storing the contents of the file.     *     * @return An {@link java.io.OutputStream OutputStream} that can be used     *         for storing the contents of the file.     *     * @throws IOException if an error occurs.     */    @Override    public OutputStream getOutputStream()        throws IOException {        if (dfos == null) {            // 創(chuàng)建臨時(shí)文件輸出            File outputFile = getTempFile();            // 使用 DeferredFileOutputStream 包裝臨時(shí)文件,            dfos = new DeferredFileOutputStream(sizeThreshold, outputFile);        }        return dfos;    }    // org.apache.tomcat.util.http.fileupload.disk.DiskFileItem#getTempFile    /**     * Creates and returns a {@link java.io.File File} representing a uniquely     * named temporary file in the configured repository path. The lifetime of     * the file is tied to the lifetime of the <code>FileItem</code> instance;     * the file will be deleted when the instance is garbage collected.     * <p>     * <b>Note: Subclasses that override this method must ensure that they return the     * same File each time.</b>     *     * @return The {@link java.io.File File} to be used for temporary storage.     */    protected File getTempFile() {        if (tempFile == null) {            File tempDir = repository;            if (tempDir == null) {                tempDir = new File(System.getProperty("java.io.tmpdir"));            }            // uid 同進(jìn)程相同, getUniqueId() 會(huì)返回一個(gè)自增的id, 保證進(jìn)程唯一,加上 uid 后,就可以確定臨時(shí)文件唯一了            String tempFileName =                    String.format("upload_%s_%s.tmp", UID, getUniqueId());
          tempFile = new File(tempDir, tempFileName); } return tempFile; } /** * Returns an identifier that is unique within the class loader used to * load this class, but does not have random-like appearance. * * @return A String with the non-random looking instance identifier. */ private static String getUniqueId() { final int limit = 100000000; int current = COUNTER.getAndIncrement(); String id = Integer.toString(current);
          // If you manage to get more than 100 million of ids, you'll // start getting ids longer than 8 characters. if (current < limit) { id = ("00000000" + id).substring(id.length()); } return id; } // 最后,將網(wǎng)絡(luò)io流對接到臨時(shí)文件流中,完成數(shù)據(jù)的接收 // org.apache.tomcat.util.http.fileupload.util.Streams#copy /** * Copies the contents of the given {@link InputStream} * to the given {@link OutputStream}. Shortcut for * <pre> * copy(pInputStream, pOutputStream, new byte[8192]); * </pre> * * @param inputStream The input stream, which is being read. * It is guaranteed, that {@link InputStream#close()} is called * on the stream. * @param outputStream The output stream, to which data should * be written. May be null, in which case the input streams * contents are simply discarded. * @param closeOutputStream True guarantees, that {@link OutputStream#close()} * is called on the stream. False indicates, that only * {@link OutputStream#flush()} should be called finally. * * @return Number of bytes, which have been copied. * @throws IOException An I/O error occurred. */ public static long copy(InputStream inputStream, OutputStream outputStream, boolean closeOutputStream) throws IOException { // 8096 return copy(inputStream, outputStream, closeOutputStream, new byte[DEFAULT_BUFFER_SIZE]); } /** * Copies the contents of the given {@link InputStream} * to the given {@link OutputStream}. * * @param inputStream The input stream, which is being read. * It is guaranteed, that {@link InputStream#close()} is called * on the stream. * @param outputStream The output stream, to which data should * be written. May be null, in which case the input streams * contents are simply discarded. * @param closeOutputStream True guarantees, that {@link OutputStream#close()} * is called on the stream. False indicates, that only * {@link OutputStream#flush()} should be called finally. * @param buffer Temporary buffer, which is to be used for * copying data. * @return Number of bytes, which have been copied. * @throws IOException An I/O error occurred. */ public static long copy(InputStream inputStream, OutputStream outputStream, boolean closeOutputStream, byte[] buffer) throws IOException { OutputStream out = outputStream; InputStream in = inputStream; try { long total = 0; for (;;) { // 阻塞式獲取文件數(shù)據(jù)流,寫入到臨時(shí)文件中 // 所以,如果我們上傳超大文件時(shí),實(shí)際上有相當(dāng)大部分的時(shí)間,只是框架和客戶端在交互,應(yīng)用代碼則不會(huì)感知到 // 直到所有上傳已完成 int res = in.read(buffer); if (res == -1) { break; } if (res > 0) { total += res; if (out != null) { out.write(buffer, 0, res); } } } if (out != null) { // 關(guān)閉輸出流,即關(guān)閉臨時(shí)文件實(shí)例 if (closeOutputStream) { out.close(); } else { out.flush(); } out = null; } in.close(); in = null; return total; } finally { IOUtils.closeQuietly(in); if (closeOutputStream) { IOUtils.closeQuietly(out); } } }

          總結(jié):主要點(diǎn)在于 boundary 的解析處理,難點(diǎn)在于io流的對接。包括接收請求io流,以及在響應(yīng)客戶端的os.flush(), 都是在做復(fù)雜的io操作。

          5. 使用過后的清理工作?

          外部請求的文件上傳,一般都會(huì)使用臨時(shí)文件進(jìn)行接收,比如上面的實(shí)現(xiàn)。雖說是臨時(shí)文件,但如果不做清理,每次上傳都生成一個(gè)臨時(shí)文件,則必然占用大量磁盤空間,遲早得耗盡資源。這不是一個(gè)好的框架該做的事?! ?/span>

          比如:spring框架中,DispatcherServlet會(huì)在使用完成 MultipartFile 后,會(huì)主動(dòng)做一些清理操作。

              // org.springframework.web.servlet.DispatcherServlet#doDispatch    /**     * Process the actual dispatching to the handler.     * <p>The handler will be obtained by applying the servlet's HandlerMappings in order.     * The HandlerAdapter will be obtained by querying the servlet's installed HandlerAdapters     * to find the first that supports the handler class.     * <p>All HTTP methods are handled by this method. It's up to HandlerAdapters or handlers     * themselves to decide which methods are acceptable.     * @param request current HTTP request     * @param response current HTTP response     * @throws Exception in case of any kind of processing failure     */    protected void doDispatch(HttpServletRequest request, HttpServletResponse response) throws Exception {        HttpServletRequest processedRequest = request;        HandlerExecutionChain mappedHandler = null;        boolean multipartRequestParsed = false;
          WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
          try { ModelAndView mv = null; Exception dispatchException = null;
          try { // 如果是文件類型,則會(huì)用 StandardMultipartHttpServletRequest 再包裝一層的處理 processedRequest = checkMultipart(request); multipartRequestParsed = (processedRequest != request);
          // Determine handler for the current request. mappedHandler = getHandler(processedRequest); if (mappedHandler == null || mappedHandler.getHandler() == null) { noHandlerFound(processedRequest, response); return; }
          // Determine handler adapter for the current request. HandlerAdapter ha = getHandlerAdapter(mappedHandler.getHandler());
          // Process last-modified header, if supported by the handler. String method = request.getMethod(); boolean isGet = "GET".equals(method); if (isGet || "HEAD".equals(method)) { long lastModified = ha.getLastModified(request, mappedHandler.getHandler()); if (logger.isDebugEnabled()) { logger.debug("Last-Modified value for [" + getRequestUri(request) + "] is: " + lastModified); } if (new ServletWebRequest(request, response).checkNotModified(lastModified) && isGet) { return; } }
          if (!mappedHandler.applyPreHandle(processedRequest, response)) { return; }
          // Actually invoke the handler. mv = ha.handle(processedRequest, response, mappedHandler.getHandler());
          if (asyncManager.isConcurrentHandlingStarted()) { return; }
          applyDefaultViewName(processedRequest, mv); mappedHandler.applyPostHandle(processedRequest, response, mv); } catch (Exception ex) { dispatchException = ex; } catch (Throwable err) { // As of 4.3, we're processing Errors thrown from handler methods as well, // making them available for @ExceptionHandler methods and other scenarios. dispatchException = new NestedServletException("Handler dispatch failed", err); } processDispatchResult(processedRequest, response, mappedHandler, mv, dispatchException); } catch (Exception ex) { triggerAfterCompletion(processedRequest, response, mappedHandler, ex); } catch (Throwable err) { triggerAfterCompletion(processedRequest, response, mappedHandler, new NestedServletException("Handler processing failed", err)); } finally { if (asyncManager.isConcurrentHandlingStarted()) { // Instead of postHandle and afterCompletion if (mappedHandler != null) { mappedHandler.applyAfterConcurrentHandlingStarted(processedRequest, response); } } else { // Clean up any resources used by a multipart request. // 如果本次有解析文件,則會(huì)做清理操作 if (multipartRequestParsed) { cleanupMultipart(processedRequest); } } } }
          // org.springframework.web.servlet.DispatcherServlet#cleanupMultipart /** * Clean up any resources used by the given multipart request (if any). * @param request current HTTP request * @see MultipartResolver#cleanupMultipart */ protected void cleanupMultipart(HttpServletRequest request) { // 獲取 MultipartHttpServletRequest 實(shí)例 MultipartHttpServletRequest multipartRequest = WebUtils.getNativeRequest(request, MultipartHttpServletRequest.class); if (multipartRequest != null) { // 清理文件 this.multipartResolver.cleanupMultipart(multipartRequest); } } // org.springframework.web.multipart.support.StandardServletMultipartResolver#cleanupMultipart @Override public void cleanupMultipart(MultipartHttpServletRequest request) { if (!(request instanceof AbstractMultipartHttpServletRequest) || ((AbstractMultipartHttpServletRequest) request).isResolved()) { // To be on the safe side: explicitly delete the parts, // but only actual file parts (for Resin compatibility) try { for (Part part : request.getParts()) { if (request.getFile(part.getName()) != null) { // 調(diào)用各part部分的接口,delete() part.delete(); } } } catch (Throwable ex) { LogFactory.getLog(getClass()).warn("Failed to perform cleanup of multipart items", ex); } } } // org.apache.catalina.core.ApplicationPart#delete @Override public void delete() throws IOException { fileItem.delete(); } // org.apache.tomcat.util.http.fileupload.disk.DiskFileItem#delete /** * Deletes the underlying storage for a file item, including deleting any * associated temporary disk file. Although this storage will be deleted * automatically when the <code>FileItem</code> instance is garbage * collected, this method can be used to ensure that this is done at an * earlier time, thus preserving system resources. */ @Override public void delete() { cachedContent = null; // 獲取臨時(shí)文件實(shí)例,如果存在則刪除 File outputFile = getStoreLocation(); if (outputFile != null && !isInMemory() && outputFile.exists()) { outputFile.delete(); } } // org.apache.tomcat.util.http.fileupload.disk.DiskFileItem#getStoreLocation /** * Returns the {@link java.io.File} object for the <code>FileItem</code>'s * data's temporary location on the disk. Note that for * <code>FileItem</code>s that have their data stored in memory, * this method will return <code>null</code>. When handling large * files, you can use {@link java.io.File#renameTo(java.io.File)} to * move the file to new location without copying the data, if the * source and destination locations reside within the same logical * volume. * * @return The data file, or <code>null</code> if the data is stored in * memory. */ public File getStoreLocation() { if (dfos == null) { return null; } if (isInMemory()) { return null; } return dfos.getFile(); }

          大概思路就是:1. 判斷是否存在文件類型的上傳;2. 從request中取出ApplicationPart;3. 遍歷每個(gè)FileItem, 依次獲取文件信息刪除;

          6. 斷點(diǎn)續(xù)傳

           http1.1 中增加了標(biāo)準(zhǔn)準(zhǔn)頭如:Range: bytes=0-2000 ,  Content-range: bytes 100-2000 用于請求服務(wù)器文件的部分內(nèi)容,服務(wù)端只需按照要求取出相應(yīng)的數(shù)據(jù)流返回即可。

          這樣做的好處是,增大了客戶端的控制力??蛻舳酥灰诒匾臅r(shí)候記錄這個(gè)偏移量,在發(fā)生網(wǎng)絡(luò)等故障,恢復(fù)后就可以從上次斷開的地方進(jìn)行請求,從而緊接上次未完下載任務(wù),實(shí)現(xiàn)斷點(diǎn)續(xù)傳。

           我們以http協(xié)議作為出發(fā)點(diǎn),講解了文件上傳下載的基本原理,以tomcat的處理過程為樣板看了對該協(xié)議的實(shí)現(xiàn)方式。實(shí)際上協(xié)議可以有很多,比如各im工具,各下載工具,一般都會(huì)有自己的一套傳輸協(xié)議。不能說一樣吧,但大方向應(yīng)該是相通的。




          往期精彩推薦



          騰訊、阿里、滴滴后臺(tái)面試題匯總總結(jié) — (含答案)

          面試:史上最全多線程面試題 !

          最新阿里內(nèi)推Java后端面試題

          JVM難學(xué)?那是因?yàn)槟銢]認(rèn)真看完這篇文章


          END


          關(guān)注作者微信公眾號 —《JAVA爛豬皮》


          了解更多java后端架構(gòu)知識(shí)以及最新面試寶典


          你點(diǎn)的每個(gè)好看,我都認(rèn)真當(dāng)成了


          看完本文記得給作者點(diǎn)贊+在看哦~~~大家的支持,是作者源源不斷出文的動(dòng)力


          作者:等你歸去來

          出處:https://www.cnblogs.com/yougewe/p/12916211.html

          瀏覽 55
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  欧美性猛交XXX 乱大交3 欧美一級黃色A片免費看野花 | 五月天色婷婷综合 | 伊人网站 | 青青草亚洲无码 | 人妻无码AV |