Redis 基本特性
1. 非关系型的键值对数据库,可以根据键以O(1) 的时间复杂度取出或插入关联值 2. Redis 的数据是存在内存中的 3. 键值对中键的类型可以是字符串,整型,浮点型等,且键是唯一的 4. 键值对中的值类型可以是string,hash,list,set,sorted set 等 5. Redis 内置了复制,磁盘持久化,LUA脚本,事务,SSL, ACLs,客户端缓存,客户端代理等功能 6. 通过Redis哨兵和Redis Cluster 模式提供高可用性
Redis高性能的原因
1.图示(换算时间:1s =1000 ms ,1ms=1000 us ,1us =1000 ns):

2.对于内存数据库来说,本身数据就存在于内存里,避免了磁盘 I/O 的限制,无疑访问速度会远大于磁盘数据库。
3.其次Redis,默认是采用一个线程执行指令任务的,既减少了线程上下文切换带来的开销,也避免并发问题。
4.而且Redis中有多种数据类型,每种数据类型的底层都由一种或多种数据结构来支持。正是因为有了这些数据结构,Redis 在存储与读取上的速度才不受阻碍。
深入底层C源码分析Redis
1.Redis是基于键值对存储数据的,像我们平时会使用的时候很容易觉得Redis的键值是多种数据类型的,其实不然,Redis的键值是String类型的,数据变成字节流(byte)基于网络传输的过程,传到Redis服务转成SDS(Simple Dynamic String【简单动态字符串】) String(Redis自定义的数据类型)。既然Redis是基于C语言写的,那么为什么不用原生的?
说明:
Redis自定义sdshdr数据结构具备三大特性:
【1】二进制安全的数据结构
【2】提供了内存预分配机制,避免了频繁的内存分配
【3】兼容C语言的函数库
2.String类型的数据结构
1)代码展示
2)发现说明
【1】为什么要对原本的数据结构进行修改?(改版后的优化在哪里)
因为int占据4个字节(8bit),也就是能存42亿左右的,但是在我们实际上,存储的数据大概率都是小数据,所以它存在浪费资源的嫌疑。
所以进行优化的思维就是根据不同的数据范围,设置不同容量,如,uint8_t 表示占据1字节(8bit,在二进制中最大可以表示255),uint16_t 表示占据2字节(16bit,在二进制中最大可以表示65535)
【2】官网上说String类型限制大小512M,是怎么限制的?
3)分析是怎么创建的
4)怎么防止操作时缓冲区溢出
5)分析是怎么扩容的
代码展示
代码说明
【1】sds内部buf的扩容机制:新buf长度 = (原buf长度 + 添加buf长度)*2,如果buf长度大于1M后,每次扩容也只会增大1M
【2】对于类型改变的需要变换存储空间。
3.RedisDb 数据结构
1)代码展示
2)视图展示

3)代码说明
【1】由上可知redisDb,主要都是将数据存储在字典(dict)中,而且还是多个,固定存储,过期维护等多个字典。
【2】dict字典结构,每个字典有两个哈希表结构的原因是为了用于渐进式扩容,当某个哈希表结构过于庞大的时候(按照hashMap的思维,必定是需要对数组进行扩容,增大数组长度,将链表长度缩小,加快遍历),其实它也需要进行扩容,但是再进行扩容操作的同时,容易出现阻塞线程的情况(如果时间太久),为此,dict中采用rehashidx标明是否正在处于扩容状态,且ht[1]会生成一个新的哈希表结构,容量是之前的两倍,然后把ht[0]中的数据按槽位一点一点的搬运过来【断断续续的操作,这样就不会一直阻塞住线程】,新的数据也会落到ht[1]中,直到搬完。然后将ht[1]指针指向ht[0],然后自己再指向null,rehashidx变为0,就完成了扩容操作。
【3】dictEntry相当于hashMap中的节点(包含了key,value,和指向下个节点的指针),其中val会被进一步封装成redisObject。
【4】redisObject中的type用于约束客户端命令,如set操作,会判断操作的值与操作的类型匹不匹配。encoding记录了值在redis底层是怎么样的编码形式。ptr指向内存的真实地址。
4)分析String类型的编码
【1】会存在:int,raw,embstr三种。
【2】为什么会有int,因为整型值最大固定是64bit,其实与指针*ptr占据的大小一致,其实把数值存于这里可以减少了对空间的开辟。代码展示:
【3】为什么会有embstr,代码展示
【4】而raw便是表示:字符串将以简单动态字符串(SDS)的形式存储,需要两次 malloc 分配内存,redisObject 对象头和 SDS 对象在内存地址上一般是不连续的。
5)发现说明
【1】会有人疑问为什么DB默认是16?
因为Redis的配置文件redis/redis.conf中的databases属性默认是16。所以Redis启动的时候默认会创建16个数据库且拿数据库索引为0的数据库作为默认数据库。这些都是可以通过配置调整的。
4.List 数据结构(Redis采用quicklist(双端链表) 和 ziplist 作为List的底层实现)
1)介绍
【1】List是一个有序(按加入的时序排序)的数据结构,Redis采用quicklist(双端链表) 和 ziplist 作为List的底层实现。以通过设置每个ziplist的最大容量,quicklist的数据压缩范围,提升数据存取效率。
2)ziplist 分析详解
【1】介绍
1.ziplist是一个经过特殊编码的双向链表,它的设计目标就是为了提高存储效率;
2.ziplist可以用于存储字符串或整数,其中整数是按真正的二进制表示进行编码的,而不是编码成字符串序列。它能以O(1)的时间复杂度在表的两端提供push和pop操作;
3.因为ziplist是一个内存连续的集合,所以ziplist遍历只要通过当前节点的指针 加上 当前节点的长度 或 减去 上一节点的长度 ,即可得到下一个节点的数据或上一个节点的数据,这样就省去的指针从而节省了存储空间,又因为内存连续所以在数据读取上的效率也远高于普通的链表。
【2】代码展示
【3】图示:

