HLL算法用来进行基数统计。golang
什么是基数统计:例如给你一个数组[1,2,2,3,3,5,5] ,这个数组的基数是4(一共有4个不重复的元素)。 好了如今知道什么是基数统计了。
对于这个问题,最容易想到的办法固然是使用bitmap来实现,每一个bit位表示一个数字是否出现过,好比要表示上面这串数字使用下面的bitmap来表示:redis
011101算法
优势:相对省空间,且合并操做简单,好比上面的应用场景1, 若是想统计某2天有多少个ip地址访问,只须要把两天的bitmap结构拿出来作“或”操做便可。
缺点: 空间复杂度仍是太大,1个byte只有8个bit,也就是1个byte只能惟一表示8个IP地址(8个不一样的客户)那么:
1k才能表示 1024 * 8 = 8192
1M才能表示 1024 * 1024 * 8 = 8388608 (约800多万)
若是商品连接不少,还须要统计天天的数据等等,每一个商品天天的连接须要1M以上的内存,太大,内存扛不住。数组
相反: 使用HLL ,对于精度要求不是特别高的时候,只须要12k的内存,很神奇!!!缓存
举一个咱们最熟悉的抛硬币例子,出现正反面的几率都是1/2,一直抛硬币直到出现正面,记录下投掷次数k,将这种抛硬币屡次直到出现正面的过程记为一次伯努利过程,对于n次伯努利过程,咱们会获得n个出现正面的投掷次数值 $ k_1, k_2 ... k_n $
, 其中这里的最大值是k_max
。
看到这里可能有点懵逼,咱们把问题缕一缕,联系咱们的问题,在作基数统计的时候,实际上是这么个问题:bash
如今的目标是来了一组投掷次数的数据(
$ k_1, k_2 ... k_n $
),想要预测出,一共作了多少次伯努利过程(假设为n),才能获得这样的数据的啊?app
根据一顿数学推导---直接给结论: 用 $2^{k_ max}$
来做为n的估计值。ide
例如:作了一组伯努利过程,发现连续出现5次反面后,出现了1次正面,请问出现这种状况,大概须要作多少次伯努利过程呢? 答案是:
$ 2^6 = 64 $
次实验。函数
这个其实就是咱们进行HLL基数统计的基础。oop
回到基数统计的问题,咱们须要统计一组数据中不重复元素的个数,集合中每一个元素的通过hash函数后能够表示成0和1构成的二进制数串,一个二进制串能够类比为一次抛硬币实验,1是抛到正面,0是反面。二进制串中从低位开始第一个1出现的位置能够理解为抛硬币试验中第一次出现正面的抛掷次数kk,那么基于上面的结论,咱们能够经过屡次抛硬币实验的最大抛到正面的次数来预估总共进行了多少次实验,一样能够能够经过第一个1出现位置的最大值$ k_{max}k max
来预估总共有多少个不一样的数字(总体基数)。
因此HLL的基本思想是利用集合中数字的比特串第一个1出现位置的最大值来预估总体基数,可是这种预估方法存在较大偏差,为了改善偏差状况,HLL中引入分桶平均的概念。
例如 一样举抛硬币的例子,若是只有一组抛硬币实验,运气较好,第一次实验过程就抛了10次才第一次抛到正面,显然根据公式推导获得的实验次数的估计偏差较大;若是100个组同时进行抛硬币实验,同时运气这么好的几率就很低了,每组分别进行屡次抛硬币实验,并上报各自实验过程当中抛到正面的抛掷次数的最大值,就能根据100组的平均值预估总体的实验次数了。
redis里面就是使用了分桶的原理,具体的实现原理以下: 首先来了一个redis object(字符串), 通过hash后,生成了一个8字节的hash值。
graph LR
A[redis object]-->|hash function| B(64 bit)
复制代码
而后将 64个bit位的前14位做为桶的下标,这样桶大小就是$ 2^{14} = 16348 $
后面50个bit位,至关因而随机的那个伯努利过程,咱们找到1第一次出现的位置count,若是当前count比桶里面的oldcount大, 则更新oldcount=count。
还有不少应用,大体就是统计类的需求其实都很明确, 不须要很准确的值,只须要一个相似的估计值便可,同时不用set来存储,由于set其实很消耗内存,但愿这个统计的结构越节约内存越好。 其实均可以用这HLL算法,节约内存。
HLL的头结构体定义:
struct hllhdr {
char magic[4]; /* "HYLL" 魔数,前面4个字节表示这是一个hll对象*/
uint8_t encoding; /* 存储方式,后面会讲到,分为HLL_DENSE or HLL_SPARSE两种存储方式 */
uint8_t notused[3]; /*保留字段,由于redis是天然字节对齐的,因此空着也是空着,不如定义一下 Reserved for future use, must be zero. */
uint8_t card[8]; /*缓存的当前hll对象的基数值 Cached cardinality, little endian. */
uint8_t registers[]; /* Data bytes. 对于dense存储方式,这里就是一个12k的连续数组,对于sparse存储方式,这里长度是不定的,后面会讲到*/
};
复制代码
建立一个hll对象:
/* Create an HLL object. We always create the HLL using sparse encoding.
* This will be upgraded to the dense representation as needed.
这里英文注释其实已经写的很清楚了,默认hll对象使用sparse的编码方式,这样比较节约内存,可是sparse方式存储其实比较难以理解,代码实现也比较复杂,可是对于理解来讲,其实就是对于里面hll桶的存储方式的不一样,HLL算法自己逻辑上没有区别
*/
robj *createHLLObject(void) {
robj *o;
struct hllhdr *hdr;
sds s;
uint8_t *p;
int sparselen = HLL_HDR_SIZE +
(((HLL_REGISTERS+(HLL_SPARSE_XZERO_MAX_LEN-1)) /
HLL_SPARSE_XZERO_MAX_LEN)*2);
//头长度+(16384 + (16384-1) / 16384 * 2),也就是2个字节,默认由于基数统计里面全部的桶都是0,用spase方式存储,只须要2个字节
int aux;
/* Populate the sparse representation with as many XZERO opcodes as
* needed to represent all the registers. */
aux = HLL_REGISTERS;
s = sdsnewlen(NULL,sparselen);
p = (uint8_t*)s + HLL_HDR_SIZE;
while(aux) {
int xzero = HLL_SPARSE_XZERO_MAX_LEN;
if (xzero > aux) xzero = aux;
HLL_SPARSE_XZERO_SET(p,xzero);
p += 2;
aux -= xzero;
}
serverAssert((p-(uint8_t*)s) == sparselen);
/* Create the actual object. */
o = createObject(OBJ_STRING,s);
hdr = o->ptr;
memcpy(hdr->magic,"HYLL",4);
hdr->encoding = HLL_SPARSE;
return o;
}
复制代码
来一个byte流,传入 是一个void * 指针和一个长度len, 经过MurmurHash64A
函数 计算一个64位的hash值。64位的前14位(这个值是能够修改的)做为index,后面做为50位做为bit流。 2 ^ 14 == 16384 也就是一共有16384个桶。每一个桶使用6个bit存储。
后面的50位bit流,以下样子:
00001000....11000
其中第一次出现1的位置咱们记为count, 因此count最大值是50, 用6个bit位就够表示了。 2 ^ 6 = 64
故一个HLL对象实际用来存储的空间是16384(个桶) * ( 每一个桶6个bit) / 8 = 12288 byte。 也就是使用了约12k的内存。这个其实redis比较牛逼的地方,其实用一个字节来存的话,其实也就是16k的内存,可是为了能省4k的内存,搞出一堆。这个只是dense方式存储,相对是浪费空间的,下面讲的sparse方式存储更加节约空间。
计算出index(桶的下标), count(后面50个bit中第一次出现1的位置)后,下一步就是更新桶的操做。 根据index找到桶,而后看当前的count 是否大于oldcount,大于则更新下oldcount = count。此时为了性能考虑,是不会去统计当前的基数的,而是将HLL的头里面的一个标志位置为1,表示下次进行pfcount操做的时候,当前的缓存值已经失效了,须要从新统计缓存值。在后面pfcount流程的时候,发现这个标记为失效,就会去从新统计新的基数,放入基数缓存。
/* Call hllDenseAdd() or hllSparseAdd() according to the HLL encoding. */
int hllAdd(robj *o, unsigned char *ele, size_t elesize) {
struct hllhdr *hdr = o->ptr;
switch(hdr->encoding) {
case HLL_DENSE: return hllDenseAdd(hdr->registers,ele,elesize);
case HLL_SPARSE: return hllSparseAdd(o,ele,elesize);//sparse
default: return -1; /* Invalid representation. */
}
}
/* "Add" the element in the dense hyperloglog data structure.
* Actually nothing is added, but the max 0 pattern counter of the subset
* the element belongs to is incremented if needed.
*
* This is just a wrapper to hllDenseSet(), performing the hashing of the
* element in order to retrieve the index and zero-run count. */
int hllDenseAdd(uint8_t *registers, unsigned char *ele, size_t elesize) {
long index;
uint8_t count = hllPatLen(ele,elesize,&index);//index就是桶的下标, count则是后面50个bit位中1第一次出现的位置
/* Update the register if this element produced a longer run of zeroes. */
return hllDenseSet(registers,index,count);
}
/* ================== Dense representation implementation ================== */
/* Low level function to set the dense HLL register at 'index' to the
* specified value if the current value is smaller than 'count'.
*
* 'registers' is expected to have room for HLL_REGISTERS plus an
* additional byte on the right. This requirement is met by sds strings
* automatically since they are implicitly null terminated.
*
* The function always succeed, however if as a result of the operation
* the approximated cardinality changed, 1 is returned. Otherwise 0
* is returned. */
int hllDenseSet(uint8_t *registers, long index, uint8_t count) {
uint8_t oldcount;
//找到对应的index获取其中的值
HLL_DENSE_GET_REGISTER(oldcount,registers,index);
if (count > oldcount) { //若是新的值比老的大,就更新来的
HLL_DENSE_SET_REGISTER(registers,index,count);
return 1;
} else {
return 0;
}
}
/* Given a string element to add to the HyperLogLog, returns the length
* of the pattern 000..1 of the element hash. As a side effect 'regp' is
* set to the register index this element hashes to. */
int hllPatLen(unsigned char *ele, size_t elesize, long *regp) {
uint64_t hash, bit, index;
int count;
/* Count the number of zeroes starting from bit HLL_REGISTERS
* (that is a power of two corresponding to the first bit we don't use * as index). The max run can be 64-P+1 = Q+1 bits. * * Note that the final "1" ending the sequence of zeroes must be * included in the count, so if we find "001" the count is 3, and * the smallest count possible is no zeroes at all, just a 1 bit * at the first position, that is a count of 1. * * This may sound like inefficient, but actually in the average case * there are high probabilities to find a 1 after a few iterations. */ hash = MurmurHash64A(ele,elesize,0xadc83b19ULL); index = hash & HLL_P_MASK; /* Register index. */ hash >>= HLL_P; /* Remove bits used to address the register. */ hash |= ((uint64_t)1<<HLL_Q); /* Make sure the loop terminates and count will be <= Q+1. */ bit = 1; count = 1; /* Initialized to 1 since we count the "00000...1" pattern. */ while((hash & bit) == 0) { count++; bit <<= 1; } *regp = (int) index; serverLog(LL_NOTICE,"pf hash idx=%d, count=%d", index, count); return count; } 复制代码
统计基数流程,就若是cache标志位是有效的,直接返回缓存值,不然从新计算HLL的全部16384个桶,而后进行统计修正,具体的修正的原理,涉及不少的数学知识和论文,这里就不说起了。
/* Return the approximated cardinality of the set based on the harmonic
* mean of the registers values. 'hdr' points to the start of the SDS
* representing the String object holding the HLL representation.
*
* If the sparse representation of the HLL object is not valid, the integer
* pointed by 'invalid' is set to non-zero, otherwise it is left untouched.
*
* hllCount() supports a special internal-only encoding of HLL_RAW, that
* is, hdr->registers will point to an uint8_t array of HLL_REGISTERS element.
* This is useful in order to speedup PFCOUNT when called against multiple
* keys (no need to work with 6-bit integers encoding). */
uint64_t hllCount(struct hllhdr *hdr, int *invalid) {
double m = HLL_REGISTERS;
double E;
int j;
int reghisto[HLL_Q+2] = {0};
/* Compute register histogram */
if (hdr->encoding == HLL_DENSE) {
hllDenseRegHisto(hdr->registers,reghisto);
} else if (hdr->encoding == HLL_SPARSE) {
hllSparseRegHisto(hdr->registers,
sdslen((sds)hdr)-HLL_HDR_SIZE,invalid,reghisto);
} else if (hdr->encoding == HLL_RAW) {
hllRawRegHisto(hdr->registers,reghisto);
} else {
serverPanic("Unknown HyperLogLog encoding in hllCount()");
}
/* Estimate cardinality form register histogram. See:
* "New cardinality estimation algorithms for HyperLogLog sketches"
* Otmar Ertl, arXiv:1702.01284 */
//这里具体的修正流程,要去看论文,就照着抄过来实现就能够了。
double z = m * hllTau((m-reghisto[HLL_Q+1])/(double)m);
for (j = HLL_Q; j >= 1; --j) {
z += reghisto[j];
z *= 0.5;
}
z += m * hllSigma(reghisto[0]/(double)m);
E = llroundl(HLL_ALPHA_INF*m*m/z);
return (uint64_t) E;
}
复制代码
其实原理是很简单的,并且里面涉及到不少的数学知识,也是不能所有看懂,不得不感慨,redis对内存的节约是真的很变态的。对于sparse模式,节约的内存更加恐怖,由于这个其实对于hll算法的原理理解其实影响不大,本文就不作详细介绍了。
最后贴上我用golang模仿写的一个hyperloglog代码:
package goRedis
import (
"bytes"
"encoding/binary"
"fmt"
"math"
)
const seed = 0xadc83b19
const hll_dense = 1
const hll_sparse = 2
const hll_p = 14
const hll_q = 64 - hll_p
const hll_registers = 1 << hll_p
const hll_p_mask = hll_registers - 1
const hll_bits = 6
const hll_sparse_val_max_value = 32
const hll_alpha_inf = 0.721347520444481703680
type hllhdr struct {
magic string
encoding uint8
notused [3]uint8
card [8]uint64
registers []byte //实际存储的,由于后面若是encoding方式采用sparse的话,长度会变化,因此使用slice比较好
vaildCache bool
}
func initHLL(encoding uint8) *hllhdr {
hdr := new(hllhdr)
hdr.magic = "HYLL"
hdr.encoding = encoding
if encoding == hll_dense {
hdr.registers = make([]byte, hll_registers*1) // 先简单实现下 用一个字节存6个bit
} else {
panic("HLL SPARSE encoding format doesn't support.")
}
return hdr
}
func hllDenseSet(hllObj *hllhdr, index uint64, count int) bool {
if count > int(hllObj.registers[index]) {
hllObj.registers[index] = byte(count)
return true
}
return false
}
func PfAddCommand(hllObj *hllhdr, val []byte) {
index, count := hllPartLen(val)
if hllObj.encoding == hll_dense {
hllDenseSet(hllObj, index, count)
hllObj.vaildCache = false
} else {
panic("HLL SPARSE encoding format doesn't support.")
}
}
func hllTau(x float64) float64 {
if x == 0. || x == 1. {
return 0.
}
var zPrime float64
y := 1.0
z := 1 - x
for {
x = math.Sqrt(x)
zPrime = z
y *= 0.5
z -= math.Pow(1-x, 2) * y
if zPrime == z {
break
}
}
return z / 3
}
func hllDenseRegHisto(hllObj *hllhdr, reghisto *[hll_q + 2]int) {
for i := 0; i < hll_registers; i++ {
reg := hllObj.registers[i]
reghisto[reg]++
}
}
func hllSigma(x float64) float64 {
if x == 1. {
return math.MaxInt64
}
var zPrime float64
y := float64(1)
z := x
for {
x *= x
zPrime = z
z += x * y
y += y
if zPrime == z {
break
}
}
return z
}
func hllCount(hllObj *hllhdr) int {
m := float64(hll_registers)
var reghisto [hll_q + 2]int
if hllObj.encoding == hll_dense {
hllDenseRegHisto(hllObj, ®histo)
} else {
panic("impliment me..")
}
z := m * hllTau((m - (float64(reghisto[hll_q+1]))/m))
for j := hll_q; j >= 1; j-- {
z += float64(reghisto[j])
z *= 0.5
}
z += m * hllSigma(float64(reghisto[0])/m)
E := math.Round(hll_alpha_inf * m * m / z)
return int(E)
}
func PfCountCommand(hllObj *hllhdr) int {
var ret int
if hllObj.vaildCache {
return 0
} else {
ret = hllCount(hllObj)
}
return ret
}
func CreateHLLObject() *hllhdr {
hdr := initHLL(hll_dense)
return hdr
}
func Murmurhash(buff []byte, seed uint32) uint64 {
buffLen := uint64(len(buff))
m := uint64(0xc6a4a7935bd1e995)
r := uint32(47)
h := uint64(seed) ^ (buffLen * m)
for i := uint64(0); i < buffLen-(buffLen&7); {
var k uint64
bBuffer := bytes.NewBuffer(buff[i : i+8])
binary.Read(bBuffer, binary.LittleEndian, &k)
k *= m
k ^= k >> r
k *= m
h ^= k
h *= m
binary.Write(bBuffer, binary.LittleEndian, &k)
i += 8
}
switch buffLen & 7 {
case 7:
h ^= uint64(buff[6]) << 48
fallthrough
case 6:
h ^= uint64(buff[5]) << 40
fallthrough
case 5:
h ^= uint64(buff[4]) << 32
fallthrough
case 4:
h ^= uint64(buff[3]) << 24
fallthrough
case 3:
h ^= uint64(buff[2]) << 16
fallthrough
case 2:
h ^= uint64(buff[1]) << 8
fallthrough
case 1:
h ^= uint64(buff[0])
fallthrough
default:
h *= m
}
h ^= h >> r
h *= m
h ^= h >> r
return h
}
func hllPartLen(buff []byte) (index uint64, count int) {
hash := Murmurhash(buff, seed)
index = hash & uint64(hll_p_mask) //这里就是取出后14个bit,做为index
hash >>= hll_p //右移把后面14个bit清理掉,注意这里的bit流实际上是倒序的
hash |= uint64(1) << hll_q //当前的最高位设置1,实际上是一个哨兵,避免count为0
bit := uint64(1)
count = 1
for (hash & bit) == 0 {
count++
bit <<= 1
}
fmt.Printf("pf hash idx=%d, count=%d\n", index, count)
return index, count
}
//func hllSparseSet(o, index int64, count int64) {
// if count > hll_sparse_val_max_value {
// goto promote
// }
//
//promote:
//}
复制代码
测试代码:
func TestAll(t *testing.T) {
hllObj := CreateHLLObject()
test1 := []string{"apple", "apple", "orange", "ttt", "aaa"}
for _, str := range test1 {
PfAddCommand(hllObj, []byte(str))
}
println(PfCountCommand(hllObj))
}
复制代码