入门知识
PySpark MLlib
1、基本介绍
这里是MLlib,但目前推荐使用ml库直接针对DataFrame,这里使用老库,主要是为了“了解”。html
Apache Spark提供了一个名为 MLlib 的机器学习API。PySpark也在Python中使用这个机器学习API。它支持不一样类型的算法,以下所述git
-
-
mllib.classification - spark.mllib 包支持二进制分类,多类分类和回归分析的各类方法。分类中一些最流行的算法是 随机森林,朴素贝叶斯,决策树 等。算法
-
mllib.clustering - 聚类是一种无监督的学习问题,您能够根据某些类似概念将实体的子集彼此分组。sql
-
mllib.fpm - 频繁模式匹配是挖掘频繁项,项集,子序列或其余子结构,这些一般是分析大规模数据集的第一步。 多年来,这一直是数据挖掘领域的一个活跃的研究课题。apache
-
mllib.linalg - 线性代数的MLlib实用程序。api
-
mllib.recommendation - 协同过滤一般用于推荐系统。 这些技术旨在填写用户项关联矩阵的缺失条目。服务器
-
spark.mllib - 它目前支持基于模型的协同过滤,其中用户和产品由一小组可用于预测缺失条目的潜在因素描述。 spark.mllib使用交替最小二乘(ALS)算法来学习这些潜在因素。app
-
mllib.regression - 线性回归属于回归算法族。 回归的目标是找到变量之间的关系和依赖关系。使用线性回归模型和模型摘要的界面相似于逻辑回归案例。机器学习
2、代码示范
提供了一个简单的数据集。ide
使用数据集 - test.data
1,1,5.0
1,2,1.0
1,3,5.0
1,4,1.0
2,1,5.0
2,2,1.0
2,3,5.0
2,4,1.0
3,1,1.0
3,2,5.0
3,3,1.0
3,4,5.0
4,1,1.0
4,2,5.0
4,3,1.0
4,4,5.0
数据集
from __future__ import print_function
from pyspark import SparkContext
# 这里使用了mllib,比较老的api,最新的是ml。 from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
if __name__ == "__main__":
sc = SparkContext(appName="Pspark mllib Example")
data = sc.textFile("test.data")
# 训练模型
ratings = data.map(lambda s: s.split(',')).map(lambda x: Rating(int(x[0]), int(x[1]), float(x[2])))
rank = 10
numIterations = 10
model = ALS.train(ratings, rank, numIterations)
# 测试模型
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))
# Save and load model
model.save(sc, "target/tmp/myCollaborativeFilter")
sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
3、spark.ml 库
Ref: http://spark.apache.org/docs/latest/ml-guide.html
As of Spark 2.0, the RDD-based APIs in the spark.mllib
package have entered maintenance mode. The primary Machine Learning API for Spark is now the DataFrame-based API in the spark.ml
package.
-
- spark.mllib: 数据抽象是rdd。
- spark.ml: 数据抽象是dataframe。

NLP基础
1、TF-IDF 单词的重要性
第一步,分拆单词,构成单词集合
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
sentenceData = spark.createDataFrame([
(0, "..."),
(1, "..."),
...
]).toDF("label", "sentence")
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
wordsData.show()
可见,多出了最后一列:

第二步,Hash成为特征向量
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=2000)
featurizedData = hashingTF.transform(wordsData)
featurizedData.select("words", "rawFeatures").show(truncate=False)
第三步,IDF构造
idf = IDF(InputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
第四步,调权重
rescaledData = idfModel.transform(featurizedData)
rescaledData.select("feature", "label").show(truncate=False)
2、Word2Vec 单词的向量化
org.apache.spark.ml.feature包:
-
- StringIndexer
- IndexToString
- OneHotEncoder
- VectorIndexer
方法一,StringIndexer
频率最高的为0。IndexToString(),是反操做。
from pyspark.ml.feature import StringIndexer # 构建一个DataFrame, 设置StringIndexer的输入列和输出列的名字
df = spark.createDataFrame([(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")], ["id", "category"])
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
# 训练并转换
model = indexer.fit(df)
indexed = model.transform(df)
indexed.show()

方法二,VectorIndexer
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.linalg import Vector, Vectors
df = spark.createDataFrame([ \
(Vectors.dense(-1.0, 1.0, 1.0), ),
(Vectors.dense(-1.0, 3.0, 1.0), ),
(Vectors.dense( 0.0, 5.0, 1.0), )], ["features"])
indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=2)
indexerModel = indexer.fit(df)
categoricalFeatures = indexerModel.categoryMaps.keys()
转换后查看。
indexed = indexerModel.transform(df)
indexed.show()
第一列,只有两种,认为是类别特征,故转换;
第二列,有三种,不认为是类别特征,故保持;
第三列,只有一种,认为是列别特征,故转换。

工做流 Pipeline
Logistic 回归分类器
(1) 得到 SparkSesson
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("WordCount").getOrCreate()
(2) 准备 train 数据

(3) 构建流水线
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training)
(4) 测试并预测

开始预测以上这些数据,获得预测结果。
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
rid, text, prob, prediction= row
print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))玩儿
决策树分类器
(1) 引用包

