本套技术专栏是做者(秦凯新)平时工做的总结和升华,经过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。QQ邮箱地址:1120746959@qq.com,若有任何商业交流,可随时联系。算法
能够看到spark的特征工程分为如下4个方向:sql
特征抽取 ,特征转换,特征选择,特征转换,Spark ML整个特征工程架构以下图所示:apache
1: Feature Extractors
TF-IDF
Word2Vec
CountVectorizer
FeatureHasher
2:Feature Transformers
Tokenizer
StopWordsRemover
n-gram
Binarizer
PCA
PolynomialExpansion
Discrete Cosine Transform (DCT)
StringIndexer
IndexToString
OneHotEncoder (Deprecated since 2.3.0)
OneHotEncoderEstimator
VectorIndexer
Interaction
Normalizer
StandardScaler
MinMaxScaler
MaxAbsScaler
Bucketizer
ElementwiseProduct
SQLTransformer
VectorAssembler
VectorSizeHint
QuantileDiscretizer
Imputer
3:Feature Selectors
VectorSlicer
RFormula
ChiSqSelector
Locality Sensitive Hashing
LSH Operations
4:Feature Transformation
LSH Operations:
Feature Transformation
Approximate Similarity Join
Approximate Nearest Neighbor Search
LSH Algorithms:
Bucketed Random Projection for Euclidean Distance
MinHash for Jaccard Distance
复制代码
词频-逆向文件频率”(TF-IDF)是一种在文本挖掘中普遍使用的特征向量化方法,它能够体现一个文档中词语在语料库中的重要程度。架构
词语由t表示,文档由d表示,语料库由D表示。词频TF(t,d)是词语t在文档d中出现的次数。文件频率DF(t,D)是包含词语的文档的个数。若是咱们只使用词频来衡量重要性,很容易过分强调在文档中常常出现,却没有太多实际信息的词语,好比“a”,“the”以及“of”。若是一个词语常常出如今语料库中,意味着它并不能很好的对文档进行区分。TF-IDF就是在数值化文档信息,衡量词语能提供多少信息以区分文档。其定义以下: IDF(t,D)=log|D|+1DF(t,D)+1 此处|D| 是语料库中总的文档数。公式中使用log函数,当词出如今全部文档中时,它的IDF值变为0。加1是为了不分母为0的状况。TF-IDF 度量值表示以下: TFIDF(t,d,D)=TF(t,d)⋅IDF(t,D) 在Spark ML库中,TF-IDF被分红两部分:TF (+hashing) 和 IDF。app
TF: HashingTF 是一个Transformer,在文本处理中,接收词条的集合而后把这些集合转化成固定长度的特征向量。这个算法在哈希的同时会统计各个词条的词频。dom
IDF: IDF是一个Estimator,在一个数据集上应用它的fit()方法,产生一个IDFModel。 该IDFModel 接收特征向量(由HashingTF产生),而后计算每个词在文档中出现的频次。IDF会减小那些在语料库中出现频率较高的词的权重。机器学习
Spark.mllib 中实现词频率统计使用特征hash的方式,原始特征经过hash函数,映射到一个索引值。后面只须要统计这些索引值的频率,就能够知道对应词的频率。这种方式避免设计一个全局1对1的词到索引的映射,这个映射在映射大量语料库时须要花费更长的时间。但须要注意,经过hash的方式可能会映射到同一个值的状况,即不一样的原始特征经过Hash映射后是同一个值。为了下降这种状况出现的几率,咱们只能对特征向量升维。i.e., 提升hash表的桶数,默认特征维度是 2^20 = 1,048,576.ide
接下来以一组句子开始。首先使用分解器Tokenizer把句子划分为单个词语。对每个句子(词袋),咱们使用HashingTF将句子转换为特征向量,最后使用IDF从新调整特征向量。这种转换一般能够提升使用文本特征的性能。函数
import org.apache.spark.sql.SparkSession
import spark.implicits._
val sentenceData = spark.createDataFrame(Seq(
| (0, "I I I Spark and I I Spark"),
| (0, "I wish wish wish wish wish classes"),
| (1, "Logistic regression regression regression regression regression I love it ")
| )).toDF("label", "sentence")
val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val wordsData = tokenizer.transform(sentenceData)
wordsData.show(false)
+-----+--------------------------------------------------------------------------+-----------------------------------------------------------------------------------+
|label|sentence |words |
+-----+--------------------------------------------------------------------------+-----------------------------------------------------------------------------------+
|0 |I I I Spark and I I Spark |[i, i, i, spark, and, i, i, spark] |
|0 |I wish wish wish wish wish classes |[i, wish, wish, wish, wish, wish, classes] |
|1 |Logistic regression regression regression regression regression I love it |[logistic, regression, regression, regression, regression, regression, i, love, it]|
+-----+--------------------------------------------------------------------------+-----------------------------------------------------------------------------------+
获得分词后的文档序列后,便可使用HashingTF的transform()方法把句子哈希成特征向量,这里设置哈希表的桶数为2000。
val hashingTF = new HashingTF().
| setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(2000)
val featurizedData = hashingTF.transform(wordsData)
能够看到,分词序列被变换成一个稀疏特征向量,其中每一个单词都被散列成了一个不一样的索引值,特征向量在某一维度上的值即该词汇在文档中出现的次数。
featurizedData.select("rawFeatures").show(false)
+----------------------------------------------------+
|rawFeatures |
+----------------------------------------------------+
|(2000,[333,1105,1329],[1.0,2.0,5.0]) |
|(2000,[495,1329,1809],[5.0,1.0,1.0]) |
|(2000,[240,495,695,1329,1604],[1.0,1.0,5.0,1.0,1.0])|
+----------------------------------------------------+
featurizedData.rdd.foreach(println)
[0,I I I Spark and I I Spark,WrappedArray(i, i, i, spark, and, i, i, spark),(2000,[333,1105,1329],[1.0,2.0,5.0])]
[0,I wish wish wish wish wish classes,WrappedArray(i, wish, wish, wish, wish, wish, classes),(2000,[495,1329,1809],[5.0,1.0,1.0])]
[1,Logistic regression regression regression regression regression I love it ,WrappedArray(logistic, regression, regression, regression, regression, regression, i, love, it),(2000,[240,495,695,1329,1604],[1.0,1.0,5.0,1.0,1.0])]
复制代码
真是太难搞懂了,辛亏我改变了案例,发现i的hash值为1329,发如今"I I I Spark and I I Spark"出现了5次。即(2000,[333,1105,1329],[1.0,2.0,5.0])。所以,1329也即全局惟一了。性能
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(featurizedData)
val rescaledData = idfModel.transform(featurizedData)
scala> val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
idf: org.apache.spark.ml.feature.IDF = idf_18dec771e2e0
使用IDF来对单纯的词频特征向量进行修正,使其更能体现不一样词汇对文本的区别能力,IDF是一个Estimator,调用fit()方法并将词频向量传入,即产生一个IDFModel
scala> val idfModel = idf.fit(featurizedData)
idfModel: org.apache.spark.ml.feature.IDFModel = idf_18dec771e2e0
IDFModel是一个Transformer,调用它的transform()方法,便可获得每个单词对应的TF-IDF度量值。
scala> val rescaledData = idfModel.transform(featurizedData)
rescaledData: org.apache.spark.sql.DataFrame = [label: int, sentence: string ... 3 more fields]
特征向量已经被其在语料库中出现的总次数进行了修正,经过TF-IDF获得的特征向量,在接下来能够被应用到相关的机器学习方法中。
scala> rescaledData.select("features", "label").take(3).foreach(println)
[(2000,[333,1105,1329],[0.6931471805599453,1.3862943611198906,0.0]),0]
[(2000,[495,1329,1809],[1.4384103622589042,0.0,0.6931471805599453]),0]
[(2000,[240,495,695,1329,1604],[0.6931471805599453,0.28768207245178085,3.4657359027997265,0.0,0.6931471805599453]),1]
scala> rescaledData.rdd.foreach(println)
进一步获得详细对比:
[0,I I I Spark and I I Spark,WrappedArray(i, i, i, spark, and, i, i, spark),(2000,[333,1105,1329],[1.0,2.0,5.0]),(2000,[333,1105,1329],[0.6931471805599453,1.3862943611198906,0.0])]
[0,I wish wish wish wish wish classes,WrappedArray(i, wish, wish, wish, wish, wish, classes),(2000,[495,1329,1809],[5.0,1.0,1.0]),(2000,[495,1329,1809],[1.4384103622589042,0.0,0.6931471805599453])]
[1,Logistic regression regression regression regression regression I love it ,WrappedArray(logistic, regression, regression, regression, regression, regression, i, love, it),(2000,[240,495,695,1329,1604],[1.0,1.0,5.0,1.0,1.0]),(2000,[240,495,695,1329,1604],[0.6931471805599453,0.28768207245178085,3.4657359027997265,0.0,0.6931471805599453])]
复制代码
发现 “I I I Spark and I I Spark” 句子中单词 i 在全部文章中都出现,因此其TF-IDF值为 0,发现spark出现了两次因此其TF-IDF值为1.3862943611198906。
Word2vec是一个Estimator,它采用一系列表明文档的词语来训练word2vecmodel。该模型将每一个词语映射到一个固定大小的向量。word2vecmodel使用文档中每一个词语的平均数来将文档转换为向量,而后这个向量能够做为预测的特征,来计算文档类似度计算等等。
ml库中,Word2vec 的实现使用的是skip-gram模型。Skip-gram的训练目标是学习词表征向量分布,其优化目标是在给定中心词的词向量的状况下,最大化如下似然函数:
import org.apache.spark.ml.feature.Word2Vec
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
val documentDF = spark.createDataFrame(Seq(
"Hi I I I Spark Spark".split(" "),
"I wish wish wish wish wish wish".split(" "),
"Logistic regression".split(" ")
).map(Tuple1.apply)).toDF("text")
val word2Vec = new Word2Vec().setInputCol("text").setOutputCol("result").setVectorSize(3).setMinCount(0)
val model = word2Vec.fit(documentDF)
文档被转变为了一个3维的特征向量,这些特征向量就能够被应用到相关的机器学习方法
scala> result.collect().foreach { case Row(text: Seq[_], features: Vector) =>
| println(s"Text: [${text.mkString(", ")}] => \nVector: $features\n") }
Text: [Hi, I, I, I, Spark, , Spark] =>
Vector: [-0.07306859535830361,-0.02478547128183501,-0.010775725756372723]
Text: [I, wish, wish, wish, wish, wish, wish] =>
Vector: [-0.033820114231535366,-0.13763525443417685,0.14657753705978394]
Text: [Logistic, regression] =>
Vector: [-0.10231713205575943,0.0494652334600687,0.014658251544460654]
复制代码
CountVectorizer旨在经过计数来将一个文档转换为向量。当不存在先验字典时,Countvectorizer做为Estimator提取词汇进行训练,并生成一个CountVectorizerModel用于存储相应的词汇向量空间。
import spark.implicits._
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}
val df = spark.createDataFrame(Seq(
(0, Array("a", "b", "c")),
(1, Array( "b", "b", "c", "a"))
)).toDF("id", "words")
val cvModel: CountVectorizerModel = new CountVectorizer().setInputCol("words").setOutputCol("features").setVocabSize(3).setMinDF(2).fit(df)
在训练结束后,能够经过CountVectorizerModel的vocabulary成员得到到模型的词汇表:
scala> cvModel.vocabulary
res46: Array[String] = Array(b, a, c)
cvModel.transform(df).show(false)
+---+------------+-------------------------+
|id |words |features |
+---+------------+-------------------------+
|0 |[a, b, c] |(3,[0,1,2],[1.0,1.0,1.0])|
|1 |[b, b, c, a]|(3,[0,1,2],[2.0,1.0,1.0])|
+---+------------+-------------------------+
val cvm = new CountVectorizerModel(Array("a", "b", "c")) .setInputCol("words").setOutputCol("features")
cvModel.transform(df).show(false)
+---+------------+-------------------------+
|id |words |features |
+---+------------+-------------------------+
|0 |[a, b, c] |(3,[0,1,2],[1.0,1.0,1.0])|
|1 |[b, b, c, a]|(3,[0,1,2],[2.0,1.0,1.0])|
+---+------------+-------------------------+
复制代码
Feature hashing projects a set of categorical or numerical features into a feature vector of specified dimension (typically substantially smaller than that of the original feature space). This is done using the hashing trick to map features to indices in the feature vector.
val df = Seq(
(2.0, true, "1", "foo"),
(3.0, false, "2", "bar")
).toDF("real", "bool", "stringNum", "string")
val hasher = new FeatureHasher().setInputCols("real", "bool", "stringNum", "string").setOutputCol("features")
hasher.transform(df).show(false)
+----+-----+---------+------+--------------------------------------------------------+
|real|bool |stringNum|string|features |
+----+-----+---------+------+--------------------------------------------------------+
|2.0 |true |1 |foo |(262144,[174475,247670,257907,262126],[2.0,1.0,1.0,1.0])|
|3.0 |false|2 |bar |(262144,[70644,89673,173866,174475],[1.0,1.0,1.0,3.0]) |
+----+-----+---------+------+--------------------------------------------------------+
复制代码
匆匆结束本文特征提取专题,特征转换才是重头戏。后续更精彩。
秦凯新 于深圳