<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          源碼分析 Node 的 Cluster 模塊

          共 7334字,需瀏覽 15分鐘

           ·

          2020-07-16 11:15

          c4d02352ade7d97e8aa195708bead1c8.webp

          作者 |?uerwtoy

          來源 |?yq.aliyun.com/articles/717323

          前段時間,公司的洋彬哥老哥遇到一個問題,大概就是本機有個node的http服務(wù)器,但是每次請求這個服務(wù)器的端口返回的數(shù)據(jù)都報錯,一看返回的數(shù)據(jù)根本不是http的報文格式,然后經(jīng)過一番排查發(fā)現(xiàn)是另外一個服務(wù)器同時監(jiān)聽了http服務(wù)器的這個端口。

          這個時候洋彬老哥就很奇怪,為啥我這個端口明明使用了,卻還是可以啟動呢?這個時候我根據(jù)以前看libuv源碼的經(jīng)驗解釋了這個問題,因為uv__tcp_bind中,對socket會設(shè)置SO_REUSEADDR選項,使得端口可以復(fù)用,但是tcp中地址不能復(fù)用,因為那兩個監(jiān)聽雖然是同一個端口,但是地址不同,所以可以同時存在。

          這個問題讓我不禁想到了之前看一篇文章里有人留言說這個選項是cluster內(nèi)部復(fù)用端口的原因,當(dāng)時沒有細(xì)細(xì)研究以為說的是SO_REUSEPORT也就沒有細(xì)想,但是這次因為這個問題仔細(xì)看了下結(jié)果是設(shè)置的SO_REUSEADDR選項,這個選項雖然能復(fù)用端口,但是前提是每個ip地址不同,比如可以同時監(jiān)聽'0.0.0.0'和'192.168.0.12'的端口,但不能兩個都是'0.0.0.0'的同一個 端口,如果cluster是用這個來實現(xiàn)的,那要是多起幾個子進(jìn)程很明顯ip地址不夠用啊,于是就用node文檔中的例子試了下:

          const cluster = require('cluster');
          const http = require('http');
          const numCPUs = require('os').cpus().length;

          if (cluster.isMaster) {
          console.log(`Master ${process.pid} is running`);

          // Fork workers.
          for (let i = 0; i < numCPUs; i++) {
          cluster.fork();
          }

          cluster.on('exit', (worker, code, signal) => {
          console.log(`worker ${worker.process.pid} died`);
          });
          } else {
          // Workers can share any TCP connection
          // In this case it is an HTTP server
          http.createServer((req, res) => {
          res.writeHead(200);
          res.end('hello world\n');
          }).listen(8000);

          console.log(`Worker ${process.pid} started`);
          }

          在使用cluster的在幾個子進(jìn)程同時監(jiān)聽了8000端口后,查看了一下只有主進(jìn)程監(jiān)聽了這個端口,其他都沒有。這個時候,我猜測node還是使用在父進(jìn)程中創(chuàng)建sever的io但是這個父進(jìn)程應(yīng)該就是通過Unix域套接字的cmsg_data將父進(jìn)程中收到客戶端套接字描述符傳遞給子進(jìn)程然后讓子進(jìn)程來處理具體的數(shù)據(jù)與邏輯,但是node到底是如何通過在子進(jìn)程中createServer并且listen但是只在父進(jìn)程中真的監(jiān)聽了該端口來實現(xiàn)這個邏輯的呢?這個問題引起了我的好奇,讓我不得不到源碼中一探究竟。

          從net模塊出發(fā)

          按理說,這個問題我們應(yīng)該直接通過cluster模塊來分析,但是很明顯,在加載http模塊的時候并不會像cluster模塊啟動時一樣通過去判斷NODE_ENV來加載不同的模塊,但是從上面的分析,我可以得出子進(jìn)程中的createServer執(zhí)行了跟父進(jìn)程不同的操作,所以只能說明http模塊中通過isMaster這樣的判斷來進(jìn)行了不同的操作,不過http.js和_http_server.js中都沒有這個判斷,但是通過對createServer向上的查找我在net.js的listenInCluster中找到了isMaster的判斷,listenInCluster會在createServer后的server.listen(8000)中調(diào)用,所以我們可以看下他的關(guān)鍵邏輯。

          if (cluster === null) cluster = require('cluster');

          if (cluster.isMaster || exclusive) {
          //父進(jìn)程中,通過_listen2方法就能開始正常的監(jiān)聽了
          server._listen2(address, port, addressType, backlog, fd);
          return;
          }

          const serverQuery = {
          address: address,
          port: port,
          addressType: addressType,
          fd: fd,
          flags: 0
          };

          // 子進(jìn)程通過獲取父進(jìn)程的server句柄
          // 并通過listenOnMasterHandle監(jiān)聽它
          cluster._getServer(server, serverQuery, listenOnMasterHandle);

          從這段代碼中我們可以看出,如果是在父進(jìn)程中,直接通過_listen2的邏輯就能開始正常的監(jiān)聽了,但是在子進(jìn)程中,會通過cluster._getServer的方式獲取父進(jìn)程的句柄,并通過回調(diào)函數(shù)listenOnMasterHandle監(jiān)聽它。看到這里我其實比較疑惑,因為在我對于網(wǎng)絡(luò)編程的學(xué)習(xí)中,只聽說過傳遞描述符的,這個傳遞server的句柄實在是太新鮮了,于是趕緊繼續(xù)深入研究了起來。

          深入cluster的代碼

          首先,來看一下_gerServer的方法的代碼。

          const message = util._extend({
          act: 'queryServer',
          index: indexes[indexesKey],
          data: null
          }, options);
          send(message, (reply, handle) => {
          if (typeof obj._setServerData === 'function')
          obj._setServerData(reply.data);

          if (handle)
          shared(reply, handle, indexesKey, cb); // Shared listen socket.
          else
          rr(reply, indexesKey, cb); // Round-robin.
          });

          這個方法通過send像主進(jìn)程發(fā)送一個包,因為在send函數(shù)中有這樣一句代碼:

          message = util._extend({ cmd: 'NODE_CLUSTER' }, message);

          通過Node的文檔,我們可以知道這種cmd帶了Node字符串的包,父進(jìn)程會通過internalMessage事件來響應(yīng),所以我們可以從internal/cluster/master.js中看到找到,對應(yīng)于act: 'queryServer'的處理函數(shù)queryServer的代碼。

            ...
          var constructor = RoundRobinHandle;
          ...
          handle = new constructor(key, message.address,message.port,message.addressType,message.fd,message.flags);
          ...
          //queryServer實際是通過RoundRobinHandle的add方法
          //
          handle.add(worker, (errno, reply, handle) => {
          //根據(jù)子進(jìn)程傳來的act組裝返回的對象
          reply = util._extend({
          errno: errno,
          key: key,
          ack: message.seq,//子進(jìn)程用來確認(rèn)是哪個命令的返回結(jié)果
          data: handles[key].data
          }, reply);

          if (errno)
          delete handles[key]; // Gives other workers a chance to retry.

          send(worker, reply, handle);
          });

          這里創(chuàng)建了一個RoundRobinHandle實例,在該實例的構(gòu)造函數(shù)中通過代碼:

            this.server = net.createServer(assert.fail);

          if (fd >= 0)
          this.server.listen({ fd });
          else if (port >= 0)
          this.server.listen(port, address);
          else
          this.server.listen(address); // UNIX socket path.

          this.server.once('listening', () => {
          this.handle = this.server._handle;
          //新連接到達(dá)時分發(fā)這個handle
          //distribute函數(shù)是給子進(jìn)程分派任務(wù)的重要函數(shù)
          this.handle.onconnection = (err, handle) => this.distribute(err, handle);
          this.server._handle = null;
          this.server = null;
          });

          在父進(jìn)程中生成了一個server,并且通過注冊listen的方法將有新的客戶端連接到達(dá)時執(zhí)行的onconnection改成了使用自身的this.distribute函數(shù),這個函數(shù)我們先記下因為他是后來父進(jìn)程給子進(jìn)程派發(fā)任務(wù)的重要函數(shù)。說回getServer的代碼,這里通過RoundRobinHandle實例的add方法:

            //在server開始監(jiān)聽端口后,通過done函數(shù)中handle.add
          //傳入的匿名函數(shù)給子進(jìn)程的getServer命令以返回
          const done = () => {
          if (this.handle.getsockname) {
          const out = {};
          this.handle.getsockname(out);
          // TODO(bnoordhuis) Check err.
          send(null, { sockname: out }, null);
          } else {
          send(null, null, null); // UNIX socket.
          }

          this.handoff(worker); // In case there are connections pending.
          };

          // Still busy binding.
          this.server.once('listening', done);

          會給子進(jìn)程的getServer以回復(fù)。從這里我們可以看到在給子進(jìn)程的回復(fù)中handle一直都是null。那這個所謂的去取得父進(jìn)程的server是怎么取得的呢?這個地方讓我困惑了一下,不過后來看子進(jìn)程的代碼我就明白了,實際上根本不存在什么取得父進(jìn)程server的句柄,這個地方的注釋迷惑了閱讀者,從之前子進(jìn)程的回調(diào)中我們可以看到,返回的handle決定子進(jìn)程是用shared方式(udp)還是Round-robin的方式(tcp)來處理父進(jìn)程派下來的任務(wù)。從這個回調(diào)函數(shù)我們就可以看出,子進(jìn)程是沒有任何獲取句柄的操作的,那它是如何處理的呢?我們通過該例子中的rr方法可以看到:

            const handle = { close, listen, ref: noop, unref: noop };

          if (message.sockname) {
          handle.getsockname = getsockname; // TCP handles only.
          }

          handles[key] = handle;
          cb(0, handle);

          這個函數(shù)中生成了一個自帶listen和close方法的對象,并傳遞給了函數(shù)listenOnMasterHandle,雖然這個名字寫的是在父進(jìn)程的server句柄上監(jiān)聽,實際上我們這個例子中是子進(jìn)程自建了一個handle,但是如果是udp的情況下這個函數(shù)名字還確實就是這么回事,原因在于SO_REUSEADDR選項,里面有這樣一個解釋:

          SO_REUSEADDR允許完全相同的地址和端口的重復(fù)綁定。但這只用于UDP的多播,不用于TCP。

          所以,在udp情況同一個地址和端口是可以重復(fù)監(jiān)聽的(之前網(wǎng)上看到那個哥們兒說的也沒問題,只是一葉障目了),所以可以共享父進(jìn)程的handle,跟TCP的情況不同。我們繼續(xù)來看當(dāng)前這個TCP的情況,在這個情況下listenOnMasterHandle會將我們在子進(jìn)程中自己生成的handle對象傳入子進(jìn)程中通過createServer創(chuàng)建的server的_handle屬性中并通過

          server._listen2(address, port, addressType, backlog, fd);

          做了一個假的監(jiān)聽操作,實際上因為_handle的存在這里只會為之前_handle賦值一個onconnection函數(shù),這個函數(shù)的觸發(fā)則跟父進(jìn)程中通過真實的客戶端連接觸發(fā)的時機不同,而是通過

          process.on('internalMessage', (message, handle) {
          if (message.act === 'newconn')
          onconnection(message, handle);
          else if (message.act === 'disconnect')
          _disconnect.call(worker, true);
          }

          中注冊的internalMessage事件中的對父進(jìn)程傳入的act為newconn的包觸發(fā)。而父進(jìn)程中就通過我們剛剛說到的改寫了server對象的onconnection函數(shù)的distribute函數(shù),這個函數(shù)中會調(diào)用一個叫handoff的函數(shù),通過代碼:

            const message = { act: 'newconn', key: this.key };
          sendHelper(worker.process, message, handle, (reply) => {
          if (reply.accepted)
          handle.close();
          else
          this.distribute(0, handle); // Worker is shutting down. Send to another.

          this.handoff(worker);
          });

          其中send到子進(jìn)程的handle就是新連接客戶端的句柄,Node中父子進(jìn)程之間的通信最后是通過src/stream_base.cc中的StreamBase::WriteString函數(shù)實現(xiàn)的,從這段代碼我們可以看出:

          ...
          //當(dāng)進(jìn)程間通信時
          uv_handle_t* send_handle = nullptr;

          if (!send_handle_obj.IsEmpty()) {
          HandleWrap* wrap;
          ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
          send_handle = wrap->GetHandle();
          // Reference LibuvStreamWrap instance to prevent it from being garbage
          // collected before `AfterWrite` is called.
          CHECK_EQ(false, req_wrap->persistent().IsEmpty());
          req_wrap_obj->Set(env->handle_string(), send_handle_obj);
          }

          err = DoWrite(
          req_wrap,
          &buf,
          1,
          //將父進(jìn)程獲取的客戶端句柄傳遞子進(jìn)程
          reinterpret_cast(send_handle));

          可以看到,在調(diào)用此方式時,如果傳入了一個客戶端的句柄則通過Dowrite方法最后通過輔助數(shù)據(jù)cmsg_data將客戶端句柄的套接字fd傳送到子進(jìn)程中進(jìn)行處理。看到這里我不禁恍然大悟,原來還是走的是我熟悉的那套網(wǎng)絡(luò)編程的邏輯啊。

          總結(jié)

          通過上面的一輪分析,我們可以總結(jié)出以下兩個結(jié)論:

          1. 創(chuàng)建TCP服務(wù)器時會在父進(jìn)程中創(chuàng)建一個server并監(jiān)聽目標(biāo)端口,新連接到達(dá)Accept這個client后,再通過ipc的高級方法將新連接的句柄(也就是這個socket的文件描述符)通過輪詢的方式分配到一個子進(jìn)程中,然后在這個子進(jìn)程中通過read和write處理新連接的數(shù)據(jù)和請求,所以只有主進(jìn)程會監(jiān)聽目標(biāo)ip和端口。

          2. 創(chuàng)建UDP服務(wù)器,會共享在父進(jìn)程中創(chuàng)建的server的句柄對象,并且在子進(jìn)程中都會監(jiān)聽到跟對象相同的ip地址和端口上,所以創(chuàng)建n個子進(jìn)程則會有n+1個進(jìn)程同時監(jiān)聽到目標(biāo)ip和端口上。

          作者 |?uerwtoy

          來源 |?yq.aliyun.com/articles/717323

          ??愛心三連擊

          1.看到這里了就點個在看支持下吧,你的在看是我創(chuàng)作的動力。

          2.關(guān)注公眾號程序員成長指北,「帶你一起學(xué)Node」!

          3.特殊階段,帶好口罩,做好個人防護(hù)。

          4.可以添加我微信【ikoala520】,拉你進(jìn)技術(shù)交流群一起學(xué)習(xí)。

          “在看轉(zhuǎn)發(fā)”是最大的支持



          瀏覽 23
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  男人的天堂网av 男人天堂无码视频 | 色婷婷在线视频观看 | 亚洲高清视频不卡 | 澳门成人无码视频免费播放 | 91成人久久久久久久 |