(2) 构造数据
def f(x):
rel = {}
rel['feature'] = Vectors.dense(float(x[0]), float(x[1]), float(x[2]), float(x[3]))
ref['label'] = str(x[4])
return rel
data = spark.sparkContext.textFile("file:///usr/local/spark/iris.txt").
map(lambda line: line.split(',')).
map(lambda p: Row(**f(p))).
toDF() # 成为二维表
(3) 转换器

(4) 分类模型
dtClassfier = DecisionTreeClassifier(). \
setLabelCol("indexedLabel"). \
setFeaturesCol("indexedFeatures")
(5) 构建流水线
dtPipeline = Pipeline().setStages([labelIndexer, featureIndexer, dtClassifier, labelConverter])
dtPipelineModel = dtPipeline.fit(trainingData)
dtPredictions = dtPipelineModel.transform(testData)
dtPredictions.select("predictedLabel", "label", "features").show(20)

(6) 评估模型
evaluator = MulticlassClassificationEvaluator(). \
setLabelCol("indexedLabel"). \
setPredictionCol("prediction")
dtAccuracy = evaluator.evaluate(dtPredictions)
AWS ETL Pipeline
1、学习资源
Ref: 使用 AWS Glue 和 Amazon Athena 实现无服务器的自主型机器学习
Ref: AWS Glue 常见问题
Extract is the process of reading data from a database. In this stage, the data is collected, often from multiple and different types of sources.
Transform is the process of converting the extracted data from its previous form into the form it needs to be in so that it can be placed into another database. Transformation occurs by using rules or lookup tables or by combining the data with other data.
Load is the process of writing the data into the target database.
2、代码示范
ETL具有pipeline的思想,这里没用,但能够加上。
import sys
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.transforms import SelectFields
from awsglue.transforms import RenameField
from awsglue.dynamicframe import DynamicFrame, DynamicFrameReader, DynamicFrameWriter, DynamicFrameCollection
from pyspark.context import SparkContext
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.clustering import KMeans
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
#JOB INPUT DATA
destination = "s3://luiscarosnaprds/gluescripts/results/ClusterResults3.parquet"
namespace = "nyc-transportation-version2"
tablename = "green"
# 固定套路
sc = SparkContext()
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
#Load table and select fields
datasource0 = glueContext.create_dynamic_frame.from_catalog(name_space = namespace, table_name = tablename)
SelectFields0 = SelectFields.apply(frame = datasource0, paths=["trip_distance","fare_amount","pickup_longitude","pickup_latitude" ])
DataFrame0 = DynamicFrame.toDF(SelectFields0) # 变成了二维表
#------------------------------------------------------------
#Filter some unwanted values
DataFrameFiltered = DataFrame0.filter("pickup_latitude > 40.472278 AND pickup_latitude < 41.160886 AND pickup_longitude > -74.300074 AND pickup_longitude < -71.844077")
#Select features and convert to SparkML required format
features = ["pickup_longitude","pickup_latitude"]
assembler = VectorAssembler(inputCols=features,outputCol='features')
assembled_df = assembler.transform(DataFrameFiltered)
#Fit and Run Kmeans
kmeans = KMeans(k=100, seed=1)
model = kmeans.fit(assembled_df)
transformed = model.transform(assembled_df)
#Save data to destination
transformed.write.mode('overwrite').parquet(destination)
job.commit()
End.