顺风车运营研发团队 闫昌
一. 乐观锁与悲观锁
悲观锁: 数据被外界修改保守态度(悲观), 所以, 在整个数据处理过程当中, 将数据处理锁定状态. 实现方式: 在对任意记录修改前, 先尝试为该记录加上排他锁, 若是加锁失败, 说明该记录正在被修改, 当前查询可能要等待或抛出异常, 若是成功加锁, 那么就能够对记录作修改
乐观锁: 乐观锁假设认为数据通常状况下不会形成冲突, 因此在数据进行提交更新的时候, 才会正式对数据的冲突进行检测, 若是发现冲突了, 则返回错误信息
二. incr命令是否使用了乐观锁或悲观锁
假设redis数据库里如今有一个key a的值为10, 同一时刻有两个redis客户端(客户端1, 客户端2)对a进行了incr操做, 那么a的值应该为多少呢?
假设使用了乐观锁, 客户端1和客户端2同时获取到了a, 且a此时先将值修改成了11, 那么客户端2此时设置值为11时, 发现值已经成了11, 会报错
假设使用了悲观锁, 客户端1抢到a时会锁定住, 客户端2此时会抢不到锁, 这样能够保证原子性
redis代码中的incr实现:redis
void incrCommand(client *c) { incrDecrCommand(c,1); } void incrDecrCommand(client *c, long long incr) { long long value, oldvalue; robj *o, *new; o = lookupKeyWrite(c->db,c->argv[1]); //从数据库中寻找须要修改的key if (o != NULL && checkType(c,o,OBJ_STRING)) return; //若是key的类型为lis, set, zset, hash等, 则直接返回, 只有当key的类型为string时才能够继续 if (getLongLongFromObjectOrReply(c,o,&value,NULL) != C_OK) return; oldvalue = value; if ((incr < 0 && oldvalue < 0 && incr < (LLONG_MIN-oldvalue)) || (incr > 0 && oldvalue > 0 && incr > (LLONG_MAX-oldvalue))) {//防止数据越界, incr > (LLONG_MAX-oldvalue) ===== incr+oldvalue>LLONG_MAX addReplyError(c,"increment or decrement would overflow"); return; } value += incr; //直接将值相加 if (o && o->refcount == 1 && o->encoding == OBJ_ENCODING_INT && (value < 0 || value >= OBJ_SHARED_INTEGERS) && value >= LONG_MIN && value <= LONG_MAX) { new = o; o->ptr = (void*)((long)value); } else { new = createStringObjectFromLongLong(value); if (o) {//直接写入数据库 dbOverwrite(c->db,c->argv[1],new); } else { dbAdd(c->db,c->argv[1],new); } } signalModifiedKey(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_STRING,"incrby",c->argv[1],c->db->id); server.dirty++; addReply(c,shared.colon); addReply(c,new); addReply(c,shared.crlf); }
结论: 从代码中能够看到, incr不用锁来实现, 不保证原子性, 命令操做时, 直接对key加1操做数据库
三. redis 4.0的线程异步
redis4.0启动会默认起四个线程: 其中一个主线程, 其它三个子进程为后台线程, 须要主线程发信号给子进程, 子进程再处理相应的逻辑函数
root 43529 1 43529 0 4 16:10 ? 00:00:11 ./redis-server *:7777 root 43529 1 43530 0 4 16:10 ? 00:00:00 ./redis-server *:7777 root 43529 1 43531 0 4 16:10 ? 00:00:00 ./redis-server *:7777 root 43529 1 43532 0 4 16:10 ? 00:00:00 ./redis-server *:7777
四. unlink命令ui
unlink命令为redis4.0新加的命令, 对一个key进行unlink操做, redis主进程会将此key交给子线程去异步删除atom
//del和unlink命令底层调用的都为delGenericCommand函数, 只是第二个参数不一样 void delCommand(client *c) { delGenericCommand(c,0); } void unlinkCommand(client *c) { delGenericCommand(c,1); }
//若是是unlink, 则调用dbAsyncDelete异步删除. 若是是del, 则调用dbAsyncDelete同步删除 void delGenericCommand(client *c, int lazy) { int numdel = 0, j; for (j = 1; j < c->argc; j++) { expireIfNeeded(c->db,c->argv[j]); int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) : dbSyncDelete(c->db,c->argv[j]); if (deleted) { signalModifiedKey(c->db,c->argv[j]); notifyKeyspaceEvent(NOTIFY_GENERIC, "del",c->argv[j],c->db->id); server.dirty++; numdel++; } } addReplyLongLong(c,numdel); }
int dbAsyncDelete(redisDb *db, robj *key) { if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);//若是key已通过期, 则直接从字典中将此key删除 dictEntry *de = dictUnlink(db->dict,key->ptr);//获取要删除的字典key=>val if (de) { robj *val = dictGetVal(de); size_t free_effort = lazyfreeGetFreeEffort(val); if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) { atomicIncr(lazyfree_objects,1); bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);//会将要删除的key发送给后台子线程去处理 dictSetVal(db->dict,de,NULL);//这里将val设置为了null } } if (de) { dictFreeUnlinkedEntry(db->dict,de);//删除key和val, 由于val已经被前一步设置为了null, 因此这一步至关于只删除key if (server.cluster_enabled) slotToKeyDel(key);//若是是集群模式, 须要将slot上的key删除 return 1; } else { return 0; } }
dictEntry *dictUnlink(dict *ht, const void *key) { return dictGenericDelete(ht,key,1); } static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) { uint64_t h, idx; dictEntry *he, *prevHe; int table; if (d->ht[0].used == 0 && d->ht[1].used == 0) return NULL; if (dictIsRehashing(d)) _dictRehashStep(d); h = dictHashKey(d, key); for (table = 0; table <= 1; table++) {//防止正在rehash, 因此要遍历两个hash idx = h & d->ht[table].sizemask; he = d->ht[table].table[idx]; prevHe = NULL; while(he) {//获取字典中的key, val if (key==he->key || dictCompareKeys(d, key, he->key)) { /* Unlink the element from the list */ if (prevHe) prevHe->next = he->next; else d->ht[table].table[idx] = he->next; if (!nofree) { dictFreeKey(d, he); dictFreeVal(d, he); zfree(he); } d->ht[table].used--; return he; } prevHe = he; he = he->next;//hash里key可能会冲突, 因此要日后遍历 } if (!dictIsRehashing(d)) break; } return NULL; /* not found */ }
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) { struct bio_job *job = zmalloc(sizeof(*job)); job->time = time(NULL); job->arg1 = arg1; job->arg2 = arg2; job->arg3 = arg3; pthread_mutex_lock(&bio_mutex[type]);//开启线程锁 listAddNodeTail(bio_jobs[type],job);//在要删除的队列中添加元素 bio_pending[type]++; pthread_cond_signal(&bio_newjob_cond[type]);//向删除元素的线程发送信号, 使其开始处理 pthread_mutex_unlock(&bio_mutex[type]);//释放线程锁 }
整体流程图:spa
五. 关于slotToKeyDel函数线程
在集群模式中, 一共有16384个slot, 每一个redis的key必定在一个slot里, 当对key进行操做时, 经过CRC(key)&16383来肯定这个key属于哪一个slot
在clusterState.slots_to_keys跳跃表每一个节点的分值都是一个槽号, 而每一个节点成员都是一个数据库键
当往数据库添加一个键时, 节点会将这个键以及键对应的槽号关联到slots_to_keys中, 当删除时, slots_to_keys会删除健值的对应关系
因此在删除一个key时, 若是是集群模式, 会调用slotToKeyDel来删除跳跃表中的对应关系code