Skip to content

Instantly share code, notes, and snippets.

@weedge
Last active July 31, 2023 12:00
Show Gist options
  • Save weedge/8d1d37963a2dfd7ecd47fefc7d8016b1 to your computer and use it in GitHub Desktop.
Save weedge/8d1d37963a2dfd7ecd47fefc7d8016b1 to your computer and use it in GitHub Desktop.
codis-server redis 3.2.11 slots_async.cc
#include "server.h"
/* ============================ Worker Thread for Lazy Release ============================= */
typedef struct {
pthread_t thread;/* lazy工作线程 */
pthread_mutex_t mutex;/* 互斥信号量 */
pthread_cond_t cond;/* 条件变量 */
list *objs; /* 要被lazy释放的对象链表 */
} lazyReleaseWorker;
/* lazy释放主线程 */
static void *
lazyReleaseWorkerMain(void *args) {
lazyReleaseWorker *p = args;
while (1) {
/* 等待在条件变量上,条件为待释放对象链表长度为0 */
pthread_mutex_lock(&p->mutex);
while (listLength(p->objs) == 0) {
pthread_cond_wait(&p->cond, &p->mutex);
}
/* 取出链表的第一个节点 */
listNode *head = listFirst(p->objs);
/* 节点值为要释放的对象 */
robj *o = listNodeValue(head);
/* 从链表中删除这个节点 */
listDelNode(p->objs, head);
pthread_mutex_unlock(&p->mutex);
/* 释放对象 */
decrRefCount(o);
}
return NULL;
}
/* lazy释放一个对象 */
static void
lazyReleaseObject(robj *o) {
/* 对象当前的refcount必须已经为1,即已经没有任何人引用这个对象 */
serverAssert(o->refcount == 1);
/* 获取lazyReleaseWorker */
lazyReleaseWorker *p = server.slotsmgrt_lazy_release;
/* 上锁 */
pthread_mutex_lock(&p->mutex);
if (listLength(p->objs) == 0) {
/* 如果待释放队列长度为0,则唤醒释放线程 */
pthread_cond_broadcast(&p->cond);
}
/* 将待释放对象加入释放链表 */
listAddNodeTail(p->objs, o);
/* 解锁 */
pthread_mutex_unlock(&p->mutex);
}
/* 创建lazy释放工作线程 */
static lazyReleaseWorker *
createLazyReleaseWorkerThread() {
lazyReleaseWorker *p = zmalloc(sizeof(lazyReleaseWorker));
pthread_mutex_init(&p->mutex, NULL);
pthread_cond_init(&p->cond, NULL);
p->objs = listCreate();
/* 创建线程 */
if (pthread_create(&p->thread, NULL, lazyReleaseWorkerMain, p) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize Worker Thread for Lazy Release Jobs.");
exit(1);
}
return p;
}
/* 初始化Lazy释放工作线程 */
void
slotsmgrtInitLazyReleaseWorkerThread() {
server.slotsmgrt_lazy_release = createLazyReleaseWorkerThread();
}
/* ============================ Iterator for Data Migration ================================ */
#define STAGE_PREPARE 0
#define STAGE_PAYLOAD 1
#define STAGE_CHUNKED 2
#define STAGE_FILLTTL 3
#define STAGE_DONE 4
/* 单对象迭代器 */
typedef struct {
int stage;
robj *key;/* 单对象对应的的key */
robj *val;/* 单对象对应的的值 */
long long expire;/* 该对象对应的过期设置 */
unsigned long cursor;/* 游标,用于dictScan */
unsigned long lindex;/* 索引,listTypeInitIterator时用到 */
unsigned long zindex;/* 索引,遍历zset时用到 */
unsigned long chunked_msgs;/* 该对象chunked消息个数 */
} singleObjectIterator;
/* 创建单对象迭代 */
static singleObjectIterator *
createSingleObjectIterator(robj *key) {
/* 分配空间 */
singleObjectIterator *it = zmalloc(sizeof(singleObjectIterator));
/* 初始化阶段 */
it->stage = STAGE_PREPARE;
/* 设置key */
it->key = key;
/* 引用计数 */
incrRefCount(it->key);
it->val = NULL;
it->expire = 0;
it->cursor = 0;
it->lindex = 0;
it->zindex = 0;
it->chunked_msgs = 0;
return it;
}
/* 释放SingleObjectIterator */
static void
freeSingleObjectIterator(singleObjectIterator *it) {
if (it->val != NULL) {
/* 对val解引用 */
decrRefCount(it->val);
}
/* 对key解引用 */
decrRefCount(it->key);
/* 释放结构 */
zfree(it);
}
static void
freeSingleObjectIteratorVoid(void *it) {
freeSingleObjectIterator(it);
}
/* 判断单个对象是否还有下一个阶段需要处理 */
static int
singleObjectIteratorHasNext(singleObjectIterator *it) {
/* 只要状态不是STAGE_DONE就还需要继续处理 */
return it->stage != STAGE_DONE;
}
/* 如果是sds编码的字符串对象就返回sds底层字符换的长度,否则返回默认长度len */
static size_t
sdslenOrElse(robj *o, size_t len) {
return sdsEncodedObject(o) ? sdslen(o->ptr) : len;
}
/* 如果val类型为dict时执行dictScan操作的回调 */
static void
singleObjectIteratorScanCallback(void *data, const dictEntry *de) {
/* 提取privdata {ll, val, &len}*/
void **pd = (void **)data;
list *l = pd[0];/* 链表,用于存放scan出来的元素 */
robj *o = pd[1];/* 被迭代的对象值val */
long long *n = pd[2];/* 返回字节数的指针 */
robj *objs[2] = {NULL, NULL};
switch (o->type) {
case OBJ_HASH:
/* 如果原对象是hash,则分别将hash的key和value按顺序方式链表 */
objs[0] = dictGetKey(de);
objs[1] = dictGetVal(de);
break;
case OBJ_SET:
/* 如果原对象是set,则只将hash的key放入链表 */
objs[0] = dictGetKey(de);
break;
}
/* 将扫出来的对象添加到链表 */
for (int i = 0; i < 2; i ++) {
if (objs[i] != NULL) {
/* 引用计数 */
incrRefCount(objs[i]);
/* 这个对象的大小,对于string对象就是string长度,其他对象就按8字节算 */
*n += sdslenOrElse(objs[i], 8);
listAddNodeTail(l, objs[i]);
}
}
}
/* 将double转为内存二进制表示 */
static uint64_t
convertDoubleToRawBits(double value) {
union {
double d;
uint64_t u;
} fp;
fp.d = value;
return fp.u;
}
/* 将内存二进制表示转为double值 */
static double
convertRawBitsToDouble(uint64_t value) {
union {
double d;
uint64_t u;
} fp;
fp.u = value;
return fp.d;
}
/* 从Uint64创建RawString对象 */
static robj *
createRawStringObjectFromUint64(uint64_t v) {
uint64_t p = intrev64ifbe(v);
return createRawStringObject((char *)&p, sizeof(p));
}
/* 从RawString获取Uint64 */
static int
getUint64FromRawStringObject(robj *o, uint64_t *p) {
if (sdsEncodedObject(o) && sdslen(o->ptr) == sizeof(uint64_t)) {
*p = intrev64ifbe(*(uint64_t *)(o->ptr));
return C_OK;
}
return C_ERR;
}
/* 计算一个对象需要的restore命令的个数,单个restore上只能携带maxbulks个Bulk
Bulk:$6\r\nfoobar\r\n
Multi-bulk :"*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n"
*/
static long
numberOfRestoreCommandsFromObject(robj *val, long long maxbulks) {
long long numbulks = 0;
switch (val->type) {
case OBJ_LIST:
if (val->encoding == OBJ_ENCODING_QUICKLIST) {
/* list的长度就是需要的Bulk的数目 */
numbulks = listTypeLength(val);
}
break;
case OBJ_HASH:
if (val->encoding == OBJ_ENCODING_HT) {
/* hash表中每个元素需要2个Bulk */
numbulks = hashTypeLength(val) * 2;
}
break;
case OBJ_SET:
if (val->encoding == OBJ_ENCODING_HT) {
/* set中每个元素需要1个Bulk */
numbulks = setTypeSize(val);
}
break;
case OBJ_ZSET:
if (val->encoding == OBJ_ENCODING_SKIPLIST) {
/* zset中每个元素需要2个Bulk */
numbulks = zsetLength(val) * 2;
}
break;
}
/* 如果实际的numbulks比要求的maxbulks小,则使用一条restore命令 */
if (numbulks <= maxbulks) {
return 1;
}
/* 计算需要的restore命令个数 */
return (numbulks + maxbulks - 1) / maxbulks;
}
/* 估计Restore命令的个数 */
static long
estimateNumberOfRestoreCommands(redisDb *db, robj *key, long long maxbulks) {
/* 查找key对应的val */
robj *val = lookupKeyWrite(db, key);
if (val != NULL) {
return numberOfRestoreCommandsFromObject(val, maxbulks);
}
return 0;
}
extern void createDumpPayload(rio *payload, robj *o);
extern zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank);
static slotsmgrtAsyncClient *getSlotsmgrtAsyncClient(int db);
/* 单对象迭代,返回值为命令个数(Bulks) */
static int
singleObjectIteratorNext(client *c, singleObjectIterator *it,
long long timeout, unsigned int maxbulks, unsigned int maxbytes) {
/* *
* STAGE_PREPARE ---> STAGE_PAYLOAD ---> STAGE_DONE
* | A
* V |
* +------------> STAGE_CHUNKED ---> STAGE_FILLTTL
* A |
* | V
* +-------+
* */
/* 本次迭代的key */
robj *key = it->key;
/* 但对象迁移的准备阶段 */
if (it->stage == STAGE_PREPARE) {
/* 以写的方式查找key,与lookupKeyRead区别是没有命中率更新 */
robj *val = lookupKeyWrite(c->db, key);
if (val == NULL) {
/* 如果key没有找到,则结束 */
it->stage = STAGE_DONE;
return 0;
}
/* 设置值 */
it->val = val;
/* 增加引用 */
incrRefCount(it->val);
/* 设置过期时间 */
it->expire = getExpire(c->db, key);
/* 前导消息 */
int leading_msgs = 0;
/* 获取db对应的slotsmgrtAsyncClient */
slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(c->db->id);
if (ac->c == c) {
/* 只有slotsmgrtAsyncClient未被使用的时候 */
if (ac->used == 0) {
/* 表示已经被使用 */
ac->used = 1;
/* 如果需要验证 */
if (server.requirepass != NULL) {
/* SLOTSRESTORE-ASYNC-AUTH $password */
addReplyMultiBulkLen(c, 2);
addReplyBulkCString(c, "SLOTSRESTORE-ASYNC-AUTH");
addReplyBulkCString(c, server.requirepass);
leading_msgs += 1;
}
/* SELECT DB操作 */
do {
/* SLOTSRESTORE-ASYNC-SELECT $db */
addReplyMultiBulkLen(c, 2);
addReplyBulkCString(c, "SLOTSRESTORE-ASYNC-SELECT");
addReplyBulkLongLong(c, c->db->id);
leading_msgs += 1;
} while (0);
}
}
/* SLOTSRESTORE-ASYNC delete $key */
addReplyMultiBulkLen(c, 3);
addReplyBulkCString(c, "SLOTSRESTORE-ASYNC");
addReplyBulkCString(c, "delete");
addReplyBulk(c, key);
/* 计算需要的restore命令个数,maxbulks表示一个restore命令可承载的bulk最大数目 */
long n = numberOfRestoreCommandsFromObject(val, maxbulks);
if (n >= 2) {
/* 如果需要2个及以上,则进入CHUNKED阶段,即启用分块传输 */
it->stage = STAGE_CHUNKED;
/* chunked消息个数 */
it->chunked_msgs = n;
} else {
/* 否则一个restore可以承载,则直接进入PAYLOAD阶段 */
it->stage = STAGE_PAYLOAD;
it->chunked_msgs = 0;
}
/* 这里的1为delete命令,再加上其他的前导命令(如果有),作为命令个数返回 */
return 1 + leading_msgs;
}
/* 取出key对应的值 */
robj *val = it->val;
long long ttl = 0;
if (it->stage == STAGE_CHUNKED) {
/* 如果是CHUNKED阶段,则设置一个临时ttl */
ttl = timeout * 3;
} else if (it->expire != -1) {
/* 否则如果val上有过期时间,则重新计算ttl */
ttl = it->expire - mstime();
if (ttl < 1) {
ttl = 1;
}
}
/* 当一个CHUNKED对象全部序列化完成之后会到这个阶段 */
if (it->stage == STAGE_FILLTTL) {
/* SLOTSRESTORE-ASYNC expire $key $ttl */
addReplyMultiBulkLen(c, 4);
addReplyBulkCString(c, "SLOTSRESTORE-ASYNC");
addReplyBulkCString(c, "expire");
addReplyBulk(c, key);
/* 设置真实的ttl */
addReplyBulkLongLong(c, ttl);
/* 迭代结束 */
it->stage = STAGE_DONE;
/* 该阶段只有一个命令 */
return 1;
}
/* 如果是PAYLOAD阶段切val类型不是OBJ_STRING */
if (it->stage == STAGE_PAYLOAD && val->type != OBJ_STRING) {
/* 负载缓冲区 */
rio payload;
/* 将val序列化为RDB格式 */
createDumpPayload(&payload, val);
/* SLOTSRESTORE-ASYNC object $key $ttl $payload */
addReplyMultiBulkLen(c, 5);
addReplyBulkCString(c, "SLOTSRESTORE-ASYNC");
/* 对象类型 */
addReplyBulkCString(c, "object");
addReplyBulk(c, key);
addReplyBulkLongLong(c, ttl);
/* 添加payload */
addReplyBulkSds(c, payload.io.buffer.ptr);
/* 迭代结束 */
it->stage = STAGE_DONE;
/* 该阶段只有一个命令 */
return 1;
}
/* 如果是PAYLOAD阶段切val类型为OBJ_STRING */
if (it->stage == STAGE_PAYLOAD && val->type == OBJ_STRING) {
/* SLOTSRESTORE-ASYNC string $key $ttl $payload */
addReplyMultiBulkLen(c, 5);
addReplyBulkCString(c, "SLOTSRESTORE-ASYNC");
addReplyBulkCString(c, "string");
addReplyBulk(c, key);
addReplyBulkLongLong(c, ttl);
addReplyBulk(c, val);
/* 迭代结束 */
it->stage = STAGE_DONE;
/* 该阶段只有一个命令 */
return 1;
}
/* 如果是CHUNKED类型 */
if (it->stage == STAGE_CHUNKED) {
const char *cmd = NULL;
/* 根据val的类型使用不同的子命令 */
switch (val->type) {
case OBJ_LIST:
cmd = "list";
break;
case OBJ_HASH:
cmd = "hash";
break;
case OBJ_SET:
cmd = "dict";
break;
case OBJ_ZSET:
cmd = "zset";
break;
default:
serverPanic("unknown object type");
}
/* 是否还有更多需要序列化 */
int more = 1;
/* ll链表用于存放本次SLOTSRESTORE-ASYNC命令携带的args */
list *ll = listCreate();
/* 设置是否函数,本质就是调用decrRefCount */
listSetFreeMethod(ll, decrRefCountVoid);
long long hint = 0, len = 0;
if (val->type == OBJ_LIST) {
/* 如果val类型为OBJ_LIST,则创建list迭代 */
listTypeIterator *li = listTypeInitIterator(val, it->lindex, LIST_TAIL);
do {
/* 表示list每一项 */
listTypeEntry entry;
/* 遍历 */
if (listTypeNext(li, &entry)) {
quicklistEntry *e = &(entry.entry);
robj *obj;
if (e->value) {
/* */
obj = createStringObject((const char *)e->value, e->sz);
} else {
/* */
obj = createStringObjectFromLongLong(e->longval);
}
/* 累计字节数 */
len += sdslenOrElse(obj, 8);
/* 添加到ll */
listAddNodeTail(ll, obj);
/* 索引加1 */
it->lindex ++;
} else {
/* 没有更多了 */
more = 0;
}
/* 当还有更多要发送且ll现有元素个数小于maxbulks且字节数小于 maxbytes */
} while (more && listLength(ll) < maxbulks && len < maxbytes);
/* 释放迭代器 */
listTypeReleaseIterator(li);
/* 原list的总长度 */
hint = listTypeLength(val);
}
if (val->type == OBJ_HASH || val->type == OBJ_SET) {
/* 控制循环次数 */
int loop = maxbulks * 10;
/* 默认最大循环次数 */
if (loop < 100) {
loop = 100;
}
dict *ht = val->ptr;
void *pd[] = {ll, val, &len};
do {
it->cursor = dictScan(ht, it->cursor, singleObjectIteratorScanCallback, pd);
if (it->cursor == 0) {
/* 没有更多了 */
more = 0;
}
/* 如果还有更多且ll现有元素个数小于maxbulks且本次发送字节数小于maxbytes且loop不为0 */
} while (more && listLength(ll) < maxbulks && len < maxbytes && (-- loop) >= 0);
/* 原hash的总大小 */
hint = dictSize(ht);
}
if (val->type == OBJ_ZSET) {
/* 如果是ZSET类型 */
zset *zs = val->ptr;
dict *ht = zs->dict;
long long rank = (long long)zsetLength(val) - it->zindex;
zskiplistNode *node = (rank >= 1) ? zslGetElementByRank(zs->zsl, rank) : NULL;
do {
if (node != NULL) {
robj *field = node->obj;
incrRefCount(field);
len += sdslenOrElse(field, 8);
listAddNodeTail(ll, field);
uint64_t bits = convertDoubleToRawBits(node->score);
robj *score = createRawStringObjectFromUint64(bits);
len += sdslenOrElse(score, 8);
listAddNodeTail(ll, score);
node = node->backward;
it->zindex ++;
} else {
/* 没有更多了 */
more = 0;
}
/* 如果还有更多元素且bulks没有超过maxbulks且产生的字节数没有超过maxbytes */
} while (more && listLength(ll) < maxbulks && len < maxbytes);
/* 原hash总大小 */
hint = dictSize(ht);
}
/* SLOTSRESTORE-ASYNC list/hash/zset/dict $key $ttl $hint [$arg1 ...] */
addReplyMultiBulkLen(c, 5 + listLength(ll));/* MultiBulk总长度 */
addReplyBulkCString(c, "SLOTSRESTORE-ASYNC");
addReplyBulkCString(c, cmd);/* list?hash? */
addReplyBulk(c, key);
addReplyBulkLongLong(c, ttl);/* ttl */
addReplyBulkLongLong(c, hint);/* 总大小 */
/* 遍历ll,ll里面存放了本地要发送的args */
while (listLength(ll) != 0) {
/* 取出头结点 */
listNode *head = listFirst(ll);
/* 取出值对象 */
robj *obj = listNodeValue(head);
/* 添加回复 */
addReplyBulk(c, obj);
/* 删除该节点 */
listDelNode(ll, head);
}
/* 释放ll */
listRelease(ll);
if (!more) {
/* 如果对象所有元素都被序列换完毕,则进入FILLTTL阶段 */
it->stage = STAGE_FILLTTL;
}
/* 该阶段只有一个命令 */
return 1;
}
if (it->stage != STAGE_DONE) {
serverPanic("invalid iterator stage");
}
serverPanic("use of empty iterator");
}
/* ============================ Iterator for Data Migration (batched) ====================== */
typedef struct {
struct zskiplist *tags;/* 标识一个hashtag有没有被添加过 */
dict *keys;/* 批处理的Keys */
list *list; /* 每个节点的值都是singleObjectIterator */
dict *hash_slot;/* hash数组,数组的下标为slot_num,每个数组元素的字典为key、crc对 */
struct zskiplist *hash_tags;/* 用于保存具有hashtag的key,score为key的crc,值为key */
long long timeout;/* 进程chunked restore时会指定临时ttl,值为timeout*3 */
unsigned int maxbulks;/* 单次restore最多发送多少个bulks */
unsigned int maxbytes;/* 单次发送最多发送多少字节数 */
list *removed_keys;/* 一个key被发送完成之后会加入这个链表 */
list *chunked_vals;/* 用于存放使用chunked方式发生的val */
long estimate_msgs;/* 估算的restore命令的个数 */
} batchedObjectIterator;
/* 创建batchedObjectIterator */
static batchedObjectIterator *
createBatchedObjectIterator(dict *hash_slot, struct zskiplist *hash_tags,
long long timeout, unsigned int maxbulks, unsigned int maxbytes) {
batchedObjectIterator *it = zmalloc(sizeof(batchedObjectIterator));
it->tags = zslCreate();
it->keys = dictCreate(&setDictType, NULL);
it->list = listCreate();
listSetFreeMethod(it->list, freeSingleObjectIteratorVoid);
it->hash_slot = hash_slot;
it->hash_tags = hash_tags;
it->timeout = timeout;
it->maxbulks = maxbulks;
it->maxbytes = maxbytes;
it->removed_keys = listCreate();
listSetFreeMethod(it->removed_keys, decrRefCountVoid);
it->chunked_vals = listCreate();
listSetFreeMethod(it->chunked_vals, decrRefCountVoid);
it->estimate_msgs = 0;
return it;
}
/* 释放BatchedObjectIterator */
static void
freeBatchedObjectIterator(batchedObjectIterator *it) {
zslFree(it->tags);
dictRelease(it->keys);
listRelease(it->list);
listRelease(it->removed_keys);
listRelease(it->chunked_vals);
zfree(it);
}
/* 批处理迭代(即一次处理多个key) */
static int
batchedObjectIteratorHasNext(batchedObjectIterator *it) {
/* list链表不为空,每个节点的值都是singleObjectIterator */
while (listLength(it->list) != 0) {
/* 每个节点的值都是singleObjectIterator */
listNode *head = listFirst(it->list);
/* 每个节点的值都是singleObjectIterator */
singleObjectIterator *sp = listNodeValue(head);
/* 判断单个对象是否已经处于STAGE_DONE */
if (singleObjectIteratorHasNext(sp)) {
/* 不处于STAGE_DONE,即单对象迭代还没结束,则直接返回1,下次还会迭代这个对象 */
return 1;
}
/* 否则当前单对象已经迭代结束 */
if (sp->val != NULL) {
/* 如果当前单对象的value不为空,就把单对象的key添加到removed_keys链表 */
incrRefCount(sp->key);
listAddNodeTail(it->removed_keys, sp->key);
if (sp->chunked_msgs != 0) {
/* 如果chunked的消息个数不为0 */
incrRefCount(sp->val);
/* 就把val加入到chunked_vals链表 */
listAddNodeTail(it->chunked_vals, sp->val);
}
}
/* 删除这个节点 */
listDelNode(it->list, head);
}
return 0;
}
/* 批处理对象迭代,返回值为本地迭代产生的SLOTSRESTORE系列命令的个数 */
static int
batchedObjectIteratorNext(client *c, batchedObjectIterator *it) {
/* 遍历链表 */
if (listLength(it->list) != 0) {
/* 取出头结点 */
listNode *head = listFirst(it->list);
/* 节点值为singleObjectIterator */
singleObjectIterator *sp = listNodeValue(head);
/* maxbytes减去客户端输出缓冲区当前已有的大小就是本次能发送的最大字节数 */
long long maxbytes = (long long)it->maxbytes - getClientOutputBufferMemoryUsage(c);
/* 单对象迭代,迭代超时timeout,迭代单词最大maxbulks,单次最大maxbytes */
return singleObjectIteratorNext(c, sp, it->timeout, it->maxbulks, maxbytes > 0 ? maxbytes : 0);
}
serverPanic("use of empty iterator");
}
/* 批处理里面是否包含key,返回1表示存在,返回0表示不存在 */
static int
batchedObjectIteratorContains(batchedObjectIterator *it, robj *key, int usetag) {
/* 如果在keys中找到,则存在 */
if (dictFind(it->keys, key) != NULL) {
return 1;
}
/* 如果没有使用hashtag则结束查找 */
if (!usetag) {
return 0;
}
uint32_t crc;
int hastag;
/* 计算key的crc和hashtag */
slots_num(key->ptr, &crc, &hastag);
if (!hastag) {
/* 如果key没有hashtag则结束查找 */
return 0;
}
/* 否则填充range */
zrangespec range;
range.min = (double)crc;
range.minex = 0;
range.max = (double)crc;
range.maxex = 0;
/* 以crc为范围在跳表tags中查找,每一个hashtag被添加都会在tags跳表中添加一个节点 */
return zslFirstInRange(it->tags, &range) != NULL;
}
/* 向批处理添加一个key,返回值为本次新添加的key的个数 */
static int
batchedObjectIteratorAddKey(redisDb *db, batchedObjectIterator *it, robj *key) {
/* 添加到keys字典 */
if (dictAdd(it->keys, key, NULL) != C_OK) {
return 0;
}
/* 引用计数 */
incrRefCount(key);
/* 创建createSingleObjectIterator */
listAddNodeTail(it->list, createSingleObjectIterator(key));
/* 对该对象需要的restore命令个数进行预估 */
it->estimate_msgs += estimateNumberOfRestoreCommands(db, key, it->maxbulks);
/* 当前批处理的key个数 */
int size = dictSize(it->keys);
uint32_t crc;
int hastag;
/* 该key对应的slot num */
slots_num(key->ptr, &crc, &hastag);
if (!hastag) {
/* 如果key不含有hashtag则跳出 */
goto out;
}
/* 知道score为crc */
zrangespec range;
range.min = (double)crc;
range.minex = 0;
range.max = (double)crc;
range.maxex = 0;
/* 寻找第一个score满足 range范围的节点*/
if (zslFirstInRange(it->tags, &range) != NULL) {
/* 找到则跳出,因此是该hashtag的key已经被添加过,无需重复添加 */
goto out;
}
/* 引用计数 */
incrRefCount(key);
/* 没找到则插入,score为crc,节点的值为key */
zslInsert(it->tags, (double)crc, key);
/* 如果hash_tags跳表指针为NULL */
if (it->hash_tags == NULL) {
goto out;
}
/* 在hash_tags中寻找score满足range范围的第一个节点 */
zskiplistNode *node = zslFirstInRange(it->hash_tags, &range);
/* 如果score不同就跳出 */
while (node != NULL && node->score == (double)crc) {
/* 结点值就是key */
robj *key = node->obj;
/* score相同的节点都是连续排列的,因此直接从level[0]向后遍历就好 */
node = node->level[0].forward;
/* 添加到批处理keys */
if (dictAdd(it->keys, key, NULL) != C_OK) {
continue;
}
/* 引用计数 */
incrRefCount(key);
/* 为该key添加但对象迭代器SingleObjectIterator */
listAddNodeTail(it->list, createSingleObjectIterator(key));
/* 对该对象需要的restore命令个数进行预估 */
it->estimate_msgs += estimateNumberOfRestoreCommands(db, key, it->maxbulks);
}
out:
/* 本次新加如的key的个数,注意最开始的1个key也要加上 */
return 1 + dictSize(it->keys) - size;
}
/* ============================ Clients ==================================================== */
/* 获取异步迁移客户端,每个db一个 */
static slotsmgrtAsyncClient *
getSlotsmgrtAsyncClient(int db) {
return &server.slotsmgrt_cached_clients[db];
}
/* 通知被阻塞的 SlotsmgrtAsyncClient */
static void
notifySlotsmgrtAsyncClient(slotsmgrtAsyncClient *ac, const char *errmsg) {
/* 获取当前迭代器 */
batchedObjectIterator *it = ac->batched_iter;
/* 获取阻塞链表 */
list *ll = ac->blocked_list;
/* 遍历 */
while (listLength(ll) != 0) {
/* 取出头节点 */
listNode *head = listFirst(ll);
/* 取出节点值,就是client */
client *c = listNodeValue(head);
if (errmsg != NULL) {
/* 错误信息不为空,则将错误信息返回给client */
addReplyError(c, errmsg);
} else if (it == NULL) {
/* 迭代器非法 */
addReplyError(c, "invalid iterator (NULL)");
} else if (it->hash_slot == NULL) {
addReplyLongLong(c, listLength(it->removed_keys));
} else {
/* 返回两个值,一个是本次moved一个是hash_slot现在的大小 */
addReplyMultiBulkLen(c, 2);
addReplyLongLong(c, listLength(it->removed_keys));
addReplyLongLong(c, dictSize(it->hash_slot));
}
/* 清除CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT标志,表示这个客户端不是一个正在被使用、正常服务的客户端 */
c->slotsmgrt_flags &= ~CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT;
/* 清空客户端阻塞链表 */
c->slotsmgrt_fenceq = NULL;
/* 删除当前节点 */
listDelNode(ll, head);
}
}
/* 释放slotsmgrtAsyncClient里面的结构 */
static void
unlinkSlotsmgrtAsyncCachedClient(client *c, const char *errmsg) {
slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(c->db->id);
/* 必须有CLIENT_SLOTSMGRT_ASYNC_CACHED_CLIENT标志,表示这是一个已经被cached的客户端 */
serverAssert(c->slotsmgrt_flags & CLIENT_SLOTSMGRT_ASYNC_CACHED_CLIENT);
serverAssert(ac->c == c);
/* 通知被阻塞的客户端,消息为errmsg */
notifySlotsmgrtAsyncClient(ac, errmsg);
batchedObjectIterator *it = ac->batched_iter;
/* 空闲时间 */
long long elapsed = mstime() - ac->lastuse;
serverLog(LL_WARNING, "slotsmgrt_async: unlink client %s:%d (DB=%d): "
"sending_msgs = %ld, batched_iter = %ld, blocked_list = %ld, "
"timeout = %lld(ms), elapsed = %lld(ms) (%s)",
ac->host, ac->port, c->db->id, ac->sending_msgs,
it != NULL ? (long)listLength(it->list) : -1, (long)listLength(ac->blocked_list),
ac->timeout, elapsed, errmsg);
sdsfree(ac->host);
if (it != NULL) {
/* 释放批处理迭代器 */
freeBatchedObjectIterator(it);
}
/* 释放阻塞链表 */
listRelease(ac->blocked_list);
/* 取消CLIENT_SLOTSMGRT_ASYNC_CACHED_CLIENT,表示不是被缓存的slotsmgrtAsyncClient */
c->slotsmgrt_flags &= ~CLIENT_SLOTSMGRT_ASYNC_CACHED_CLIENT;
/* 情况结构,以备下一次使用(注意不需要free ac,因为这是每个db私有的) */
memset(ac, 0, sizeof(*ac));
}
/* 释放一个db相关的SlotsmgrtAsyncClient */
static int
releaseSlotsmgrtAsyncClient(int db, const char *errmsg) {
slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(db);
if (ac->c == NULL) {
/* 为NULL无需释放 */
return 0;
}
client *c = ac->c;
/* 释放slotsmgrtAsyncClient里面的结构 */
unlinkSlotsmgrtAsyncCachedClient(c, errmsg);
/* 释放client结构 */
freeClient(c);
return 1;
}
/* 新建一个slotsmgrtAsyncClient */
static int
createSlotsmgrtAsyncClient(int db, char *host, int port, long timeout) {
/* 新建连接 */
int fd = anetTcpNonBlockConnect(server.neterr, host, port);
if (fd == -1) {
serverLog(LL_WARNING, "slotsmgrt_async: create socket %s:%d (DB=%d) failed, %s",
host, port, db, server.neterr);
return C_ERR;
}
/* 禁用nagel算法 */
anetEnableTcpNoDelay(server.neterr, fd);
int wait = 100;
if (wait > timeout) {
wait = timeout;
}
/* 等待可写状态 */
if ((aeWait(fd, AE_WRITABLE, wait) & AE_WRITABLE) == 0) {
serverLog(LL_WARNING, "slotsmgrt_async: create socket %s:%d (DB=%d) failed, io error or timeout (%d)",
host, port, db, wait);
close(fd);
return C_ERR;
}
/* 创建redis客户端,内部会将fd读事件添加到主线程eventloop */
client *c = createClient(fd);
if (c == NULL) {
serverLog(LL_WARNING, "slotsmgrt_async: create client %s:%d (DB=%d) failed, %s",
host, port, db, server.neterr);
return C_ERR;
}
/* 选择客户端绑定的db */
if (selectDb(c, db) != C_OK) {
serverLog(LL_WARNING, "slotsmgrt_async: invalid DB index (DB=%d)", db);
freeClient(c);
return C_ERR;
}
/* 添加设置标志CLIENT_SLOTSMGRT_ASYNC_CACHED_CLIENT,表示这是一个已经被CACHED的客户端 */
c->slotsmgrt_flags |= CLIENT_SLOTSMGRT_ASYNC_CACHED_CLIENT;
/* 已认证 */
c->authenticated = 1;
/* 释放一个db相关的SlotsmgrtAsyncClient(清空里面的成员结构) */
releaseSlotsmgrtAsyncClient(db, "interrupted: build new connection");
serverLog(LL_WARNING, "slotsmgrt_async: create client %s:%d (DB=%d) OK", host, port, db);
/* 根据db获取slotsmgrtAsyncClient */
slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(db);
/* 设置绑定的client */
ac->c = c;
/* 没有被使用 */
ac->used = 0;
/* ip */
ac->host = sdsnew(host);
/* port */
ac->port = port;
/* 空闲时间 */
ac->timeout = timeout;
/* 更新最后一次使用时间 */
ac->lastuse = mstime();
/* 飞行中的消息计数 */
ac->sending_msgs = 0;
/* 批处理迭代器 */
ac->batched_iter = NULL;
/* 创建阻塞链表 */
ac->blocked_list = listCreate();
return C_OK;
}
/* 获取或创建一个slotsmgrtAsyncClient */
static slotsmgrtAsyncClient *
getOrCreateSlotsmgrtAsyncClient(int db, char *host, int port, long timeout) {
/* 根据要操作的db获取缓存的slotsmgrtAsyncClient */
slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(db);
if (ac->c != NULL) {
/* 不为NULL,在比较下host和port,只有完全线条才返回 */
if (ac->port == port && !strcmp(ac->host, host)) {
return ac;
}
}
/* 否则新建一个slotsmgrtAsyncClient */
return createSlotsmgrtAsyncClient(db, host, port, timeout) != C_OK ? NULL : ac;
}
static void
unlinkSlotsmgrtAsyncNormalClient(client *c) {
/* 释放一个正在被使用的、正常的client */
serverAssert(c->slotsmgrt_flags & CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT);
/* 阻塞链表不能为NULL */
serverAssert(c->slotsmgrt_fenceq != NULL);
/* 该客户端阻塞的链表 */
list *ll = c->slotsmgrt_fenceq;
/* 在阻塞链表中搜索该客户端 */
listNode *node = listSearchKey(ll, c);
/* 必须能搜索到 */
serverAssert(node != NULL);
/* 不再是一个正在被使用的、正常的client */
c->slotsmgrt_flags &= ~CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT;
/* 不再阻塞也就没有阻塞链表 */
c->slotsmgrt_fenceq = NULL;
/* 从阻塞链表中删除该客户端 */
listDelNode(ll, node);
}
void
slotsmgrtAsyncUnlinkClient(client *c) {
/* 针对CACHED类型客户端 */
if (c->slotsmgrt_flags & CLIENT_SLOTSMGRT_ASYNC_CACHED_CLIENT) {
unlinkSlotsmgrtAsyncCachedClient(c, "interrupted: connection closed");
}
/* 针对NORMAL类型客户端 */
if (c->slotsmgrt_flags & CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT) {
unlinkSlotsmgrtAsyncNormalClient(c);
}
}
/* 会被定期执行 */
void
slotsmgrtAsyncCleanup() {
/* 遍历所有db */
for (int i = 0; i < server.dbnum; i ++) {
/* 获取每个db对应的 slotsmgrtAsyncClient */
slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(i);
if (ac->c == NULL) {
continue;
}
/* 计算空闲时间 */
long long elapsed = mstime() - ac->lastuse;
/* 提取客户端timeout */
long long timeout = ac->batched_iter != NULL ? ac->timeout : 1000LL * 60;
if (elapsed <= timeout) {
/* 如果空闲时间小于timeout则继续遍历 */
continue;
}
/* 否则就释放这个客户端 */
releaseSlotsmgrtAsyncClient(i, ac->batched_iter != NULL ?
"interrupted: migration timeout" : "interrupted: idle timeout");
}
}
/* 获取异步迁移状态或者阻塞一个client */
static int
getSlotsmgrtAsyncClientMigrationStatusOrBlock(client *c, robj *key, int block) {
/* 获取当前db上的slotsmgrtAsyncClient */
slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(c->db->id);
if (ac->c == NULL || ac->batched_iter == NULL) {
/* 没有迁移或迁移完成 */
return 0;
}
/* 获取当前的batched_iter */
batchedObjectIterator *it = ac->batched_iter;
if (key != NULL && !batchedObjectIteratorContains(it, key, 1)) {
/* 如果key不为NULL且key不在batched中则直接返回0,表示该key没有迁移或者迁移完成 */
return 0;
}
if (!block) {
/* 如果不允许阻塞则直接返回 */
return 1;
}
if (c->slotsmgrt_flags & CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT) {
/* 如果这个客户端是一个CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT,即是一个
正在服务的slotsmgrtAsyncClient */
return -1;
}
/* 获取阻塞链表 */
list *ll = ac->blocked_list;
/* 设置CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT标志,表示这是一个正常的被阻塞的客户端 */
c->slotsmgrt_flags |= CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT;
/* 设置客户端阻塞在哪个链表上 */
c->slotsmgrt_fenceq = ll;
/* 添加到阻塞队列 */
listAddNodeTail(ll, c);
return 1;
}
/* ============================ Slotsmgrt{One,TagOne}AsyncDumpCommand ====================== */
/* SLOTSMGRTONE-ASYNC-DUMP $timeout $maxbulks $key1 [$key2 ...] */
/* SLOTSMGRTTAGONE-ASYNC-DUMP $timeout $maxbulks $key1 [$key2 ...] */
static void
slotsmgrtAsyncDumpGenericCommand(client *c, int usetag) {
long long timeout;
/* 获取timeout */
if (getLongLongFromObject(c->argv[1], &timeout) != C_OK ||
!(timeout >= 0 && timeout <= INT_MAX)) {
addReplyErrorFormat(c, "invalid value of timeout (%s)",
(char *)c->argv[1]->ptr);
return;
}
/* 如果timeout为0就修正为30s */
if (timeout == 0) {
timeout = 1000 * 30;
}
/* 获取maxbulks */
long long maxbulks;
if (getLongLongFromObject(c->argv[2], &maxbulks) != C_OK ||
!(maxbulks >= 0 && maxbulks <= INT_MAX)) {
addReplyErrorFormat(c, "invalid value of maxbulks (%s)",
(char *)c->argv[2]->ptr);
return;
}
/* 如果maxbulks就修正为默认值3000 */
if (maxbulks == 0) {
maxbulks = 1000;
}
/* 创建批处理迭代器,如果使用hashtag则提供 tagged_keys */
batchedObjectIterator *it = createBatchedObjectIterator(NULL,
usetag ? c->db->tagged_keys : NULL, timeout, maxbulks, INT_MAX);
/* 向批处理添加keys */
for (int i = 3; i < c->argc; i ++) {
batchedObjectIteratorAddKey(c->db, it, c->argv[i]);
}
/* 添加一个空对象节点到复链表reply中,用于存放MultiBulk的长度 */
void *ptr = addDeferredMultiBulkLength(c);
int total = 0;
/* batched迭代 */
while (batchedObjectIteratorHasNext(it)) {
/* batchedObjectIteratorNext返回本次迭代产生的SLOTSRESTORE系列命令的个数 */
total += batchedObjectIteratorNext(c, it);
}
/* 把真实的长度写进去 */
setDeferredMultiBulkLength(c, ptr, total);
/* 释放批处理迭代器 */
freeBatchedObjectIterator(it);
}
/* *
* SLOTSMGRTONE-ASYNC-DUMP $timeout $maxbulks $key1 [$key2 ...]
* */
void slotsmgrtOneAsyncDumpCommand(client *c) {
if (c->argc <= 3) {
addReplyError(c, "wrong number of arguments for SLOTSMGRTONE-ASYNC-DUMP");
return;
}
slotsmgrtAsyncDumpGenericCommand(c, 0);
}
/* *
* SLOTSMGRTTAGONE-ASYNC-DUMP $timeout $maxbulks $key1 [$key2 ...]
* */
void
slotsmgrtTagOneAsyncDumpCommand(client *c) {
if (c->argc <= 3) {
addReplyError(c, "wrong number of arguments for SLOTSMGRTTAGONE-ASYNC-DUMP");
return;
}
slotsmgrtAsyncDumpGenericCommand(c, 1);
}
/* ============================ Slotsmgrt{One,TagOne,Slot,TagSlot}AsyncCommand ============= */
/* 根据配置的client_obuf参数来修正maxbytes */
static unsigned int
slotsmgrtAsyncMaxBufferLimit(unsigned int maxbytes) {
clientBufferLimitsConfig *config = &server.client_obuf_limits[CLIENT_TYPE_NORMAL];
if (config->soft_limit_bytes != 0 && config->soft_limit_bytes < maxbytes) {
/* 如果配置的大小比soft_limit_bytes大则使用soft_limit_bytes */
maxbytes = config->soft_limit_bytes;
}
if (config->hard_limit_bytes != 0 && config->hard_limit_bytes < maxbytes) {
/* 如果配置的大小比hard_limit_bytes大则使用hard_limit_bytes */
maxbytes = config->hard_limit_bytes;
}
return maxbytes;
}
/* 在给定长时间usecs内至少产生atleast条消息(一条消息代表一条SLOTSRESTORE命令) */
static long
slotsmgrtAsyncNextMessagesMicroseconds(slotsmgrtAsyncClient *ac, long atleast, long long usecs) {
/* 批处理迭代 */
batchedObjectIterator *it = ac->batched_iter;
/* 阶段截止时间 */
long long deadline = ustime() + usecs;
long msgs = 0;
/* 如果批处理还有对象需要迭代切客户端输出缓冲区使用字节数小于maxbytes */
while (batchedObjectIteratorHasNext(it) && getClientOutputBufferMemoryUsage(ac->c) < it->maxbytes) {
/* 批处理对象迭代,返回值为本地迭代产生的SLOTSRESTORE系列命令的个数 */
if ((msgs += batchedObjectIteratorNext(ac->c, it)) < atleast) {
continue;
}
/* 如果已经超时就返回 */
if (ustime() >= deadline) {
return msgs;
}
}
/* 返回消息的个数 */
return msgs;
}
/* hash_slot的扫描函数 */
static void
slotsScanSdsKeyCallback(void *l, const dictEntry *de) {
sds skey = dictGetKey(de);
robj *key = createStringObject(skey, sdslen(skey));
/* 将key添加都链表 */
listAddNodeTail((list *)l, key);
}
/* SLOTSMGRTONE-ASYNC $host $port $timeout $maxbulks $maxbytes $key1 [$key2 ...] */
/* SLOTSMGRTTAGONE-ASYNC $host $port $timeout $maxbulks $maxbytes $key1 [$key2 ...] */
/* SLOTSMGRTSLOT-ASYNC $host $port $timeout $maxbulks $maxbytes $slot $numkeys */
/* SLOTSMGRTTAGSLOT-ASYNC $host $port $timeout $maxbulks $maxbytes $slot $numkeys */
static void
slotsmgrtAsyncGenericCommand(client *c, int usetag, int usekey) {
/* 提取host和port */
char *host = c->argv[1]->ptr;
long long port;
if (getLongLongFromObject(c->argv[2], &port) != C_OK ||
!(port >= 1 && port < 65536)) {
addReplyErrorFormat(c, "invalid value of port (%s)",
(char *)c->argv[2]->ptr);
return;
}
/* 提取timeout,用于chunk迁移时的临时ttl */
long long timeout;
if (getLongLongFromObject(c->argv[3], &timeout) != C_OK ||
!(timeout >= 0 && timeout <= INT_MAX)) {
addReplyErrorFormat(c, "invalid value of timeout (%s)",
(char *)c->argv[3]->ptr);
return;
}
/* 默认30S */
if (timeout == 0) {
timeout = 1000 * 30;
}
/* 提取maxbulks,用于觉得每个chunk能鞋底的bulk数目 */
long long maxbulks;
if (getLongLongFromObject(c->argv[4], &maxbulks) != C_OK ||
!(maxbulks >= 0 && maxbulks <= INT_MAX)) {
addReplyErrorFormat(c, "invalid value of maxbulks (%s)",
(char *)c->argv[4]->ptr);
return;
}
if (maxbulks == 0) {
maxbulks = 200;
}
/* 最大512K */
if (maxbulks > 512 * 1024) {
maxbulks = 512 * 1024;
}
/* 提取 maxbytes,用于决定单词迁移发送的最大字节数 */
long long maxbytes;
if (getLongLongFromObject(c->argv[5], &maxbytes) != C_OK ||
!(maxbytes >= 0 && maxbytes <= INT_MAX)) {
addReplyErrorFormat(c, "invalid value of maxbytes (%s)",
(char *)c->argv[5]->ptr);
return;
}
if (maxbytes == 0) {
maxbytes = 512 * 1024;
}
if (maxbytes > INT_MAX / 2) {
maxbytes = INT_MAX / 2;
}
/* 根据客户端配置的outbuf大小修正maxbytes */
maxbytes = slotsmgrtAsyncMaxBufferLimit(maxbytes);
dict *hash_slot = NULL;
long long numkeys = 0;
if (!usekey) {
/* 不是SLOTSMGRTTAGONE-ASYNC和SLOTSMGRTONE-ASYNC,即不指定key迁移
则提取slotnum
*/
long long slotnum;
if (getLongLongFromObject(c->argv[6], &slotnum) != C_OK ||
!(slotnum >= 0 && slotnum < HASH_SLOTS_SIZE)) {
addReplyErrorFormat(c, "invalid value of slot (%s)",
(char *)c->argv[6]->ptr);
return;
}
/* 获取hash_slot字典 */
hash_slot = c->db->hash_slots[slotnum];
/* 提取numkeys */
if (getLongLongFromObject(c->argv[7], &numkeys) != C_OK ||
!(numkeys >= 0 && numkeys <= INT_MAX)) {
addReplyErrorFormat(c, "invalid value of numkeys (%s)",
(char *)c->argv[7]->ptr);
return;
}
/* 如果numkeys为0就默认为每次迁移100 */
if (numkeys == 0) {
numkeys = 100;
}
}
/* DB是否正处于迁移状态 */
if (getSlotsmgrtAsyncClientMigrationStatusOrBlock(c, NULL, 0) != 0) {
addReplyError(c, "the specified DB is being migrated");
return;
}
/* 带有CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT标志的客户端是一个被阻塞正在等待操作结束的客户端 */
if (c->slotsmgrt_flags & CLIENT_SLOTSMGRT_ASYNC_NORMAL_CLIENT) {
addReplyError(c, "previous operation has not finished");
return;
}
/* 获取或创建一个slotsmgrtAsyncClient */
slotsmgrtAsyncClient *ac = getOrCreateSlotsmgrtAsyncClient(c->db->id, host, port, timeout);
if (ac == NULL) {
addReplyErrorFormat(c, "create client to %s:%d failed", host, (int)port);
return;
}
/* 创建批处理迭代器 */
batchedObjectIterator *it = createBatchedObjectIterator(hash_slot,
usetag ? c->db->tagged_keys : NULL, timeout, maxbulks, maxbytes);
if (!usekey) {
/* 创建一个链表ll,用于存放从hash_slot扫描出来的数据 */
list *ll = listCreate();
listSetFreeMethod(ll, decrRefCountVoid);
for (int i = 2; i >= 0 && it->estimate_msgs < numkeys; i --) {
unsigned long cursor = 0;
if (i != 0) {
cursor = random();
} else {
if (htNeedsResize(hash_slot)) {
dictResize(hash_slot);
}
}
if (dictIsRehashing(hash_slot)) {
dictRehash(hash_slot, 50);
}
int loop = numkeys * 10;
if (loop < 100) {
loop = 100;
}
do {
/* slotsScanSdsKeyCallback里面会把扫描出来的key添加都ll中 */
cursor = dictScan(hash_slot, cursor, slotsScanSdsKeyCallback, ll);
while (listLength(ll) != 0 && it->estimate_msgs < numkeys) {
listNode *head = listFirst(ll);
robj *key = listNodeValue(head);
long msgs = estimateNumberOfRestoreCommands(c->db, key, it->maxbulks);
if (it->estimate_msgs == 0 || it->estimate_msgs + msgs <= numkeys * 2) {
batchedObjectIteratorAddKey(c->db, it, key);
}
listDelNode(ll, head);
}
/* */
} while (cursor != 0 && it->estimate_msgs < numkeys &&
dictSize(it->keys) < (unsigned long)numkeys && (-- loop) >= 0);
}
listRelease(ll);
} else {
/* 否则就是指定key的迁移 */
for (int i = 6; i < c->argc; i ++) {
batchedObjectIteratorAddKey(c->db, it, c->argv[i]);
}
}
/* 当前没有正在发送的消息 */
serverAssert(ac->sending_msgs == 0);
/* 客户端阻塞链表也为空 */
serverAssert(ac->batched_iter == NULL && listLength(ac->blocked_list) == 0);
ac->timeout = timeout;
/* 更新最后使用时间 */
ac->lastuse = mstime();
ac->batched_iter = it;
/* 在500ms内至少产生3条命令 */
ac->sending_msgs = slotsmgrtAsyncNextMessagesMicroseconds(ac, 3, 500);
/* 判断db是否在迁移状态,如果是则阻塞 */
getSlotsmgrtAsyncClientMigrationStatusOrBlock(c, NULL, 1);
if (ac->sending_msgs != 0) {
return;
}
notifySlotsmgrtAsyncClient(ac, NULL);
ac->batched_iter = NULL;
freeBatchedObjectIterator(it);
}
/* *
* SLOTSMGRTONE-ASYNC $host $port $timeout $maxbulks $maxbytes $key1 [$key2 ...]
* */
void slotsmgrtOneAsyncCommand(client *c) {
if (c->argc <= 6) {
addReplyError(c, "wrong number of arguments for SLOTSMGRTONE-ASYNC");
return;
}
slotsmgrtAsyncGenericCommand(c, 0, 1);
}
/* *
* SLOTSMGRTTAGONE-ASYNC $host $port $timeout $maxbulks $maxbytes $key1 [$key2 ...]
* */
void slotsmgrtTagOneAsyncCommand(client *c) {
if (c->argc <= 6) {
addReplyError(c, "wrong number of arguments for SLOTSMGRTTAGONE-ASYNC");
return;
}
slotsmgrtAsyncGenericCommand(c, 1, 1);
}
/* *
* SLOTSMGRTSLOT-ASYNC $host $port $timeout $maxbulks $maxbytes $slot $numkeys
* */
void slotsmgrtSlotAsyncCommand(client *c) {
if (c->argc != 8) {
addReplyError(c, "wrong number of arguments for SLOTSMGRTSLOT-ASYNC");
return;
}
slotsmgrtAsyncGenericCommand(c, 0, 0);
}
/* *
* SLOTSMGRTTAGSLOT-ASYNC $host $port $timeout $maxbulks $maxbytes $slot $numkeys
* */
void slotsmgrtTagSlotAsyncCommand(client *c) {
if (c->argc != 8) {
addReplyError(c, "wrong number of arguments for SLOTSMGRTSLOT-ASYNC");
return;
}
slotsmgrtAsyncGenericCommand(c, 1, 0);
}
/* *
* SLOTSMGRT-ASYNC-FENCE
* */
void
slotsmgrtAsyncFenceCommand(client *c) {
/* 获取异步迁移状态或者阻塞一个client */
int ret = getSlotsmgrtAsyncClientMigrationStatusOrBlock(c, NULL, 1);
if (ret == 0) {
/* 没有阻塞,说明当前没有迁移任务 */
addReply(c, shared.ok);
} else if (ret != 1) {
/* 正常情况下如果客户端成功阻塞,会返回1 */
addReplyError(c, "previous operation has not finished (call fence again)");
}
/* 返回1的情况下,客户端暂时不会受到任何返回,后续迁移完成后会收到最终通知 */
}
/* *
* SLOTSMGRT-ASYNC-CANCEL
* */
void
slotsmgrtAsyncCancelCommand(client *c) {
addReplyLongLong(c, releaseSlotsmgrtAsyncClient(c->db->id, "interrupted: canceled"));
}
/* ============================ SlotsmgrtAsyncStatus ======================================= */
static void
singleObjectIteratorStatus(client *c, singleObjectIterator *it) {
if (it == NULL) {
addReply(c, shared.nullmultibulk);
return;
}
void *ptr = addDeferredMultiBulkLength(c);
int fields = 0;
fields ++; addReplyBulkCString(c, "key");
addReplyBulk(c, it->key);
fields ++; addReplyBulkCString(c, "val.type");
addReplyBulkLongLong(c, it->val == NULL ? -1 : it->val->type);
fields ++; addReplyBulkCString(c, "stage");
addReplyBulkLongLong(c, it->stage);
fields ++; addReplyBulkCString(c, "expire");
addReplyBulkLongLong(c, it->expire);
fields ++; addReplyBulkCString(c, "cursor");
addReplyBulkLongLong(c, it->cursor);
fields ++; addReplyBulkCString(c, "lindex");
addReplyBulkLongLong(c, it->lindex);
fields ++; addReplyBulkCString(c, "zindex");
addReplyBulkLongLong(c, it->zindex);
fields ++; addReplyBulkCString(c, "chunked_msgs");
addReplyBulkLongLong(c, it->chunked_msgs);
setDeferredMultiBulkLength(c, ptr, fields * 2);
}
/* batchedObjectIterator的状态 */
static void
batchedObjectIteratorStatus(client *c, batchedObjectIterator *it) {
if (it == NULL) {
addReply(c, shared.nullmultibulk);
return;
}
void *ptr = addDeferredMultiBulkLength(c);
int fields = 0;
fields ++;
addReplyBulkCString(c, "keys");
addReplyMultiBulkLen(c, 2);
addReplyBulkLongLong(c, dictSize(it->keys));
addReplyMultiBulkLen(c, dictSize(it->keys));
dictIterator *di = dictGetIterator(it->keys);
dictEntry *de;
while((de = dictNext(di)) != NULL) {
addReplyBulk(c, dictGetKey(de));
}
dictReleaseIterator(di);
fields ++; addReplyBulkCString(c, "timeout");
addReplyBulkLongLong(c, it->timeout);
fields ++; addReplyBulkCString(c, "maxbulks");
addReplyBulkLongLong(c, it->maxbulks);
fields ++; addReplyBulkCString(c, "maxbytes");
addReplyBulkLongLong(c, it->maxbytes);
fields ++; addReplyBulkCString(c, "estimate_msgs");
addReplyBulkLongLong(c, it->estimate_msgs);
fields ++; addReplyBulkCString(c, "removed_keys");
addReplyBulkLongLong(c, listLength(it->removed_keys));
fields ++; addReplyBulkCString(c, "chunked_vals");
addReplyBulkLongLong(c, listLength(it->chunked_vals));
fields ++; addReplyBulkCString(c, "iterators");
addReplyMultiBulkLen(c, 2);
addReplyBulkLongLong(c, listLength(it->list));
singleObjectIterator *sp = NULL;
if (listLength(it->list) != 0) {
sp = listNodeValue(listFirst(it->list));
}
singleObjectIteratorStatus(c, sp);
setDeferredMultiBulkLength(c, ptr, fields * 2);
}
/* *
* SLOTSMGRT-ASYNC-STATUS
* */
void
slotsmgrtAsyncStatusCommand(client *c) {
/* */
slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(c->db->id);
if (ac->c == NULL) {
addReply(c, shared.nullmultibulk);
return;
}
/* 预留MultiBulk长度 */
void *ptr = addDeferredMultiBulkLength(c);
int fields = 0;
fields ++; addReplyBulkCString(c, "host");
addReplyBulkCString(c, ac->host);
fields ++; addReplyBulkCString(c, "port");
addReplyBulkLongLong(c, ac->port);
fields ++; addReplyBulkCString(c, "used");
addReplyBulkLongLong(c, ac->used);
fields ++; addReplyBulkCString(c, "timeout");
addReplyBulkLongLong(c, ac->timeout);
fields ++; addReplyBulkCString(c, "lastuse");
addReplyBulkLongLong(c, ac->lastuse);
fields ++; addReplyBulkCString(c, "since_lastuse");
addReplyBulkLongLong(c, mstime() - ac->lastuse);
fields ++; addReplyBulkCString(c, "sending_msgs");
addReplyBulkLongLong(c, ac->sending_msgs);
/* 被阻塞的客户端的个数 */
fields ++; addReplyBulkCString(c, "blocked_clients");
addReplyBulkLongLong(c, listLength(ac->blocked_list));
fields ++; addReplyBulkCString(c, "batched_iterator");
batchedObjectIteratorStatus(c, ac->batched_iter);
/* 设置MultiBulk长度 */
setDeferredMultiBulkLength(c, ptr, fields * 2);
}
/* ============================ SlotsmgrtExecWrapper ======================================= */
/* *
* SLOTSMGRT-EXEC-WRAPPER $hashkey $command [$arg1 ...]
* */
void
slotsmgrtExecWrapperCommand(client *c) {
/* MultiBulk长度2 */
addReplyMultiBulkLen(c, 2);
if (c->argc < 3) {
addReplyLongLong(c, -1);
addReplyError(c, "wrong number of arguments for SLOTSMGRT-EXEC-WRAPPER");
return;
}
/* 查找命令 */
struct redisCommand *cmd = lookupCommand(c->argv[2]->ptr);
if (cmd == NULL) {
addReplyLongLong(c, -1);
addReplyErrorFormat(c,"invalid command specified (%s)",
(char *)c->argv[2]->ptr);
return;
}
if ((cmd->arity > 0 && cmd->arity != c->argc - 2) || (c->argc - 2 < -cmd->arity)) {
addReplyLongLong(c, -1);
addReplyErrorFormat(c, "wrong number of arguments for command (%s)",
(char *)c->argv[2]->ptr);
return;
}
/* 写的方式查找key */
if (lookupKeyWrite(c->db, c->argv[1]) == NULL) {
addReplyLongLong(c, 0);
addReplyError(c, "the specified key doesn't exist");
return;
}
/* 如果是写命令且 c->argv[1]正处于迁移状态,不会阻塞客户端 */
if (!(cmd->flags & CMD_READONLY) && getSlotsmgrtAsyncClientMigrationStatusOrBlock(c, c->argv[1], 0) != 0) {
/* 返回1 */
addReplyLongLong(c, 1);
addReplyError(c, "the specified key is being migrated");
return;
} else {
/* 返回2表示正常 */
addReplyLongLong(c, 2);
robj **argv = zmalloc(sizeof(robj *) * (c->argc - 2));
for (int i = 2; i < c->argc; i ++) {
argv[i - 2] = c->argv[i];
incrRefCount(c->argv[i]);
}
/* 被重复引用计数的要减去 */
for (int i = 0; i < c->argc; i ++) {
decrRefCount(c->argv[i]);
}
zfree(c->argv);
c->argc = c->argc - 2;
c->argv = argv;
c->cmd = cmd;
/* 调用被包装的命令 */
call(c, CMD_CALL_FULL & ~CMD_CALL_PROPAGATE);
}
}
/* ============================ SlotsrestoreAsync Commands ================================= */
/* SLOTSRESTORE-ASYNC的回复 */
static void
slotsrestoreReplyAck(client *c, int err_code, const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);
sds s = sdscatvprintf(sdsempty(), fmt, ap);
va_end(ap);
addReplyMultiBulkLen(c, 3);
addReplyBulkCString(c, "SLOTSRESTORE-ASYNC-ACK");
addReplyBulkLongLong(c, err_code);
addReplyBulkSds(c, s);
if (err_code != 0) {
/* 如果有错误则回复之后关闭客户端 */
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
}
}
extern int verifyDumpPayload(unsigned char *p, size_t len);
/* slotsrestore-async命令具体处理 */
static int
slotsrestoreAsyncHandle(client *c) {
/* 获取本节点上异步迁移状态,即使在迁移也不会阻塞这个client */
if (getSlotsmgrtAsyncClientMigrationStatusOrBlock(c, NULL, 0) != 0) {
/* 本节点当前db上正在执行迁移,不能响应slotsrestore-async命令 */
slotsrestoreReplyAck(c, -1, "the specified DB is being migrated");
return C_ERR;
}
const char *cmd = "";
/* 参数校验 */
if (c->argc < 2) {
goto bad_arguments_number;
}
cmd = c->argv[1]->ptr;
/* ==================================================== */
/* SLOTSRESTORE-ASYNC $cmd $key [$ttl $arg1, $arg2 ...] */
/* ==================================================== */
if (c->argc < 3) {
goto bad_arguments_number;
}
robj *key = c->argv[2];
/* SLOTSRESTORE-ASYNC delete $key */
if (!strcasecmp(cmd, "delete")) {
if (c->argc != 3) {
goto bad_arguments_number;
}
/* 同步删除 */
int deleted = dbDelete(c->db, key);
if (deleted) {
/* 删除成功,通知所有watch该key的client */
signalModifiedKey(c->db, key);
/* 脏计数 */
server.dirty ++;
}
/* 回复,成删除回复1,没有删除则返回0 */
slotsrestoreReplyAck(c, 0, deleted ? "1" : "0");
return C_OK;
}
/* ==================================================== */
/* SLOTSRESTORE-ASYNC $cmd $key $ttl [$arg1, $arg2 ...] */
/* ==================================================== */
if (c->argc < 4) {
goto bad_arguments_number;
}
/* 提取ttl */
long long ttl;
if (getLongLongFromObject(c->argv[3], &ttl) != C_OK || ttl < 0) {
slotsrestoreReplyAck(c, -1, "invalid TTL value (TTL=%s)", c->argv[3]->ptr);
return C_ERR;
}
/* SLOTSRESTORE-ASYNC expire $key $ttl */
if (!strcasecmp(cmd, "expire")) {
/* 参数校验 */
if (c->argc != 4) {
goto bad_arguments_number;
}
/* 查看key是否存在 */
if (lookupKeyWrite(c->db, key) == NULL) {
slotsrestoreReplyAck(c, -1, "the specified key doesn't exist (%s)", key->ptr);
return C_ERR;
}
/* 响应 */
slotsrestoreReplyAck(c, 0, "1");
/* 会执过期设置 */
goto success_common;
}
/* SLOTSRESTORE-ASYNC string $key $ttl $payload */
if (!strcasecmp(cmd, "string")) {
/* 参数校验 */
if (c->argc != 5) {
goto bad_arguments_number;
}
/* 查看key是否存在 */
if (lookupKeyWrite(c->db, key) != NULL) {
slotsrestoreReplyAck(c, -1, "the specified key already exists (%s)", key->ptr);
return C_ERR;
}
/* 对val编码 */
robj *val = c->argv[4] = tryObjectEncoding(c->argv[4]);
/* 添加到db */
dbAdd(c->db, key, val);
/* 引用计数 */
incrRefCount(val);
/* 响应 */
slotsrestoreReplyAck(c, 0, "1");
/* 会执过期设置 */
goto success_common;
}
/* SLOTSRESTORE-ASYNC object $key $ttl $payload */
if (!strcasecmp(cmd, "object")) {
/* 参数校验 */
if (c->argc != 5) {
goto bad_arguments_number;
}
/* 查看key是否存在 */
if (lookupKeyWrite(c->db, key) != NULL) {
slotsrestoreReplyAck(c, -1, "the specified key already exists (%s)", key->ptr);
return C_ERR;
}
void *bytes = c->argv[4]->ptr;
rio payload;
/* 校验RDB序列化格式 */
if (verifyDumpPayload(bytes, sdslen(bytes)) != C_OK) {
slotsrestoreReplyAck(c, -1, "invalid payload checksum");
return C_ERR;
}
/* 初始化payload */
rioInitWithBuffer(&payload, bytes);
/* 获取对象类型 */
int type = rdbLoadObjectType(&payload);
if (type == -1) {
slotsrestoreReplyAck(c, -1, "invalid payload type");
return C_ERR;
}
/* 获取值对象 */
robj *val = rdbLoadObject(type, &payload);
if (val == NULL) {
slotsrestoreReplyAck(c, -1, "invalid payload body");
return C_ERR;
}
/* 添加到db */
dbAdd(c->db, key, val);
/* 响应 */
slotsrestoreReplyAck(c, 0, "1");
/* 会执过期设置 */
goto success_common;
}
/* ========================================================== */
/* SLOTSRESTORE-ASYNC $cmd $key $ttl $hint [$arg1, $arg2 ...] */
/* ========================================================== */
/* 参数校验 */
if (c->argc < 5) {
goto bad_arguments_number;
}
/* 提取总长度hint */
long long hint;
if (getLongLongFromObject(c->argv[4], &hint) != C_OK || hint < 0) {
slotsrestoreReplyAck(c, -1, "invalid Hint value (Hint=%s)", c->argv[4]->ptr);
return C_ERR;
}
int xargc = c->argc - 5;
robj **xargv = &c->argv[5];
/* SLOTSRESTORE-ASYNC list $key $ttl $hint [$elem1 ...] */
if (!strcasecmp(cmd, "list")) {
/* 查看key是否存在 */
robj *val = lookupKeyWrite(c->db, key);
if (val != NULL) {
/* 如果key已经存在,则val类型必须为OBJ_LIST切编码类型必须为OBJ_ENCODING_QUICKLIST */
if (val->type != OBJ_LIST || val->encoding != OBJ_ENCODING_QUICKLIST) {
slotsrestoreReplyAck(c, -1, "wrong type (expect=%d/%d,got=%d/%d)",
OBJ_LIST, OBJ_ENCODING_QUICKLIST, val->type, val->encoding);
return C_ERR;
}
} else {
/* 否则key不存在 */
if (xargc == 0) {
slotsrestoreReplyAck(c, -1, "the specified key doesn't exist (%s)", key->ptr);
return C_ERR;
}
/* 常见Quicklist对象 */
val = createQuicklistObject();
/* 设置选项 */
quicklistSetOptions(val->ptr, server.list_max_ziplist_size,
server.list_compress_depth);
/* 添加到db */
dbAdd(c->db, key, val);
}
/* 将所有的args添加到val Quicklist中 */
for (int i = 0; i < xargc; i ++) {
xargv[i] = tryObjectEncoding(xargv[i]);
listTypePush(val, xargv[i], LIST_TAIL);
}
/* 返回值为val当前总长度 */
slotsrestoreReplyAck(c, 0, "%d", listTypeLength(val));
goto success_common;
}
/* SLOTSRESTORE-ASYNC hash $key $ttl $hint [$hkey1 $hval1 ...] */
if (!strcasecmp(cmd, "hash")) {
/* 对于hash类型args必须是偶数 */
if (xargc % 2 != 0) {
goto bad_arguments_number;
}
/* 先查找key */
robj *val = lookupKeyWrite(c->db, key);
if (val != NULL) {
/* key已存在,则类型必须为OBJ_HASH,编码类型必须为OBJ_ENCODING_HT */
if (val->type != OBJ_HASH || val->encoding != OBJ_ENCODING_HT) {
slotsrestoreReplyAck(c, -1, "wrong type (expect=%d/%d,got=%d/%d)",
OBJ_HASH, OBJ_ENCODING_HT, val->type, val->encoding);
return C_ERR;
}
} else {
if (xargc == 0) {
slotsrestoreReplyAck(c, -1, "the specified key doesn't exist (%s)", key->ptr);
return C_ERR;
}
/* 不存在就创建hash对象 */
val = createHashObject();
if (val->encoding != OBJ_ENCODING_HT) {
hashTypeConvert(val, OBJ_ENCODING_HT);
}
/* 添加到db */
dbAdd(c->db, key, val);
}
/* 如果总长度不为0 */
if (hint != 0) {
dict *ht = val->ptr;
/* 使用hint创建或者扩展ht */
dictExpand(ht, hint);
}
/* 顺序添加 */
for (int i = 0; i < xargc; i += 2) {
/* field */
hashTypeTryObjectEncoding(val, &xargv[i], &xargv[i + 1]);
/* value */
hashTypeSet(val, xargv[i], xargv[i + 1]);
}
/* 返回值为val当前总长度 */
slotsrestoreReplyAck(c, 0, "%d", hashTypeLength(val));
goto success_common;
}
/* SLOTSRESTORE-ASYNC dict $key $ttl $hint [$elem1 ...] */
if (!strcasecmp(cmd, "dict")) {
/* 先查找key */
robj *val = lookupKeyWrite(c->db, key);
if (val != NULL) {
/* key已存在,则类型必须为OBJ_SET,编码类型必须为OBJ_ENCODING_HT */
if (val->type != OBJ_SET || val->encoding != OBJ_ENCODING_HT) {
slotsrestoreReplyAck(c, -1, "wrong type (expect=%d/%d,got=%d/%d)",
OBJ_SET, OBJ_ENCODING_HT, val->type, val->encoding);
return C_ERR;
}
} else {
if (xargc == 0) {
slotsrestoreReplyAck(c, -1, "the specified key doesn't exist (%s)", key->ptr);
return C_ERR;
}
/* 不存在就创建set对象 */
val = createSetObject();
if (val->encoding != OBJ_ENCODING_HT) {
setTypeConvert(val, OBJ_ENCODING_HT);
}
/* 添加到db */
dbAdd(c->db, key, val);
}
/* 如果总长度不为0 */
if (hint != 0) {
dict *ht = val->ptr;
/* 使用hint创建或者扩展ht */
dictExpand(ht, hint);
}
/* 顺序添加 */
for (int i = 0; i < xargc; i ++) {
/* feild */
xargv[i] = tryObjectEncoding(xargv[i]);
/* val */
setTypeAdd(val, xargv[i]);
}
/* 返回值为val当前总长度 */
slotsrestoreReplyAck(c, 0, "%d", setTypeSize(val));
goto success_common;
}
/* SLOTSRESTORE-ASYNC zset $key $ttl $hint [$elem1 $score1 ...] */
if (!strcasecmp(cmd, "zset")) {
/* zset参数也必须是偶数,elem1和score配对 */
if (xargc % 2 != 0) {
goto bad_arguments_number;
}
/* 提取score */
double *scores = zmalloc(sizeof(double) * xargc / 2);
for (int i = 1, j = 0; i < xargc; i += 2, j ++) {
uint64_t bits;
if (getUint64FromRawStringObject(xargv[i], &bits) != C_OK) {
zfree(scores);
slotsrestoreReplyAck(c, -1, "invalid zset score ([%d]), bad raw bits", j);
return C_ERR;
}
scores[j] = convertRawBitsToDouble(bits);
}
/* */
robj *val = lookupKeyWrite(c->db, key);
if (val != NULL) {
/* val已经存在,校验类型 */
if (val->type != OBJ_ZSET || val->encoding != OBJ_ENCODING_SKIPLIST) {
zfree(scores);
slotsrestoreReplyAck(c, -1, "wrong type (expect=%d/%d,got=%d/%d)",
OBJ_ZSET, OBJ_ENCODING_SKIPLIST, val->type, val->encoding);
return C_ERR;
}
} else {
/* 不存在 */
if (xargc == 0) {
zfree(scores);
slotsrestoreReplyAck(c, -1, "the specified key doesn't exist (%s)", key->ptr);
return C_ERR;
}
/* 否则就创建zset对象 */
val = createZsetObject();
if (val->encoding != OBJ_ENCODING_SKIPLIST) {
zsetConvert(val, OBJ_ENCODING_SKIPLIST);
}
/* 添加到db */
dbAdd(c->db, key, val);
}
zset *zset = val->ptr;
/* 如果总长度不为0 */
if (hint != 0) {
dict *ht = zset->dict;
/* 就创建或修正hash大小为hint */
dictExpand(ht, hint);
}
/* 顺序添加 */
for (int i = 0, j = 0; i < xargc; i += 2, j ++) {
robj *elem = xargv[i] = tryObjectEncoding(xargv[i]);
dictEntry *de = dictFind(zset->dict, elem);
if (de != NULL) {
/* score */
double score = *(double *)dictGetVal(de);
zslDelete(zset->zsl, score, elem);
/* memeber */
dictDelete(zset->dict, elem);
}
/* 添加elem */
zskiplistNode *znode = zslInsert(zset->zsl, scores[j], elem);
/* 引用计数 */
incrRefCount(elem);
/* 添加score */
dictAdd(zset->dict, elem, &(znode->score));
/* 引用计数 */
incrRefCount(elem);
}
zfree(scores);
/* 返回值为val当前总长度 */
slotsrestoreReplyAck(c, 0, "%d", zsetLength(val));
goto success_common;
}
slotsrestoreReplyAck(c, -1, "unknown command (argc=%d,cmd=%s)", c->argc, cmd);
return C_ERR;
bad_arguments_number:
slotsrestoreReplyAck(c, -1, "wrong number of arguments (argc=%d,cmd=%s)", c->argc, cmd);
return C_ERR;
success_common:
/* ttl不为0就设置过期,否则就删除过期设置 */
if (ttl != 0) {
setExpire(c->db, key, mstime() + ttl);
} else {
removeExpire(c->db, key);
}
/* 通知watched */
signalModifiedKey(c->db, key);
/* 脏计数 */
server.dirty ++;
return C_OK;
}
/* *
* SLOTSRESTORE-ASYNC delete $key
* expire $key $ttl
* object $key $ttl $payload
* string $key $ttl $payload
* list $key $ttl $hint [$elem1 ...]
* hash $key $ttl $hint [$hkey1 $hval1 ...]
* dict $key $ttl $hint [$elem1 ...]
* zset $key $ttl $hint [$elem1 $score1 ...]
* */
void
slotsrestoreAsyncCommand(client *c) {
/* slotsrestore-async命令处理 */
if (slotsrestoreAsyncHandle(c) != C_OK) {
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
}
}
/* 目的实例发送SLOTSRESTORE-ASYNC-ACK的处理 */
static int
slotsrestoreAsyncAckHandle(client *c) {
/* 获取该db上对应的slotsmgrtAsyncClient */
slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(c->db->id);
if (ac->c != c) {
/* 必须是同一个客户端发送的SLOTSRESTORE-ASYNC-ACK才合法 */
addReplyErrorFormat(c, "invalid client, permission denied");
return C_ERR;
}
/* 参数校验,格式:SLOTSRESTORE-ASYNC-ACK $errno $message */
if (c->argc != 3) {
addReplyError(c, "wrong number of arguments for SLOTSRESTORE-ASYNC-ACK");
return C_ERR;
}
long long errcode;
if (getLongLongFromObject(c->argv[1], &errcode) != C_OK) {
addReplyErrorFormat(c, "invalid errcode (%s)",
(char *)c->argv[1]->ptr);
return C_ERR;
}
/* 如果有错误这个就是错误的描述信息 */
const char *errmsg = c->argv[2]->ptr;
if (errcode != 0) {
/* 错误码不为0则打印错误 */
serverLog(LL_WARNING, "slotsmgrt_async: ack[%d] %s",
(int)errcode, errmsg != NULL ? errmsg : "(null)");
return C_ERR;
}
/* batched_iter校验,理论上在有迁移状态下不能为NULL */
if (ac->batched_iter == NULL) {
serverLog(LL_WARNING, "slotsmgrt_async: null batched iterator");
addReplyError(c, "invalid iterator (NULL)");
return C_ERR;
}
/* 正在发送的消息个数(飞行中的消息) */
if (ac->sending_msgs == 0) {
serverLog(LL_WARNING, "slotsmgrt_async: invalid message counter");
addReplyError(c, "invalid pending messages");
return C_ERR;
}
/* 更新slotsmgrtAsyncClient最后一次被使用的时间 */
ac->lastuse = mstime();
/* 飞行中的消息个数减一(即一条restore命令收到了一个ack) */
ac->sending_msgs -= 1;
/* 继续产生新的restore命令(在给定10ms内至少产生2条消息) */
ac->sending_msgs += slotsmgrtAsyncNextMessagesMicroseconds(ac, 2, 10);
/* 如果还有正在发送的消息(即发出去还没收到ACK) */
if (ac->sending_msgs != 0) {
return C_OK;
}
/* 通知客户端 */
notifySlotsmgrtAsyncClient(ac, NULL);
/* 获取批处理迭代器 */
batchedObjectIterator *it = ac->batched_iter;
if (listLength(it->removed_keys) != 0) {
/* 如果被移走的key个数不为0 */
list *ll = it->removed_keys;
for (int i = 0; i < c->argc; i ++) {
/* 遍历removed_keys链表,对其引用计数减一 */
decrRefCount(c->argv[i]);
}
/* 释放客户端当前的参数结构 */
zfree(c->argv);
/* DEL key1 key2 key2 ... */
c->argc = 1 + listLength(ll);
/* 分配argv结构 */
c->argv = zmalloc(sizeof(robj *) * c->argc);
for (int i = 1; i < c->argc; i ++) {
/* 遍历、填充argv */
listNode *head = listFirst(ll);
/* 获取被移走的key */
robj *key = listNodeValue(head);
/* 将其从db中删除 */
if (dbDelete(c->db, key)) {
/* 通知key空间 */
signalModifiedKey(c->db, key);
/* 脏计数 */
server.dirty ++;
}
/* 填充argv */
c->argv[i] = key;
/* 引用计数 */
incrRefCount(key);
/* 删除当前节点 */
listDelNode(ll, head);
}
/* 填充 argv[0] */
c->argv[0] = createStringObject("DEL", 3);
/* 注意,虽然客户端发来的是SLOTSRESTORE-ASYNC-ACK命令,
但是此时我们已经将其改写为一条DEL命令,该函数退出后,会被
propagate写到AOF文件和所有slaves
*/
}
/* 用于存放使用chunked方式发生的val */
if (listLength(it->chunked_vals) != 0) {
list *ll = it->chunked_vals;
/* 遍历 chunked_vals链表 */
while (listLength(ll) != 0) {
/* 头结点 */
listNode *head = listFirst(ll);
/* 提取节点值 */
robj *o = listNodeValue(head);
/* 引用计数 */
incrRefCount(o);
/* 删除当前节点 */
listDelNode(ll, head);
/* 如果当前对象refcount不为1就先decrRefCount */
if (o->refcount != 1) {
decrRefCount(o);
} else {
/* 否则refcount为1就lazy释放 */
lazyReleaseObject(o);
}
}
}
ac->batched_iter = NULL;
freeBatchedObjectIterator(it);
return C_OK;
}
/* *
* SLOTSRESTORE-ASYNC-ACK $errno $message
* */
void
slotsrestoreAsyncAckCommand(client *c) {
/* 调用slotsrestoreAsyncAckHandle进一步处理 */
if (slotsrestoreAsyncAckHandle(c) != C_OK) {
/* Close after writing entire reply. */
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
}
}
extern int time_independent_strcmp(const char *a, const char *b);
/* *
* SLOTSRESTORE-ASYNC-AUTH $passwd
* */
void
slotsrestoreAsyncAuthCommand(client *c) {
if (!server.requirepass) {
/* 如果服务端没有设置密码则返回错误 */
slotsrestoreReplyAck(c, -1, "Client sent AUTH, but no password is set");
return;
}
if (!time_independent_strcmp(c->argv[1]->ptr, server.requirepass)) {
/* 密码匹配成功则设置客户端的authenticated标志,并响应ok */
c->authenticated = 1;
slotsrestoreReplyAck(c, 0, "OK");
} else {
/* 密码匹配失败 */
c->authenticated = 0;
slotsrestoreReplyAck(c, -1, "invalid password");
}
}
/* *
* SLOTSRESTORE-ASYNC-SELECT $db
* */
void
slotsrestoreAsyncSelectCommand(client *c) {
long long db;
if (getLongLongFromObject(c->argv[1], &db) != C_OK ||
!(db >= 0 && db <= INT_MAX) || selectDb(c, db) != C_OK) {
slotsrestoreReplyAck(c, -1, "invalid DB index (%s)", c->argv[1]->ptr);
} else {
slotsrestoreReplyAck(c, 0, "OK");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment