[ML] Online learning

复习


1、Spark 流处理

使用Spark Streaming与咱们操做RDD的方式很接近,处理数据流也变得简单了。使用Spark的流处理元素结合MLlib的基于SGD的在线学习能力,能够建立实时的机器学习模型,当数据流到达时实时更新学习模型。javascript

[Spark] 04 - What is Spark Streamingcss

[Spark] 05 - Apache Kafkahtml

[Spark] 06 - Structured Streaminghtml5

[Link] http://shartoo.github.io/spark-python-example/ [若干有用例子]java

 

2、Spark.ml 库

 

3、本地模式下的 "在线学习"

[Scikit-learn] 1.5 Generalized Linear Models - SGD for Regressionnode

[Scikit-learn] 1.5 Generalized Linear Models - SGD for Classificationpython

[Scikit-learn] 1.1 Generalized Linear Models - Comparing online solversjquery

 

  

 

学习


1、批计算

例如:LinearRegressionWithSGD linux

SGD相关算法:android

Algorithms are all implemented in Scala: 这些个都是只存在于mllib中的api。

 

 

2、流计算

StreamingLinearRegressionWithSGD(...)

Train or predict a linear regression model on streaming data. Training uses Stochastic Gradient Descent to update the model based on each new batch of incoming data from a DStream (see LinearRegressionWithSGD for model equation).

Each batch of data is assumed to be an RDD of LabeledPoints. The number of data points per batch can vary, but the number of features must be constant. An initial weight vector must be provided.

 

Ref: How does Spark's StreamingLinearRegressionWithSGD work?

spark.ml是否有对应api的问题,只能经过阅读源代码才能一探究竟了。

 

 

3、流聚类模型

Ref: Spark机器学习9· 实时机器学习(scala with sbt)

电子书下载:https://www.iteblog.com/download/2150.html

代码下载:https://github.com/PacktPublishing/Machine-Learning-with-Spark-Second-Edition

Chapter 1: Getting Up and Running with Spark
Chapter 2: Math for Machine Learning
Chapter 3: Designing a Machine Learning System
Chapter 4: Obtaining, Processing, and Preparing Data with Spark
Chapter 5: Building a Recommendation Engine with Spark
Chapter 6: Building a Classification Model with Spark
Chapter 7: Building a Regression Model with Spark
Chapter 8: Building a Clustering Model with Spark
Chapter 9: Dimensionality Reduction with Spark
Chapter 10: Advanced Text Processing with Spark
Chapter 11: Real-Time Machine Learning with Spark Streaming
Chapter 12: Pipeline APIs for Spark ML

 

 

Ref: Spark入门:MLlib基本数据类型(1)

Vectors.dense

稀疏格式表示为(4,[0,2,3],[1.0,1.0,3.0]) 第一个4表示向量的长度(元素个数),[0,2,3]就是indices数组,[1.0,1.0,3.0]是values数组,表示向量0的位置的值是1.0,2的位置的值是1.0,而3的位置的值是3.0,其余的位置都是0。

 

LabeledPoint

//建立一个标签为1.0(分类中可视为正样本)的稠密向量标注点
scala> val pos = LabeledPoint(1.0, Vectors.dense(2.0, 0.0, 8.0))
pos: org.apache.spark.mllib.regression.LabeledPoint = (1.0,[2.0,0.0,8.0])
//建立一个标签为0.0(分类中可视为负样本)的稀疏向量标注点 scala> val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(2.0, 8.0))) neg: org.apache.spark.mllib.regression.LabeledPoint = (0.0, (3,[0,2],[2.0,8.0]))

 

 

Ref: https://github.com/apache/spark/blob/master/examples/src/main/python/mllib/streaming_k_means_example.py

Spark Streaming 使用 streamingContext.queueStream(queueOfRDDs) 方法能够建立基于 RDD 队列的DStream,每一个RDD 队列将被视为 DStream 中一块数据流进行加工处理。

trainOn, update the model by training on batches of data from a DStream. This operation registers a DStream for training the model, and updates the model based on every subsequent batch of data from the stream.

 

Init

In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext

import datetime
In [2]:
def fnGetAppName():

    currentSecond=datetime.datetime.now().second
    currentMinute=datetime.datetime.now().minute
    currentHour=datetime.datetime.now().hour

    currentDay=datetime.datetime.now().day
    currentMonth=datetime.datetime.now().month
    currentYear=datetime.datetime.now().year
    
    return "{}-{}-{}_{}-{}-{}".format(currentYear, currentMonth, currentDay, currentHour, currentMinute, currentSecond)
In [3]:
def fn_timer(a_func):

    def wrapTheFunction():
        import time
        time_start=time.time()
        
        a_func()
        
        time_end=time.time()
        print('totally cost {} sec'.format(time_end-time_start))
 
    return wrapTheFunction
In [4]:
appName = fnGetAppName()
print("appName: {}".format(appName))

conf = SparkConf().setMaster("spark://node-master:7077").setAppName(appName)
# conf = SparkConf().setMaster("local").setAppName(appName)
 
appName: 2019-11-10_20-58-19
 

Spark Context

In [5]:
sc = SparkContext(conf = conf)
 

Spark Session

In [6]:
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
 

Spark Stream

In [7]:
ssc = StreamingContext(sc, 1)
 

Let's Go!

In [8]:
from __future__ import print_function
# $example on$
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import StreamingKMeans
# $example off$


# $example on$
# we make an input stream of vectors for training,
# as well as a stream of vectors for testing
def parse(lp):
    label = float(lp[lp.find('(') + 1: lp.find(')')])
    vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))

    return LabeledPoint(label, vec)

# (1) trainingData = sc.textFile("/test/kmeans_data.txt")\ .map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')])) testingData = sc.textFile("/test/streaming_kmeans_data_test.txt").map(parse)
In [9]:
# (2)
trainingQueue
= [trainingData] testingQueue = [testingData]
In [10]:
# rdd队列流,做为模型的“参数”
# More details:
[Spark] 04 - What is Spark Streaming

# (3)
trainingStream
= ssc.queueStream(trainingQueue) testingStream = ssc.queueStream(testingQueue)
In [11]:
# We create a model with random clusters and specify the number of clusters to find
model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0)
In [12]:
# Now register the streams for training and testing and start the job,
# printing the predicted cluster assignments on new data points as they arrive.

# (4) model.trainOn(trainingStream)
In [13]:
result = model.predictOnValues(testingStream.map(lambda lp: (lp.label, lp.features)))
result.pprint()
In [14]:
ssc.start()
ssc.stop(stopSparkContext=True, stopGraceFully=True)
# $example off$

print("Final centers: " + str(model.latestModel().centers))
 
-------------------------------------------
Time: 2019-11-10 20:58:33
-------------------------------------------
(1.0, 0)
(2.0, 1)

-------------------------------------------
Time: 2019-11-10 20:58:34
-------------------------------------------

Final centers: [[ 4.19486462  4.00002246  4.08267685]
 [ 2.2408932   1.86755799 -0.97727788]] 

 

 

4、流回归模型

实践代码:

Ref: https://github.com/apache/spark/blob/master/examples/src/main/python/mllib/streaming_linear_regression_example.py

 

/* implement */

相关文章
相关标签/搜索