pyspark minHash LSH 查找类似度

先看看官方文档:

MinHash for Jaccard Distance

MinHash is an LSH family for Jaccard distance where input features are sets of natural numbers. Jaccard distance of two sets is defined by the cardinality of their intersection and union:html

d(A,B)=1|AB||AB|d(A,B)=1−|A∩B||A∪B|

MinHash applies a random hash function g to each element in the set and take the minimum of all hashed values:java

h(A)=minaA(g(a))h(A)=mina∈A(g(a))

 

The input sets for MinHash are represented as binary vectors, where the vector indices represent the elements themselves and the non-zero values in the vector represent the presence of that element in the set. While both dense and sparse vectors are supported, typically sparse vectors are recommended for efficiency. For example, Vectors.sparse(10, Array[(2, 1.0), (3, 1.0), (5, 1.0)]) means there are 10 elements in the space. This set contains elem 2, elem 3 and elem 5. All non-zero values are treated as binary “1” values.python

Note: Empty sets cannot be transformed by MinHash, which means any input vector must have at least 1 non-zero entry.sql

Refer to the MinHashLSH Python docs for more details on the API.apache

from pyspark.ml.feature import MinHashLSH from pyspark.ml.linalg import Vectors from pyspark.sql.functions import col dataA = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),), (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),), (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)] dfA = spark.createDataFrame(dataA, ["id", "features"]) dataB = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),), (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),), (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)] dfB = spark.createDataFrame(dataB, ["id", "features"]) key = Vectors.sparse(6, [1, 3], [1.0, 1.0]) mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5) model = mh.fit(dfA) # Feature Transformation print("The hashed dataset where hashed values are stored in the column 'hashes':") model.transform(dfA).show() # Compute the locality sensitive hashes for the input rows, then perform approximate # similarity join. # We could avoid computing hashes by passing in the already-transformed dataset, e.g. # `model.approxSimilarityJoin(transformedA, transformedB, 0.6)` print("Approximately joining dfA and dfB on distance smaller than 0.6:") model.approxSimilarityJoin(dfA, dfB, 0.6, distCol="JaccardDistance")\ .select(col("datasetA.id").alias("idA"), col("datasetB.id").alias("idB"), col("JaccardDistance")).show() # Compute the locality sensitive hashes for the input rows, then perform approximate nearest # neighbor search. # We could avoid computing hashes by passing in the already-transformed dataset, e.g. # `model.approxNearestNeighbors(transformedA, key, 2)` # It may return less than 2 rows when not enough approximate near-neighbor candidates are # found. print("Approximately searching dfA for 2 nearest neighbors of the key:") model.approxNearestNeighbors(dfA, key, 2).show() 
Find full example code at "examples/src/main/python/ml/min_hash_lsh_example.py" in the Spark repo.

[PySpark] LSH类似度计算

1、问题场景

假设咱们要找海量用户中哪些是行为类似的——api

用户A:数组

id: 1001
name: 用户A
data: "07:00 吃早餐,09:00 工做,12:00 吃午餐,13:00 打王者,18:00 吃晚饭,22:00 睡觉"
mat: "1010001000010001100001101010000"

用户B:app

id: 1002
name: 用户B
data: "07:00 晨运,08:00 吃早餐,12:30 吃午餐,13:00 学习,19:00 吃晚饭,21:00 学习,23:00 睡觉"
mat: "1110001000010000001011101010000"

用户C:......less

mat是对用户的数据的特征化描述,好比能够定义第一位为“早起”,第二位为“晨运”,第三位为“吃早餐”,那么咱们有了这个矩阵,怎么找到和他相近行为习惯的人呢?dom

从描述的one-hot向量中,咱们看到A和B其实有不少类似性,但有部分不一样,好比A打王者、可是B爱学习——

用户A: "1010001000010000001001101010000"
用户B: "1110001000010000000011101010000"

这就能够用LSH大法了。

2、思路介绍

Q:LSH类似度用来干吗?
A:全称是“局部敏感哈希”(Locality Sensitive Hashing)。能在特征向量类似又不彻底相同的状况下,找出尽量近的样本。固然了,仍是须要先定义好特征,再用LSH方法。

参考资料:Extracting, transforming and selecting features(特征的提取,转换和选择)

工做中的问题是如何在海量数据中跑起来,pyspark实现时,有MinHashLSH, BucketedRandomProjectionLSH两个选择。

