做者:余根茂,阿里巴巴计算平台事业部EMR团队的技术专家,参与了Hadoop,Spark,Kafka等开源项目的研发工做。目前主要专一于EMR流式计算产品的研发工做。前端
今天来和你们聊一下如何使用Spark SQL进行流式数据的机器学习处理。本文主要分为如下几个章节:mysql
什么是流式机器学习git
机器学习模型获取途径github
系统演示算法
1. 什么是流式机器学习
一般,当咱们听到有人提到实时数据机器学习时,其实他们是讨论:sql
他们但愿有一个模型,这个模型利用最近历史信息来进行预测分析。举一个天气的例子,若是最近几天都是晴天,那么将来几天极小几率会出现雨雪和低温天气apache
这个模型还须要是可更新的。当数据流经系统时,模型是能够随之进化升级。举个例子,随着业务规模的扩大,咱们但愿零售销售模型仍然保持准确。微信
第一个例子咱们能够将它归为时序预测。第二个例子中,模型须要更新或者从新训练,这是一个non-stationarity问题。时序预测和non-stationarity数据分布是两类不一样的问题。本文主要关注第二类问题,对于这类问题,通常的解决方案主要有:网络
增量式算法:有一些算法支持经过数据逐步学习。也就是说,每次进来一些新的数据时,模型会被更新。SVM,神经网络等算法都有增量式版本,此外贝叶斯网络也能够用做增量学习。架构
周期从新学习:一个更加直接的方法就是用一批最新数据从新训练咱们的模型。这种方法能够用到的绝大多数的算法上。
2. 机器学习模型获取途径
实时机器学习应用分红两块,一部分是模型实时训练,另外一部分是数据实时预测分析。现实中,咱们可能无法实现模型的实时训练,只能退而求其次地使用已经训练好模型。这些模型可能会周期性地使用历史数据训练更新一次。因此,咱们能够根据实际的算法和模型时效性要求,来选择实时训练模型仍是使用预训练好的模型。
模型算法支持增量训练:能够选择用流式数据实时训练更新
模型算法不自持增量训练:能够选择用离线数据预先训练好模式
回到主题上,咱们要实现使用Spark SQL进行流式机器学习。前面几篇文章已经简单介绍了EMR如何使用Spark SQL进行流式ETL处理。既然要进行机器学习,咱们很天然地想到Spark MLlib。DataBricks有篇文档介绍了在Spark Structured Streaming进行机器学习,你们有兴趣的能够看下。若是想将Spark MLlib应用到Spark SQL上,咱们能够简单地将MLlib算法包装成UDF使用。另一个模型获取途径是利用阿里云上的一些在线机器学习服务,咱们能够将在线机器学习服务使用UDF封装后使用。
使用UDF封装现有的Spark MLlib算法
使用UDF封装阿里云在线机器学习服务
限于篇幅,我会分两篇文章分别介绍这两个方式,本文将简单介绍如何利用Spark MLlib进行流式机器学习。
3. 系统演示
本节,咱们将演示一下如何利用逻辑回归算法进行演示。
3.1 系统架构
下面这张图展现了整个实时监测系统的架构,前端接LogService数据,实时监测分析结果写入到RDS,最后经过DataV展现出来。
3.2 测试数据集
测试数据集使用Spark自带的sample_libsvm_data.txt,咱们要作的是写一个数据生成器,将数据集的数据不断地向SLS中发送,模拟流式数据。
算法模型准备
Spark MLlib提供了大量的机器学习算法实现,能够方便的再RDD或者DataFrame API上使用,可是没法直接用在SQL API上,因此咱们须要使用UDF来封装一下。这里,咱们选用逻辑回归算法,具体的实现就不细说了,能够参考这里的代码:LogisticRegressionUDF.scala,地址:https://github.com/aliyun/aliyun-emapreduce-sdk/blob/master-2.x/emr-sql/src/main/scala/org/apache/spark/sql/aliyun/udfs/ml/LogisticRegressionUDF.scala
3.4 部署测试
CLI
## emr datasources包尚未发布,须要手动编译出来git clone git@github.com:aliyun/aliyun-emapreduce-sdk.gitcd aliyun-emapreduce-sdkgit checkout -b master-2.x origin/master-2.xmvn clean package -DskipTests
## 编译完后, assembly/target目录下会生成emr-datasources_shaded_2.11-1.7.0-SNAPSHOT.jar
--master yarn-client --num-executors 2 --executor-memory 2g --executor-cores 2 --jars emr-datasources_shaded_2.11-1.7.0-SNAPSHOT.jar --driver-class-path emr-datasources_shaded_2.11-1.7.0-SNAPSHOT.jar
建表
spark-sql> CREATE DATABASE IF NOT EXISTS default;spark-sql> USE default;
-- 测试数据源spark-sql> CREATE TABLE IF NOT EXISTS sls_datasetUSING loghubOPTIONS (sls.project = "${logProjectName}",sls.store = "${logStoreName}",access.key.id = "${accessKeyId}",access.key.secret = "${accessKeySecret}",endpoint = "${endpoint}");
spark-sql> DESC sls_dataset__logProject__ string NULL__logStore__ string NULL__shard__ int NULL__time__ timestamp NULL__topic__ string NULL__source__ string NULLlabel string NULLfeatures string NULL__tag__hostname__ string NULL__tag__path__ string NULL__tag__receive_time__ string NULLTime taken: 0.058 seconds, Fetched 11 row(s)
-- 结果数据源spark-sql> CREATE TABLE IF NOT EXISTS rds_resultUSING jdbc2OPTIONS (url="${rdsUrl}",driver="com.mysql.jdbc.Driver",dbtable="${rdsTableName}",user="${user}",password="${password}",batchsize="100",isolationLevel="NONE");
spark-sql> DESC rds_result;acc double NULLlabel double NULLtime string NULLTime taken: 0.457 seconds, Fetched 3 row(s)
注册UDF
-- udf_jar_path: 编译完后, emr-sql/target目录下会生成emr-sql_2.11-1.7.0-SNAPSHOT.jar,使用之。
CREATE FUNCTION Logistic_Regression AS 'org.apache.spark.sql.aliyun.udfs.ml.LogisticRegressionUDF' USING JAR '${udf_jar_path}';
提交执行
SET streaming.query.name=lr_prediction;SET spark.sql.streaming.checkpointLocation.lr_prediction=hdfs:///tmp/spark/lr_prediction;SET spark.sql.streaming.query.outputMode.lr_prediction=update;-- 因为DataSource是基于JDBC实现的,因此咱们须要设置向RDS表插入数据的SQL-- 这里个人RDS表名是`result`SET streaming.query.lr_prediction.sql=insert into `result`(`time`, `label`, `acc`) values(?, ?, ?);
INSERT INTO rds_result SELECT window.start, label, sum(if(tb.predict = tb.label, 1, 0)) / count(tb.label) as acc FROM(SELECT default.Logistic_Regression("${LR_model_path}", concat_ws(" ", label, features)) as predict, label, __time__ as time FROM sls_dataset) tb GROUP BY TUMBLING(tb.time, interval 10 second), tb.label;
3.5 效果展现
在DataV中配置上面的RDS结果表,使用折线图查看label=1的预测准确率,以下:
4. 小结
本文简要介绍了流式机器学习面临的几个问题,以及相应的解决方法。并使用Spark SQL结合Spark MLlib演示了一个流式机器学习的案例。下一篇,我会简要介绍Spark SQL如何结合阿里云的在线机器学习服务来进行流式机器学习应用开发。
本文分享自微信公众号 - Apache Spark技术交流社区(E-MapReduce_Spark)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。