美文网首页工作生活
「Redis源码解读」—数据结构(七)快速列表

「Redis源码解读」—数据结构(七)快速列表

作者: wh4763 | 来源:发表于2019-06-30 19:22 被阅读0次

quicklist结构是在redis 3.2版本中新加的数据结构,用在列表的底层实现。
考虑到链表的附加空间相对太高,prev 和 next 指针就要占去 16 个字节 (64bit 系统的指针是 8 个字节),另外每个节点的内存都是单独分配,会加剧内存的碎片化,影响内存管理效率。
后续版本对列表数据结构进行了改造,使用 quicklist 代替了 ziplist 和 linkedlist.

知识点

  • quicklist宏观上是一个双向链表,因此,它具有一个双向链表的有点,进行插入或删除操作时非常方便,虽然复杂度为O(n),但是不需要内存的复制,提高了效率,而且访问两端元素复杂度为O(1)。
  • quicklist微观上是一片片entry节点,每一片entry节点内存连续且顺序存储,可以通过二分查找以 log2(n)log2(n) 的复杂度进行定位。

基本结构

结构体

// 快速列表
struct quicklist {
    quicklistNode* head;
    quicklistNode* tail;
    long count; // 元素总数
    int nodes; // ziplist 节点的个数
    int compressDepth; // LZF 算法压缩深度
    ...
}
// 快速列表节点
struct quicklistNode {
    quicklistNode* prev;
    quicklistNode* next;
    ziplist* zl; // 指向压缩列表
    int32 size; // ziplist 的字节总数
    int16 count; // ziplist 中的元素数量
    int2 encoding; // 存储形式 2bit,原生字节数组还是 LZF 压缩存储
    ...
}

上述代码简单地表示了 quicklist 的大致结构。为了进一步节约空间,Redis 还会对 ziplist 进行压缩存储,使用 LZF 算法压缩,可以选择压缩深度。

压缩深度

  • quicklist 默认的压缩深度是 0,也就是不压缩。压缩的实际深度由配置参数list-compress-depth决定。
  • 为了支持快速的 push/pop 操作,quicklist 的首尾两个 ziplist 不压缩,此时深度就是 1。
  • 如果深度为 2,就表示 quicklist 的首尾第一个 ziplist 以及首尾第二个 ziplist 都不压缩。

zipList 长度

  • quicklist 内部默认单个 ziplist 长度为 8k 字节,超出了这个字节数,就会新起一个 ziplist。
  • ziplist 的长度由配置参数 list-max-ziplist-size决定。
typedef struct quicklistLZF {
//表示被LZF算法压缩后的ziplist的大小
unsigned int sz; /* LZF size in bytes*/

//保存压缩后的ziplist的数组,柔性数组
char compressed[];
} quicklistLZF;

//quicklist的迭代器结构
typedef struct quicklistIter {
const quicklist *quicklist; //指向所属的quicklist的指针
quicklistNode *current; //指向当前迭代的quicklist节点的指针
unsigned char *zi; //指向当前quicklist节点中迭代的ziplist
long offset; //当前ziplist结构中的偏移量 /* offset in current ziplist */
int direction; //迭代方向
} quicklistIter;

//管理quicklist中quicklistNode节点中ziplist信息的结构
typedef struct quicklistEntry {
const quicklist *quicklist; //指向所属的quicklist的指针
quicklistNode *node; //指向所属的quicklistNode节点的指针
unsigned char *zi; //指向当前ziplist结构的指针
unsigned char *value; //指向当前ziplist结构的字符串vlaue成员
long long longval; //指向当前ziplist结构的整数value成员
unsigned int sz; //保存当前ziplist结构的字节数大小
int offset; //保存相对ziplist的偏移量
} quicklistEntry;

