顺风车运营研发团队 熊浩含
1、命令简介
BITCOUNT key [start] [end]html
redis计算给定字符串中,被设置为 1 的比特位的数量。redis
redis> BITCOUNT bits (integer) 0 redis> SETBIT bits 0 1 # 0001 (integer) 0 redis> BITCOUNT bits (integer) 1 redis> SETBIT bits 3 1 # 1001 (integer) 0 redis> BITCOUNT bits (integer) 2
2、算法思路
redis执行这一命令的过程,核心是求二进制数中“1”的个数。但不一样于处理通常数据,redis中支持计算最多512M数据中被设置为 1 的比特位的数。因此问题不妨转化为:算法
如何计算0.5个G数据中,被设置为 1 的比特位的数量?数组
相关的算法有不少,redis在处理过程当中,综合了二种不一样的方法,先单独介绍:函数
查表法
此处入参的大小是4字节(unsigned int)ui
int BitCount(unsigned int n) { unsigned int table[256] = { 0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, 4, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 4, 5, 5, 6, 5, 6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8, }; return table[n &0xff] + table[(n >>8) &0xff] + table[(n >>16) &0xff] + table[(n >>24) &0xff] ; }
思路:this
一、建立一大小为256的数组,相应位置上存放对应2进制数的“1”的个数;spa
二、将入参按8bit分开,查4次表,并将4次结果结果相加。.net
以2882400018(二进制:10101011110011011110111100010010)为例,四次查表过程以下:红色表示当前8bit,绿色表示右移后高位补零。code
相加可得2+7+5+5=19。
variable-precision SWAR算法
统计一个位数组中非0位的数量,数学上称做:”Hanmming Weight“(汉明重量)。目前效率最高的是variable-precision SWAR算法,能够在常数时间内计算出多个字节的非0数目。
先观察如下几个数,以后这几个数将做为掩码参与计算。
int swar(uint32_t i) { //计算每两位二进制数中1的个数 i = ( i & 0x55555555) + ((i >> 1) & 0x55555555); //计算每四位二进制数中1的个数 i = (i & 0x33333333) + ((i >> 2) & 0x33333333); //计算每八位二进制数中1的个数 i = (i & 0x0F0F0F0F) + ((i >> 4) & 0x0F0F0F0F); //将每八位二进制数中1的个数和相加,并移至最低位八位 i = (i * 0x01010101) >> 24); return i; }
下面以(0010 1011 0100 1010 0001 1111 1000 0111)为例逐步说明:
1)首先计算每两位二进制数中1的个数,( i & 0x55555555)筛出了每两位二进制数中奇数位的“1”,并把“1”置于低位;((i >> 1) & 0x55555555)筛出了每两位二进制数中偶数位,一样把“1”置于低位;相加后的值,只多是0,1,2,表明了这两位上“1”的个数;
2)对上一步的结果做“归并”处理,计算每四位上“1”的个数,此时i的一个4bit,存放着两个2bit的“1”的个数和。(i & 0x33333333)筛出了奇数序列上的4bit,((i >> 2) & 0x33333333)筛出了偶数序列上的2bit;相加后的值,表明了这4bit上“1”的个数;
3)继续对上一步结果做“归并处理”,计算每八位上“1”的个数,此时i的一个8bit,存放着两个4bit的“1”的个数和。(i &0x0F0F0F0F)筛出了奇数序列上的4bit,((i >> 2) & 0x0F0F0F0F)筛出了偶数序列上的4bit;相加后的值,表明了这8bit上“1”的个数;
4)此时对于32bit的二进制数据,咱们已经按8bit*4分好了组,每8bit存放着的是该组“1”的个数,如今把这四组数加起来便可,即实现
00000100+00000101+00000011+00000100。
体如今乘法上,便是(i * 0x01010101)>>24,等于0000....000000010000=16。
3、redis实现
void bitcountCommand(client *c) { robj *o; long start, end, strlen; unsigned char *p; char llbuf[LONG_STR_SIZE]; /* Lookup, check for type, and return 0 for non existing keys. */ /*检查key是否存在,若是不存在,则返回0*/ if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,OBJ_STRING)) return; p = getObjectReadOnlyString(o,&strlen,llbuf); /* 检查参数是否有误 */ if (c->argc == 4) { if (getLongFromObjectOrReply(c,c->argv[2],&start,NULL) != C_OK) return; if (getLongFromObjectOrReply(c,c->argv[3],&end,NULL) != C_OK) return; /* Convert negative indexes */ if (start < 0 && end < 0 && start > end) { addReply(c,shared.czero); return; } if (start < 0) start = strlen+start; if (end < 0) end = strlen+end; if (start < 0) start = 0; if (end < 0) end = 0; if (end >= strlen) end = strlen-1; } else if (c->argc == 2) { /* The whole string. */ start = 0; end = strlen-1; } else { /* Syntax error. */ addReply(c,shared.syntaxerr); return; } /* Precondition: end >= 0 && end < strlen, so the only condition where * zero can be returned is: start > end. */ if (start > end) { addReply(c,shared.czero); } else { long bytes = end-start+1; addReplyLongLong(c,redisPopcount(p+start,bytes)); } }
* Count number of bits set in the binary array pointed by 's' and long * 'count' bytes. The implementation of this function is required to * work with a input string length up to 512 MB. */ size_t redisPopcount(void *s, long count) { size_t bits = 0; unsigned char *p = s; uint32_t *p4; /*为查表法预先准备好的表*/ static const unsigned char bitsinbyte[256] = {0,1,1,2,1,2,2,3,1,2,2,3,2,3,3,4,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,4,5,5,6,5,6,6,7,5,6,6,7,6,7,7,8}; /* Count initial bytes not aligned to 32 bit. */ /*四字节对齐,不是32整数倍的用查表处理。方便接下来按每次28字节处理*/ while((unsigned long)p & 3 && count) { bits += bitsinbyte[*p++];//一次仍是处理一字节 count--; } /* Count bits 28 bytes at a time */ p4 = (uint32_t*)p;//32bit 4字节 /*开始用variable-precision SWAR算法计算“1”的个数,每次算28字节*/ while(count>=28) { uint32_t aux1, aux2, aux3, aux4, aux5, aux6, aux7; aux1 = *p4++; aux2 = *p4++; aux3 = *p4++; aux4 = *p4++; aux5 = *p4++; aux6 = *p4++; aux7 = *p4++; count -= 28; aux1 = aux1 - ((aux1 >> 1) & 0x55555555);//步骤一 aux1 = (aux1 & 0x33333333) + ((aux1 >> 2) & 0x33333333);/步骤二 aux2 = aux2 - ((aux2 >> 1) & 0x55555555); aux2 = (aux2 & 0x33333333) + ((aux2 >> 2) & 0x33333333); aux3 = aux3 - ((aux3 >> 1) & 0x55555555); aux3 = (aux3 & 0x33333333) + ((aux3 >> 2) & 0x33333333); aux4 = aux4 - ((aux4 >> 1) & 0x55555555); aux4 = (aux4 & 0x33333333) + ((aux4 >> 2) & 0x33333333); aux5 = aux5 - ((aux5 >> 1) & 0x55555555); aux5 = (aux5 & 0x33333333) + ((aux5 >> 2) & 0x33333333); aux6 = aux6 - ((aux6 >> 1) & 0x55555555); aux6 = (aux6 & 0x33333333) + ((aux6 >> 2) & 0x33333333); aux7 = aux7 - ((aux7 >> 1) & 0x55555555); aux7 = (aux7 & 0x33333333) + ((aux7 >> 2) & 0x33333333); bits += ((((aux1 + (aux1 >> 4)) & 0x0F0F0F0F) + ((aux2 + (aux2 >> 4)) & 0x0F0F0F0F) + ((aux3 + (aux3 >> 4)) & 0x0F0F0F0F) + ((aux4 + (aux4 >> 4)) & 0x0F0F0F0F) + ((aux5 + (aux5 >> 4)) & 0x0F0F0F0F) + ((aux6 + (aux6 >> 4)) & 0x0F0F0F0F) + ((aux7 + (aux7 >> 4)) & 0x0F0F0F0F))* 0x01010101) >> 24;//步骤三及步骤四 } /* Count the remaining bytes. */ /*用查表法收尾剩余几个字节中“1”的个数*/ p = (unsigned char*)p4; while(count--) bits += bitsinbyte[*p++]; return bits; }
自问自答
Q1:为何要4字节对齐?
A1:由于接下来处理时,p4是按4字节处理的,一次处理4*7=28字节的内容。若是这里不是4字节,而是8字节,则前面也须要改为8字节对齐,保持一致。
Q2:为何一次批量处理28字节,处理16字节行不行,处理48字节行不行?
A2:其实能够,在redis3.0中,一次就只处理了16字节,只须要保证每次处理的大小是32bit(一字节)的倍数就能够。
Q3:函数限制了二进制串的大小是512M,是在哪限制的?
A3:这跟bitcount无关,是在setbit时限制的。
/* This helper function used by GETBIT / SETBIT parses the bit offset argument * making sure an error is returned if it is negative or if it overflows * Redis 512 MB limit for the string value. * * If the 'hash' argument is true, and 'bits is positive, then the command * will also parse bit offsets prefixed by "#". In such a case the offset * is multiplied by 'bits'. This is useful for the BITFIELD command. */ int getBitOffsetFromArgument(client *c, robj *o, size_t *offset, int hash, int bits) { long long loffset; char *err = "bit offset is not an integer or out of range"; char *p = o->ptr; size_t plen = sdslen(p); int usehash = 0; /* Handle #<offset> form. */ if (p[0] == '#' && hash && bits > 0) usehash = 1; if (string2ll(p+usehash,plen-usehash,&loffset) == 0) { addReplyError(c,err); return C_ERR; } /* Adjust the offset by 'bits' for #<offset> form. */ if (usehash) loffset *= bits; /* Limit offset to 512MB in bytes */ if ((loffset < 0) || ((unsigned long long)loffset >> 3) >= (512*1024*1024)) { addReplyError(c,err); return C_ERR; } *offset = (size_t)loffset; return C_OK; }
4、参考资料
1.https://blog.csdn.net/u010320...