源碼分析 Node 的 Cluster 模塊

作者 |?uerwtoy
來源 |?yq.aliyun.com/articles/717323
前段時間,公司的洋彬哥老哥遇到一個問題,大概就是本機有個node的http服務(wù)器,但是每次請求這個服務(wù)器的端口返回的數(shù)據(jù)都報錯,一看返回的數(shù)據(jù)根本不是http的報文格式,然后經(jīng)過一番排查發(fā)現(xiàn)是另外一個服務(wù)器同時監(jiān)聽了http服務(wù)器的這個端口。
這個時候洋彬老哥就很奇怪,為啥我這個端口明明使用了,卻還是可以啟動呢?這個時候我根據(jù)以前看libuv源碼的經(jīng)驗解釋了這個問題,因為uv__tcp_bind中,對socket會設(shè)置SO_REUSEADDR選項,使得端口可以復(fù)用,但是tcp中地址不能復(fù)用,因為那兩個監(jiān)聽雖然是同一個端口,但是地址不同,所以可以同時存在。
這個問題讓我不禁想到了之前看一篇文章里有人留言說這個選項是cluster內(nèi)部復(fù)用端口的原因,當(dāng)時沒有細(xì)細(xì)研究以為說的是SO_REUSEPORT也就沒有細(xì)想,但是這次因為這個問題仔細(xì)看了下結(jié)果是設(shè)置的SO_REUSEADDR選項,這個選項雖然能復(fù)用端口,但是前提是每個ip地址不同,比如可以同時監(jiān)聽'0.0.0.0'和'192.168.0.12'的端口,但不能兩個都是'0.0.0.0'的同一個 端口,如果cluster是用這個來實現(xiàn)的,那要是多起幾個子進(jìn)程很明顯ip地址不夠用啊,于是就用node文檔中的例子試了下:
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`);
// Fork workers.
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('exit', (worker, code, signal) => {
console.log(`worker ${worker.process.pid} died`);
});
} else {
// Workers can share any TCP connection
// In this case it is an HTTP server
http.createServer((req, res) => {
res.writeHead(200);
res.end('hello world\n');
}).listen(8000);
console.log(`Worker ${process.pid} started`);
}
在使用cluster的在幾個子進(jìn)程同時監(jiān)聽了8000端口后,查看了一下只有主進(jìn)程監(jiān)聽了這個端口,其他都沒有。這個時候,我猜測node還是使用在父進(jìn)程中創(chuàng)建sever的io但是這個父進(jìn)程應(yīng)該就是通過Unix域套接字的cmsg_data將父進(jìn)程中收到客戶端套接字描述符傳遞給子進(jìn)程然后讓子進(jìn)程來處理具體的數(shù)據(jù)與邏輯,但是node到底是如何通過在子進(jìn)程中createServer并且listen但是只在父進(jìn)程中真的監(jiān)聽了該端口來實現(xiàn)這個邏輯的呢?這個問題引起了我的好奇,讓我不得不到源碼中一探究竟。
從net模塊出發(fā)
按理說,這個問題我們應(yīng)該直接通過cluster模塊來分析,但是很明顯,在加載http模塊的時候并不會像cluster模塊啟動時一樣通過去判斷NODE_ENV來加載不同的模塊,但是從上面的分析,我可以得出子進(jìn)程中的createServer執(zhí)行了跟父進(jìn)程不同的操作,所以只能說明http模塊中通過isMaster這樣的判斷來進(jìn)行了不同的操作,不過http.js和_http_server.js中都沒有這個判斷,但是通過對createServer向上的查找我在net.js的listenInCluster中找到了isMaster的判斷,listenInCluster會在createServer后的server.listen(8000)中調(diào)用,所以我們可以看下他的關(guān)鍵邏輯。
if (cluster === null) cluster = require('cluster');
if (cluster.isMaster || exclusive) {
//父進(jìn)程中,通過_listen2方法就能開始正常的監(jiān)聽了
server._listen2(address, port, addressType, backlog, fd);
return;
}
const serverQuery = {
address: address,
port: port,
addressType: addressType,
fd: fd,
flags: 0
};
// 子進(jìn)程通過獲取父進(jìn)程的server句柄
// 并通過listenOnMasterHandle監(jiān)聽它
cluster._getServer(server, serverQuery, listenOnMasterHandle);
從這段代碼中我們可以看出,如果是在父進(jìn)程中,直接通過_listen2的邏輯就能開始正常的監(jiān)聽了,但是在子進(jìn)程中,會通過cluster._getServer的方式獲取父進(jìn)程的句柄,并通過回調(diào)函數(shù)listenOnMasterHandle監(jiān)聽它。看到這里我其實比較疑惑,因為在我對于網(wǎng)絡(luò)編程的學(xué)習(xí)中,只聽說過傳遞描述符的,這個傳遞server的句柄實在是太新鮮了,于是趕緊繼續(xù)深入研究了起來。
深入cluster的代碼
首先,來看一下_gerServer的方法的代碼。
const message = util._extend({
act: 'queryServer',
index: indexes[indexesKey],
data: null
}, options);
send(message, (reply, handle) => {
if (typeof obj._setServerData === 'function')
obj._setServerData(reply.data);
if (handle)
shared(reply, handle, indexesKey, cb); // Shared listen socket.
else
rr(reply, indexesKey, cb); // Round-robin.
});
這個方法通過send像主進(jìn)程發(fā)送一個包,因為在send函數(shù)中有這樣一句代碼:
message = util._extend({ cmd: 'NODE_CLUSTER' }, message);
通過Node的文檔,我們可以知道這種cmd帶了Node字符串的包,父進(jìn)程會通過internalMessage事件來響應(yīng),所以我們可以從internal/cluster/master.js中看到找到,對應(yīng)于act: 'queryServer'的處理函數(shù)queryServer的代碼。
...
var constructor = RoundRobinHandle;
...
handle = new constructor(key, message.address,message.port,message.addressType,message.fd,message.flags);
...
//queryServer實際是通過RoundRobinHandle的add方法
//
handle.add(worker, (errno, reply, handle) => {
//根據(jù)子進(jìn)程傳來的act組裝返回的對象
reply = util._extend({
errno: errno,
key: key,
ack: message.seq,//子進(jìn)程用來確認(rèn)是哪個命令的返回結(jié)果
data: handles[key].data
}, reply);
if (errno)
delete handles[key]; // Gives other workers a chance to retry.
send(worker, reply, handle);
});
這里創(chuàng)建了一個RoundRobinHandle實例,在該實例的構(gòu)造函數(shù)中通過代碼:
this.server = net.createServer(assert.fail);
if (fd >= 0)
this.server.listen({ fd });
else if (port >= 0)
this.server.listen(port, address);
else
this.server.listen(address); // UNIX socket path.
this.server.once('listening', () => {
this.handle = this.server._handle;
//新連接到達(dá)時分發(fā)這個handle
//distribute函數(shù)是給子進(jìn)程分派任務(wù)的重要函數(shù)
this.handle.onconnection = (err, handle) => this.distribute(err, handle);
this.server._handle = null;
this.server = null;
});
在父進(jìn)程中生成了一個server,并且通過注冊listen的方法將有新的客戶端連接到達(dá)時執(zhí)行的onconnection改成了使用自身的this.distribute函數(shù),這個函數(shù)我們先記下因為他是后來父進(jìn)程給子進(jìn)程派發(fā)任務(wù)的重要函數(shù)。說回getServer的代碼,這里通過RoundRobinHandle實例的add方法:
//在server開始監(jiān)聽端口后,通過done函數(shù)中handle.add
//傳入的匿名函數(shù)給子進(jìn)程的getServer命令以返回
const done = () => {
if (this.handle.getsockname) {
const out = {};
this.handle.getsockname(out);
// TODO(bnoordhuis) Check err.
send(null, { sockname: out }, null);
} else {
send(null, null, null); // UNIX socket.
}
this.handoff(worker); // In case there are connections pending.
};
// Still busy binding.
this.server.once('listening', done);
會給子進(jìn)程的getServer以回復(fù)。從這里我們可以看到在給子進(jìn)程的回復(fù)中handle一直都是null。那這個所謂的去取得父進(jìn)程的server是怎么取得的呢?這個地方讓我困惑了一下,不過后來看子進(jìn)程的代碼我就明白了,實際上根本不存在什么取得父進(jìn)程server的句柄,這個地方的注釋迷惑了閱讀者,從之前子進(jìn)程的回調(diào)中我們可以看到,返回的handle決定子進(jìn)程是用shared方式(udp)還是Round-robin的方式(tcp)來處理父進(jìn)程派下來的任務(wù)。從這個回調(diào)函數(shù)我們就可以看出,子進(jìn)程是沒有任何獲取句柄的操作的,那它是如何處理的呢?我們通過該例子中的rr方法可以看到:
const handle = { close, listen, ref: noop, unref: noop };
if (message.sockname) {
handle.getsockname = getsockname; // TCP handles only.
}
handles[key] = handle;
cb(0, handle);
這個函數(shù)中生成了一個自帶listen和close方法的對象,并傳遞給了函數(shù)listenOnMasterHandle,雖然這個名字寫的是在父進(jìn)程的server句柄上監(jiān)聽,實際上我們這個例子中是子進(jìn)程自建了一個handle,但是如果是udp的情況下這個函數(shù)名字還確實就是這么回事,原因在于SO_REUSEADDR選項,里面有這樣一個解釋:
SO_REUSEADDR允許完全相同的地址和端口的重復(fù)綁定。但這只用于UDP的多播,不用于TCP。
所以,在udp情況同一個地址和端口是可以重復(fù)監(jiān)聽的(之前網(wǎng)上看到那個哥們兒說的也沒問題,只是一葉障目了),所以可以共享父進(jìn)程的handle,跟TCP的情況不同。我們繼續(xù)來看當(dāng)前這個TCP的情況,在這個情況下listenOnMasterHandle會將我們在子進(jìn)程中自己生成的handle對象傳入子進(jìn)程中通過createServer創(chuàng)建的server的_handle屬性中并通過
server._listen2(address, port, addressType, backlog, fd);
做了一個假的監(jiān)聽操作,實際上因為_handle的存在這里只會為之前_handle賦值一個onconnection函數(shù),這個函數(shù)的觸發(fā)則跟父進(jìn)程中通過真實的客戶端連接觸發(fā)的時機不同,而是通過
process.on('internalMessage', (message, handle) {
if (message.act === 'newconn')
onconnection(message, handle);
else if (message.act === 'disconnect')
_disconnect.call(worker, true);
}
中注冊的internalMessage事件中的對父進(jìn)程傳入的act為newconn的包觸發(fā)。而父進(jìn)程中就通過我們剛剛說到的改寫了server對象的onconnection函數(shù)的distribute函數(shù),這個函數(shù)中會調(diào)用一個叫handoff的函數(shù),通過代碼:
const message = { act: 'newconn', key: this.key };
sendHelper(worker.process, message, handle, (reply) => {
if (reply.accepted)
handle.close();
else
this.distribute(0, handle); // Worker is shutting down. Send to another.
this.handoff(worker);
});
其中send到子進(jìn)程的handle就是新連接客戶端的句柄,Node中父子進(jìn)程之間的通信最后是通過src/stream_base.cc中的StreamBase::WriteString函數(shù)實現(xiàn)的,從這段代碼我們可以看出:
...
//當(dāng)進(jìn)程間通信時
uv_handle_t* send_handle = nullptr;
if (!send_handle_obj.IsEmpty()) {
HandleWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
send_handle = wrap->GetHandle();
// Reference LibuvStreamWrap instance to prevent it from being garbage
// collected before `AfterWrite` is called.
CHECK_EQ(false, req_wrap->persistent().IsEmpty());
req_wrap_obj->Set(env->handle_string(), send_handle_obj);
}
err = DoWrite(
req_wrap,
&buf,
1,
//將父進(jìn)程獲取的客戶端句柄傳遞子進(jìn)程
reinterpret_cast(send_handle));
可以看到,在調(diào)用此方式時,如果傳入了一個客戶端的句柄則通過Dowrite方法最后通過輔助數(shù)據(jù)cmsg_data將客戶端句柄的套接字fd傳送到子進(jìn)程中進(jìn)行處理。看到這里我不禁恍然大悟,原來還是走的是我熟悉的那套網(wǎng)絡(luò)編程的邏輯啊。
總結(jié)
通過上面的一輪分析,我們可以總結(jié)出以下兩個結(jié)論:
創(chuàng)建TCP服務(wù)器時會在父進(jìn)程中創(chuàng)建一個server并監(jiān)聽目標(biāo)端口,新連接到達(dá)Accept這個client后,再通過ipc的高級方法將新連接的句柄(也就是這個socket的文件描述符)通過輪詢的方式分配到一個子進(jìn)程中,然后在這個子進(jìn)程中通過read和write處理新連接的數(shù)據(jù)和請求,所以只有主進(jìn)程會監(jiān)聽目標(biāo)ip和端口。
創(chuàng)建UDP服務(wù)器,會共享在父進(jìn)程中創(chuàng)建的server的句柄對象,并且在子進(jìn)程中都會監(jiān)聽到跟對象相同的ip地址和端口上,所以創(chuàng)建n個子進(jìn)程則會有n+1個進(jìn)程同時監(jiān)聽到目標(biāo)ip和端口上。
作者 |?uerwtoy
來源 |?yq.aliyun.com/articles/717323
??愛心三連擊
1.看到這里了就點個在看支持下吧,你的「在看」是我創(chuàng)作的動力。
2.關(guān)注公眾號程序員成長指北,「帶你一起學(xué)Node」!
3.特殊階段,帶好口罩,做好個人防護(hù)。
4.可以添加我微信【ikoala520】,拉你進(jìn)技術(shù)交流群一起學(xué)習(xí)。
“在看轉(zhuǎn)發(fā)”是最大的支持
