方案选择可参考:[Scikit-learn] 4.3 Preprocessing datahtml
代码示范可参考:[ML] Pyspark ML tutorial for beginners git
本篇涉及:Feature Transformersgithub
对于没有 "数字自己" 的意义的特征时,能够考虑。算法
from pyspark.ml.feature import Binarizer continuousDataFrame = spark.createDataFrame([ (0, 0.1), (1, 0.8), (2, 0.2) ], ["id", "feature"]) continuousDataFrame.show() +---+-------+ | id|feature| +---+-------+ | 0| 0.1| | 1| 0.8| | 2| 0.2| +---+-------+
# define model, and no fit, and transform binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature") binarizedDataFrame = binarizer.transform(continuousDataFrame) print("Binarizer output with Threshold = %f" % binarizer.getThreshold()) binarizedDataFrame.show()
Binarizer output with Threshold = 0.500000 +---+-------+-----------------+ | id|feature|binarized_feature| +---+-------+-----------------+ | 0| 0.1| 0.0| | 1| 0.8| 1.0| | 2| 0.2| 0.0| +---+-------+-----------------+
有时候,等级可能采用字母,而非数字去表示。sql
from pyspark.ml.feature import IndexToString, StringIndexer df = spark.createDataFrame( [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")], ["id", "category"])
# StringIndexer
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex") model = indexer.fit(df) indexed = model.transform(df) print("Transformed string column '%s' to indexed column '%s'" % (indexer.getInputCol(), indexer.getOutputCol())) indexed.show() print("StringIndexer will store labels in output column metadata\n")
Transformed string column 'category' to indexed column 'categoryIndex' +---+--------+-------------+ | id|category|categoryIndex| +---+--------+-------------+ | 0| a| 0.0| | 1| b| 2.0| | 2| c| 1.0| | 3| a| 0.0| | 4| a| 0.0| | 5| c| 1.0| +---+--------+-------------+
# StringIndexer will store labels in output column metadata converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory") converted = converter.transform(indexed) print("Transformed indexed column '%s' back to original string column '%s' using labels in metadata" % (converter.getInputCol(), converter.getOutputCol())) converted.select("id", "categoryIndex", "originalCategory").show() Transformed indexed column 'categoryIndex' back to original string column 'originalCategory' using labels in metadata +---+-------------+----------------+ | id|categoryIndex|originalCategory| +---+-------------+----------------+ | 0| 0.0| a| | 1| 2.0| b| | 2| 1.0| c| | 3| 0.0| a| | 4| 0.0| a| | 5| 1.0| c| +---+-------------+----------------+
多是个临时的api。apache
from pyspark.ml.feature import OneHotEncoderEstimator df = spark.createDataFrame([ (0, 3), (2, 0) ], ["categoryIndex1", "categoryIndex2"]) encoder = OneHotEncoderEstimator(inputCols=["categoryIndex1", "categoryIndex2"], outputCols=["categoryVec1", "categoryVec2"]) model = encoder.fit(df) encoded = model.transform(df) encoded.show() +--------------+--------------+-------------+-------------+ |categoryIndex1|categoryIndex2| categoryVec1| categoryVec2| +--------------+--------------+-------------+-------------+ | 0| 3|(2,[0],[1.0])| (3,[],[])| | 2| 0| (2,[],[])|(3,[0],[1.0])| +--------------+--------------+-------------+-------------+
VectorAssembler
将多个数值列按顺序汇总成一个向量列。api
from pyspark.ml.linalg import Vectors from pyspark.ml.feature import VectorAssembler dataset = spark.createDataFrame( [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)], ["id", "hour", "mobile", "userFeatures", "clicked"]) assembler = VectorAssembler( inputCols=["hour", "mobile", "userFeatures"], outputCol="features") output = assembler.transform(dataset) print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'") output.select("features", "clicked").show(truncate=False)
Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features' +-----------------------+-------+ |features |clicked| +-----------------------+-------+ |[18.0,1.0,0.0,10.0,0.5]|1.0 | +-----------------------+-------+
本篇章结合: [Feature] Compare the effect of different scalersmarkdown
sklearn提供的常见 “去量纲” 的策略。函数
distributions = [ ('Unscaled data', X), ('Data after standard scaling', StandardScaler().fit_transform(X)), ('Data after min-max scaling', MinMaxScaler().fit_transform(X)), ('Data after max-abs scaling', MaxAbsScaler().fit_transform(X)), ('Data after robust scaling', RobustScaler(quantile_range=(25, 75)).fit_transform(X)), [spark ml暂不支持] ('Data after power transformation (Yeo-Johnson)', PowerTransformer(method='yeo-johnson').fit_transform(X)), ('Data after power transformation (Box-Cox)', PowerTransformer(method='box-cox').fit_transform(X)), ('Data after quantile transformation (gaussian pdf)', QuantileTransformer(output_distribution='normal').fit_transform(X)), ('Data after quantile transformation (uniform pdf)', QuantileTransformer(output_distribution='uniform').fit_transform(X)), ('Data after sample-wise L2 normalizing', Normalizer().fit_transform(X)), ]
使用的缘由:若是某个特征的方差远大于其它特征的方差,那么它将会在算法学习中占据主导位置,致使咱们的学习器不能像咱们指望的那样,去学习其余的特征,这将致使最后的模型收敛速度慢甚至不收敛,所以咱们须要对这样的特征数据进行标准化/归一化。post
In [19]: from pyspark.ml.feature import StandardScaler
# 稀疏表示法 dataFrame = spark.read.format("libsvm").load("file:///usr/local/spark/data/mllib/sample_libsvm_data.txt") dataFrame.show() +-----+--------------------+ |label| features| +-----+--------------------+ | 0.0|(692,[127,128,129...| | 1.0|(692,[158,159,160...| | 1.0|(692,[124,125,126...| | 1.0|(692,[152,153,154...| | 1.0|(692,[151,152,153...| | 0.0|(692,[129,130,131...| | 1.0|(692,[158,159,160...| | 1.0|(692,[99,100,101,...| | 0.0|(692,[154,155,156...| | 0.0|(692,[127,128,129...| | 1.0|(692,[154,155,156...| | 0.0|(692,[153,154,155...| | 0.0|(692,[151,152,153...| | 1.0|(692,[129,130,131...| | 0.0|(692,[154,155,156...| | 1.0|(692,[150,151,152...| | 0.0|(692,[124,125,126...| | 0.0|(692,[152,153,154...| | 1.0|(692,[97,98,99,12...| | 1.0|(692,[124,125,126...| +-----+--------------------+ only showing top 20 rows
In [21]: scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False) scalerModel = scaler.fit(dataFrame) scaledData = scalerModel.transform(dataFrame)
scaledData.show() +-----+--------------------+--------------------+ |label| features| scaledFeatures| +-----+--------------------+--------------------+ | 0.0|(692,[127,128,129...|(692,[127,128,129...| | 1.0|(692,[158,159,160...|(692,[158,159,160...| | 1.0|(692,[124,125,126...|(692,[124,125,126...| | 1.0|(692,[152,153,154...|(692,[152,153,154...| | 1.0|(692,[151,152,153...|(692,[151,152,153...| | 0.0|(692,[129,130,131...|(692,[129,130,131...| | 1.0|(692,[158,159,160...|(692,[158,159,160...| | 1.0|(692,[99,100,101,...|(692,[99,100,101,...| | 0.0|(692,[154,155,156...|(692,[154,155,156...| | 0.0|(692,[127,128,129...|(692,[127,128,129...| | 1.0|(692,[154,155,156...|(692,[154,155,156...| | 0.0|(692,[153,154,155...|(692,[153,154,155...| | 0.0|(692,[151,152,153...|(692,[151,152,153...| | 1.0|(692,[129,130,131...|(692,[129,130,131...| | 0.0|(692,[154,155,156...|(692,[154,155,156...| | 1.0|(692,[150,151,152...|(692,[150,151,152...| | 0.0|(692,[124,125,126...|(692,[124,125,126...| | 0.0|(692,[152,153,154...|(692,[152,153,154...| | 1.0|(692,[97,98,99,12...|(692,[97,98,99,12...| | 1.0|(692,[124,125,126...|(692,[124,125,126...| +-----+--------------------+--------------------+ only showing top 20 rows
缩放到一个指定的最大和最小值(一般是1-0)之间:(1)对于方差很是小的属性能够加强其稳定性。(2)维持稀疏矩阵中为0的条目。
from pyspark.ml.feature import MinMaxScaler from pyspark.ml.linalg import Vectors dataFrame = spark.createDataFrame([ (0, Vectors.dense([1.0, 0.1, -1.0]),), (1, Vectors.dense([2.0, 1.1, 1.0]),), (2, Vectors.dense([3.0, 10.1, 3.0]),) ], ["id", "features"]) dataFrame.show() +---+--------------+ | id| features| +---+--------------+ | 0|[1.0,0.1,-1.0]| | 1| [2.0,1.1,1.0]| | 2|[3.0,10.1,3.0]| +---+--------------+
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures") scalerModel = scaler.fit(dataFrame) scaledData = scalerModel.transform(dataFrame) print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax())) scaledData.select("features", "scaledFeatures").show()
Features scaled to range: [0.000000, 1.000000] +--------------+--------------+ | features|scaledFeatures| +--------------+--------------+ |[1.0,0.1,-1.0]| [0.0,0.0,0.0]| | [2.0,1.1,1.0]| [0.5,0.1,0.5]| |[3.0,10.1,3.0]| [1.0,1.0,1.0]| +--------------+--------------+
缩放到一个指定的最大和最小值(一般是1到-1)之间。
from pyspark.ml.feature import MaxAbsScaler from pyspark.ml.linalg import Vectors dataFrame = spark.createDataFrame([ (0, Vectors.dense([1.0, 0.1, -8.0]),), (1, Vectors.dense([2.0, 1.0, -4.0]),), (2, Vectors.dense([4.0, 10.0, 8.0]),) ], ["id", "features"])
scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures") scalerModel = scaler.fit(dataFrame) scaledData = scalerModel.transform(dataFrame) scaledData.select("features", "scaledFeatures").show() +--------------+----------------+ | features| scaledFeatures| +--------------+----------------+ |[1.0,0.1,-8.0]|[0.25,0.01,-1.0]| |[2.0,1.0,-4.0]| [0.5,0.1,-0.5]| |[4.0,10.0,8.0]| [1.0,1.0,1.0]| +--------------+----------------+
Ref: 标准化和归一化的区别
[归一化],适用于“线性模型”,让不一样维度之间的特征在数值上有必定比较性,能够大大提升分类器的准确性。可是,当有新数据加入时,可能致使max和min的变化,须要从新定义。
以下,这两个维度特征的量级不一样,会致使训练出来模型中老虎这个特征对应的w参数大,而麻雀数量这个特征对应的w参数小,容易致使参数小的特征对目标函数的影响被覆盖;
因此须要对每一个特征的数据进行归一化处理,以减小不一样量级的特征数据覆盖其余特征对目标函数的影响。
[标准化],消除分布产生的度量误差,例如:班级数学考试,数学成绩在90-100之间,语文成绩在60-100之间,那么,小明数学90,语文100,小花数学95,语文95,如何评价两个综合成绩好坏的数学处理方式。
from pyspark.ml.feature import Normalizer from pyspark.ml.linalg import Vectors dataFrame = spark.createDataFrame([ (0, Vectors.dense([1.0, 0.5, -1.0]),), (1, Vectors.dense([2.0, 1.0, 1.0]),), (2, Vectors.dense([4.0, 10.0, 2.0]),) ], ["id", "features"]) normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
# Normalize each Vector using $L^1$ norm. l1NormData = normalizer.transform(dataFrame) print("Normalized using L^1 norm") l1NormData.show()
Normalized using L^1 norm +---+--------------+------------------+ | id| features| normFeatures| +---+--------------+------------------+ | 0|[1.0,0.5,-1.0]| [0.4,0.2,-0.4]| | 1| [2.0,1.0,1.0]| [0.5,0.25,0.25]| | 2|[4.0,10.0,2.0]|[0.25,0.625,0.125]| +---+--------------+------------------+
# Normalize each Vector using L∞ norm. lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")}) print("Normalized using L^inf norm") lInfNormData.show()
Normalized using L^inf norm +---+--------------+--------------+ | id| features| normFeatures| +---+--------------+--------------+ | 0|[1.0,0.5,-1.0]|[1.0,0.5,-1.0]| | 1| [2.0,1.0,1.0]| [1.0,0.5,0.5]| | 2|[4.0,10.0,2.0]| [0.4,1.0,0.2]| +---+--------------+--------------+
Ref: Map data to a normal distribution
数据分布的倾斜有不少负面的影响。
咱们可使用特征工程技巧,利用统计或数学变换来减轻数据分布倾斜的影响。使本来密集的区间的值尽量的分散,本来分散的区间的值尽可能的聚合。
Log变换倾向于拉伸那些落在较低的幅度范围内自变量值的范围,压缩或减小较高幅度范围内的自变量值的范围。从而使得倾斜分布尽量的接近正态分布。
判断特征数据是有有偏。
# Here's how you check skewness (we will do it for the 'balance' feature only). fraud_pd.agg({'balance': 'skewness'}).show()
+------------------+ | skewness(balance)| +------------------+ |1.1818315552993002| +------------------+
白化是例如pca,ica操做以前的必要数据预处理步骤。
举例来讲,假设训练数据是图像,因为图像中相邻像素之间具备很强的相关性,因此用于训练时输入是冗余的。
白化的目的就是下降输入的冗余性;更正式的说,咱们但愿经过白化过程使得学习算法的输入具备以下性质:
(i) 特征之间相关性较低;
(ii) 全部特征具备相同的方差。
代码:Unsupervised Feature Learning and Deep Learning [matlab代码]
from pyspark.ml.feature import PCA from pyspark.ml.linalg import Vectors data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),), (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),), (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)] df = spark.createDataFrame(data, ["features"]) df.show() +--------------------+ | features| +--------------------+ | (5,[1,3],[1.0,7.0])| |[2.0,0.0,3.0,4.0,...| |[4.0,0.0,0.0,6.0,...| +--------------------+
# define model, and fit, and transform pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures") model = pca.fit(df) result = model.transform(df).select("pcaFeatures") result.show(truncate=False) +-----------------------------------------------------------+ |pcaFeatures | +-----------------------------------------------------------+ |[1.6485728230883807,-4.013282700516296,-5.524543751369388] | |[-4.645104331781534,-1.1167972663619026,-5.524543751369387]| |[-6.428880535676489,-5.337951427775355,-5.524543751369389] | +-----------------------------------------------------------+
End.