* Insert a new entry before or after existing entry 'entry'.
*
* If after==1, the new value is inserted after 'entry', otherwise
* the new value is inserted before 'entry'. */
//如果after为1,在已存在的entry后插入一个entry,否则在前面插入
REDIS_STATIC void _quicklistInsert(quicklist *quicklist, quicklistEntry *entry,
void *value, const size_t sz, int after) {
int full = 0, at_tail = 0, at_head = 0, full_next = 0, full_prev = 0;
int fill = quicklist->fill;
quicklistNode *node = entry->node;
quicklistNode *new_node = NULL;

if (!node) { //如果entry为没有所属的quicklistNode节点,需要新创建
/* we have no reference node, so let's create only node in the list */
D("No node given!");
new_node = quicklistCreateNode(); //创建一个节点
//将entry值push到new_node新节点的ziplist中
new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
//将新的quicklistNode节点插入到quicklist中
__quicklistInsertNode(quicklist, NULL, new_node, after);
//更新entry计数器
new_node->count++;
quicklist->count++;
return;
}

/* Populate accounting flags for easier boolean checks later */
//如果node不能插入entry
if (!_quicklistNodeAllowInsert(node, fill, sz)) {
D("Current node is full with count %d with requested fill %lu",
node->count, fill);
full = 1; //设置full的标志
}

//如果是后插入且当前entry为尾部的entry
if (after && (entry->offset == node->count)) {
D("At Tail of current ziplist");
at_tail = 1; //设置在尾部at_tail标示
//如果node的后继节点不能插入
if (!_quicklistNodeAllowInsert(node->next, fill, sz)) {
D("Next node is full too.");
full_next = 1; //设置标示
}
}

//如果是前插入且当前entry为头部的entry
if (!after && (entry->offset == 0)) {
D("At Head");
at_head = 1; //设置at_head表示
if (!_quicklistNodeAllowInsert(node->prev, fill, sz)) { //如果node的前驱节点不能插入
D("Prev node is full too.");
full_prev = 1; //设置标示
}
}

/* Now determine where and how to insert the new element */
//如果node不满,且是后插入
if (!full && after) {
D("Not full, inserting after current position.");
quicklistDecompressNodeForUse(node); //将node临时解压
unsigned char *next = ziplistNext(node->zl, entry->zi); //返回下一个entry的地址
if (next == NULL) { //如果next为空,则直接在尾部push一个entry
node->zl = ziplistPush(node->zl, value, sz, ZIPLIST_TAIL);
} else { //否则,后插入一个entry
node->zl = ziplistInsert(node->zl, next, value, sz);
}
node->count++; //更新entry计数器
quicklistNodeUpdateSz(node); //更新ziplist的大小sz
quicklistRecompressOnly(quicklist, node); //将临时解压的重压缩

//如果node不满且是前插
} else if (!full && !after) {
D("Not full, inserting before current position.");
quicklistDecompressNodeForUse(node); //将node临时解压
node->zl = ziplistInsert(node->zl, entry->zi, value, sz); //前插入
node->count++; //更新entry计数器
quicklistNodeUpdateSz(node); //更新ziplist的大小sz
quicklistRecompressOnly(quicklist, node); //将临时解压的重压缩

//当前node满了,且当前已存在的entry是尾节点,node的后继节点指针不为空,且node的后驱节点能插入
//本来要插入当前node中,但是当前的node满了,所以插在next节点的头部
} else if (full && at_tail && node->next && !full_next && after) {
/* If we are: at tail, next has free space, and inserting after:
* - insert entry at head of next node. */
D("Full and tail, but next isn't full; inserting next node head");
new_node = node->next; //new_node指向node的后继节点
quicklistDecompressNodeForUse(new_node); //将node临时解压
new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_HEAD); //在new_node头部push一个entry
new_node->count++; //更新entry计数器
quicklistNodeUpdateSz(new_node); //更新ziplist的大小sz
quicklistRecompressOnly(quicklist, new_node); //将临时解压的重压缩

//当前node满了,且当前已存在的entry是头节点,node的前驱节点指针不为空,且前驱节点可以插入
//因此插在前驱节点的尾部
} else if (full && at_head && node->prev && !full_prev && !after) {
/* If we are: at head, previous has free space, and inserting before:
* - insert entry at tail of previous node. */
D("Full and head, but prev isn't full, inserting prev node tail");
new_node = node->prev; //new_node指向node的后继节点
quicklistDecompressNodeForUse(new_node); //将node临时解压
new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_TAIL);//在new_node尾部push一个entry
new_node->count++; //更新entry计数器
quicklistNodeUpdateSz(new_node); //更新ziplist的大小sz
quicklistRecompressOnly(quicklist, new_node); //将临时解压的重压缩

//当前node满了
//要么已存在的entry是尾节点,且后继节点指针不为空,且后继节点不可以插入,且要后插
//要么已存在的entry为头节点,且前驱节点指针不为空,且前驱节点不可以插入,且要前插
} else if (full && ((at_tail && node->next && full_next && after) ||
(at_head && node->prev && full_prev && !after))) {
/* If we are: full, and our prev/next is full, then:
* - create new node and attach to quicklist */
D("\tprovisioning new node...");
new_node = quicklistCreateNode(); //创建一个节点
new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD); //将entrypush到new_node的头部
new_node->count++; //更新entry计数器
quicklistNodeUpdateSz(new_node); //更新ziplist的大小sz
__quicklistInsertNode(quicklist, node, new_node, after); //将new_node插入在当前node的后面

//当前node满了,且要将entry插入在中间的任意地方,需要将node分割
} else if (full) {
/* else, node is full we need to split it. */
/* covers both after and !after cases */
D("\tsplitting node...");
quicklistDecompressNodeForUse(node); //将node临时解压
new_node = _quicklistSplitNode(node, entry->offset, after);//分割node成两块
new_node->zl = ziplistPush(new_node->zl, value, sz,
after ? ZIPLIST_HEAD : ZIPLIST_TAIL);//将entry push到new_node中
new_node->count++; //更新entry计数器
quicklistNodeUpdateSz(new_node); //更新ziplist的大小sz
__quicklistInsertNode(quicklist, node, new_node, after); //将new_node插入进去
_quicklistMergeNodes(quicklist, node); //左右能合并的合并
}

quicklist->count++; //更新总的entry计数器
}
/* Add new entry to head node of quicklist.
*
* Returns 0 if used existing head.
* Returns 1 if new head created. */
//push一个entry节点到quicklist的头部
//返回0表示不改变头节点指针,返回1表示节点插入在头部,改变了头结点指针
int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
quicklistNode *orig_head = quicklist->head; //备份头结点地址