【4】图示参数说明
【5】说明
1.Ziplist的设计结构,保障了空间的节省与查询的高效,但是当出现zlentry增加或删除时,Ziplist是不能直接在原有空间上进行修改,每一次变动都需要重新开辟空间去拷贝、修改。这样的场景下Ziplist一旦内部元素过多,将会导致性能的急剧下滑。因此Redis 在实现上做了一层优化,当Ziplist过大时,会将其分割成多个Ziplist,然后再通过一个双向链表将其串联起来。
3)quicklist 分析详解
【1】介绍
1.Redis quicklist是Redis 3.2版本以后针对链表和压缩列表进行改造的一种数据结构,是 zipList 和 linkedList 的混合体,相对于链表它压缩了内存,进一步的提高了效率。
【2】代码展示
【3】图示:

【4】说明
1.通过控制ziplist 的大小,则很好的解决了超大ziplist 的拷贝情况下对性能的影响。每次改动只需要针对具体的小段ziplist 进行操作。
4)发现说明
【1】为什么不采用两个指针指向前后数据的方式,而是要采用复合的数据结构完成?
1.采用双指针的方式,那就必须赋予两个指针pre和next,一个指针占据了8byte,故两个指针就需要消耗16byte。如果list存在大量数据,所以就需要消耗相当多的内存在指针方面(胖指针问题)。
2.采用双链表的话数据可能会分的很散,因为指针就是采用不连续的存储空间来存储数据,容易造成大量的内存碎片。
3.采用quicklist 和 ziplist 混合,达到减少指针消耗的空间,其次连续的存储空间读取起来效率高于不连续的存储空间,节省IO。
4.通过控制ziplist 的大小,则很好的解决了超大ziplist 的拷贝情况下对性能的影响。每次改动只需要针对具体的小段ziplist 进行操作。
5.Hash 数据结构
1)介绍
【1】Hash 数据结构底层实现为一个字典( dict ),也是RedisBb用来存储K-V的数据结构,当数据量比较小,或者单个元素比较小时,底层用ziplist存储,数据大小和元素数量阈值可以通过如下参数设置。
2)发现说明
【1】为什么数据量小的时候采用ziplist存储?
1.ziplist使用紧凑的连续内存块顺序存储数据,在list或者hash结构中,未使用listNode(24字节)和dictEntry(24字节)结构体来存储元素项,因此会节省内存。
2.ziplist结构元素访问采用的是后向遍历(从后往前),因此在hash中可将热点的key或者在list中将热点的元素项放在最后,可以提升性能。
3.因为ziplist的内存结构中,仅仅只使用了额外的11个字节来存储ziplist的属性,另外很重要的是ziplist使用后向遍历,当list或者hash中的元素较多时,可以根据元素的冷热性调整元素存储顺序。
4.而在dictht结构体中,存储属性需要32个字节,其中元素dictEntry也是每个占用24个字节。
6.Set 数据结构
1)介绍
【1】Set 为无序的,自动去重的集合数据类型,Set 数据结构底层实现为一个value 为 null 的 字典( dict ),当数据可以用整形表示时,Set集合将被编码为intset数据结构。
【2】两个条件任意满足时Set将用hashtable存储数据。1, 元素个数大于 set-max-intset-entries , 2 , 元素无法用整形表示。
2)intset数据结构
3)set存储过程
7.ZSet 数据结构
1)介绍
【1】ZSet 为有序的,自动去重的集合数据类型,ZSet 数据结构底层实现为 字典(dict) + 跳表(skiplist) ,当数据比较少时,用ziplist编码结构存储。
【2】数据比较少时,用ziplist编码结构存储的图示:

