Redis(一):服務(wù)啟動(dòng)及基礎(chǔ)請(qǐng)求處理流程源碼解析
redis是用c語(yǔ)言的寫(xiě)的緩存服務(wù)器,有高性能和多種數(shù)據(jù)類(lèi)型支持的特性,廣受互聯(lián)網(wǎng)公司喜愛(ài)。
我們要分析其啟動(dòng)過(guò)程,首先就要先找到其入口。
當(dāng)然我們應(yīng)該是要先分析 Makefile 文件,然后找到最終編譯成的文件,然后再順勢(shì)找到C語(yǔ)言入口 main(); 這里咱們就不費(fèi)那事了,一是這事很枯燥,二是我也不知道找不找到得到。所以,就直接找到入口吧: 在 src/server.c 中,main() 函數(shù)就是了。
引用網(wǎng)上大牛的話歸納一下,main 函數(shù)執(zhí)行的過(guò)程分以下幾步:
1. Redis 會(huì)設(shè)置一些回調(diào)函數(shù),當(dāng)前時(shí)間,隨機(jī)數(shù)的種子?;卣{(diào)函數(shù)實(shí)際上什么?舉個(gè)例子,比如 Q/3 要給 Redis 發(fā)送一個(gè)關(guān)閉的命令,讓它去做一些優(yōu)雅的關(guān)閉,做一些掃尾清楚的工作,這個(gè)工作如果不設(shè)計(jì)回調(diào)函數(shù),它其實(shí)什么都不會(huì)干。其實(shí) C 語(yǔ)言的程序跑在操作系統(tǒng)之上,Linux 操作系統(tǒng)本身就是提供給我們事件機(jī)制的回調(diào)注冊(cè)功能,所以它會(huì)設(shè)計(jì)這個(gè)回調(diào)函數(shù),讓你注冊(cè)上,關(guān)閉的時(shí)候優(yōu)雅的關(guān)閉,然后它在后面可以做一些業(yè)務(wù)邏輯。
2. 不管任何軟件,肯定有一份配置文件需要配置。首先在服務(wù)器端會(huì)把它默認(rèn)的一份配置做一個(gè)初始化。
3. Redis 在 3.0 版本正式發(fā)布之前其實(shí)已經(jīng)有篩選這個(gè)模式了,但是這個(gè)模式,我很少在生產(chǎn)環(huán)境在用。Redis 可以初始化這個(gè)模式,比較復(fù)雜。
4. 解析啟動(dòng)的參數(shù)。其實(shí)不管什么軟件,它在初始化的過(guò)程當(dāng)中,配置都是由兩部分組成的。第一部分,靜態(tài)的配置文件;第二部分,動(dòng)態(tài)啟動(dòng)的時(shí)候,main,就是參數(shù)給它的時(shí)候進(jìn)去配置。
5. 把服務(wù)端的東西拿過(guò)來(lái),裝載 Config 配置文件,loadServerConfig。
6. 初始化服務(wù)器,initServer。
7. 從磁盤(pán)裝載數(shù)據(jù)。
8. 有一個(gè)主循環(huán)程序開(kāi)始干活,用來(lái)處理客戶(hù)端的請(qǐng)求,并且把這個(gè)請(qǐng)求轉(zhuǎn)到后端的業(yè)務(wù)邏輯,幫你完成命令執(zhí)行,然后吐數(shù)據(jù),這么一個(gè)過(guò)程。
我們以源碼瀏覽形式,來(lái)看看具體實(shí)現(xiàn)。
main 函數(shù)入口:
注意 server 是一個(gè)全局變量,各函數(shù)進(jìn)行操作時(shí),都直接對(duì)其操作。
// struct redisServer server;// src/server.cint main(int argc, char **argv) {struct timeval tv;int j;// 測(cè)試環(huán)境變量設(shè)置#ifdef REDIS_TESTif (argc == 3 && !strcasecmp(argv[1], "test")) {if (!strcasecmp(argv[2], "ziplist")) {return (argc, argv);} else if (!strcasecmp(argv[2], "quicklist")) {quicklistTest(argc, argv);} else if (!strcasecmp(argv[2], "intset")) {return intsetTest(argc, argv);} else if (!strcasecmp(argv[2], "zipmap")) {return zipmapTest(argc, argv);} else if (!strcasecmp(argv[2], "sha1test")) {return sha1Test(argc, argv);} else if (!strcasecmp(argv[2], "util")) {return utilTest(argc, argv);} else if (!strcasecmp(argv[2], "sds")) {return sdsTest(argc, argv);} else if (!strcasecmp(argv[2], "endianconv")) {return endianconvTest(argc, argv);} else if (!strcasecmp(argv[2], "crc64")) {return crc64Test(argc, argv);}return -1; /* test not found */}#endif/* We need to initialize our libraries, and the server configuration. */#ifdef INIT_SETPROCTITLE_REPLACEMENTspt_init(argc, argv)ziplistTest;#endif// 設(shè)置些默認(rèn)值, 隨機(jī)數(shù)等等setlocale(LC_COLLATE,"");zmalloc_enable_thread_safeness();// oom 回調(diào)處理zmalloc_set_oom_handler(redisOutOfMemoryHandler);srand(time(NULL)^getpid());gettimeofday(&tv,NULL);dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid());server.sentinel_mode = checkForSentinelMode(argc,argv);// 初始化服務(wù)器默認(rèn)配置, 將變化體現(xiàn)到 server 變量上initServerConfig();/* Store the executable path and arguments in a safe place in order* to be able to restart the server later. */server.executable = getAbsolutePath(argv[0]);server.exec_argv = zmalloc(sizeof(char*)*(argc+1));server.exec_argv[argc] = NULL;for (j = 0; j < argc; j++) server.exec_argv[j] = zstrdup(argv[j]);/* We need to init sentinel right now as parsing the configuration file* in sentinel mode will have the effect of populating the sentinel* data structures with master nodes to monitor. */if (server.sentinel_mode) {initSentinelConfig();initSentinel();}// 加載配置文件及其他命令/* Check if we need to start in redis-check-rdb mode. We just execute* the program main. However the program is part of the Redis executable* so that we can easily execute an RDB check on loading errors. */if (strstr(argv[0],"redis-check-rdb") != NULL)exit(redis_check_rdb_main(argv,argc));if (argc >= 2) {j = 1; /* First option to parse in argv[] */sds options = sdsempty();char *configfile = NULL;/* Handle special options --help and --version */if (strcmp(argv[1], "-v") == 0 ||strcmp(argv[1], "--version") == 0) version();if (strcmp(argv[1], "--help") == 0 ||strcmp(argv[1], "-h") == 0) usage();if (strcmp(argv[1], "--test-memory") == 0) {if (argc == 3) {memtest(atoi(argv[2]),50);exit(0);} else {fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n");fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n");exit(1);}}/* First argument is the config file name? */if (argv[j][0] != '-' || argv[j][1] != '-') {configfile = argv[j];server.configfile = getAbsolutePath(configfile);/* Replace the config file in server.exec_argv with* its absoulte path. */zfree(server.exec_argv[j]);server.exec_argv[j] = zstrdup(server.configfile);j++;}/* All the other options are parsed and conceptually appended to the* configuration file. For instance --port 6380 will generate the* string "port 6380\n" to be parsed after the actual file name* is parsed, if any. */while(j != argc) {if (argv[j][0] == '-' && argv[j][1] == '-') {/* Option name */if (!strcmp(argv[j], "--check-rdb")) {/* Argument has no options, need to skip for parsing. */j++;continue;}if (sdslen(options)) options = sdscat(options,"\n");options = sdscat(options,argv[j]+2);options = sdscat(options," ");} else {/* Option argument */options = sdscatrepr(options,argv[j],strlen(argv[j]));options = sdscat(options," ");}j++;}if (server.sentinel_mode && configfile && *configfile == '-') {serverLog(LL_WARNING,"Sentinel config from STDIN not allowed.");serverLog(LL_WARNING,"Sentinel needs config file on disk to save state. Exiting...");exit(1);}resetServerSaveParams();loadServerConfig(configfile,options);sdsfree(options);} else {serverLog(LL_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis");}server.supervised = redisIsSupervised(server.supervised_mode);int background = server.daemonize && !server.supervised;if (background) daemonize();// 初始化服務(wù)器// 重點(diǎn)如: 綁定監(jiān)聽(tīng)端口號(hào),設(shè)置 acceptTcpHandler 回調(diào)函數(shù)initServer();if (background || server.pidfile) createPidFile();redisSetProcTitle(argv[0]);redisAsciiArt();checkTcpBacklogSettings();if (!server.sentinel_mode) {/* Things not needed when running in Sentinel mode. */serverLog(LL_WARNING,"Server started, Redis version " REDIS_VERSION);#ifdef __linux__linuxMemoryWarnings();#endif// 從磁盤(pán)裝載數(shù)據(jù)進(jìn)行恢復(fù)或者初始化loadDataFromDisk();if (server.cluster_enabled) {if (verifyClusterConfigWithData() == C_ERR) {serverLog(LL_WARNING,"You can't have keys in a DB different than DB 0 when in ""Cluster mode. Exiting.");exit(1);}}if (server.ipfd_count > 0)serverLog(LL_NOTICE,"The server is now ready to accept connections on port %d", server.port);if (server.sofd > 0)serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);} else {sentinelIsRunning();}/* Warning the user about suspicious maxmemory setting. */if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);}// 主循環(huán)服務(wù), 只有收到 stop 命令后,才會(huì)退出aeSetBeforeSleepProc(server.el,beforeSleep);aeMain(server.el);// 關(guān)閉服務(wù)aeDeleteEventLoop(server.el);return 0;}
如上,即是redis的整個(gè)main方法了,整個(gè)啟動(dòng)流程也算是一目了然了。大概流程也不出預(yù)料,環(huán)境設(shè)置、默認(rèn)參數(shù)、配置文件加載、初始化服務(wù)、恢復(fù)數(shù)據(jù)、死循環(huán)。
配置參數(shù)什么的都不用瞅了,但是對(duì)于哨兵、集群什么的,又太深入了。咱們還是先蜻蜓點(diǎn)水下,主要看年初始化服務(wù)器的時(shí)候做了些啥事!
初始化服務(wù)器:
// src/server.c, 在main中調(diào)用void initServer(void) {int j;// 注冊(cè)幾個(gè)事件響應(yīng)處理器,比如前臺(tái)模式運(yùn)行或者調(diào)試模式的處理signal(SIGHUP, SIG_IGN);signal(SIGPIPE, SIG_IGN);setupSignalHandlers();if (server.syslog_enabled) {openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,server.syslog_facility);}// 初始化客戶(hù)端相關(guān)的參數(shù),設(shè)置到 server 中server.pid = getpid();server.current_client = NULL;server.clients = listCreate();server.clients_to_close = listCreate();server.slaves = listCreate();server.monitors = listCreate();server.clients_pending_write = listCreate();server.slaveseldb = -1; /* Force to emit the first SELECT command. */server.unblocked_clients = listCreate();server.ready_keys = listCreate();server.clients_waiting_acks = listCreate();server.get_ack_from_slaves = 0;server.clients_paused = 0;server.system_memory_size = zmalloc_get_memory_size();// 全局共享對(duì)象, 比如 OK, 1-10000, ...// 性能優(yōu)化, 避免對(duì)相同的對(duì)象反復(fù)創(chuàng)建createSharedObjects();adjustOpenFilesLimit();// 創(chuàng)建事件循環(huán)對(duì)象 (aeEventLoop), 在 ae.c 中實(shí)現(xiàn)server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);// 創(chuàng)建db對(duì)象,所有數(shù)據(jù)存儲(chǔ)其中server.db = zmalloc(sizeof(redisDb)*server.dbnum);/* Open the TCP listening socket for the user commands. */// 打開(kāi)服務(wù)端口監(jiān)聽(tīng)if (server.port != 0 &&listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)exit(1);/* Open the listening Unix domain socket. */if (server.unixsocket != NULL) {unlink(server.unixsocket); /* don't care if this fails */server.sofd = anetUnixServer(server.neterr,server.unixsocket,server.unixsocketperm, server.tcp_backlog);if (server.sofd == ANET_ERR) {serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);exit(1);}anetNonBlock(NULL,server.sofd);}/* Abort if there are no listening sockets at all. */if (server.ipfd_count == 0 && server.sofd < 0) {serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");exit(1);}/* Create the Redis databases, and initialize other internal state. */// 初始化各db,實(shí)際就是由這么幾個(gè)數(shù)組來(lái)動(dòng)作db的for (j = 0; j < server.dbnum; j++) {server.db[j].dict = dictCreate(&dbDictType,NULL);server.db[j].expires = dictCreate(&keyptrDictType,NULL);server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);server.db[j].eviction_pool = evictionPoolAlloc();server.db[j].id = j;server.db[j].avg_ttl = 0;}// pub/sub 參數(shù)初始化server.pubsub_channels = dictCreate(&keylistDictType,NULL);server.pubsub_patterns = listCreate();listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);server.cronloops = 0;// rdb,aof 參數(shù)初始化server.rdb_child_pid = -1;server.aof_child_pid = -1;server.rdb_child_type = RDB_CHILD_TYPE_NONE;aofRewriteBufferReset();server.aof_buf = sdsempty();server.lastsave = time(NULL); /* At startup we consider the DB saved. */server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */server.rdb_save_time_last = -1;server.rdb_save_time_start = -1;server.dirty = 0;resetServerStats();/* A few stats we don't want to reset: server startup time, and peak mem. */server.stat_starttime = time(NULL);server.stat_peak_memory = 0;server.resident_set_size = 0;server.lastbgsave_status = C_OK;server.aof_last_write_status = C_OK;server.aof_last_write_errno = 0;server.repl_good_slaves_count = 0;updateCachedTime();/* Create out timers, that's our main way to process background* operations. */// 創(chuàng)建定時(shí)器,用于運(yùn)行后臺(tái)事務(wù),每隔1s運(yùn)行一次// 由 serverCron 承載任務(wù),執(zhí)行任務(wù)如 指標(biāo)統(tǒng)計(jì),操作日志持久化,db擴(kuò)容,客戶(hù)端管理...if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {serverPanic("Can't create event loop timers.");exit(1);}/* Create an event handler for accepting new connections in TCP and Unix* domain sockets. */// 創(chuàng)建socket文件監(jiān)控, 由 acceptTcpHandler 承載處理for (j = 0; j < server.ipfd_count; j++) {if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,acceptTcpHandler,NULL) == AE_ERR){serverPanic("Unrecoverable error creating server.ipfd file event.");}}if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");// 如果開(kāi)啟了AOF功能,就打開(kāi)AOF文件/* Open the AOF file if needed. */if (server.aof_state == AOF_ON) {server.aof_fd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644);if (server.aof_fd == -1) {serverLog(LL_WARNING, "Can't open the append-only file: %s",strerror(errno));exit(1);}}/* 32 bit instances are limited to 4GB of address space, so if there is* no explicit limit in the user provided configuration we set a limit* at 3 GB using maxmemory with 'noeviction' policy'. This avoids* useless crashes of the Redis instance for out of memory. */if (server.arch_bits == 32 && server.maxmemory == 0) {serverLog(LL_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now.");server.maxmemory = 3072LL*(1024*1024); /* 3 GB */server.maxmemory_policy = MAXMEMORY_NO_EVICTION;}if (server.cluster_enabled) clusterInit();replicationScriptCacheInit();// lua 腳本初始化scriptingInit(1);// 初始化慢查詢(xún)?nèi)罩咀兞?/span>slowlogInit();// 延遲監(jiān)控初始化,僅創(chuàng)建變量latencyMonitorInit();// 初始化幾個(gè)系統(tǒng)必須的線程(線程池),執(zhí)行任務(wù),while死循環(huán)bioInit();}
通過(guò)以上,我們可以清楚明白,在初始化服務(wù)器時(shí),高大上的C都干了啥??傮w來(lái)說(shuō)就是: 設(shè)置系統(tǒng)回調(diào)、開(kāi)啟端口監(jiān)聽(tīng)、開(kāi)啟socket監(jiān)聽(tīng)、開(kāi)啟后臺(tái)任務(wù)、開(kāi)啟AOF、腳本初始化、線程池初始化。。。(做這些事是容易的,難的是設(shè)計(jì)之初如何架構(gòu)其功能)
下面我們來(lái)看幾個(gè)初始服務(wù)器時(shí)的關(guān)鍵函數(shù)方法。
1. aeEventLoop 的創(chuàng)建
aeEventLoop 是后續(xù)進(jìn)行任務(wù)處理的重要數(shù)據(jù)結(jié)構(gòu)。
// ae.c, 創(chuàng)建 aeEventLoop 對(duì)象,封裝底層的 事件模式,統(tǒng)一對(duì)外服務(wù)aeEventLoop *aeCreateEventLoop(int setsize) {aeEventLoop *eventLoop;int i;if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;eventLoop->setsize = setsize;eventLoop->lastTime = time(NULL);eventLoop->timeEventHead = NULL;eventLoop->timeEventNextId = 0;eventLoop->stop = 0;eventLoop->maxfd = -1;eventLoop->beforesleep = NULL;// 根據(jù)系統(tǒng)不同,選擇不同的實(shí)現(xiàn),C里面的多態(tài)自然是用 #ifdef 來(lái)實(shí)現(xiàn)了if (aeApiCreate(eventLoop) == -1) goto err;/* Events with mask == AE_NONE are not set. So let's initialize the* vector with it. */for (i = 0; i < setsize; i++)eventLoop->events[i].mask = AE_NONE;return eventLoop;err:if (eventLoop) {zfree(eventLoop->events);zfree(eventLoop->fired);zfree(eventLoop);}return NULL;}// 選擇不同的io模型, 優(yōu)先級(jí): evport > epoll > kqueue > select#ifdef HAVE_EVPORT#include "ae_evport.c"#else#ifdef HAVE_EPOLL#include "ae_epoll.c"#else#ifdef HAVE_KQUEUE#include "ae_kqueue.c"#else#include "ae_select.c"#endif#endif#endif// epoll 實(shí)現(xiàn)static int aeApiCreate(aeEventLoop *eventLoop) {aeApiState *state = zmalloc(sizeof(aeApiState));if (!state) return -1;state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);if (!state->events) {zfree(state);return -1;}state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */if (state->epfd == -1) {zfree(state->events);zfree(state);return -1;}eventLoop->apidata = state;return 0;}// ae_epoll.c, linux 創(chuàng)建epoll句柄static int aeApiCreate(aeEventLoop *eventLoop) {aeApiState *state = zmalloc(sizeof(aeApiState));if (!state) return -1;state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);if (!state->events) {zfree(state);return -1;}state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */if (state->epfd == -1) {zfree(state->events);zfree(state);return -1;}eventLoop->apidata = state;return 0;}
2. acceptTcpHandler, 對(duì)于網(wǎng)絡(luò)請(qǐng)求的接入處理
// networking.c, acceptTcpHandlervoid acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {int cport, cfd, max = MAX_ACCEPTS_PER_CALL;char cip[NET_IP_STR_LEN];UNUSED(el);UNUSED(mask);UNUSED(privdata);while(max--) {// 獲取fd, ip, portcfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);if (cfd == ANET_ERR) {if (errno != EWOULDBLOCK)serverLog(LL_WARNING,"Accepting client connection: %s", server.neterr);return;}serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);// 創(chuàng)建客戶(hù)端對(duì)象,加入到 server.clients 中acceptCommonHandler(cfd,0,cip);}}// anet.c, 解析 ip, port, fdint anetTcpAccept(char *err, int s, char *ip, size_t ip_len, int *port) {int fd;struct sockaddr_storage sa;socklen_t salen = sizeof(sa);if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == -1)return ANET_ERR;if (sa.ss_family == AF_INET) {struct sockaddr_in *s = (struct sockaddr_in *)&sa;if (ip) inet_ntop(AF_INET,(void*)&(s->sin_addr),ip,ip_len);if (port) *port = ntohs(s->sin_port);} else {struct sockaddr_in6 *s = (struct sockaddr_in6 *)&sa;if (ip) inet_ntop(AF_INET6,(void*)&(s->sin6_addr),ip,ip_len);if (port) *port = ntohs(s->sin6_port);}return fd;}// anet.c, 調(diào)用系統(tǒng)函數(shù)獲取 socket 數(shù)據(jù)static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {int fd;while(1) {fd = accept(s,sa,len);if (fd == -1) {if (errno == EINTR)continue;else {anetSetError(err, "accept: %s", strerror(errno));return ANET_ERR;}}break;}return fd;}
3. bioInit 線程創(chuàng)建
// bio.c/* Initialize the background system, spawning the thread. */void bioInit(void) {pthread_attr_t attr;pthread_t thread;size_t stacksize;int j;/* Initialization of state vars and objects */for (j = 0; j < BIO_NUM_OPS; j++) {pthread_mutex_init(&bio_mutex[j],NULL);pthread_cond_init(&bio_newjob_cond[j],NULL);pthread_cond_init(&bio_step_cond[j],NULL);bio_jobs[j] = listCreate();bio_pending[j] = 0;}/* Set the stack size as by default it may be small in some system */pthread_attr_init(&attr);pthread_attr_getstacksize(&attr,&stacksize);if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;pthread_attr_setstacksize(&attr, stacksize);/* Ready to spawn our threads. We use the single argument the thread* function accepts in order to pass the job ID the thread is* responsible of. */for (j = 0; j < BIO_NUM_OPS; j++) {void *arg = (void*)(unsigned long) j;// bioProcessBackgroundJobs 用于執(zhí)行線程任務(wù)if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");exit(1);}bio_threads[j] = thread;}}
二、主循環(huán)服務(wù)
接下來(lái)我們看看另一個(gè)重要的流程,主循環(huán)服務(wù)。redis作為一個(gè)存儲(chǔ)服務(wù),必定需要一直運(yùn)行等待,這就是while死循環(huán)的應(yīng)用了。在前面各種環(huán)境初始化完成后,進(jìn)入while循環(huán)服務(wù)。
// src/ae.c 主循環(huán)服務(wù)void aeMain(aeEventLoop *eventLoop) {eventLoop->stop = 0;// eventLoop 會(huì)被 acceptTcpHandler 進(jìn)行數(shù)據(jù)填充// 此處 beforesleep 為外部初始化的// aeSetBeforeSleepProc(), 設(shè)置 beforeSleepwhile (!eventLoop->stop) {if (eventLoop->beforesleep != NULL)eventLoop->beforesleep(eventLoop);// 由 aeProcessEvents 處理事件aeProcessEvents(eventLoop, AE_ALL_EVENTS);}}
很簡(jiǎn)單,就做兩件事: beforesleep, aeProcessEvents, 看起來(lái) aeProcessEvents() 是個(gè)核對(duì)服務(wù)。我們可以先觀察其行為。
1. aeProcessEvents, 處理各種事件(數(shù)據(jù)準(zhǔn)備)
// ae.c/* Process every pending time event, then every pending file event* (that may be registered by time event callbacks just processed).* Without special flags the function sleeps until some file event* fires, or when the next time event occurs (if any).** If flags is 0, the function does nothing and returns.* if flags has AE_ALL_EVENTS set, all the kind of events are processed.* if flags has AE_FILE_EVENTS set, file events are processed.* if flags has AE_TIME_EVENTS set, time events are processed.* if flags has AE_DONT_WAIT set the function returns ASAP until all* the events that's possible to process without to wait are processed.** The function returns the number of events processed. */int aeProcessEvents(aeEventLoop *eventLoop, int flags){int processed = 0, numevents;/* Nothing to do? return ASAP */if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;/* Note that we want call select() even if there are no* file events to process as long as we want to process time* events, in order to sleep until the next time event is ready* to fire. */if (eventLoop->maxfd != -1 ||((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {int j;aeTimeEvent *shortest = NULL;struct timeval tv, *tvp;// 獲取最近 timer事件, 用于判定是否有需要執(zhí)行至少一個(gè)時(shí)間事件if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))shortest = aeSearchNearestTimer(eventLoop);if (shortest) {long now_sec, now_ms;/* Calculate the time missing for the nearest* timer to fire. */aeGetTime(&now_sec, &now_ms);tvp = &tv;tvp->tv_sec = shortest->when_sec - now_sec;if (shortest->when_ms < now_ms) {tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;tvp->tv_sec --;} else {tvp->tv_usec = (shortest->when_ms - now_ms)*1000;}if (tvp->tv_sec < 0) tvp->tv_sec = 0;if (tvp->tv_usec < 0) tvp->tv_usec = 0;} else {/* If we have to check for events but need to return* ASAP because of AE_DONT_WAIT we need to set the timeout* to zero */if (flags & AE_DONT_WAIT) {tv.tv_sec = tv.tv_usec = 0;tvp = &tv;} else {/* Otherwise we can block */tvp = NULL; /* wait forever */}}// 獲取等待事件numevents = aeApiPoll(eventLoop, tvp);for (j = 0; j < numevents; j++) {aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];int mask = eventLoop->fired[j].mask;int fd = eventLoop->fired[j].fd;int rfired = 0;/* note the fe->mask & mask & ... code: maybe an already processed* event removed an element that fired and we still didn't* processed, so we check if the event is still valid. */// 此處將會(huì)調(diào)用前面設(shè)置好的 acceptTcpHandler 服務(wù)if (fe->mask & mask & AE_READABLE) {rfired = 1;fe->rfileProc(eventLoop,fd,fe->clientData,mask);}if (fe->mask & mask & AE_WRITABLE) {if (!rfired || fe->wfileProc != fe->rfileProc)fe->wfileProc(eventLoop,fd,fe->clientData,mask);}processed++;}}// 時(shí)間事件處理, serverCron 調(diào)用/* Check time events */if (flags & AE_TIME_EVENTS)processed += processTimeEvents(eventLoop);return processed; /* return the number of processed file/time events */}// ae_epoll.c, 調(diào)用系統(tǒng)底層, 獲取網(wǎng)絡(luò)就緒事件, 放入 eventLoop->fired 中static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {aeApiState *state = eventLoop->apidata;int retval, numevents = 0;retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);if (retval > 0) {int j;numevents = retval;for (j = 0; j < numevents; j++) {int mask = 0;struct epoll_event *e = state->events+j;// 將系統(tǒng)事件類(lèi)型轉(zhuǎn)換為 redis 的事件類(lèi)型if (e->events & EPOLLIN) mask |= AE_READABLE;if (e->events & EPOLLOUT) mask |= AE_WRITABLE;if (e->events & EPOLLERR) mask |= AE_WRITABLE;if (e->events & EPOLLHUP) mask |= AE_WRITABLE;eventLoop->fired[j].fd = e->data.fd;eventLoop->fired[j].mask = mask;}}return numevents;}
2. 主循環(huán)服務(wù)之 beforeSleep
beforeSleep是在進(jìn)入 aeMain之前,直接綁定在 el 上的。是在主循環(huán)中進(jìn)行檢測(cè)的條件,但其承擔(dān)了重要的作用,比如客戶(hù)請(qǐng)求的命令解析和處理!
// server.c, beforeSleep/* This function gets called every time Redis is entering the* main loop of the event driven library, that is, before to sleep* for ready file descriptors. */void beforeSleep(struct aeEventLoop *eventLoop) {UNUSED(eventLoop);/* Call the Redis Cluster before sleep function. Note that this function* may change the state of Redis Cluster (from ok to fail or vice versa),* so it's a good idea to call it before serving the unblocked clients* later in this function. */if (server.cluster_enabled) clusterBeforeSleep();/* Run a fast expire cycle (the called function will return* ASAP if a fast cycle is not needed). */if (server.active_expire_enabled && server.masterhost == NULL)activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);/* Send all the slaves an ACK request if at least one client blocked* during the previous event loop iteration. */if (server.get_ack_from_slaves) {robj *argv[3];argv[0] = createStringObject("REPLCONF",8);argv[1] = createStringObject("GETACK",6);argv[2] = createStringObject("*",1); /* Not used argument. */replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);decrRefCount(argv[0]);decrRefCount(argv[1]);decrRefCount(argv[2]);server.get_ack_from_slaves = 0;}/* Unblock all the clients blocked for synchronous replication* in WAIT. */if (listLength(server.clients_waiting_acks))processClientsWaitingReplicas();/* Try to process pending commands for clients that were just unblocked. */// 處理可用的客戶(hù)端請(qǐng)求if (listLength(server.unblocked_clients))processUnblockedClients();// AOF刷盤(pán)服務(wù)/* Write the AOF buffer on disk */flushAppendOnlyFile(0);// 將一些被掛起的數(shù)據(jù)寫(xiě)入客戶(hù)端socket中/* Handle writes with pending output buffers. */handleClientsWithPendingWrites();}// blocking.c, 處理被解阻塞的客戶(hù)端連接, 順便處理客戶(hù)端請(qǐng)求/* This function is called in the beforeSleep() function of the event loop* in order to process the pending input buffer of clients that were* unblocked after a blocking operation. */void processUnblockedClients(void) {listNode *ln;client *c;while (listLength(server.unblocked_clients)) {ln = listFirst(server.unblocked_clients);serverAssert(ln != NULL);c = ln->value;listDelNode(server.unblocked_clients,ln);c->flags &= ~CLIENT_UNBLOCKED;/* Process remaining data in the input buffer, unless the client* is blocked again. Actually processInputBuffer() checks that the* client is not blocked before to proceed, but things may change and* the code is conceptually more correct this way. */if (!(c->flags & CLIENT_BLOCKED)) {if (c->querybuf && sdslen(c->querybuf) > 0) {processInputBuffer(c);}}}}// networking.c, 處理接收到的數(shù)據(jù), 調(diào)起下游處理服務(wù)void processInputBuffer(client *c) {server.current_client = c;/* Keep processing while there is something in the input buffer */while(sdslen(c->querybuf)) {/* Return if clients are paused. */if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;/* Immediately abort if the client is in the middle of something. */if (c->flags & CLIENT_BLOCKED) break;/* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is* written to the client. Make sure to not let the reply grow after* this flag has been set (i.e. don't process more commands). */if (c->flags & CLIENT_CLOSE_AFTER_REPLY) break;/* Determine request type when unknown. */// 根據(jù)第一個(gè)字符是否是 *, 分為兩種類(lèi)型協(xié)議, 處理方式不同if (!c->reqtype) {if (c->querybuf[0] == '*') {c->reqtype = PROTO_REQ_MULTIBULK;} else {c->reqtype = PROTO_REQ_INLINE;}}if (c->reqtype == PROTO_REQ_INLINE) {if (processInlineBuffer(c) != C_OK) break;} else if (c->reqtype == PROTO_REQ_MULTIBULK) {if (processMultibulkBuffer(c) != C_OK) break;} else {serverPanic("Unknown request type");}/* Multibulk processing could see a <= 0 length. */if (c->argc == 0) {resetClient(c);} else {/* Only reset the client when the command was executed. */// 經(jīng)過(guò)前面請(qǐng)求解析后,進(jìn)入請(qǐng)求處理核心流程if (processCommand(c) == C_OK)resetClient(c);}}server.current_client = NULL;}// server.c, 根據(jù)網(wǎng)絡(luò)模塊解析好的客戶(hù)端命令,進(jìn)行相應(yīng)的業(yè)務(wù)處理/* If this function gets called we already read a whole* command, arguments are in the client argv/argc fields.* processCommand() execute the command or prepare the* server for a bulk read from the client.** If C_OK is returned the client is still alive and valid and* other operations can be performed by the caller. Otherwise* if C_ERR is returned the client was destroyed (i.e. after QUIT). */int processCommand(client *c) {/* The QUIT command is handled separately. Normal command procs will* go through checking for replication and QUIT will cause trouble* when FORCE_REPLICATION is enabled and would be implemented in* a regular command proc. */if (!strcasecmp(c->argv[0]->ptr,"quit")) {addReply(c,shared.ok);c->flags |= CLIENT_CLOSE_AFTER_REPLY;return C_ERR;}/* Now lookup the command and check ASAP about trivial error conditions* such as wrong arity, bad command name and so forth. */// 根據(jù)第一個(gè)參數(shù) 查找處理命令,在 server.c 的頂部有定義: redisCommandTablec->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);if (!c->cmd) {flagTransaction(c);addReplyErrorFormat(c,"unknown command '%s'",(char*)c->argv[0]->ptr);return C_OK;} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||(c->argc < -c->cmd->arity)) {flagTransaction(c);addReplyErrorFormat(c,"wrong number of arguments for '%s' command",c->cmd->name);return C_OK;}// 以下是一系列判斷,是否符合命令執(zhí)行前提/* Check if the user is authenticated */if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand){flagTransaction(c);addReply(c,shared.noautherr);return C_OK;}/* If cluster is enabled perform the cluster redirection here.* However we don't perform the redirection if:* 1) The sender of this command is our master.* 2) The command has no key arguments. */if (server.cluster_enabled &&!(c->flags & CLIENT_MASTER) &&!(c->flags & CLIENT_LUA &&server.lua_caller->flags & CLIENT_MASTER) &&!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)){int hashslot;if (server.cluster->state != CLUSTER_OK) {flagTransaction(c);clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE);return C_OK;} else {int error_code;clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);if (n == NULL || n != server.cluster->myself) {flagTransaction(c);clusterRedirectClient(c,n,hashslot,error_code);return C_OK;}}}/* Handle the maxmemory directive.** First we try to free some memory if possible (if there are volatile* keys in the dataset). If there are not the only thing we can do* is returning an error. */if (server.maxmemory) {int retval = freeMemoryIfNeeded();/* freeMemoryIfNeeded may flush slave output buffers. This may result* into a slave, that may be the active client, to be freed. */if (server.current_client == NULL) return C_ERR;/* It was impossible to free enough memory, and the command the client* is trying to execute is denied during OOM conditions? Error. */if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR) {flagTransaction(c);addReply(c, shared.oomerr);return C_OK;}}/* Don't accept write commands if there are problems persisting on disk* and if this is a master instance. */if (((server.stop_writes_on_bgsave_err &&server.saveparamslen > 0 &&server.lastbgsave_status == C_ERR) ||server.aof_last_write_status == C_ERR) &&server.masterhost == NULL &&(c->cmd->flags & CMD_WRITE ||c->cmd->proc == pingCommand)){flagTransaction(c);if (server.aof_last_write_status == C_OK)addReply(c, shared.bgsaveerr);elseaddReplySds(c,sdscatprintf(sdsempty(),"-MISCONF Errors writing to the AOF file: %s\r\n",strerror(server.aof_last_write_errno)));return C_OK;}/* Don't accept write commands if there are not enough good slaves and* user configured the min-slaves-to-write option. */if (server.masterhost == NULL &&server.repl_min_slaves_to_write &&server.repl_min_slaves_max_lag &&c->cmd->flags & CMD_WRITE &&server.repl_good_slaves_count < server.repl_min_slaves_to_write){flagTransaction(c);addReply(c, shared.noreplicaserr);return C_OK;}/* Don't accept write commands if this is a read only slave. But* accept write commands if this is our master. */if (server.masterhost && server.repl_slave_ro &&!(c->flags & CLIENT_MASTER) &&c->cmd->flags & CMD_WRITE){addReply(c, shared.roslaveerr);return C_OK;}/* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */if (c->flags & CLIENT_PUBSUB &&c->cmd->proc != pingCommand &&c->cmd->proc != subscribeCommand &&c->cmd->proc != unsubscribeCommand &&c->cmd->proc != psubscribeCommand &&c->cmd->proc != punsubscribeCommand) {addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");return C_OK;}/* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and* we are a slave with a broken link with master. */if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&server.repl_serve_stale_data == 0 &&!(c->cmd->flags & CMD_STALE)){flagTransaction(c);addReply(c, shared.masterdownerr);return C_OK;}/* Loading DB? Return an error if the command has not the* CMD_LOADING flag. */if (server.loading && !(c->cmd->flags & CMD_LOADING)) {addReply(c, shared.loadingerr);return C_OK;}/* Lua script too slow? Only allow a limited number of commands. */if (server.lua_timedout &&c->cmd->proc != authCommand &&c->cmd->proc != replconfCommand &&!(c->cmd->proc == shutdownCommand &&c->argc == 2 &&tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&!(c->cmd->proc == scriptCommand &&c->argc == 2 &&tolower(((char*)c->argv[1]->ptr)[0]) == 'k')){flagTransaction(c);addReply(c, shared.slowscripterr);return C_OK;}/* Exec the command */if (c->flags & CLIENT_MULTI &&c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&c->cmd->proc != multiCommand && c->cmd->proc != watchCommand){queueMultiCommand(c);addReply(c,shared.queued);} else {// 由 call 函數(shù)執(zhí)行各自的 commandcall(c,CMD_CALL_FULL);c->woff = server.master_repl_offset;if (listLength(server.ready_keys))handleClientsBlockedOnLists();}return C_OK;}
到此,整個(gè)redis的啟動(dòng)及簡(jiǎn)要的請(qǐng)求處理流程就完成了。?
下面以?xún)蓚€(gè)UML來(lái)重新審視整個(gè)流程。
1.?redisServer?初始化時(shí)序圖

2.?主循環(huán)服務(wù)時(shí)序圖

總體來(lái)說(shuō),就單個(gè)命令的執(zhí)行流程來(lái)說(shuō),簡(jiǎn)單到?就是一個(gè)?命令表的查找,到數(shù)據(jù)處理響應(yīng)。

騰訊、阿里、滴滴后臺(tái)面試題匯總總結(jié) — (含答案)
面試:史上最全多線程面試題 !
最新阿里內(nèi)推Java后端面試題
JVM難學(xué)?那是因?yàn)槟銢](méi)認(rèn)真看完這篇文章

關(guān)注作者微信公眾號(hào) —《JAVA爛豬皮》
了解更多java后端架構(gòu)知識(shí)以及最新面試寶典


看完本文記得給作者點(diǎn)贊+在看哦~~~大家的支持,是作者源源不斷出文的動(dòng)力
作者:等你歸去來(lái)
出處:https://www.cnblogs.com/yougewe/p/12187858.html
