Redis(六):list/lpush/lrange/lpop 命令源碼解析
上一篇講了hash數(shù)據(jù)類型的相關(guān)實(shí)現(xiàn)方法,沒有茅塞頓開也至少知道redis如何搞事情的了吧。
本篇咱們繼續(xù)來看redis中的數(shù)據(jù)類型的實(shí)現(xiàn): list 相關(guān)操作實(shí)現(xiàn)。
同樣,我們以使用者的角度,開始理解list提供的功能,相應(yīng)的數(shù)據(jù)結(jié)構(gòu)承載,再到具體實(shí)現(xiàn),以這樣一個(gè)思路來理解redis之list。
零、redis list相關(guān)操作方法
從官方的手冊(cè)中可以查到相關(guān)的使用方法。
1> BLPOP key1 [key2] timeout
功能: 移出并獲取列表的第一個(gè)元素, 如果列表沒有元素會(huì)阻塞列表直到等待超時(shí)或發(fā)現(xiàn)可彈出元素為止。(LPOP的阻塞版本)
返回值: 獲取到元素的key和被彈出的元素值2> BRPOP key1 [key2 ] timeout
功能: 移出并獲取列表的最后一個(gè)元素, 如果列表沒有元素會(huì)阻塞列表直到等待超時(shí)或發(fā)現(xiàn)可彈出元素為止。(RPOP 的阻塞版本)
返回值: 獲取到元素的key和被彈出的元素值3> BRPOPLPUSH source destination timeout
功能: 從列表中彈出一個(gè)值,將彈出的元素插入到另外一個(gè)列表中并返回它;如果列表沒有元素會(huì)阻塞列表直到等待超時(shí)或發(fā)現(xiàn)可彈出元素為止。(RPOPLPUSH 的阻塞版本)
返回值: 被轉(zhuǎn)移的元素值或者為nil4> LINDEX key index
功能: 通過索引獲取列表中的元素
返回值: 查找到的元素值,超出范圍時(shí)返回nil5> LINSERT key BEFORE|AFTER pivot value
功能: 在列表的元素前或者后插入元素
返回值: 插入后的list長(zhǎng)度6> LLEN key
功能: 獲取列表長(zhǎng)度
返回值: 列表長(zhǎng)度7> LPOP key
功能: 移出并獲取列表的第一個(gè)元素
返回值: 第一個(gè)元素或者nil8> LPUSH key value1 [value2]
功能: 將一個(gè)或多個(gè)值插入到列表頭部
返回值: 插入后的list長(zhǎng)度9> LPUSHX key value
將一個(gè)值插入到已存在的列表頭部,如果key不存在則不做任何操作
返回值: 插入后的list長(zhǎng)度10> LRANGE key start stop
功能: 獲取列表指定范圍內(nèi)的元素 (包含起止邊界)
返回值: 值列表11> LREM key count value
功能: 移除列表元素, count>0:移除正向匹配的count個(gè)元素,count<0:移除逆向匹配的count個(gè)元素, count=0,只移除匹配的元素
返回值: 移除的元素個(gè)數(shù)12> LSET key index value
功能: 通過索引設(shè)置列表元素的值
返回值: OK or err13> LTRIM key start stop
功能: 對(duì)一個(gè)列表進(jìn)行修剪(trim),就是說,讓列表只保留指定區(qū)間內(nèi)的元素,不在指定區(qū)間之內(nèi)的元素都將被刪除。
返回值: OK14> RPOP key
功能: 移除列表的最后一個(gè)元素,返回值為移除的元素。
返回值: 最后一個(gè)元素值或者nil15> RPOPLPUSH source destination
功能: 移除列表的最后一個(gè)元素,并將該元素添加到另一個(gè)列表并返回
返回值: 被轉(zhuǎn)移的元素16> RPUSH key value1 [value2]
功能: 在列表中添加一個(gè)或多個(gè)值
返回值: 插入后的list長(zhǎng)度17> RPUSHX key value
功能: 為已存在的列表添加值
返回值: 插入后的list長(zhǎng)度
redis中的實(shí)現(xiàn)方法定義如下:
{"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},{"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},{"rpushx",rpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},{"lpushx",lpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},{"linsert",linsertCommand,5,"wm",0,NULL,1,1,1,0,0},{"rpop",rpopCommand,2,"wF",0,NULL,1,1,1,0,0},{"lpop",lpopCommand,2,"wF",0,NULL,1,1,1,0,0},{"brpop",brpopCommand,-3,"ws",0,NULL,1,1,1,0,0},{"brpoplpush",brpoplpushCommand,4,"wms",0,NULL,1,2,1,0,0},{"blpop",blpopCommand,-3,"ws",0,NULL,1,-2,1,0,0},{"llen",llenCommand,2,"rF",0,NULL,1,1,1,0,0},{"lindex",lindexCommand,3,"r",0,NULL,1,1,1,0,0},{"lset",lsetCommand,4,"wm",0,NULL,1,1,1,0,0},{"lrange",lrangeCommand,4,"r",0,NULL,1,1,1,0,0},{"ltrim",ltrimCommand,4,"w",0,NULL,1,1,1,0,0},{"lrem",lremCommand,4,"w",0,NULL,1,1,1,0,0},{"rpoplpush",rpoplpushCommand,3,"wm",0,NULL,1,2,1,0,0},
一、list相關(guān)數(shù)據(jù)結(jié)構(gòu)
說到list或者說鏈表,我們能想到什么數(shù)據(jù)結(jié)構(gòu)呢?單向鏈表、雙向鏈表、循環(huán)鏈表... 好像都挺簡(jiǎn)單的,還有啥??我們來看下redis 的實(shí)現(xiàn):
// quicklist 是其實(shí)數(shù)據(jù)容器,由head,tail 進(jìn)行迭代,所以算是一個(gè)雙向鏈表/* quicklist is a 32 byte struct (on 64-bit systems) describing a quicklist.* 'count' is the number of total entries.* 'len' is the number of quicklist nodes.* 'compress' is: -1 if compression disabled, otherwise it's the number* of quicklistNodes to leave uncompressed at ends of quicklist.* 'fill' is the user-requested (or default) fill factor. */typedef struct quicklist {// 頭節(jié)點(diǎn)quicklistNode *head;// 尾節(jié)點(diǎn)quicklistNode *tail;// 現(xiàn)有元素個(gè)數(shù)unsigned long count; /* total count of all entries in all ziplists */// 現(xiàn)有的 quicklistNode 個(gè)數(shù),一個(gè) node 可能包含n個(gè)元素unsigned int len; /* number of quicklistNodes */// 填充因子int fill : 16; /* fill factor for individual nodes */// 多深的鏈表無需壓縮unsigned int compress : 16; /* depth of end nodes not to compress;0=off */} quicklist;// 鏈表中的每個(gè)節(jié)點(diǎn)typedef struct quicklistEntry {const quicklist *quicklist;quicklistNode *node;// 當(dāng)前迭代元素的ziplist的偏移位置指針unsigned char *zi;// 純粹的 value, 值來源 ziunsigned char *value;// 占用空間大小unsigned int sz;long long longval;// 當(dāng)前節(jié)點(diǎn)偏移int offset;} quicklistEntry;// 鏈表元素節(jié)點(diǎn)使用 quicklistNode/* quicklistNode is a 32 byte struct describing a ziplist for a quicklist.* We use bit fields keep the quicklistNode at 32 bytes.* count: 16 bits, max 65536 (max zl bytes is 65k, so max count actually < 32k).* encoding: 2 bits, RAW=1, LZF=2.* container: 2 bits, NONE=1, ZIPLIST=2.* recompress: 1 bit, bool, true if node is temporarry decompressed for usage.* attempted_compress: 1 bit, boolean, used for verifying during testing.* extra: 12 bits, free for future use; pads out the remainder of 32 bits */typedef struct quicklistNode {struct quicklistNode *prev;struct quicklistNode *next;// zl 為ziplist鏈表,保存count個(gè)元素值unsigned char *zl;unsigned int sz; /* ziplist size in bytes */unsigned int count : 16; /* count of items in ziplist */unsigned int encoding : 2; /* RAW==1 or LZF==2 */unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */unsigned int recompress : 1; /* was this node previous compressed? */unsigned int attempted_compress : 1; /* node can't compress; too small */unsigned int extra : 10; /* more bits to steal for future usage */} quicklistNode;// list迭代器typedef struct quicklistIter {const quicklist *quicklist;quicklistNode *current;unsigned char *zi;long offset; /* offset in current ziplist */int direction;} quicklistIter;// ziplist 數(shù)據(jù)結(jié)構(gòu)typedef struct zlentry {unsigned int prevrawlensize, prevrawlen;unsigned int lensize, len;unsigned int headersize;unsigned char encoding;unsigned char *p;} zlentry;
二、rpush/lpush 新增元素操作實(shí)現(xiàn)
rpush是所尾部添加元素,lpush是從頭部添加元素,本質(zhì)上都是一樣的,redis實(shí)際上也是完全復(fù)用一套代碼。
// t_list.c, lpushvoid lpushCommand(client *c) {// 使用 LIST_HEAD|LIST_TAIL 作為插入位置標(biāo)識(shí)pushGenericCommand(c,LIST_HEAD);}void rpushCommand(client *c) {pushGenericCommand(c,LIST_TAIL);}// t_list.c, 實(shí)際的push操作void pushGenericCommand(client *c, int where) {int j, waiting = 0, pushed = 0;// 在db中查找對(duì)應(yīng)的key實(shí)例,查到或者查不到robj *lobj = lookupKeyWrite(c->db,c->argv[1]);// 查到的情況下,需要驗(yàn)證數(shù)據(jù)類型if (lobj && lobj->type != OBJ_LIST) {addReply(c,shared.wrongtypeerr);return;}for (j = 2; j < c->argc; j++) {c->argv[j] = tryObjectEncoding(c->argv[j]);if (!lobj) {// 1. 在沒有key實(shí)例的情況下,先創(chuàng)建key實(shí)例到db中lobj = createQuicklistObject();// 2. 設(shè)置 fill和depth 參數(shù)// fill 默認(rèn): -2// depth 默認(rèn): 0quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,server.list_compress_depth);dbAdd(c->db,c->argv[1],lobj);}// 3. 一個(gè)個(gè)元素添加進(jìn)去listTypePush(lobj,c->argv[j],where);pushed++;}// 返回list長(zhǎng)度addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));if (pushed) {// 命令傳播char *event = (where == LIST_HEAD) ? "lpush" : "rpush";signalModifiedKey(c->db,c->argv[1]);notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);}server.dirty += pushed;}// 1. 創(chuàng)建初始list// object.c, 創(chuàng)建初始listrobj *createQuicklistObject(void) {quicklist *l = quicklistCreate();robj *o = createObject(OBJ_LIST,l);o->encoding = OBJ_ENCODING_QUICKLIST;return o;}// quicklist.c, 創(chuàng)建一個(gè)新的list容器,初始化默認(rèn)值/* Create a new quicklist.* Free with quicklistRelease(). */quicklist *quicklistCreate(void) {struct quicklist *quicklist;quicklist = zmalloc(sizeof(*quicklist));quicklist->head = quicklist->tail = NULL;quicklist->len = 0;quicklist->count = 0;quicklist->compress = 0;quicklist->fill = -2;return quicklist;}// 2. 設(shè)置quicklist 的fill和depth 值// quicklist.cvoid quicklistSetOptions(quicklist *quicklist, int fill, int depth) {quicklistSetFill(quicklist, fill);quicklistSetCompressDepth(quicklist, depth);}// quicklist.c, 設(shè)置 fill 參數(shù)void quicklistSetFill(quicklist *quicklist, int fill) {if (fill > FILL_MAX) {fill = FILL_MAX;} else if (fill < -5) {fill = -5;}quicklist->fill = fill;}// quicklist.c, 設(shè)置 depth 參數(shù)void quicklistSetCompressDepth(quicklist *quicklist, int compress) {if (compress > COMPRESS_MAX) {compress = COMPRESS_MAX;} else if (compress < 0) {compress = 0;}quicklist->compress = compress;}// 3. 將元素添加進(jìn)list中// t_list.c,/* The function pushes an element to the specified list object 'subject',* at head or tail position as specified by 'where'.** There is no need for the caller to increment the refcount of 'value' as* the function takes care of it if needed. */void listTypePush(robj *subject, robj *value, int where) {if (subject->encoding == OBJ_ENCODING_QUICKLIST) {int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL;// 解碼valuevalue = getDecodedObject(value);size_t len = sdslen(value->ptr);// 將value添加到鏈表中quicklistPush(subject->ptr, value->ptr, len, pos);// 減小value的引用,如果是被解編碼后的對(duì)象,此時(shí)會(huì)將內(nèi)存釋放decrRefCount(value);} else {serverPanic("Unknown list encoding");}}// object.c/* Get a decoded version of an encoded object (returned as a new object).* If the object is already raw-encoded just increment the ref count. */robj *getDecodedObject(robj *o) {robj *dec;// OBJ_ENCODING_RAW,OBJ_ENCODING_EMBSTR 編碼直接返回,引用計(jì)數(shù)+1(原因是: 原始robj一個(gè)引用,轉(zhuǎn)換后的robj一個(gè)引用)if (sdsEncodedObject(o)) {incrRefCount(o);return o;}if (o->type == OBJ_STRING && o->encoding == OBJ_ENCODING_INT) {char buf[32];// 整型轉(zhuǎn)換為字符型,返回string型的robjll2string(buf,32,(long)o->ptr);dec = createStringObject(buf,strlen(buf));return dec;} else {serverPanic("Unknown encoding type");}}// quicklist.c, 添加value到鏈表中/* Wrapper to allow argument-based switching between HEAD/TAIL pop */void quicklistPush(quicklist *quicklist, void *value, const size_t sz,int where) {// 根據(jù)where決定添加到表頭還表尾if (where == QUICKLIST_HEAD) {quicklistPushHead(quicklist, value, sz);} else if (where == QUICKLIST_TAIL) {quicklistPushTail(quicklist, value, sz);}}// quicklist.c, 添加表頭數(shù)據(jù)/* Add new entry to head node of quicklist.** Returns 0 if used existing head.* Returns 1 if new head created. */int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {quicklistNode *orig_head = quicklist->head;// likely 對(duì)不同平臺(tái)處理 __builtin_expect(!!(x), 1),// 判斷是否允許插入元素,實(shí)際上是判斷 head 的ziplist空間是否已占滿, 沒有則直接往里面插入即可// fill 默認(rèn): -2// depth 默認(rèn): 0if (likely(_quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {// 3.1. 添加head節(jié)點(diǎn)的zl鏈表中, zl 為ziplist 鏈表節(jié)點(diǎn)quicklist->head->zl =ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);// 3.2. 更新頭節(jié)點(diǎn)size大小quicklistNodeUpdateSz(quicklist->head);} else {// 如果head已占滿,則創(chuàng)建一個(gè)新的 quicklistNode 節(jié)點(diǎn)進(jìn)行插入quicklistNode *node = quicklistCreateNode();node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);quicklistNodeUpdateSz(node);// 3.3. 插入新節(jié)點(diǎn)到head之前_quicklistInsertNodeBefore(quicklist, quicklist->head, node);}// 將鏈表計(jì)數(shù)+1, 避免獲取總數(shù)時(shí)迭代計(jì)算quicklist->count++;quicklist->head->count++;return (orig_head != quicklist->head);}// quicklist.c, 判斷是否允許插入元素REDIS_STATIC int _quicklistNodeAllowInsert(const quicklistNode *node,const int fill, const size_t sz) {if (unlikely(!node))return 0;int ziplist_overhead;/* size of previous offset */if (sz < 254)ziplist_overhead = 1;elseziplist_overhead = 5;/* size of forward offset */if (sz < 64)ziplist_overhead += 1;else if (likely(sz < 16384))ziplist_overhead += 2;elseziplist_overhead += 5;/* new_sz overestimates if 'sz' encodes to an integer type */// 加上需要添加的新元素的長(zhǎng)度后,進(jìn)行閥值判定,如果在閥值內(nèi),則返回1,否則返回0unsigned int new_sz = node->sz + sz + ziplist_overhead;// 使用fill參數(shù)判定if (likely(_quicklistNodeSizeMeetsOptimizationRequirement(new_sz, fill)))return 1;else if (!sizeMeetsSafetyLimit(new_sz))return 0;else if ((int)node->count < fill)return 1;elsereturn 0;}// quicklist.cREDIS_STATIC int_quicklistNodeSizeMeetsOptimizationRequirement(const size_t sz,const int fill) {if (fill >= 0)return 0;size_t offset = (-fill) - 1;// /* Optimization levels for size-based filling */// static const size_t optimization_level[] = {4096, 8192, 16384, 32768, 65536};// offset < 5, offset 默認(rèn)將等于 1, sz <= 8292if (offset < (sizeof(optimization_level) / sizeof(*optimization_level))) {if (sz <= optimization_level[offset]) {return 1;} else {return 0;}} else {return 0;}}// SIZE_SAFETY_LIMIT 8192#define sizeMeetsSafetyLimit(sz) ((sz) <= SIZE_SAFETY_LIMIT)// 3.1. 向每個(gè)鏈表節(jié)點(diǎn)中添加value, 實(shí)際是向 ziplist push 數(shù)據(jù)// ziplist.c, push *s 數(shù)據(jù)到 zl 中unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where) {unsigned char *p;p = (where == ZIPLIST_HEAD) ? ZIPLIST_ENTRY_HEAD(zl) : ZIPLIST_ENTRY_END(zl);// 具體添加元素方法,有點(diǎn)復(fù)雜。簡(jiǎn)單點(diǎn)說就是 判斷容量、擴(kuò)容、按照ziplist協(xié)議添加元素return __ziplistInsert(zl,p,s,slen);}// ziplist.c, 在hash的數(shù)據(jù)介紹時(shí)已詳細(xì)介紹/* Insert item at "p". */static unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen;unsigned int prevlensize, prevlen = 0;size_t offset;int nextdiff = 0;unsigned char encoding = 0;long long value = 123456789; /* initialized to avoid warning. Using a valuethat is easy to see if for some reasonwe use it uninitialized. */zlentry tail;/* Find out prevlen for the entry that is inserted. */if (p[0] != ZIP_END) {ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);} else {unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);if (ptail[0] != ZIP_END) {prevlen = zipRawEntryLength(ptail);}}/* See if the entry can be encoded */if (zipTryEncoding(s,slen,&value,&encoding)) {/* 'encoding' is set to the appropriate integer encoding */reqlen = zipIntSize(encoding);} else {/* 'encoding' is untouched, however zipEncodeLength will use the* string length to figure out how to encode it. */reqlen = slen;}/* We need space for both the length of the previous entry and* the length of the payload. */reqlen += zipPrevEncodeLength(NULL,prevlen);reqlen += zipEncodeLength(NULL,encoding,slen);/* When the insert position is not equal to the tail, we need to* make sure that the next entry can hold this entry's length in* its prevlen field. */nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;/* Store offset because a realloc may change the address of zl. */offset = p-zl;zl = ziplistResize(zl,curlen+reqlen+nextdiff);p = zl+offset;/* Apply memory move when necessary and update tail offset. */if (p[0] != ZIP_END) {/* Subtract one because of the ZIP_END bytes */memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);/* Encode this entry's raw length in the next entry. */zipPrevEncodeLength(p+reqlen,reqlen);/* Update offset for tail */ZIPLIST_TAIL_OFFSET(zl) =intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);/* When the tail contains more than one entry, we need to take* "nextdiff" in account as well. Otherwise, a change in the* size of prevlen doesn't have an effect on the *tail* offset. */zipEntry(p+reqlen, &tail);if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {ZIPLIST_TAIL_OFFSET(zl) =intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);}} else {/* This element will be the new tail. */ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);}/* When nextdiff != 0, the raw length of the next entry has changed, so* we need to cascade the update throughout the ziplist */if (nextdiff != 0) {offset = p-zl;zl = __ziplistCascadeUpdate(zl,p+reqlen);p = zl+offset;}/* Write the entry */p += zipPrevEncodeLength(p,prevlen);p += zipEncodeLength(p,encoding,slen);if (ZIP_IS_STR(encoding)) {memcpy(p,s,slen);} else {zipSaveInteger(p,value,encoding);}ZIPLIST_INCR_LENGTH(zl,1);return zl;}// 3.2. 更新node的size (實(shí)際占用內(nèi)存空間大小)// quicklist.c, 更新node的size, 其實(shí)就是重新統(tǒng)計(jì)node的ziplist長(zhǎng)度#define quicklistNodeUpdateSz(node) \do { \(node)->sz = ziplistBlobLen((node)->zl); \} while (0)// 3.3. 添加新鏈表節(jié)點(diǎn)到head之前// quicklist.c,/* Wrappers for node inserting around existing node. */REDIS_STATIC void _quicklistInsertNodeBefore(quicklist *quicklist,quicklistNode *old_node,quicklistNode *new_node) {__quicklistInsertNode(quicklist, old_node, new_node, 0);}/* Insert 'new_node' after 'old_node' if 'after' is 1.* Insert 'new_node' before 'old_node' if 'after' is 0.* Note: 'new_node' is *always* uncompressed, so if we assign it to* head or tail, we do not need to uncompress it. */REDIS_STATIC void __quicklistInsertNode(quicklist *quicklist,quicklistNode *old_node,quicklistNode *new_node, int after) {if (after) {new_node->prev = old_node;if (old_node) {new_node->next = old_node->next;if (old_node->next)old_node->next->prev = new_node;old_node->next = new_node;}if (quicklist->tail == old_node)quicklist->tail = new_node;} else {// 插入new_node到old_node之前new_node->next = old_node;if (old_node) {new_node->prev = old_node->prev;if (old_node->prev)old_node->prev->next = new_node;old_node->prev = new_node;}// 替換頭節(jié)點(diǎn)位置if (quicklist->head == old_node)quicklist->head = new_node;}/* If this insert creates the only element so far, initialize head/tail. */// 第一個(gè)元素if (quicklist->len == 0) {quicklist->head = quicklist->tail = new_node;}// 壓縮listif (old_node)quicklistCompress(quicklist, old_node);quicklist->len++;}// quicklist.c, 壓縮list#define quicklistCompress(_ql, _node) \do { \if ((_node)->recompress) \// recompressquicklistCompressNode((_node)); \else \//__quicklistCompress((_ql), (_node)); \} while (0)// recompress/* Compress only uncompressed nodes. */#define quicklistCompressNode(_node) \do { \if ((_node) && (_node)->encoding == QUICKLIST_NODE_ENCODING_RAW) { \__quicklistCompressNode((_node)); \} \} while (0)/* Compress the ziplist in 'node' and update encoding details.* Returns 1 if ziplist compressed successfully.* Returns 0 if compression failed or if ziplist too small to compress. */REDIS_STATIC int __quicklistCompressNode(quicklistNode *node) {#ifdef REDIS_TESTnode->attempted_compress = 1;#endif/* Don't bother compressing small values */if (node->sz < MIN_COMPRESS_BYTES)return 0;quicklistLZF *lzf = zmalloc(sizeof(*lzf) + node->sz);/* Cancel if compression fails or doesn't compress small enough */// lzf 壓縮算法,有點(diǎn)復(fù)雜咯if (((lzf->sz = lzf_compress(node->zl, node->sz, lzf->compressed,node->sz)) == 0) ||lzf->sz + MIN_COMPRESS_IMPROVE >= node->sz) {/* lzf_compress aborts/rejects compression if value not compressable. */zfree(lzf);return 0;}lzf = zrealloc(lzf, sizeof(*lzf) + lzf->sz);zfree(node->zl);node->zl = (unsigned char *)lzf;node->encoding = QUICKLIST_NODE_ENCODING_LZF;node->recompress = 0;return 1;}/* Force 'quicklist' to meet compression guidelines set by compress depth.* The only way to guarantee interior nodes get compressed is to iterate* to our "interior" compress depth then compress the next node we find.* If compress depth is larger than the entire list, we return immediately. */REDIS_STATIC void __quicklistCompress(const quicklist *quicklist,quicklistNode *node) {/* If length is less than our compress depth (from both sides),* we can't compress anything. */if (!quicklistAllowsCompression(quicklist) ||quicklist->len < (unsigned int)(quicklist->compress * 2))return;#if 0/* Optimized cases for small depth counts */if (quicklist->compress == 1) {quicklistNode *h = quicklist->head, *t = quicklist->tail;quicklistDecompressNode(h);quicklistDecompressNode(t);if (h != node && t != node)quicklistCompressNode(node);return;} else if (quicklist->compress == 2) {quicklistNode *h = quicklist->head, *hn = h->next, *hnn = hn->next;quicklistNode *t = quicklist->tail, *tp = t->prev, *tpp = tp->prev;quicklistDecompressNode(h);quicklistDecompressNode(hn);quicklistDecompressNode(t);quicklistDecompressNode(tp);if (h != node && hn != node && t != node && tp != node) {quicklistCompressNode(node);}if (hnn != t) {quicklistCompressNode(hnn);}if (tpp != h) {quicklistCompressNode(tpp);}return;}#endif/* Iterate until we reach compress depth for both sides of the list.a* Note: because we do length checks at the *top* of this function,* we can skip explicit null checks below. Everything exists. */quicklistNode *forward = quicklist->head;quicklistNode *reverse = quicklist->tail;int depth = 0;int in_depth = 0;while (depth++ < quicklist->compress) {// 解壓縮???quicklistDecompressNode(forward);quicklistDecompressNode(reverse);if (forward == node || reverse == node)in_depth = 1;if (forward == reverse)return;forward = forward->next;reverse = reverse->prev;}if (!in_depth)quicklistCompressNode(node);if (depth > 2) {/* At this point, forward and reverse are one node beyond depth */// 壓縮quicklistCompressNode(forward);quicklistCompressNode(reverse);}}/* Decompress only compressed nodes. */#define quicklistDecompressNode(_node) \do { \if ((_node) && (_node)->encoding == QUICKLIST_NODE_ENCODING_LZF) { \__quicklistDecompressNode((_node)); \} \} while (0)/* Uncompress the ziplist in 'node' and update encoding details.* Returns 1 on successful decode, 0 on failure to decode. */REDIS_STATIC int __quicklistDecompressNode(quicklistNode *node) {#ifdef REDIS_TESTnode->attempted_compress = 0;#endifvoid *decompressed = zmalloc(node->sz);quicklistLZF *lzf = (quicklistLZF *)node->zl;if (lzf_decompress(lzf->compressed, lzf->sz, decompressed, node->sz) == 0) {/* Someone requested decompress, but we can't decompress. Not good. */zfree(decompressed);return 0;}zfree(lzf);node->zl = decompressed;node->encoding = QUICKLIST_NODE_ENCODING_RAW;return 1;}
總體來說,redis的list實(shí)現(xiàn)不是純粹的單雙向鏈表,而是 使用雙向鏈表+ziplist 的方式實(shí)現(xiàn)鏈表功能,既節(jié)省了內(nèi)存空間,對(duì)于查找來說時(shí)間復(fù)雜度也相對(duì)小。我們用一個(gè)時(shí)序圖來重新審視下:

三、lindex/lrange/rrange 查找操作
讀數(shù)據(jù)是數(shù)據(jù)庫的一個(gè)另一個(gè)重要功能。一般來說,有單個(gè)查詢,批量查詢,范圍查詢之類的功能,咱們就分頭說說。
// 1. 單個(gè)查詢 lindex key index// t_list.c, 通過下標(biāo)查找元素值void lindexCommand(client *c) {robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);// 如果key本身就不存在,直接返回,空已響應(yīng)if (o == NULL || checkType(c,o,OBJ_LIST)) return;long index;robj *value = NULL;// 解析index字段,賦給 index 變量if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))return;if (o->encoding == OBJ_ENCODING_QUICKLIST) {quicklistEntry entry;// 根據(jù)index查詢list數(shù)據(jù)if (quicklistIndex(o->ptr, index, &entry)) {// 使用兩個(gè)字段來保存valueif (entry.value) {value = createStringObject((char*)entry.value,entry.sz);} else {value = createStringObjectFromLongLong(entry.longval);}addReplyBulk(c,value);decrRefCount(value);} else {addReply(c,shared.nullbulk);}} else {serverPanic("Unknown list encoding");}}// quicklist.c, 根據(jù) index 查找元素/* Populate 'entry' with the element at the specified zero-based index* where 0 is the head, 1 is the element next to head* and so on. Negative integers are used in order to count* from the tail, -1 is the last element, -2 the penultimate* and so on. If the index is out of range 0 is returned.** Returns 1 if element found* Returns 0 if element not found */int quicklistIndex(const quicklist *quicklist, const long long idx,quicklistEntry *entry) {quicklistNode *n;unsigned long long accum = 0;unsigned long long index;int forward = idx < 0 ? 0 : 1; /* < 0 -> reverse, 0+ -> forward */// 初始化 quicklistEntry, 設(shè)置默認(rèn)值initEntry(entry);entry->quicklist = quicklist;// index為負(fù)數(shù)時(shí),逆向搜索if (!forward) {index = (-idx) - 1;n = quicklist->tail;} else {index = idx;n = quicklist->head;}if (index >= quicklist->count)return 0;while (likely(n)) {// n->count 代表每個(gè)list節(jié)點(diǎn)里的實(shí)際元素的個(gè)數(shù)(ziplist里可能包含n個(gè)元素)// 此處代表只會(huì)迭代到 index 所在的list節(jié)點(diǎn)就停止了if ((accum + n->count) > index) {break;} else {D("Skipping over (%p) %u at accum %lld", (void *)n, n->count,accum);// 依次迭代accum += n->count;n = forward ? n->next : n->prev;}}// 如果已經(jīng)迭代完成,說明未找到index元素if (!n)return 0;D("Found node: %p at accum %llu, idx %llu, sub+ %llu, sub- %llu", (void *)n,accum, index, index - accum, (-index) - 1 + accum);entry->node = n;if (forward) {/* forward = normal head-to-tail offset. */// index-accum 代表index節(jié)點(diǎn)在 當(dāng)前n節(jié)點(diǎn)中的偏移entry->offset = index - accum;} else {/* reverse = need negative offset for tail-to-head, so undo* the result of the original if (index < 0) above. */// 逆向搜索定位 如-1=1-1+0,-2=2-1+0entry->offset = (-index) - 1 + accum;}// 解壓縮node數(shù)據(jù)quicklistDecompressNodeForUse(entry->node);// 根據(jù)offset,查找ziplist中的sds valueentry->zi = ziplistIndex(entry->node->zl, entry->offset);// 從zi中獲取value,sz,longval 返回 (ziplist 協(xié)議)ziplistGet(entry->zi, &entry->value, &entry->sz, &entry->longval);/* The caller will use our result, so we don't re-compress here.* The caller can recompress or delete the node as needed. */return 1;}// quicklist.c/* Simple way to give quicklistEntry structs default values with one call. */#define initEntry(e) \do { \(e)->zi = (e)->value = NULL; \(e)->longval = -123456789; \(e)->quicklist = NULL; \(e)->node = NULL; \(e)->offset = 123456789; \(e)->sz = 0; \} while (0)// 解壓縮node數(shù)據(jù)/* Force node to not be immediately re-compresable */#define quicklistDecompressNodeForUse(_node) \do { \if ((_node) && (_node)->encoding == QUICKLIST_NODE_ENCODING_LZF) { \__quicklistDecompressNode((_node)); \(_node)->recompress = 1; \} \} while (0)/* Uncompress the ziplist in 'node' and update encoding details.* Returns 1 on successful decode, 0 on failure to decode. */REDIS_STATIC int __quicklistDecompressNode(quicklistNode *node) {#ifdef REDIS_TESTnode->attempted_compress = 0;#endifvoid *decompressed = zmalloc(node->sz);quicklistLZF *lzf = (quicklistLZF *)node->zl;if (lzf_decompress(lzf->compressed, lzf->sz, decompressed, node->sz) == 0) {/* Someone requested decompress, but we can't decompress. Not good. */zfree(decompressed);return 0;}zfree(lzf);node->zl = decompressed;node->encoding = QUICKLIST_NODE_ENCODING_RAW;return 1;}// ziplist.c/* Returns an offset to use for iterating with ziplistNext. When the given* index is negative, the list is traversed back to front. When the list* doesn't contain an element at the provided index, NULL is returned. */unsigned char *ziplistIndex(unsigned char *zl, int index) {unsigned char *p;unsigned int prevlensize, prevlen = 0;if (index < 0) {index = (-index)-1;p = ZIPLIST_ENTRY_TAIL(zl);if (p[0] != ZIP_END) {ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);while (prevlen > 0 && index--) {p -= prevlen;ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);}}} else {p = ZIPLIST_ENTRY_HEAD(zl);while (p[0] != ZIP_END && index--) {p += zipRawEntryLength(p);}}return (p[0] == ZIP_END || index > 0) ? NULL : p;}
對(duì)于范圍查找來說,按照redis之前的套路,有可能是在單個(gè)查找的上面再進(jìn)行循環(huán)查找就可以了,是否是這樣呢?我們來看看:
// 2. lrange 范圍查詢// t_list.cvoid lrangeCommand(client *c) {robj *o;long start, end, llen, rangelen;// 解析 start,end 參數(shù)if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL|| checkType(c,o,OBJ_LIST)) return;// list 長(zhǎng)度獲取, 有個(gè)計(jì)數(shù)器在llen = listTypeLength(o);/* convert negative indexes */if (start < 0) start = llen+start;if (end < 0) end = llen+end;// 將-xx的下標(biāo)轉(zhuǎn)換為正數(shù)查詢,如果負(fù)數(shù)過大,則以0計(jì)算if (start < 0) start = 0;/* Invariant: start >= 0, so this test will be true when end < 0.* The range is empty when start > end or start >= length. */if (start > end || start >= llen) {addReply(c,shared.emptymultibulk);return;}// end 過大,則限制// end 不可能小于0,因?yàn)樯弦粋€(gè) start > end 已限制if (end >= llen) end = llen-1;rangelen = (end-start)+1;/* Return the result in form of a multi-bulk reply */addReplyMultiBulkLen(c,rangelen);if (o->encoding == OBJ_ENCODING_QUICKLIST) {// 返回列表迭代器, start-TAIL, LIST_TAIL 代表正向迭代listTypeIterator *iter = listTypeInitIterator(o, start, LIST_TAIL);// 迭代到 rangelen=0 為止,依次向輸出緩沖輸出while(rangelen--) {listTypeEntry entry;// 獲取下一個(gè)元素listTypeNext(iter, &entry);quicklistEntry *qe = &entry.entry;if (qe->value) {addReplyBulkCBuffer(c,qe->value,qe->sz);} else {addReplyBulkLongLong(c,qe->longval);}}listTypeReleaseIterator(iter);} else {serverPanic("List encoding is not QUICKLIST!");}}// t_list.c, 統(tǒng)計(jì)list長(zhǎng)度unsigned long listTypeLength(robj *subject) {if (subject->encoding == OBJ_ENCODING_QUICKLIST) {return quicklistCount(subject->ptr);} else {serverPanic("Unknown list encoding");}}/* Return cached quicklist count */unsigned int quicklistCount(quicklist *ql) { return ql->count; }// 初始化 list 迭代器/* Initialize an iterator at the specified index. */listTypeIterator *listTypeInitIterator(robj *subject, long index,unsigned char direction) {listTypeIterator *li = zmalloc(sizeof(listTypeIterator));li->subject = subject;li->encoding = subject->encoding;li->direction = direction;li->iter = NULL;/* LIST_HEAD means start at TAIL and move *towards* head.* LIST_TAIL means start at HEAD and move *towards tail. */int iter_direction =direction == LIST_HEAD ? AL_START_TAIL : AL_START_HEAD;if (li->encoding == OBJ_ENCODING_QUICKLIST) {li->iter = quicklistGetIteratorAtIdx(li->subject->ptr,iter_direction, index);} else {serverPanic("Unknown list encoding");}return li;}/* Initialize an iterator at a specific offset 'idx' and make the iterator* return nodes in 'direction' direction. */quicklistIter *quicklistGetIteratorAtIdx(const quicklist *quicklist,const int direction,const long long idx) {quicklistEntry entry;// 查找idx 元素先 (前面已介紹, 為 ziplist+quicklist 迭代獲得)if (quicklistIndex(quicklist, idx, &entry)) {// 獲取獲取的是整個(gè)list的迭代器, 通過current和offset進(jìn)行迭代quicklistIter *base = quicklistGetIterator(quicklist, direction);base->zi = NULL;base->current = entry.node;base->offset = entry.offset;return base;} else {return NULL;}}// quicklist, list迭代器初始化/* Returns a quicklist iterator 'iter'. After the initialization every* call to quicklistNext() will return the next element of the quicklist. */quicklistIter *quicklistGetIterator(const quicklist *quicklist, int direction) {quicklistIter *iter;// 迭代器只包含當(dāng)前元素iter = zmalloc(sizeof(*iter));if (direction == AL_START_HEAD) {iter->current = quicklist->head;iter->offset = 0;} else if (direction == AL_START_TAIL) {iter->current = quicklist->tail;iter->offset = -1;}iter->direction = direction;iter->quicklist = quicklist;iter->zi = NULL;return iter;}// 迭代器攜帶整個(gè)list 引用,及當(dāng)前節(jié)點(diǎn),如何進(jìn)行迭代,則是重點(diǎn)// t_list.c, 迭代list元素, 并將 當(dāng)前節(jié)點(diǎn)賦給 entry/* Stores pointer to current the entry in the provided entry structure* and advances the position of the iterator. Returns 1 when the current* entry is in fact an entry, 0 otherwise. */int listTypeNext(listTypeIterator *li, listTypeEntry *entry) {/* Protect from converting when iterating */serverAssert(li->subject->encoding == li->encoding);entry->li = li;if (li->encoding == OBJ_ENCODING_QUICKLIST) {// 迭代iter(改變iter指向), 賦值給 entry->entryreturn quicklistNext(li->iter, &entry->entry);} else {serverPanic("Unknown list encoding");}return 0;}// quicklist.c/* Get next element in iterator.** Note: You must NOT insert into the list while iterating over it.* You *may* delete from the list while iterating using the* quicklistDelEntry() function.* If you insert into the quicklist while iterating, you should* re-create the iterator after your addition.** iter = quicklistGetIterator(quicklist,<direction>);* quicklistEntry entry;* while (quicklistNext(iter, &entry)) {* if (entry.value)* [[ use entry.value with entry.sz ]]* else* [[ use entry.longval ]]* }** Populates 'entry' with values for this iteration.* Returns 0 when iteration is complete or if iteration not possible.* If return value is 0, the contents of 'entry' are not valid.*/int quicklistNext(quicklistIter *iter, quicklistEntry *entry) {initEntry(entry);if (!iter) {D("Returning because no iter!");return 0;}// 保存當(dāng)前node, 及quicklist引用entry->quicklist = iter->quicklist;entry->node = iter->current;if (!iter->current) {D("Returning because current node is NULL")return 0;}unsigned char *(*nextFn)(unsigned char *, unsigned char *) = NULL;int offset_update = 0;if (!iter->zi) {/* If !zi, use current index. */// 初始化時(shí) zi 未賦值,所以直接使用當(dāng)前元素,使用offset進(jìn)行查找quicklistDecompressNodeForUse(iter->current);iter->zi = ziplistIndex(iter->current->zl, iter->offset);} else {/* else, use existing iterator offset and get prev/next as necessary. */if (iter->direction == AL_START_HEAD) {nextFn = ziplistNext;offset_update = 1;} else if (iter->direction == AL_START_TAIL) {nextFn = ziplistPrev;offset_update = -1;}// 向前或向后迭代元素iter->zi = nextFn(iter->current->zl, iter->zi);iter->offset += offset_update;}entry->zi = iter->zi;entry->offset = iter->offset;if (iter->zi) {/* Populate value from existing ziplist position */// 從 zi 中獲取值返回 (按ziplist協(xié)議)ziplistGet(entry->zi, &entry->value, &entry->sz, &entry->longval);return 1;} else {/* We ran out of ziplist entries.* Pick next node, update offset, then re-run retrieval. */// 當(dāng)前ziplist沒有下一個(gè)元素了,遞歸查找下一個(gè)ziplistquicklistCompress(iter->quicklist, iter->current);if (iter->direction == AL_START_HEAD) {/* Forward traversal */D("Jumping to start of next node");iter->current = iter->current->next;iter->offset = 0;} else if (iter->direction == AL_START_TAIL) {/* Reverse traversal */D("Jumping to end of previous node");iter->current = iter->current->prev;iter->offset = -1;}iter->zi = NULL;return quicklistNext(iter, entry);}}// ziplist.c/* Get entry pointed to by 'p' and store in either '*sstr' or 'sval' depending* on the encoding of the entry. '*sstr' is always set to NULL to be able* to find out whether the string pointer or the integer value was set.* Return 0 if 'p' points to the end of the ziplist, 1 otherwise. */unsigned int ziplistGet(unsigned char *p, unsigned char **sstr, unsigned int *slen, long long *sval) {zlentry entry;if (p == NULL || p[0] == ZIP_END) return 0;if (sstr) *sstr = NULL;zipEntry(p, &entry);if (ZIP_IS_STR(entry.encoding)) {if (sstr) {*slen = entry.len;*sstr = p+entry.headersize;}} else {if (sval) {*sval = zipLoadInteger(p+entry.headersize,entry.encoding);}}return 1;}
看起來并沒有利用單個(gè)查找的代碼,而是使用迭代器進(jìn)行操作。看起來不難,但是有點(diǎn)繞,我們就用一個(gè)時(shí)序圖來重新表達(dá)下:

四、lrem 刪除操作
增刪改查,還是要湊夠的。刪除的操作,自然是要配置數(shù)據(jù)結(jié)構(gòu)來做了,比如: 如何定位要?jiǎng)h除的元素,刪除后鏈表是否需要重排?
// LREM key count value, 只提供了范圍刪除的方式,單個(gè)數(shù)據(jù)刪除可以通過此命令來完成// t_list.cvoid lremCommand(client *c) {robj *subject, *obj;obj = c->argv[3];long toremove;long removed = 0;if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != C_OK))return;subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);if (subject == NULL || checkType(c,subject,OBJ_LIST)) return;// 因是范圍型刪除,自然使用迭代刪除是最好的選擇了listTypeIterator *li;if (toremove < 0) {toremove = -toremove;li = listTypeInitIterator(subject,-1,LIST_HEAD);} else {li = listTypeInitIterator(subject,0,LIST_TAIL);}listTypeEntry entry;// 迭代方式我們?cè)诓檎也僮饕言敿?xì)說明while (listTypeNext(li,&entry)) {// 1. 比較元素是否是需要?jiǎng)h除的對(duì)象,只有完全匹配才可以刪除if (listTypeEqual(&entry,obj)) {// 2. 實(shí)際的刪除動(dòng)作listTypeDelete(li, &entry);server.dirty++;removed++;if (toremove && removed == toremove) break;}}listTypeReleaseIterator(li);// 如果沒有任何元素后,將key從db中刪除if (listTypeLength(subject) == 0) {dbDelete(c->db,c->argv[1]);}addReplyLongLong(c,removed);if (removed) signalModifiedKey(c->db,c->argv[1]);}// 1. 是否與指定robj相等// t_list.c, listTypeEntry 是否與指定robj相等/* Compare the given object with the entry at the current position. */int listTypeEqual(listTypeEntry *entry, robj *o) {if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {serverAssertWithInfo(NULL,o,sdsEncodedObject(o));return quicklistCompare(entry->entry.zi,o->ptr,sdslen(o->ptr));} else {serverPanic("Unknown list encoding");}}// t_list.cint quicklistCompare(unsigned char *p1, unsigned char *p2, int p2_len) {// 元素本身是 ziplist 類型的,所以直接交由ziplist比對(duì)即可return ziplistCompare(p1, p2, p2_len);}// ziplist.c/* Compare entry pointer to by 'p' with 'sstr' of length 'slen'. *//* Return 1 if equal. */unsigned int ziplistCompare(unsigned char *p, unsigned char *sstr, unsigned int slen) {zlentry entry;unsigned char sencoding;long long zval, sval;if (p[0] == ZIP_END) return 0;zipEntry(p, &entry);if (ZIP_IS_STR(entry.encoding)) {/* Raw compare */if (entry.len == slen) {return memcmp(p+entry.headersize,sstr,slen) == 0;} else {return 0;}} else {/* Try to compare encoded values. Don't compare encoding because* different implementations may encoded integers differently. */if (zipTryEncoding(sstr,slen,&sval,&sencoding)) {zval = zipLoadInteger(p+entry.headersize,entry.encoding);return zval == sval;}}return 0;}/* Delete the element pointed to. */void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry) {if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {quicklistDelEntry(iter->iter, &entry->entry);} else {serverPanic("Unknown list encoding");}}// 2. 執(zhí)行刪除操作// t_list.c/* Delete the element pointed to. */void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry) {if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {quicklistDelEntry(iter->iter, &entry->entry);} else {serverPanic("Unknown list encoding");}}// quicklist.c/* Delete one element represented by 'entry'** 'entry' stores enough metadata to delete the proper position in* the correct ziplist in the correct quicklist node. */void quicklistDelEntry(quicklistIter *iter, quicklistEntry *entry) {quicklistNode *prev = entry->node->prev;quicklistNode *next = entry->node->next;int deleted_node = quicklistDelIndex((quicklist *)entry->quicklist,entry->node, &entry->zi);/* after delete, the zi is now invalid for any future usage. */iter->zi = NULL;/* If current node is deleted, we must update iterator node and offset. */if (deleted_node) {// 如果node被刪除,則移動(dòng)quicklist指針if (iter->direction == AL_START_HEAD) {iter->current = next;iter->offset = 0;} else if (iter->direction == AL_START_TAIL) {iter->current = prev;iter->offset = -1;}}/* else if (!deleted_node), no changes needed.* we already reset iter->zi above, and the existing iter->offset* doesn't move again because:* - [1, 2, 3] => delete offset 1 => [1, 3]: next element still offset 1* - [1, 2, 3] => delete offset 0 => [2, 3]: next element still offset 0* if we deleted the last element at offet N and now* length of this ziplist is N-1, the next call into* quicklistNext() will jump to the next node. */}// quicklist.c/* Delete one entry from list given the node for the entry and a pointer* to the entry in the node.** Note: quicklistDelIndex() *requires* uncompressed nodes because you* already had to get *p from an uncompressed node somewhere.** Returns 1 if the entire node was deleted, 0 if node still exists.* Also updates in/out param 'p' with the next offset in the ziplist. */REDIS_STATIC int quicklistDelIndex(quicklist *quicklist, quicklistNode *node,unsigned char **p) {int gone = 0;// 同樣,到最后一級(jí),依舊是調(diào)用ziplist的方法進(jìn)行刪除 (按照 ziplist 協(xié)議操作即可)node->zl = ziplistDelete(node->zl, p);node->count--;// 如果node中沒有元素了,就把當(dāng)前node移除,否則更新 sz 大小if (node->count == 0) {gone = 1;__quicklistDelNode(quicklist, node);} else {quicklistNodeUpdateSz(node);}quicklist->count--;/* If we deleted the node, the original node is no longer valid */return gone ? 1 : 0;}
delete 操作總體來說就是一個(gè)迭代,比對(duì),刪除的操作,細(xì)節(jié)還是有點(diǎn)多的,只是都是些我們前面說過的技術(shù),也就無所謂了。
五、lpop 彈出隊(duì)列
這個(gè)功能大概和刪除的意思差不多,就是刪除最后一元素即可。事實(shí)上,我們也更喜歡使用redis這種功能。簡(jiǎn)單看看。
// 用法: LPOP key// t_list.cvoid lpopCommand(client *c) {popGenericCommand(c,LIST_HEAD);}void popGenericCommand(client *c, int where) {robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk);if (o == NULL || checkType(c,o,OBJ_LIST)) return;// 彈出元素,重點(diǎn)看一下這個(gè)方法robj *value = listTypePop(o,where);if (value == NULL) {addReply(c,shared.nullbulk);} else {char *event = (where == LIST_HEAD) ? "lpop" : "rpop";addReplyBulk(c,value);decrRefCount(value);notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);if (listTypeLength(o) == 0) {notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);dbDelete(c->db,c->argv[1]);}signalModifiedKey(c->db,c->argv[1]);server.dirty++;}}// t_list.crobj *listTypePop(robj *subject, int where) {long long vlong;robj *value = NULL;int ql_where = where == LIST_HEAD ? QUICKLIST_HEAD : QUICKLIST_TAIL;if (subject->encoding == OBJ_ENCODING_QUICKLIST) {if (quicklistPopCustom(subject->ptr, ql_where, (unsigned char **)&value,NULL, &vlong, listPopSaver)) {if (!value)value = createStringObjectFromLongLong(vlong);}} else {serverPanic("Unknown list encoding");}return value;}// quicklist.c/* pop from quicklist and return result in 'data' ptr. Value of 'data'* is the return value of 'saver' function pointer if the data is NOT a number.** If the quicklist element is a long long, then the return value is returned in* 'sval'.** Return value of 0 means no elements available.* Return value of 1 means check 'data' and 'sval' for values.* If 'data' is set, use 'data' and 'sz'. Otherwise, use 'sval'. */int quicklistPopCustom(quicklist *quicklist, int where, unsigned char **data,unsigned int *sz, long long *sval,void *(*saver)(unsigned char *data, unsigned int sz)) {unsigned char *p;unsigned char *vstr;unsigned int vlen;long long vlong;int pos = (where == QUICKLIST_HEAD) ? 0 : -1;if (quicklist->count == 0)return 0;if (data)*data = NULL;if (sz)*sz = 0;if (sval)*sval = -123456789;quicklistNode *node;// 獲取ziplist中的,第一個(gè)元素或者最后一個(gè)節(jié)點(diǎn)if (where == QUICKLIST_HEAD && quicklist->head) {node = quicklist->head;} else if (where == QUICKLIST_TAIL && quicklist->tail) {node = quicklist->tail;} else {return 0;}// 獲取ziplist中的,第一個(gè)元素或者最后一個(gè)元素p = ziplistIndex(node->zl, pos);if (ziplistGet(p, &vstr, &vlen, &vlong)) {if (vstr) {if (data)// 創(chuàng)建string 對(duì)象返回*data = saver(vstr, vlen);if (sz)*sz = vlen;} else {if (data)*data = NULL;if (sval)*sval = vlong;}// 刪除獲取數(shù)據(jù)后的元素quicklistDelIndex(quicklist, node, &p);return 1;}return 0;}
彈出一個(gè)元素,大概分三步:
1. 獲取頭節(jié)點(diǎn)或尾節(jié)點(diǎn);
2. 從ziplist中獲取第一個(gè)元素或最后一個(gè)元素;
3. 刪除頭節(jié)點(diǎn)或尾節(jié)點(diǎn);
六、blpop 阻塞式彈出元素
算是阻塞隊(duì)列吧。我們只想看一下,像本地語言實(shí)現(xiàn)的阻塞,我們知道用鎖+wait/notify機(jī)制。redis是如何進(jìn)行阻塞的呢?
// 用法: BLPOP key1 [key2] timeout// t_list.c 同樣 l/r 復(fù)用代碼void blpopCommand(client *c) {blockingPopGenericCommand(c,LIST_HEAD);}/* Blocking RPOP/LPOP */void blockingPopGenericCommand(client *c, int where) {robj *o;mstime_t timeout;int j;if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)!= C_OK) return;// 循環(huán)查找多個(gè)keyfor (j = 1; j < c->argc-1; j++) {o = lookupKeyWrite(c->db,c->argv[j]);if (o != NULL) {if (o->type != OBJ_LIST) {addReply(c,shared.wrongtypeerr);return;} else {// 如果有值,則和非阻塞版本一樣了,直接響應(yīng)即可if (listTypeLength(o) != 0) {/* Non empty list, this is like a non normal [LR]POP. */char *event = (where == LIST_HEAD) ? "lpop" : "rpop";robj *value = listTypePop(o,where);serverAssert(value != NULL);addReplyMultiBulkLen(c,2);addReplyBulk(c,c->argv[j]);addReplyBulk(c,value);decrRefCount(value);notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[j],c->db->id);if (listTypeLength(o) == 0) {dbDelete(c->db,c->argv[j]);notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[j],c->db->id);}signalModifiedKey(c->db,c->argv[j]);server.dirty++;/* Replicate it as an [LR]POP instead of B[LR]POP. */rewriteClientCommandVector(c,2,(where == LIST_HEAD) ? shared.lpop : shared.rpop,c->argv[j]);// 獲取到值后直接結(jié)束流程return;}}}}/* If we are inside a MULTI/EXEC and the list is empty the only thing* we can do is treating it as a timeout (even with timeout 0). */if (c->flags & CLIENT_MULTI) {addReply(c,shared.nullmultibulk);return;}/* If the list is empty or the key does not exists we must block */// 阻塞是在這里實(shí)現(xiàn)的blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);}/* This is how the current blocking POP works, we use BLPOP as example:* - If the user calls BLPOP and the key exists and contains a non empty list* then LPOP is called instead. So BLPOP is semantically the same as LPOP* if blocking is not required.* - If instead BLPOP is called and the key does not exists or the list is* empty we need to block. In order to do so we remove the notification for* new data to read in the client socket (so that we'll not serve new* requests if the blocking request is not served). Also we put the client* in a dictionary (db->blocking_keys) mapping keys to a list of clients* blocking for this keys.* - If a PUSH operation against a key with blocked clients waiting is* performed, we mark this key as "ready", and after the current command,* MULTI/EXEC block, or script, is executed, we serve all the clients waiting* for this list, from the one that blocked first, to the last, accordingly* to the number of elements we have in the ready list.*//* Set a client in blocking mode for the specified key, with the specified* timeout */void blockForKeys(client *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {dictEntry *de;list *l;int j;c->bpop.timeout = timeout;c->bpop.target = target;if (target != NULL) incrRefCount(target);// 阻塞入隊(duì)判定for (j = 0; j < numkeys; j++) {/* If the key already exists in the dict ignore it. */if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;incrRefCount(keys[j]);/* And in the other "side", to map keys -> clients */de = dictFind(c->db->blocking_keys,keys[j]);if (de == NULL) {int retval;/* For every key we take a list of clients blocked for it */l = listCreate();// 將阻塞key放到 db 中,后臺(tái)有線程去輪詢retval = dictAdd(c->db->blocking_keys,keys[j],l);incrRefCount(keys[j]);serverAssertWithInfo(c,keys[j],retval == DICT_OK);} else {l = dictGetVal(de);}// 將每個(gè)key 依次添加到 c->db->blocking_keys, 后續(xù)迭代將會(huì)重新檢查取出listAddNodeTail(l,c);}// 阻塞客戶端,其實(shí)就是設(shè)置阻塞標(biāo)識(shí),然后等待key變更或超時(shí),下一次掃描時(shí)將重新檢測(cè)取出執(zhí)行blockClient(c,BLOCKED_LIST);}// block.c 設(shè)置阻塞標(biāo)識(shí)/* Block a client for the specific operation type. Once the CLIENT_BLOCKED* flag is set client query buffer is not longer processed, but accumulated,* and will be processed when the client is unblocked. */void blockClient(client *c, int btype) {c->flags |= CLIENT_BLOCKED;c->btype = btype;server.bpop_blocked_clients++;}
redis阻塞功能的實(shí)現(xiàn): 使用一個(gè) db->blocking_keys 的列表來保存需要阻塞的請(qǐng)求,在下一次循環(huán)時(shí),進(jìn)行掃描這些隊(duì)列的條件是否滿足,從而決定是否繼續(xù)阻塞或者取出。
思考:從上面實(shí)現(xiàn)中,有個(gè)疑問:如何保證最多等待 timeout 時(shí)間或者最多循環(huán)多少次呢?你覺得應(yīng)該如何處理呢?
OK, 至此,整個(gè)list數(shù)據(jù)結(jié)構(gòu)的解析算是完整了。

騰訊、阿里、滴滴后臺(tái)面試題匯總總結(jié) — (含答案)
面試:史上最全多線程面試題 !
最新阿里內(nèi)推Java后端面試題
JVM難學(xué)?那是因?yàn)槟銢]認(rèn)真看完這篇文章

關(guān)注作者微信公眾號(hào) —《JAVA爛豬皮》
了解更多java后端架構(gòu)知識(shí)以及最新面試寶典


看完本文記得給作者點(diǎn)贊+在看哦~~~大家的支持,是作者源源不斷出文的動(dòng)力
作者:等你歸去來
出處:https://www.cnblogs.com/yougewe/p/12240140.html