2)skiplist 分析解析
【1】数据结构代码
【2】追踪添加函数
//在server.c发现跳表的添加函数为zaddCommand
//去t_zset.c文件查看流程
void zaddCommand(client *c) {
zaddGenericCommand(c,ZADD_NONE);
}
void zaddGenericCommand(client *c, int flags) {
static char *nanerr = "resulting score is not a number (NaN)";
robj *key = c->argv[1];
robj *zobj;
sds ele;
double score = 0, *scores = NULL;
int j, elements;
int scoreidx = 0;
/* The following vars are used in order to track what the command actually
* did during the execution, to reply to the client and to trigger the
* notification of keyspace change. */
int added = 0; //新添加元素的数量
int updated = 0; //更新分数的元素数量
int processed = 0; //被处理的元素数量
/* Parse options. At the end 'scoreidx' is set to the argument position
* of the score of the first score-element pair. */
scoreidx = 2;
// 输入参数解析
while(scoreidx < c->argc) {
char *opt = c->argv[scoreidx]->ptr;
if (!strcasecmp(opt,"nx")) flags |= ZADD_NX;
else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX;
else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH;
else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR;
else break;
scoreidx++;
}
/* Turn options into simple to check vars. */
int incr = (flags & ZADD_INCR) != 0;
int nx = (flags & ZADD_NX) != 0;
int xx = (flags & ZADD_XX) != 0;
int ch = (flags & ZADD_CH) != 0;
/* After the options, we expect to have an even number of args, since
* we expect any number of score-element pairs. */
elements = c->argc-scoreidx;
if (elements % 2 || !elements) {
addReply(c,shared.syntaxerr);
return;
}
elements /= 2; /* Now this holds the number of score-element pairs. */
/* Check for incompatible options. */
if (nx && xx) {
addReplyError(c,
"XX and NX options at the same time are not compatible");
return;
}
if (incr && elements > 1) {
addReplyError(c,
"INCR option supports a single increment-element pair");
return;
}
/* Start parsing all the scores, we need to emit any syntax error
* before executing additions to the sorted set, as the command should
* either execute fully or nothing at all. */
scores = zmalloc(sizeof(double)*elements);
for (j = 0; j < elements; j++) {
if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
!= C_OK) goto cleanup;
}
/* Lookup the key and create the sorted set if does not exist.
查询对应的 key 在对应的 db 即 hash table 中,是否存在
*/
zobj = lookupKeyWrite(c->db,key);
if (zobj == NULL) {
if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
// 如果 zset_max_ziplist_entries ==0
// // 或者 zadd 元素的长度 > zset_max_ziplist_value
// // 则直接创建 skiplist 数据结构
// // 否则创建ziplist 压缩列表数据结构
if (server.zset_max_ziplist_entries == 0 ||
server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
{
zobj = createZsetObject();
} else {
zobj = createZsetZiplistObject();
}
// 关联对象到db
dbAdd(c->db,key,zobj);
} else {
if (zobj->type != OBJ_ZSET) {
addReply(c,shared.wrongtypeerr);
goto cleanup;
}
}
// 处理所有元素
for (j = 0; j < elements; j++) {
double newscore;
// 分值
score = scores[j];
int retflags = flags;
// 元素
ele = c->argv[scoreidx+1+j*2]->ptr;
// 往 zobj 添加元素
int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
if (retval == 0) {
addReplyError(c,nanerr);
goto cleanup;
}
if (retflags & ZADD_ADDED) added++;
if (retflags & ZADD_UPDATED) updated++;
if (!(retflags & ZADD_NOP)) processed++;
score = newscore;
}
server.dirty += (added+updated);
reply_to_client:
if (incr) { /* ZINCRBY or INCR option. */
if (processed)
addReplyDouble(c,score);
else
addReplyNull(c);
} else { /* ZADD. */
addReplyLongLong(c,ch ? added+updated : added);
}
cleanup:
zfree(scores);
if (added || updated) {
signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_ZSET,
incr ? "zincr" : "zadd", key, c->db->id);
}
}
// 创建zset 数据结构: 字典 + 跳表
robj *createZsetObject(void) {
zset *zs = zmalloc(sizeof(*zs));
robj *o;
// dict用来查询数据到分数的对应关系, 如 zscore 就可以直接根据 元素拿到分值
zs->dict = dictCreate(&zsetDictType,NULL);
// skiplist用来根据分数查询数据(可能是范围查找)
zs->zsl = zslCreate();
// 设置对象类型
o = createObject(OBJ_ZSET,zs);
// 设置编码类型
o->encoding = OBJ_ENCODING_SKIPLIST;
return o;
}
// 创建zset 数据结构: ZipList
robj *createZsetZiplistObject(void) {
unsigned char *zl = ziplistNew();
robj *o = createObject(OBJ_ZSET,zl);
o->encoding = OBJ_ENCODING_ZIPLIST;
return o;
}
int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
/* Turn options into simple to check vars.
可选参数解析
*/
int incr = (*flags & ZADD_INCR) != 0;
int nx = (*flags & ZADD_NX) != 0;
int xx = (*flags & ZADD_XX) != 0;
*flags = 0; /* We'll return our response flags. */
double curscore;
/* NaN as input is an error regardless of all the other parameters.
数值判断
*/
if (isnan(score)) {
*flags = ZADD_NAN;
return 0;
}
/* Update the sorted set according to its encoding.
数据类型为ziplist 的情况
*/
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *eptr;
if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
/* NX? Return, same element already exists. */
if (nx) {
*flags |= ZADD_NOP;
return 1;
}
/* Prepare the score for the increment if needed. */
if (incr) {
score += curscore;
if (isnan(score)) {
*flags |= ZADD_NAN;
return 0;
}
if (newscore) *newscore = score;
}
/* Remove and re-insert when score changed.
元素 score 有变化,则删除老节点,重新插入
*/
if (score != curscore) {
zobj->ptr = zzlDelete(zobj->ptr,eptr);
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
*flags |= ZADD_UPDATED;
}
return 1;
} else if (!xx) {
/* Optimize: check if the element is too large or the list
* becomes too long *before* executing zzlInsert. */
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries ||
sdslen(ele) > server.zset_max_ziplist_value)
// 元素个数 或者 单个元素大小超过阈值 任意条件满足就转化为skiplist
zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
if (newscore) *newscore = score;
*flags |= ZADD_ADDED;
return 1;
} else {
*flags |= ZADD_NOP;
return 1;
}
// 数据类型为 跳表的情况
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
// 获取值指针
zset *zs = zobj->ptr;
zskiplistNode *znode;
dictEntry *de;
// O(1) 的时间复杂度,获取到元素
de = dictFind(zs->dict,ele);
if (de != NULL) {
/* NX? Return, same element already exists.
NX 互斥
*/
if (nx) {
*flags |= ZADD_NOP;
return 1;
}
// 当前分值
curscore = *(double*)dictGetVal(de);
/* Prepare the score for the increment if needed. */
// 递增
if (incr) {
score += curscore;
if (isnan(score)) {
*flags |= ZADD_NAN;
return 0;
}
if (newscore) *newscore = score;
}
/* Remove and re-insert when score changes.
分值不同的场景
*/
if (score != curscore) {
znode = zslUpdateScore(zs->zsl,curscore,ele,score);
/* Note that we did not removed the original element from
* the hash table representing the sorted set, so we just
* update the score.
* hash 表中不需要移除元素, 修改分值就可以了
*
* */
dictGetVal(de) = &znode->score; /* Update score ptr. */
*flags |= ZADD_UPDATED;
}
return 1;
// 元素不存在
} else if (!xx) {
ele = sdsdup(ele);
// 插入新元素
znode = zslInsert(zs->zsl,score,ele);
serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
*flags |= ZADD_ADDED;
if (newscore) *newscore = score;
return 1;
} else {
*flags |= ZADD_NOP;
return 1;
}
} else {
serverPanic("Unknown sorted set encoding");
}
return 0; /* Never reached. */
}
// 往跳表中 新增元素
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
// 数值判断
serverAssert(!isnan(score));
x = zsl->header;
// 遍历所有层高 ,寻找插入点: 高位 -> 低位
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position
存储排位, 便于更新
*/
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score || // 找到第一个比新分值大的节点,前面一个位置即是插入点
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0))) //相同分值则按字典序排序
{
rank[i] += x->level[i].span; // 累加排位分值
x = x->level[i].forward;
}
update[i] = x; // 每一层的拐点
}
/* we assume the element is not already inside, since we allow duplicated
* scores, reinserting the same element should never happen since the
* caller of zslInsert() should test in the hash table if the element is
* already inside or not.
*
* */
level = zslRandomLevel(); // 幂次定律, 随机生成层高 ,越高的层出现概率越低
if (level > zsl->level) { // 随机层高大于当前的最大层高,则初始化新的层高
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length; //header 最层都是跳表的长度
}
zsl->level = level;
}
x = zslCreateNode(level,score,ele); // 创建新的节点
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward; // 插入新节点
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here
更新 span 信息
*/
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
/* increment span for untouched levels
新加入节点, 更新顶层 span
*/
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
// 更新后退指针 和尾指针
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}
//返回一个随机的层数,不是level的索引是层数
int zslRandomLevel(void) {
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF)) //有1/4的概率加入到上一层中
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
【3】示图展示

【4】示图说明
1.默认会构造一个不存数据的拥有32层高度的头结点,而每加一个结点,会自身去概率生成层数(概率为1/4),这样就可以通过头结点快速查找数据了。
标签:
留言评论