//如果ziplist可以插入entry节点
if (likely(
_quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
quicklist->head->zl =
ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD); //将节点push到头部
quicklistNodeUpdateSz(quicklist->head); //更新quicklistNode记录ziplist大小的sz
} else { //如果不能插入entry节点到ziplist
quicklistNode *node = quicklistCreateNode(); //新创建一个quicklistNode节点

//将entry节点push到新创建的quicklistNode节点中
node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);

quicklistNodeUpdateSz(node); //更新ziplist的大小sz
_quicklistInsertNodeBefore(quicklist, quicklist->head, node); //将新创建的节点插入到头节点前
}
quicklist->count++; //更新quicklistNode计数器
quicklist->head->count++; //更新entry计数器
return (orig_head != quicklist->head); //如果改变头节点指针则返回1,否则返回0
}

/* Add new entry to tail node of quicklist.
*
* Returns 0 if used existing tail.
* Returns 1 if new tail created. */
//push一个entry节点到quicklist的尾节点中,如果不能push则新创建一个quicklistNode节点
//返回0表示不改变尾节点指针,返回1表示节点插入在尾部,改变了尾结点指针
int quicklistPushTail(quicklist *quicklist, void *value, size_t sz) {
quicklistNode *orig_tail = quicklist->tail;

//如果ziplist可以插入entry节点
if (likely(
_quicklistNodeAllowInsert(quicklist->tail, quicklist->fill, sz))) {
quicklist->tail->zl =
ziplistPush(quicklist->tail->zl, value, sz, ZIPLIST_TAIL); //将节点push到尾部
quicklistNodeUpdateSz(quicklist->tail); //更新quicklistNode记录ziplist大小的sz
} else {
quicklistNode *node = quicklistCreateNode(); //新创建一个quicklistNode节点

//将entry节点push到新创建的quicklistNode节点中
node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_TAIL);

quicklistNodeUpdateSz(node); //更新ziplist的大小sz
_quicklistInsertNodeAfter(quicklist, quicklist->tail, node);//将新创建的节点插入到尾节点后
}
quicklist->count++; //更新quicklistNode计数器
quicklist->tail->count++; //更新entry计数器
return (orig_tail != quicklist->tail); //如果改变尾节点指针则返回1,否则返回0
}
/*
* pop from quicklist and return result in 'data' ptr. Value of 'data'
* is the return value of 'saver' function pointer if the data is NOT a number.
*
* If the quicklist element is a long long, then the return value is returned in
* 'sval'.
*
* Return value of 0 means no elements available.
* Return value of 1 means check 'data' and 'sval' for values.
* If 'data' is set, use 'data' and 'sz'. Otherwise, use 'sval'. */
//从quicklist的头节点或尾节点pop弹出出一个entry,并将value保存在传入传出参数
//返回0表示没有可pop出的entry
//返回1表示pop出了entry,存在data或sval中
int quicklistPopCustom(quicklist *quicklist, int where, unsigned char **data,
unsigned int *sz, long long *sval,
void *(*saver)(unsigned char *data, unsigned int sz)) {
unsigned char *p;
unsigned char *vstr;
unsigned int vlen;
long long vlong;
int pos = (where == QUICKLIST_HEAD) ? 0 : -1; //位置下标

if (quicklist->count == 0) //entry数量为0,弹出失败
return 0;

//初始化
if (data)
*data = NULL;
if (sz)
*sz = 0;
if (sval)
*sval = -123456789;

quicklistNode *node;
//记录quicklist的头quicklistNode节点或尾quicklistNode节点
if (where == QUICKLIST_HEAD && quicklist->head) {
node = quicklist->head;
} else if (where == QUICKLIST_TAIL && quicklist->tail) {
node = quicklist->tail;
} else {
return 0; //只能从头或尾弹出
}

p = ziplistIndex(node->zl, pos); //获得当前pos的entry地址
if (ziplistGet(p, &vstr, &vlen, &vlong)) { //将entry信息读入到参数中
if (vstr) { //entry中是字符串值
if (data)
*data = saver(vstr, vlen); //调用特定的函数将字符串值保存到*data
if (sz)
*sz = vlen; //保存字符串长度
} else { //整数值
if (data)
*data = NULL;
if (sval)
*sval = vlong; //将整数值保存在*sval中
}
quicklistDelIndex(quicklist, node, &p); //将该entry从ziplist中删除
return 1;
}
return 0;
}

/* Return a malloc'd copy of data passed in */
//将data内容拷贝一份并返回地址
REDIS_STATIC void *_quicklistSaver(unsigned char *data, unsigned int sz) {
unsigned char *vstr;
if (data) {
vstr = zmalloc(sz); //分配空间
memcpy(vstr, data, sz); //拷贝
return vstr;
}
return NULL;
}

相关文章

网友评论

    本文标题:「Redis源码解读」—数据结构(七)快速列表

    本文链接:https://www.haomeiwen.com/subject/fcobcctx.html