ANN的前身技术是NN(Neighbor Search),简单地说,最近邻检索就是根据数据的类似性,从数据集中寻找与目标数据最类似的项目,而这种类似性一般会被量化到空间上数据之间的距离,例如欧几里得距离(Euclidean distance),NN认为数据在空间中的距离越近,则数据之间的类似性越高。html
当须要查找离目标数据最近的前k个数据项时,就是k最近邻检索(K-NN)。python
近些年的研究中涌现出大量以最近邻检索为基本思想的方法,主要可分为两类:git
尽管出现了不少针对NN算法的改进措施,可是在实际工业场景中,NN算法遇到最大阻碍是:github
数据通过向量化(即特征工程)以后,由于特征空间特别高维(上百/上千/甚至上万),致使在空间距离上特别稀疏,维度越高这个现象越明显,这直接致使了NN的近邻搜索效果很差。笔者本身也一样在项目开发中尝试使用过NN算法,当发现NN搜索效果不佳时,反过来调整特征工程,而后再继续NN搜索,如此反复迭代,最终效果难以保证,由于你没法保证每一次的特征工程都能精确地表征出业务场景的类似性。web
举个例子来讲,咱们有一批恶意文件如今要对其进行聚类分析,首先咱们对其进行文本方面的特征工程,获得一个向量集合。由于基于专家经验获得的特征维度之间是彼此“正交”的,所以每一个特征向量之间的余弦类似性都不强,基于“空间距离度量”的聚类算法效果天然也不会很好。算法
并且另外一方面,由于特征空间的维度过高了(几百维、几千维),一些原本颇有用的”强贡献特征“可能会被淹没在大量的“弱贡献特征”中,这很好理解,看一下欧几里得空间的距离度量公式:网页爬虫
从公式中能够看到,全部维度都被“公平看待”,平方和开根起到了一个均值的做用,弱特征越多,强特征被“稀释”的影响就越大。特征不是越多越好,有时候太多无用的特征可能还会引发反效果。windows
换句话说,即便原本可能很“类似的文件”(例如同一个病毒家族的变种),可是在咱们设计的特征上却不能很好地体现。数组
面对这些问题,如何解决呢?安全
一个很天然的想法是,若是能有一种算法,能将类似的字符串,从高维空间降维到一个相对低维的空间中。同时,在这个低维空间中,语法/语义相近的字符串的夹角余弦相对较小,也即语法/语义相近的字符串在降维后彼此较为接近。
若是能实现上述两个目标,咱们不只能够有效实现对高维向量的降维,同时由于低维空间的向量间具有类似汇集性,咱们能够在接近线性的时间内,进行向量间距离评估,以及找到类似的文本。
这个章节咱们按照历史时间线来讨论学术界在面对语言模型中文本类似性这个课题分支时,一路走来遇到了哪些问题,整个时间线学术成果很是丰富,咱们这里只能摘取其主要节点进行推导式的讨论。
假如咱们有两段输入文本:
1. how are u? 2. how are you?
如今计算这两段文本的类似度,也即须要计算这两段文本的区别度,一个最简单直观的想法是直接基于原始的ascii序列逐位计算最小编辑距离:
1. u -> u 2. ? -> o 3. N/A -> u 3. N/A -> ?
即第一段文本经过4次修改便可获得第二段文本,因此这两个文本的类似度为:
(1 - 4 / (len(第一段文本) + len(第二段文本))) * 100% = (1 - 4 / (10 + 12)) * 100% = 81.81%
类似度为81%,这个结论怎么样?准吗?勉强好像可用,可是效果显然不太好,怎么办呢?
咱们开始思考,原始ascii字符空间对变换的感知很是敏感,有两个主要缘由:
从向量空间的角度来看,原始的ascii字符空间能够抽象为一个 N * 2的列向量组(ascii bytes vec,position vec),这里N表明着输入文本的length长度。
沿着线性空间的这个思考路线,咱们应该去找一个新的向量空间,该新向量空间与原始ascii字符空间相比,对变化的敏感度更低(包括对ascii修改、ascii位置变化)。
那对ascii字符变化的敏感度更低,怎么用数学思惟来理解这个概念呢?
这里须要引入线性映射的概念:
设 S 和 S' 是两个集合,若是存在一个法则f,使得集合S中每个元素a,都有集合 S' 中惟一肯定的元素b与它对应,则称 f 是S到 S' 的一个映射,记做:
咱们须要找到一种线性映射,将原始ascii序列中的 N * 2(ascii byte,position)向量组,降维映射成一个 M * 1(ascii sequence windows)向量组,这里 M 是新向量空间中的维度。
从投影降维理论视角咱们知道,降维后,原始空间中的position维度被彻底忽略了,而ascii byte这个维度被转换为ascii sequence windows这个新维度,这显然不是一个正交投影,即不是单射,由于原始输入文本中的一个ascii修改,可能会引发新空间里多个ascii sequence window的变化。
好,接下来的问题是,如何找到这个线性空间映射呢?这就是接下来要讨论的ngram分词算法。
假设咱们如今有3段输入文本:
[ 'This is the first document.', 'This is the second document.', 'Is this the first document?', ]
以单个word为一个slice window进行切词,即1-gram(unigram),获得:
[ { u'This', u'is', u'the', u'first', u'document' }, { u'This', u'is', u'the', u'second', u'document' }, { u'This', u'is', u'the', u'first', u'document' } ]
从1-gram slice结果中,咱们能够看到几点信息:
显然,1-gram的分词方案形成了信息的过分失真,致使了原始输入文本的语法结构被丢失了,这个问题怎么解决呢?显然,咱们须要引入相对位置(relative position)这个特征维度。
使用2-gram算法进行切词,获得:
[ { u'This', u'This is', u'is the', u'the first', u'first document', u'document' }, { u'This', u'This is', u'is the', u'the second', u'second document', u'document' }, { u'is', u'is This', , u'This the', u'the first', u'first document', u'document' } ]
从1-gram slice结果中,咱们能够看到几点信息:
从线性映射的角度看,在2-gram算法下,原始ascii序列中的 N * 2(ascii byte,position)向量组,降维映射成一个 M * N(ascii sequence windows,relative position)向量组,这里 M 表明了2-gram后的 gram token数量,N 表明了每一个gram分组内的word组合,2-gram token内的组合维度数为2,若是是3-gram,则组合维度数位6。
问题到这里就结束了吗?显然不是的,ngram算法虽然比纯粹的ascii逐字符比对各方面效果要好,可是还存在几个问题:
那解决问题的思路是什么呢?答案仍是降维,咱们须要继续寻找一个新的映射函数,将原始ascii字符空间映射到一个低维向量空间中。可是要注意,这个新的低维向量空间有几个技术指标须要知足:
基于上个章节讨论的3个技术指标,学术界开始了学术的研究和创新,演化出了两个不一样的方向:
LSH的核心是哈希散列、其次是降维、其次是语法/语义一致性、再其次是算法过程简单高效适合在大规模高并发场景中使用。
能够这么说,LSH经过牺牲了一部分的信息熵,即牺牲了一部分的语法/语义一致性,换取了超级高效的时间/计算复杂度,是一种很是优秀的算法思想,值得咱们不断深刻思考和学习。
可是换一个角度,若是对时间/空间复杂度没有那么高的需求,而是对语法/语义一致性有很高的要求,LSH算法家族可能就不必定很是适合了。
这个时候,另外一条思考脉轮就呈如今咱们的面前,即词向量/句子向量/文档向量,具体来讲,就是从2007年开始逐渐被提出的各类词向量降维表征方法,包括:
词向量普遍地被运用于NLP相关的任务中,关于这部分的详细讨论,笔者在另外一篇文章有所涉及。
从笔者本身经验来看,在大数据时代,LSH的使用场景相比词向量要相对少一些,笔者我的以为问题核心在于现代NLP任务中,对语义的精确表征能力要求愈来愈高,工程师和数据科学家经过不断地引入更庞大的数据集,引入更复杂的词向量算法,也是但愿尽量提升信息的利用率,尽可能少的丢失信息。
而相对的,对时间/空间复杂度有极高要求的场景可能只存在于一些极端的场景中,例如搜索引擎等。
Relevant Link:
https://www2007.cpsc.ucalgary.ca/papers/paper215.pdf google Detecting Near-Duplicates for Web Crawling. WWW2007 http://www2007.org/ https://www.iw3c2.org/blog/category/www2007/
在不少应用领域中,咱们面对和须要处理的数据每每是海量而且具备很高的维度(high dimensional spaces),同时数据中又广泛存在着近似相同的状况(例如类似的对话、类似的网页、类似的URl等),怎样快速地从海量的高维数据集合中找到与某个数据近似类似(approximate or exact Near Neighbor)的一个数据或多个数据,成为了一个难点和问题。
若是是低维的小数据集,咱们经过线性查找(Linear Search)就能够容易解决,但若是是对一个海量的高维数据集采用线性查找匹配的话,会很是耗时,这成为ANN被研究和发展的原动力。
在笔者所在的网络安全学科中,也经常会遇到不少局部不一样(locality change)的近似文本的识别与检测问题,例如:
面对海量高维数据背景下,还要进行高效的数据类似性搜索的需求,该从哪些方面进行思考解决方案呢?
要实现上述目标,咱们须要能找到一整套综合技术,能综合实现如下几个技术指标:
符合以上四点技术指标的算法被统称为ANN(Approximate Nearest Neighbor)算法。
Relevant Link:
https://www2007.cpsc.ucalgary.ca/papers/paper215.pdf
值得注意的是,对于第二章提到的ANN技术指标中的前两个,词向量和LSH均可以实现一样的效果,可是咱们本文的讨论对象LSH局部敏感哈希。
局部敏感哈希(LSH)核心思想是:在高维空间相邻的数据通过局部敏感哈希函数的映射投影转化到低维空间后,他们落入同一个吊桶(空间区间)的几率很大而不相邻的数据映射到同一个吊桶的几率则很小。
这种方法的主要难点在于如何寻找适合的局部敏感哈希函数,在原论文中,做者提出了局部敏感Hash函数的通常性定义:
咱们设定x和y的距离测定函数为d(x,y),这个d()函数能够是Jaccard函数/Hamming度量函数,也能够其余具有一样性能的函数。
在这个距离测定标准下,设定两个距离阈值d1,d2,且 d1 < d2。
若是一个函数族F的每个函数 f 知足:
那么称F为(d1,d2,p1,p2)-敏感的函数族,实际上,simhash就是一种(d1,d2,p1,p2)-敏感的函数。
左图是传统Hash算法,右图是LSH。红色点和绿色点距离相近,橙色点和蓝色点距离相近。
按照LSH的发展顺序,LSH家族的演变史以下:
咱们接下来逐个讨论其算法流程及其背后的思惟方式。
Relevant Link:
https://www.cnblogs.com/wt869054461/p/9234184.html http://people.csail.mit.edu/gregory/annbook/introduction.pdf https://www.cnblogs.com/wt869054461/p/9234184.html http://infolab.stanford.edu/~ullman/mmds/ch3.pdf https://www.cnblogs.com/fengfenggirl/p/lsh.html http://sawyersun.top/2016/Locality-Sensitive-Hashing.html
2008年IEEE Signal Process上有一篇文章Locality-Sensitive Hashing for Finding Nearest Neighbors是一篇较为容易理解的基于Stable Dsitrubution的投影方法的Tutorial。
其思想在于高维空间中相近的物体,投影(降维)后也相近。
三维空间中的四个点,红色圆形在三围空间中相近,绿色方块在三围空间中相距较远,那么投影后仍是红色圆形相距较近,绿色方块相距较远。
基于Stable Distribution的投影LSH,就是产生知足Stable Distribution的分布进行投影,最后将量化后的投影值做为value输出。
具体数学表示形式以下:给定特征向量v,Hash的每一bit的生成公式为:
其中:
须要注意的是,若是 x 抽样于高斯分布,那么ϕ(u,v)衡量的是L2 norm;若是 x 抽样于柯西分布,那么ϕ(u,v)衡量的是L1 norm。
更详细的介绍在Alexandr Andoni维护的LSH主页中,这就是LSH方法的鼻祖。
关于随机抽样涉及到随机过程方面的知识,能够参阅这篇帖子。关于高斯随机投影和柯西分布随机投影的讨论,能够参阅另外一篇blog。
Relevant Link:
http://www.slaney.org/malcolm/yahoo/Slaney2008-LSHTutorial.pdf https://www.cnblogs.com/LittleHann/p/6558575.html#_label2 https://www.zhihu.com/question/26694486/answer/242650962
Stable Distribution Projection从原理上没有什么大问题,其实后来改进的随机超平面和球面哈希算法,其底层思想上和Stable Distribution Projection没有太大的区别。
可是在实际操做中,Stable Distribution Projection存在几个比较明显的问题:
面对上述问题,Charikar改进了这种状况,提出了一种随机超平面投影LSH。能够参考论文《Multi-probe LSH: efficient indexing for high-dimensional similarity search》。
假设有一个M维高维数据向量x,咱们在M维空间中随机选择一个超平面,经过这个超平面来对数据进行切分。
这个动做总共进行N次,即经过N个随机超平面单位向量来对原始数据集进行切分,这里N就是降维后的向量维度。
超平面的选择是随机过程,不须要提早参数设定。
以下图所示,随机在空间里划几个超平面,就能够把数据分到不一样空间里,好比中间这个小三角的区域就能够赋值为110.
Hash的每一bit的数学定义式为:
x 是随机超平面单位向量,sgn是符号函数:
接下来咱们来讨论在随机超平面投影算法下,LSH哈希的产生原理是什么。
这时ϕ(u,v),也就是上述公式中的内积点乘计算,衡量的就是u和v的cosine距离,θ(u,v)表示向量u和v的夹角。
hyperplane projection的核心假设就是,两个向量越类似,则他们的cosine距离越小:
下图说明了该公式原理
能够看到,给定两个向量(图中的黑色箭头),只有在其法线的交叠区域(深蓝色区域)投影后的方向(sgn函数的值)才不相等,因此有:
,即蓝色区域面积占比整个圆,的比率等于u与v的夹角。
经过sgn符号函数的归一化,只要两个向量是同方向,无论距离远近,都统一归一化为1。
这样计算后的hash value值是比特形式的1和0,虽然带来了必定的信息丢失,可是免去了使用时须要再次归一化。
Relevant Link:
https://www.jiqizhixin.com/articles/2018-06-26-15 http://delivery.acm.org/10.1145/1330000/1325958/p950-lv.pdf?ip=42.120.75.135&id=1325958&acc=ACTIVE%20SERVICE&key=C8BAF422464E9FCC%2EC8BAF422464E9FCC%2E4D4702B0C3E38B35%2E4D4702B0C3E38B35&__acm__=1560846249_128df98f27ef856192df883b1ce48987 http://yangyi-bupt.github.io/ml/2015/08/28/lsh.html
spherical hash是在前人hyperplane hash的基础之上改进而来的,因此这里咱们首先来一块儿思考下hyperplane-base哈希算法都存在哪些问题。
sphericalplane hash超球体哈希算法就在这个背景下,在2012 CVPR上提出的。
面对上述几个问题,sphericalplane hash进行了算法理论和公式层面上的创新,咱们接下来详细讨论具体细节。
咱们知道,利用kernel space核空间技术,咱们能够将线性超平面映射为一个非线性超平面,这是创建在核函数的理论基础上的。可是研究发现,使用sphericalplane,由于球平面天生的封闭性,能够直接对高维空间进行partition分类,并得到比non-linear hyperplane更好的效果。
理论上说,若是须要切割出一个d维封闭空间,至少须要d+1个超平面,可是若是使用超球体,则最少值须要1个超球体便可,例以下图
值得注意的是,c个超球体划分出的有界封闭区域数是能够计算的,即:
同时,球哈希划分的区域是封闭且更紧凑的,每一个区域内样本的最大距离的平均值(bounding power)会更小,说明各个区域的样本是更紧凑的,以下图所示:
Average of maximum distances within a partition: ‐ Hyper‐spheres gives tighter bound!
经过渐进逼近的方法,迭代优化算法超参数,获得符合算法约束条件的近似最优解。
这里的约束条件指的是:
1. 咱们但愿每一个超球体把样本都是均分的,就是球内球外各占一半 2. 但愿每一个超球体的交叉部分不要太多,最多1/4,也就是每一个哈希函数相对独立
优化过程最重要的一个前提就是设定约束条件(constraint condition)。
这里首先先定义一些数学标记:
设为数据向量在单个超球体(单个hash function)内部(+1)仍是外部(-1)的几率。
设为单个超球体的半径。
Spherical Hashing是由c个不一样位置,不一样的大小的超球体组成的,对于c个超球体的总约束条件以下:
注意,约束2个1/4是一个理论极限值了,经过空间几何的相关知识能够证实,当两个球都近似将空间一分为二时,这两个球的交集的最小值就是1/4。
优化过程的伪码以下,咱们接下来逐步讨论:
采用随机采样的方式从样本集中采样m个样本,用于进行后续的优化过程。固然,若是你的算力足够,也能够将全部样本都做为训练集进行优化训练。
从样本集S中随机选择c个数据点做为初始的超球体中心。
值得注意的是,做者在使用kmeans获得c个聚类中心做为初始的超球体中心后,并无很明显提高实验结果,这反映了求哈希算法对初始值不是很是敏感。
接下来的迭代会不断会动态调整半径,以及动态移动球心的位置。为了方便计算,咱们定义下面两个辅助变量:
,1 ≤ i, j ≤ c。
对于来讲,咱们只要使其知足
便可。
对于来讲,咱们的目标是使其靠近m/4,经过计算当前值和目标值之间的残差累积和,获得一个回归值,原论文中使用了力的概念形象地说明了这个过程。
对于交叉样本太多的两个球心,赋予一个repulsive的力,对离得太远的两个球赋予一个attractive 的力。而后计算这些力的累加做用,更新球心,再根据目标一更新半径。对照上面算法伪码很容易理解该思想。
重复这个过程,直到知足收敛条件。
理论上说,优化的最终结果应该是的均值为m/4,方差为0,即彻底收敛,可是这很容易致使过拟合。
和不少渐进逼近的优化算法同样(例如gradient descent),球哈希算法设置了一个收敛近似精度,来提早中止优化,避免过拟合的发生。
算法对均值和方差设置了一个容忍精度阈值 和
,只要优化在一段步骤区间中,达到了这个容忍精度,即代表优化结果,中止优化。
在原论文中,做者对和
的值实验最佳值分别是10%和15%。
Relevant Link:
https://engineering.stanford.edu/people/moses-charikar http://xueshu.baidu.com/s?wd=charikar+%E2%80%9CRandom+Hyperplane%E2%80%9D&tn=SE_baiduxueshu_c1gjeupa&cl=3&ie=utf-8&bs=charikar+Random+Hyperplane&f=8&rsv_bp=1&rsv_sug2=0&sc_f_para=sc_tasktype%3D%7BfirstSimpleSearch%7D http://sglab.kaist.ac.kr/Spherical_Hashing/ https://blog.csdn.net/u014624632/article/details/79972100 http://sglab.kaist.ac.kr/Spherical_Hashing/Spherical_Hashing.pdf https://blog.csdn.net/u014624632/article/details/79972100 https://blog.csdn.net/zwwkity/article/details/8565485 https://www.bbsmax.com/A/LPdojpBG53/
Simhash是一种降维投影方法,它将一段文本映射为一段固定位数的二进制指纹(fixed length fingerprint),同时,这种fingerprint具备较好的语法/语义一致性。
它由google的Moses Charikar提出。整个算法很是简单精巧,咱们这章来阐述一下其算法过程。
使用ngram对文档进行token化分词,值得注意的是,n值的选取具备必定的技巧:
在分词后,计算每一个token的权重,能够经过ngram token词频统计获得w,也能够经过TF-IDF计算。无论用什么方式,核心是将ngram token的权重表征出来。
例如对”How are you?“这段话进行4-gram的切词能够获得:
ngram tokens frequency list:
{u'owar': 1, u'reyo': 1, u'howa': 1, u'eyou': 1, u'ware': 1, u'arey': 1}
注意,这里权重w为1,只是咱们举例比较简单,恰好的巧合。
将每一个ngram token都被转换为了一个散列hash,这个散列hash是随机均匀分布的,例如MD五、SHA-1算法。
(u'owar', 1): 333172464361321106773216808497407930520 (u'reyo', 1): 310879434437019318776469684649603935114 (u'howa', 1): 98593099505511350710740956016689849066 (u'eyou', 1): 32675000308058660898513414756955031020 (u'ware', 1): 325869966946114134008620588371145019154 (u'arey', 1): 110781832133915061990833609831166700777
这个哈希化过程主要是完成字符串的数字化。由于对token进行哈希处理的散列函数是像MD五、SHA1这种随机散列函数,散列后的空间是随机均匀的。所以不一样的token获得的散列值自己不包含任何信息熵。
那什么东西包含信息熵呢?笔者认为这里的传递下一步的信息熵有两项:
对每一个token hash进行逐位扫描,对某一个token hash来讲,若是某一位为1,则赋值一个该token的正权重;若是某位是0,则赋值为该token的负权重。
获得一个N x M矩阵,N为token数量,M为fingerprint向量V的长度,原论文中默认为64bit,咱们在实际开发中大多数也使用64bit,这是一个效率与效果比较折中的配置。
(u'owar', 1): [-1, -1, -1, 1, 1, -1, -1, 1, -1, -1, -1, -1, 1, 1, -1, -1, 1, -1, 1, 1, 1, 1, 1, -1, -1, -1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, -1, -1, 1, 1, -1, -1, 1, 1, 1, 1, 1, 1, -1, 1, -1, 1, -1, -1, 1, -1, 1, -1, 1, 1, 1, 1, -1] (u'reyo', 1): [-1, 1, -1, 1, -1, -1, -1, 1, 1, 1, -1, 1, -1, 1, -1, 1, -1, -1, -1, 1, 1, 1, 1, 1, -1, -1, 1, -1, 1, 1, -1, 1, -1, 1, -1, -1, -1, -1, -1, 1, 1, -1, 1, 1, 1, 1, 1, 1, 1, -1, 1, 1, -1, 1, -1, -1, 1, 1, 1, -1, 1, 1, -1, -1] (u'howa', 1): [-1, 1, -1, 1, -1, 1, 1, 1, -1, 1, -1, -1, -1, 1, -1, 1, -1, -1, 1, -1, 1, -1, -1, -1, -1, 1, 1, 1, -1, 1, -1, 1, 1, -1, 1, -1, 1, 1, -1, 1, -1, -1, -1, -1, -1, -1, -1, 1, 1, -1, 1, -1, -1, 1, 1, -1, 1, 1, 1, 1, -1, 1, -1, -1] (u'eyou', 1): [-1, -1, 1, 1, -1, 1, 1, 1, 1, -1, 1, -1, -1, -1, -1, 1, 1, 1, 1, -1, -1, -1, -1, -1, -1, 1, 1, 1, -1, -1, -1, -1, 1, 1, 1, 1, -1, -1, 1, 1, -1, -1, 1, 1, -1, -1, -1, 1, -1, -1, -1, -1, 1, -1, 1, -1, -1, -1, 1, -1, -1, -1, -1, -1] (u'ware', 1): [-1, 1, -1, -1, 1, -1, -1, -1, 1, 1, 1, 1, -1, -1, 1, -1, -1, 1, 1, -1, 1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, -1, -1, -1, -1, 1, 1, -1, -1, -1, -1, 1, 1, 1, -1, -1, 1, -1, 1, -1, -1, 1, 1, -1, 1, -1, -1, 1, 1, -1, 1, 1, -1, 1] (u'arey', 1): [1, -1, -1, 1, -1, 1, 1, 1, -1, -1, 1, 1, -1, 1, 1, 1, -1, 1, 1, 1, -1, -1, 1, 1, 1, 1, 1, -1, -1, 1, 1, 1, 1, -1, 1, 1, -1, -1, 1, -1, 1, 1, -1, 1, -1, 1, 1, 1, 1, -1, -1, -1, -1, -1, -1, 1, 1, 1, 1, 1, 1, -1, -1, -1]
这步有一个细节须要注意,即无论上一步token hash的位数多长,这一步都只进行fingerprint V长度的逐位扫描与翻译,这实际上使用裁剪cutoff的方式实现了降维,这种压缩映射会损失一部分准确性,引入必定的信息损失和误报,不过这和咱们选择的fingerprint V长度有关,咱们选的V越长,例如128bit,这种信息损失就越小。
从信息熵的角度来讲,这一步实际就是在将上一步传入的token权重这一信息进行翻译。
上一步获得的V是一个由token w组成的N x M矩阵,咱们逐位进行纵向的列维度sum压缩:
v: [-4, 0, -4, 4, -2, 0, 0, 4, 0, 0, 0, 0, -4, 2, -2, 2, -2, 0, 4, 0, 2, 0, 0, 0, -4, 2, 2, 2, -4, 4, -4, 2, 0, 0, 0, 2, -2, -4, -2, 2, 0, -2, 0, 4, -2, 0, 2, 4, 4, -6, 0, -2, 0, -2, 0, -2, 0, 4, 4, 0, 2, 2, -4, -4]
这一步经过将每一bit上的全部信息都压缩综合起来,获得最终的信息表达。
上一步获得的V是一个1 x M,这里M已经就是fingerprint长度的向量,默认为64bit,最后一步进行归一化。
逐位bit扫描当前fingerprint向量 V,若是其值>0,则归一化为1;若是其小于零,则归一化为0
v_: [0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 1, 0, 1, 0, 0, 0, 0, 1, 1, 1, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 1, 1, 0, 0]
用一张图来总结梳理一下上述的几个步骤:
这里笔者抛出一个问题来一块儿思考下,看起来在第四步已经获得了已经降维后的定长向量了,并且向量的每一个元素也都是由全部token综合起来获得的,应该可以表明原始的输入文本了,那为啥第五步还要画蛇添足进行一次0/1归一化呢?背后的原理是啥呢?
# -*- coding: utf-8 -*- from __future__ import division, unicode_literals import re import sys import hashlib import logging import numbers import collections from itertools import groupby if sys.version_info[0] >= 3: basestring = str unicode = str long = int else: range = xrange def _hashfunc(x): return int(hashlib.md5(x).hexdigest(), 16) class Simhash(object): def __init__( self, value, f=64, reg=r'[\w\u4e00-\u9fcc]+', hashfunc=None, log=None ): """ `f` is the dimensions of fingerprints `reg` is meaningful only when `value` is basestring and describes what is considered to be a letter inside parsed string. Regexp object can also be specified (some attempt to handle any letters is to specify reg=re.compile(r'\w', re.UNICODE)) `hashfunc` accepts a utf-8 encoded string and returns a unsigned integer in at least `f` bits. """ self.f = f self.reg = reg self.value = None if hashfunc is None: self.hashfunc = _hashfunc else: self.hashfunc = hashfunc if log is None: self.log = logging.getLogger("simhash") else: self.log = log if isinstance(value, Simhash): self.value = value.value elif isinstance(value, basestring): self.build_by_text(unicode(value)) elif isinstance(value, collections.Iterable): self.build_by_features(value) elif isinstance(value, numbers.Integral): self.value = value else: raise Exception('Bad parameter with type {}'.format(type(value))) def __eq__(self, other): """ Compare two simhashes by their value. :param Simhash other: The Simhash object to compare to """ return self.value == other.value def _slide(self, content, width=4): return [content[i:i + width] for i in range(max(len(content) - width + 1, 1))] def _tokenize(self, content): content = content.lower() content = ''.join(re.findall(self.reg, content)) ans = self._slide(content) # ngram slide into tokens list return ans def build_by_text(self, content): # 1. ngram分词 features = self._tokenize(content) # 2. ngram token词频统计,统计获得的词频将做为权重 features = {k:sum(1 for _ in g) for k, g in groupby(sorted(features))} print "ngram tokens frequency list: ", features return self.build_by_features(features) def build_by_features(self, features): """ `features` might be a list of unweighted tokens (a weight of 1 will be assumed), a list of (token, weight) tuples or a token -> weight dict. """ v = [0] * self.f # 初始化simhash fingerprint V,默认为64bit,每一个元素初始化为0 # 逐位为1的掩码,即[1], [10], [100]....[100000(64个)],这个掩码数组的做用是后面进行逐位提取 masks = [1 << i for i in range(self.f)] print "masks: ", masks if isinstance(features, dict): features = features.items() for f in features: v_ = [0] * self.f # 若是传入的是一个token string list,则默认每一个token string的权重都为1 if isinstance(f, basestring): # 经过散列哈希算法将每一个ngram token转换为一个hash序列 h = self.hashfunc(f.encode('utf-8')) w = 1 # 若是传入的是一个(token, wight)的list,则按照预约的weight进行计算,咱们本文默认采用ngram词频统计方式获得weight else: assert isinstance(f, collections.Iterable) h = self.hashfunc(f[0].encode('utf-8')) w = f[1] # 每一个ngram token都被转换为了一个散列hash,这个散列hash是随机均匀分布的 #print "{0}: ".format(f), h # 循环f次(本文是64bit),逐位进行扫描,若是某一位是1,则赋值为该token的正权重;若是某位是0,则赋值为该token的负权重 for i in range(self.f): #print "h & masks[i]: ", h & masks[i] v[i] += w if h & masks[i] else -w v_[i] += w if h & masks[i] else -w print "{0}: ".format(f), v_ # 在完成对全部ngram token的扫描后,fingerprint向量 V 的每一位bit都是全部token hash在该bit上的权重加和结果。 print "v: ", v ans = 0 # 逐位bit扫描当前fingerprint向量 V,若是其值>0,则归一化为1;若是其小于零,则归一化为0 v_ = [0] * self.f for i in range(self.f): if v[i] > 0: ans |= masks[i] v_[i] = 1 else: v_[i] = 0 print "v_: ", v_ self.value = ans def distance(self, another): assert self.f == another.f x = (self.value ^ another.value) & ((1 << self.f) - 1) ans = 0 while x: ans += 1 x &= x - 1 return ans class SimhashIndex(object): def __init__(self, objs, f=64, k=2, log=None): """ `objs` is a list of (obj_id, simhash) obj_id is a string, simhash is an instance of Simhash `f` is the same with the one for Simhash `k` is the tolerance """ self.k = k self.f = f count = len(objs) if log is None: self.log = logging.getLogger("simhash") else: self.log = log self.log.info('Initializing %s data.', count) self.bucket = collections.defaultdict(set) for i, q in enumerate(objs): if i % 10000 == 0 or i == count - 1: self.log.info('%s/%s', i + 1, count) self.add(*q) def get_near_dups(self, simhash): """ `simhash` is an instance of Simhash return a list of obj_id, which is in type of str """ assert simhash.f == self.f ans = set() for key in self.get_keys(simhash): dups = self.bucket[key] self.log.debug('key:%s', key) if len(dups) > 200: self.log.warning('Big bucket found. key:%s, len:%s', key, len(dups)) for dup in dups: sim2, obj_id = dup.split(',', 1) sim2 = Simhash(long(sim2, 16), self.f) d = simhash.distance(sim2) if d <= self.k: ans.add(obj_id) return list(ans) def add(self, obj_id, simhash): """ `obj_id` is a string `simhash` is an instance of Simhash """ assert simhash.f == self.f for key in self.get_keys(simhash): v = '%x,%s' % (simhash.value, obj_id) self.bucket[key].add(v) def delete(self, obj_id, simhash): """ `obj_id` is a string `simhash` is an instance of Simhash """ assert simhash.f == self.f for key in self.get_keys(simhash): v = '%x,%s' % (simhash.value, obj_id) if v in self.bucket[key]: self.bucket[key].remove(v) @property def offsets(self): """ You may optimize this method according to <http://www.wwwconference.org/www2007/papers/paper215.pdf> """ return [self.f // (self.k + 1) * i for i in range(self.k + 1)] def get_keys(self, simhash): for i, offset in enumerate(self.offsets): if i == (len(self.offsets) - 1): m = 2 ** (self.f - offset) - 1 else: m = 2 ** (self.offsets[i + 1] - offset) - 1 c = simhash.value >> offset & m yield '%x:%x' % (c, i) def bucket_size(self): return len(self.bucket)
使用时,import引入便可:
# -*- coding: utf-8 -*- from simhash import Simhash, SimhashIndex if __name__ == '__main__': sh = Simhash('How are you? I Am fine. ablar ablar xyz blar blar blar blar blar blar blar Thanks.') sh2 = Simhash('How are you i am fine.ablar ablar xyz blar blar blar blar blar blar blar than') dis = sh.distance(sh2) print "sh: ", sh.value print "sh2: ", sh2.value print "dis: ", dis
Relevant Link:
https://www.mit.edu/~andoni/LSH/ http://www.cs.princeton.edu/courses/archive/spr04/cos598B/bib/CharikarEstim.pdf http://people.csail.mit.edu/indyk/ https://blog.csdn.net/laobai1015/article/details/78011870 https://github.com/LittleHann/simhash https://www.cnblogs.com/hxsyl/p/4518506.html https://zhuanlan.zhihu.com/p/32078737 https://www.kancloud.cn/kancloud/the-art-of-programming/41614 https://wizardforcel.gitbooks.io/the-art-of-programming-by-july/content/06.03.html http://yanyiwu.com/work/2014/01/30/simhash-shi-xian-xiang-jie.html https://www.cnblogs.com/maybe2030/p/5203186.html
笔者认为Simhash之因此能够实现局部敏感,主要原有有两个:
simhash的hash不是直接经过原始输入文本计算获得的,而是经过ngram分片,将原始输入文本经过滑动窗口分片获得slice token列表,对每个slice token分别经过某种合理的方式计算一段hash,而后经过某种合理的方式将全部hash综合起来,获得最终的hash。
咱们经过一个例子来讲明,假设有两段文本:
1. how are u? 2. how are you?
分别使用4-gram进行切片,获得:
1. [u'howa', u'owar', u'ware', u'areu'] 2. [u'howa', u'owar', u'ware', u'arey', u'reyo', u'eyou']
能够看到,由于ngram切片的缘由,输入文本中的修改只影响到最终ngram list中的最后3个slice token,从而输入文本对最终Hash的影响也从整个散列空间缩小到了最后3个slice token中,这就是所谓的局部敏感算法。
其实基于ngram的切片式特征工程自己就是一个有损信息抽取的特征提取方式,这种信息损失,一方面损失了精度,可是另外一方面也带来了对输入局部修改的容忍度。
可是simhash仅仅是感知局部slice token的变化吗?不是,光一个rooling slice checksum是没法提供足够的局部修改容忍度的。
除了rooling piece wise分片思想以外,Simhash还引入了”Sice Token权重思想“,即每一个slice Token具体对最终的Hash能产生多大的影响,取决于这些slice Token的权重。
咱们仍是用一个例子来讲明这句话的意思,假设有三段文本:
1. how are u? 2. how are you? 3. how are u? and u? and u? and u? and u?
能够看到,这3段文本都不同,可是若是咱们以第一段文本为基准,能够发现另外2段文本的修改程度是不同的。
仍是使用4-gram进行切片,获得slice token list:
1. [u'howa', u'owar', u'ware', u'areu'] 2. [u'howa', u'owar', u'ware', u'arey', u'reyo', u'eyou'] 3. [u'howa', u'owar', u'ware', u'areu', u'reua', u'euan', u'uand', u'andu', u'ndua', u'duan', u'uand', u'andu', u'ndua', u'duan', u'uand', u'andu', u'ndua', u'duan', u'uand', u'andu']
能够看到,后两个输入文本都形成了不少slice token的改变。那最终的simhash受了多少影响呢?影响slice token数量多就是影响多吗?
simhash在slice token之上,还引入了slice token weight一维度信息,simhash不只统计受影响的slice token,还会统计每一个slice token的权重(例如是词频统计,也能够是TF-iDF)。
例如对上面的slice token list进行词频统计得:
1. {u'ware': 1, u'owar': 1, u'howa': 1, u'areu': 1} 2. {u'ware': 1, u'owar': 1, u'howa': 1, u'arey': 1, u'reyo': 1, , u'eyou': 1} 3. {u'ware': 1, u'owar': 1, u'howa': 1, u'areu': 1, u'reua': 1, , u'euan': 1, u'ndua': 3, , u'duan': 3, u'andu': 4, u'uand': 4}
能够看到,第二个文本虽然变更了2个slice token,可是权重不高,对最终的hash的影响有限。可是第三个文本中,不只出现了较多token变更,并且每一个token的权重比较高,它们对最终hash的影响就相对很大了。
为了说明上述的观点,咱们来运行一段示例代码:
# -*- coding: utf-8 -*- from simhash import Simhash, SimhashIndex if __name__ == '__main__': sh1 = Simhash('how are u?') sh2 = Simhash('how are you?') sh3 = Simhash('how are u? and u? and u? and u? and u?') dis_1_2 = sh1.distance(sh2) dis_1_3 = sh1.distance(sh3) print "sh1: ", sh1.value print "sh2: ", sh2.value print "sh3: ", sh3.value print "dis_1_2: ", dis_1_2 print "dis_1_3: ", dis_1_3
能够看到,文本2和文本1的距离,小于文本3和文本1的距离。
笔者认为,Simhash比Ssdeep效果好的主要缘由之一就在于这第二点,即Slice Token权重思想,借助权重均值化这种hash化方法,使得Simhash对多处少许的局部能够具有更大的容忍度。
这里提醒读者朋友注意一个细节,simhash的降维过程分红了2个环节。第一个环节中,原始ascii特征空间被降维到了ngram token特征空间;第二个环节中,ngram token特征空间被降维到了一个定长的fingerprint hashbit空间中,第二步降维的本质上也是一个线性变换的过程,从矩阵列向量的角度能够看的很是明显。
Simhash算法与上文提到随机超平面哈希之间是什么关系呢?一言以蔽之:Simhash是随机超平面投影的一种特殊实现,本质上属于随机超平面投影的一种。
怎么理解这句话呢?笔者带领你们从列向量的视角来从新审视一下simhash的计算过程。simhash的具体原理这里再也不赘述,文章前面已经详细讨论过了,这里直接进入正题。
假设输入文本通过ngram以后获得5个词token,并经过词频统计获得这5个词token的权重向量d,d = (w1=1,w2=2,w3=0,w4=3,w5=0)
simhash中是经过散列哈希的方法获得每一个词token的一个向量化表示,这里咱们抓住其本质,即散列哈希每个词token的本质目的就是为了定义一个低维的向量空间。
假设这5个词token对应的3维向量分别为:
h(w1) = (1, -1, 1) h(w2) = (-1, 1, 1) h(w3) = (1, -1, -1) h(w4) = (-1, -1, 1) h(w5) = (1, 1, -1)
按照simhash的算法,是将每一个词token向量乘上对应的权重w,而后再按照列相加起来,即
m = w1 * h(w1) + w2 * h(w2) + w3 * h(w3) + w4 * h(w4) + w5 * h(w5) = 1 * h(w1) + 2 * h(w2) + 0 * h(w3) + 3 * h(w4) + 0 * h(w5) = (-4, -2, 6)
实际上,上述过程可使用列向量矩阵的方式来一步完成:
接下来simhash的0/1归一化,其实就是sgn符号函数。
能够看到,simhash算法产生的结果与随机超平面投影的结果是一致的。
更进一步地说,在simhash中,随机超平面,被词token的权重向量代替了,词token权重向量做为超平面和原始向量进行内积计算,计算其夹角。
simhash算法获得的两个签名的汉明距离,能够用来衡量原始向量的夹角。
Relevant Link:
http://www.cs.princeton.edu/courses/archive/spr04/cos598B/bib/CharikarEstim.pdf
这个章节,咱们继续深刻讨论simhash的算法的底层思想。无论simhash流程如何复杂,其本质是对原始数据应用一个矩阵变换,经过线性变换的方式转换向量基,将原始数据转换到另外一个向量空间中。
假设输入文本中包含 n 个字符,原始字符向量空间维度为m,原始的数据向量矩阵为:
,M * N的矩阵。
左乘上一个 K*M 矩阵:
,K * N的矩阵。
左乘上一个 K*N 对角矩阵:
左乘上一个 P*K 矩阵:
能够看到,simhash中不一样步骤,分别对应了不一样的矩阵运算:
Relevant Link:
https://www.cnblogs.com/LittleHann/p/10859016.html#_label7
前面讨论的几种LSH算法,基本能够解决通常状况下的问题,不过对于某些特定状况仍是不行,好比:
其实若是咱们从计算公式的角度来看前面讨论的几种LSH,发现其形式均可以表示成内积的形式,提到内积天然会想到kernel方法,LSH也一样可使用kernel核方法,关于Kernel LSH的工做可参看下面这三篇文章。
Relevant Link:
http://www.robots.ox.ac.uk/~vgg/rg/papers/klsh.pdf 2009年ICCV上的 Kernelized Locality-Sensitive Hashing for Scalable Image Search http://machinelearning.wustl.edu/mlpapers/paper_files/NIPS2009_0146.pdf 2009年NIPS上的Locality-Sensitive Binary Codes From Shift-Invariant Kernels http://pages.cs.wisc.edu/~brecht/papers/07.rah.rec.nips.pdf 2007年NIPS上的Random Features for Large-Scale Kernel Machines
模糊哈希算法,又叫基于内容分割的分片分片哈希算法(context triggered piecewise hashing, CTPH)。
笔者认为,ssdeep算法的主要思想有如下几点:
Dynamic piecewise hashing动态分片哈希思想:基于输入文本的长度,进行动态分片,将局部的改变限制在一个有限长度的窗口内。
ssdeep的分片不是ngram那种固定size的滑动窗口机制,而是根据输入文本的长度动态算出的一个n值。
咱们知道,即使对弱哈希,都具有随机均匀散列的性质,即产生的结果在其映射空间上是接近于均匀分布的。
在ssdeep中,n的值始终取2的整数次方,这样Alder-32哈希值(每一个byte的滚动hash)除以n的余数也接近于均匀分布。仅当余数等于n-1时分片,就至关于只有差很少1/n的状况下会分片。也就是说,对一个文件,没往前读取一个byte,就有1/n的可能要分片。
在ssdeep中,每次都是将n除以或者乘以2,来调整,使最终的片数尽量在32到64之间。
bs = 3 while bs * MAX_LENGTH < length: bs *= 2
同时在ssdeep中,n的值会做为一个最终结果的一部分出现,在比较的时候,n会做为一个考量因素被计入考量,具体细节后面会讨论。
上述策略下,一个新问题出现了。这是一种比较极端的状况。假设一个文件使用的分片值n。在该文件中改动一个字节(修改、插入、删除等),且这个改动影响了分片的数量,使得分片数增长或减小,例如把n乘以或者除以2。所以,即使对文件的一个字节改动,也可能致使分片条件n的变化,从而致使分片数相差近一倍,而获得的结果可能会发生巨大的变化,如何解决这个问题?
ssdeep解决这种问题的思考是加入冗余因子,将边界状况也归入进来。
对每个文件,它同时使用n和n/2做为分片值,算得两个不一样的模糊哈希值,而这两个值都使用。所以,最后获得的一个文件的模糊哈希值是:
n : h(n) : h(n/2)
而在比较时,若是两个文件的分片值分别为n和m,则判断是否有n==m, n==2m, 2n==m三种状况,若是有之一,则将二者相应的模糊哈希值进行比较。例如,若是n==2m,则比较h(n/2)与h(m)是否类似。这样,在必定程序上解决了分片值变化的问题。
ssdeep逐字节读取输入文本内容,并采用滚动哈希算法(rolling hashing)不断叠加式计算最新的hash,在ssdeep中,使用Alder-32 [4] 算法做为弱哈希。它实际是一种用于校验和的弱哈希,相似于CRC32,不能用于密码学算法,但计算快速,生成4字节哈希值,而且是滚动哈希。
获得了当前byte对应的滚动hash值后,ssdeep基于动态分片阈值(上一节讨论过)以及滚动Hash的当前State状态值动态决定每一步(byte)是否分片。
哈希值除以n的余数刚好等于n-1时,就在当前位置分片
和simhash同样,对每一个token进行随机散列哈希化,可使用传统的哈希算法,例如MD5。在ssdeep中,使用一个名为Fowler-Noll-Vo hash的哈希算法。
这一步没有什么特别意义,纯粹是一个信息传递过程。
对每个文件分片,计算获得一个哈希值之后,能够选择将结果压缩短。例如,在ssdeep中,只取FNV(Fowler-Noll-Vo hash的哈希算法)哈希结果的最低6位,并用一个ASCII字符表示出来,做为这个分片的最终哈希结果。
这一步的压缩映射损失了一部分的信息,可是带来了必定的冗余度的提高。
将每片压缩后的哈希值链接到一块儿,就获得这个文件的模糊哈希值了(hash)。若是分片条件参数n对不一样文件可能不一样,还应该将n归入模糊哈希值中。
':'.join([str(bs), hash1, hash2])
注意,上文提到的h(n)和h(n/2)都要拼接进来
在ssdeep中,采用的以下思路。因为ssdeep对每一片获得的哈希值是一个ASCII字符,最终获得的文件模糊哈希值就是一个字符串了。假设是s一、s2,将s1到s2的“加权编辑距离”(weighted edit distance)做为评价其类似性的依据。
接下来,ssdeep将这个距离除以s1和s2的长度和,以将绝对结果变为相对结果,再映射到0-100的一个整数值上,其中,100表示两个字符串彻底一致,而0表示彻底不类似。
咱们来模拟分析一下模糊哈希是如何面对不一样程度的文本修改,以及又是如何在各类修改状况下进行类似性分析的,经过这个例子咱们能够更清晰地理解ssdeep的工做原理。
咱们以概括推理的方法来展开分析,无论对原始输入文本进行如何程度的修改,均可以从单个字符的修改这里推演获得,复杂的增删改查是简单原子的修改的组合与叠加,这是部分与总体的关系。
若是在一个输入文本中修改一个字节,对ssdeep hash来讲,有几种状况:
# -*- coding: utf-8 -*- import numpy as np import collections import doctest import pprint def INSERTION(A, cost=1): return cost def DELETION(A, cost=1): return cost def SUBSTITUTION(A, B, cost=1): return cost Trace = collections.namedtuple("Trace", ["cost", "ops"]) class WagnerFischer(object): # Initializes pretty printer (shared across all class instances). pprinter = pprint.PrettyPrinter(width=75) def __init__(self, A, B, insertion=INSERTION, deletion=DELETION, substitution=SUBSTITUTION): # Stores cost functions in a dictionary for programmatic access. self.costs = {"I": insertion, "D": deletion, "S": substitution} # Initializes table. self.asz = len(A) self.bsz = len(B) self._table = [[None for _ in range(self.bsz + 1)] for _ in range(self.asz + 1)] # From now on, all indexing done using self.__getitem__. ## Fills in edges. self[0][0] = Trace(0, {"O"}) # Start cell. for i in range(1, self.asz + 1): self[i][0] = Trace(self[i - 1][0].cost + self.costs["D"](A[i - 1]), {"D"}) for j in range(1, self.bsz + 1): self[0][j] = Trace(self[0][j - 1].cost + self.costs["I"](B[j - 1]), {"I"}) ## Fills in rest. for i in range(len(A)): for j in range(len(B)): # Cleans it up in case there are more than one check for match # first, as it is always the cheapest option. if A[i] == B[j]: self[i + 1][j + 1] = Trace(self[i][j].cost, {"M"}) # Checks for other types. else: costD = self[i][j + 1].cost + self.costs["D"](A[i]) costI = self[i + 1][j].cost + self.costs["I"](B[j]) costS = self[i][j].cost + self.costs["S"](A[i], B[j]) min_val = min(costI, costD, costS) trace = Trace(min_val, set()) # Adds _all_ operations matching minimum value. if costD == min_val: trace.ops.add("D") if costI == min_val: trace.ops.add("I") if costS == min_val: trace.ops.add("S") self[i + 1][j + 1] = trace # Stores optimum cost as a property. self.cost = self[-1][-1].cost def __repr__(self): return self.pprinter.pformat(self._table) def __iter__(self): for row in self._table: yield row def __getitem__(self, i): """ Returns the i-th row of the table, which is a list and so can be indexed. Therefore, e.g., self[2][3] == self._table[2][3] """ return self._table[i] # Stuff for generating alignments. def _stepback(self, i, j, trace, path_back): """ Given a cell location (i, j) and a Trace object trace, generate all traces they point back to in the table """ for op in trace.ops: if op == "M": yield i - 1, j - 1, self[i - 1][j - 1], path_back + ["M"] elif op == "I": yield i, j - 1, self[i][j - 1], path_back + ["I"] elif op == "D": yield i - 1, j, self[i - 1][j], path_back + ["D"] elif op == "S": yield i - 1, j - 1, self[i - 1][j - 1], path_back + ["S"] elif op == "O": return # Origin cell, so we"re done. else: raise ValueError("Unknown op {!r}".format(op)) def alignments(self): """ Generate all alignments with optimal-cost via breadth-first traversal of the graph of all optimal-cost (reverse) paths implicit in the dynamic programming table """ # Each cell of the queue is a tuple of (i, j, trace, path_back) # where i, j is the current index, trace is the trace object at # this cell, and path_back is a reversed list of edit operations # which is initialized as an empty list. queue = collections.deque( self._stepback(self.asz, self.bsz, self[-1][-1], [])) while queue: (i, j, trace, path_back) = queue.popleft() if trace.ops == {"O"}: # We have reached the origin, the end of a reverse path, so # yield the list of edit operations in reverse. yield path_back[::-1] continue queue.extend(self._stepback(i, j, trace, path_back)) def IDS(self): """ Estimates insertions, deletions, and substitution _count_ (not costs). Non-integer values arise when there are multiple possible alignments with the same cost. """ npaths = 0 opcounts = collections.Counter() for alignment in self.alignments(): # Counts edit types for this path, ignoring "M" (which is free). opcounts += collections.Counter(op for op in alignment if op != "M") npaths += 1 # Averages over all paths. return collections.Counter({o: c / npaths for (o, c) in opcounts.items()}) FNV_PRIME = 0x01000193 FNV_INIT = 0x28021967 MAX_LENGTH = 64 B64 = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/" class Last7chars(object): def __init__(self): self._reset_rollhash() def _reset_rollhash(self): self.roll_h1 = 0 self.roll_h2 = 0 self.roll_h3 = 0 self.ringbuffer = [0] * 7 self.writeindex = 0 def _roll_hash(self, char): char7bf = self.readwrite(char) self.roll_h2 += 7 * char - self.roll_h1 self.roll_h1 += char - char7bf self.roll_h3 <<= 5 self.roll_h3 &= 0xffffffff self.roll_h3 ^= char return self.roll_h1 + self.roll_h2 + self.roll_h3 def readwrite(self, num): retval = self.ringbuffer[self.writeindex] self.ringbuffer[self.writeindex] = num self.writeindex = (self.writeindex + 1) % 7 return retval def __repr__(self): arr = self.ringbuffer[ self.writeindex:] + self.ringbuffer[:self.writeindex] return " ".join(map(str, arr)) def _update_fnv(fnvhasharray, newchar): fnvhasharray *= FNV_PRIME fnvhasharray &= 0xffffffff fnvhasharray ^= newchar return fnvhasharray def _calc_initbs(length): bs = 3 while bs * MAX_LENGTH < length: bs *= 2 if bs > 3: #proably checking for integer overflow here? return bs return 3 def ssdeep_hash(content): bs = _calc_initbs(len(content)) #print "bs: ", bs hash1 = '' hash2 = '' last7chars = Last7chars() while True: last7chars._reset_rollhash() fnv1 = FNV_INIT fnv2 = FNV_INIT hash1 = '' hash2 = '' fnvarray = np.array([fnv1, fnv2]) for i in range(len(content)): # 逐bytes扫描 c = ord(content[i]) # 使用Alder-32 [4] 算法做为弱哈希。它实际是一种用于校验和的弱哈希,相似于CRC32,不能用于密码学算法,但计算快速,生成4字节哈希值,而且是滚动哈希。 h = last7chars._roll_hash(c) #print "h_roll_hash: ", h fnvarray = _update_fnv(fnvarray, c) # 当Alder-32哈希值除以n的余数刚好等于n-1时,就在当前位置分片;不然,不分片,窗口日后滚动一个字节,而后再次计算Alder-32哈希值并判断,如此继续 # 1. 使用bs做为分片值 if h % bs == (bs - 1) and len(hash1) < (MAX_LENGTH - 1): # 对每片分别计算哈希了。可使用传统的哈希算法,例如MD5。在ssdeep中,使用一个名为Fowler-Noll-Vo hash的哈希算法 b64char = B64[fnvarray[0] & 63] hash1 += b64char fnvarray[0] = FNV_INIT # 2. 使用2*bs做为分片值 if h % (2 * bs) == (2 * bs - 1) and len(hash2) < ( MAX_LENGTH / 2 - 1): b64char = B64[fnvarray[1] & 63] hash2 += b64char fnvarray[1] = FNV_INIT # 将每片压缩后的哈希值链接到一块儿,就获得这个文件的模糊哈希值了 hash1 += B64[fnvarray[0] & 63] # 对每个文件分片,计算获得一个哈希值之后,能够选择将结果压缩短。例如,在ssdeep中,只取FNV哈希结果的最低6位,并用一个ASCII字符表示出来,做为这个分片的最终哈希结果 hash2 += B64[fnvarray[1] & 63] # 这里 &63,等价于取最低6bit if bs <= 3 or len(hash1) > (MAX_LENGTH / 2): break bs = int(bs / 2) if bs < 3: bs = 3 # 对每个文件,它同时使用n和n/2做为分片值,算得两个不一样的模糊哈希值,而这两个值都使用。所以,最后获得的一个文件的模糊哈希值是: n:h(n):h(n/2) return ':'.join([str(bs), hash1, hash2]) #from https://en.wikibooks.org/wiki/Algorithm_Implementation/Strings/Longest_common_substring#Python_2 def longest_common_substring(s1, s2): m = [[0] * (1 + len(s2)) for i in xrange(1 + len(s1))] longest, x_longest = 0, 0 for x in xrange(1, 1 + len(s1)): for y in xrange(1, 1 + len(s2)): if s1[x - 1] == s2[y - 1]: m[x][y] = m[x - 1][y - 1] + 1 if m[x][y] > longest: longest = m[x][y] x_longest = x else: m[x][y] = 0 return s1[x_longest - longest:x_longest] def _likeliness(min_lcs, a, b): # 若是最长公共子串长度不知足要求,则直接退出 if longest_common_substring(a, b) < min_lcs: return 0 # Wagner Fischer算法(字符串编辑距离,Edit Distance) dist = WagnerFischer(a, b).cost # ssdeep将这个距离除以s1和s2的长度和,以将绝对结果变为相对结果,再映射到0-100的一个整数值上,其中,100表示两个字符串彻底一致,而0表示彻底不类似 dist = int(dist * MAX_LENGTH / (len(a) + len(b))) dist = int(100 * dist / 64) if dist > 100: dist = 100 return 100 - dist def ssdeep_compare(hashA, hashB, min_lcs=7): bsA, hs1A, hs2A = hashA.split(':') #blocksize, hash1, hash2 bsB, hs1B, hs2B = hashB.split(':') bsA = int(bsA) bsB = int(bsB) like = 0 # 在比较时,若是两个文件的分片值分别为n和m,则判断是否有n==m, n==2m, 2n==m三种状况,若是有之一,则将二者相应的模糊哈希值进行比较。例如,若是n==2m,则比较h(n/2)与h(m)是否类似 #block size comparison if bsA == bsB: #compare both hashes like1 = _likeliness(min_lcs, hs1A, hs1B) like2 = _likeliness(min_lcs, hs2A, hs2B) like = max(like1, like2) elif bsA == 2 * bsB: # Compare hash_bsA with hash_2*bsB like = _likeliness(min_lcs, hs1A, hs2B) elif 2 * bsA == bsB: # Compare hash_2*bsA with hash_bsB like = _likeliness(min_lcs, hs2A, hs1B) else: #nothing suitable to compare like = 0 return like if __name__ == '__main__': import sys content1 = "this is a test!" content2 = "this is a test." hash1 = ssdeep_hash(content1) print hash1 hash2 = ssdeep_hash(content2) print hash2 similarity = ssdeep_compare(hash1, hash2) print similarity
Relevant Link:
https://github.com/LittleHann/ssdeeppy https://ssdeep-project.github.io/ssdeep/ https://www.claudxiao.net/2012/02/fuzzy_hashing/#comment-457473