MinHashLSH

MinHash 是一个用于Jaccard 距离的 LSH family,它的输入特征是天然数的集合。 两组的Jaccard距离由它们的交集和并集的基数定义:

MinHash 将随机哈希函数g应用于集合中的每一个元素,并取得全部哈希值中的最小值。

BucketedRandomProjectionLSH(欧几里得度量的随机投影)

随机桶投影是用于欧几里德距离的 LSH family。 欧氏度量的定义以下:

其LSH family将向量x特征向量映射到随机单位矢量v,并将映射结果分为哈希桶中:

其中r是用户定义的桶长度,桶长度可用于控制哈希桶的平均大小(所以也可用于控制桶的数量)。 较大的桶长度(即,更少的桶)增长了将特征哈希到相同桶的几率(增长真实和假阳性的数量)。

桶随机投影接受任意向量做为输入特征,并支持稀疏和密集向量。

3、代码实现

很少说了,直接上代码吧。

import os import re import hashlib from pyspark import SparkContext, SparkConf from pyspark import Row from pyspark.sql import SQLContext, SparkSession from pyspark.sql.functions import * from pyspark.sql.types import * from pyspark.sql.functions import udf,collect_list, collect_set from pyspark.ml.feature import MinHashLSH, BucketedRandomProjectionLSH from pyspark.ml.linalg import Vectors, VectorUDT # 控制spark服务启动 spark = SparkSession.builder.appName('app_name').getOrCreate() spark.stop() spark = SparkSession.builder.appName('app_name').getOrCreate() class PySpark(object): @staticmethod def execute(df_input): """  程序入口,需用户重载  :return:必须返回一个DataFrame类型对象  """ # step 1:读入DataFrame df_mid = df_input.select('id','name','data','mat') # step 2:特征向量预处理 def mat2vec(mat): """  定义UDF函数,将特征矩阵向量化  :return:返回类似度计算所需的VectorUDT类型  """ arr = [0.0]*len(mat) for i in range(len(mat)): if mat[i]!='0': arr[i]=1.0 return Vectors.dense(arr) udf_mat2vec = udf(mat2vec,VectorUDT()) df_mid = df_mid.withColumn('vec', udf_mat2vec('mat')).select( 'id','name','data','mat','vec') # step 3:计算类似度 ## MinHashLSH,可用EuclideanDistance minlsh = MinHashLSH(inputCol="vec", outputCol="hashes", seed=123, numHashTables=3) model_minlsh = minlsh.fit(df_mid) ## BucketedRandomProjectionLSH brplsh = BucketedRandomProjectionLSH(inputCol="vec", outputCol="hashes", seed=123, bucketLength=10.0, numHashTables=10) model_brplsh = brplsh.fit(df_mid) # step 4:计算(忽略自类似,最远距离限制0.8) ## model_brplsh相似,可用EuclideanDistance df_ret = model_minlsh.approxSimilarityJoin(df_mid, df_mid, 0.8, distCol='JaccardDistance').select( col("datasetA.id").alias("id"), col("datasetA.name").alias("name"), col("datasetA.data").alias("data"), col("datasetA.mat").alias("mat"), col("JaccardDistance").alias("distance"), col("datasetB.id").alias("ref_id"), col("datasetB.name").alias("ref_name"), col("datasetB.data").alias("ref_data"), col("datasetB.mat").alias("ref_mat") ).filter("id=ref_id") return df_ret df_in = spark.createDataFrame([ (1001,"A","xxx","1010001000010000001001101010000"), (1002,"B","yyy","1110001000010000000011101010000"), (1003,"C","zzz","1101100101010111011101110111101")], ['id', 'name', 'data', 'mat']) df_out = PySpark.execute(df_in) df_out.show()

跑出来的效果是,MinHashLSH模式下,A和B距离是0.27,比较近,但A、B到C都是0.75左右,和预期相符。

好了,够钟上去举铁了……

MLlib支持两种矩阵,dense密集型和sparse稀疏型。一个dense类型的向量背后其实就是一个数组,而sparse向量背后则是两个并行数组——索引数组和值数组。好比向量(1.0, 0.0, 3.0)既能够用密集型向量表示为[1.0, 0.0, 3.0],也能够用稀疏型向量表示为(3, [0,2],[1.0,3.0]),其中3是数组的大小。

相关文章
相关标签/搜索