利用机器学习模型对PySpark流数据进行预测

做者|LAKSHAY ARORA
编译|VK
来源|Analytics Vidhyapython

概述

  • 流数据是机器学习领域的一个新兴概念git

  • 学习如何使用机器学习模型(如logistic回归)使用PySpark对流数据进行预测程序员

  • 咱们将介绍流数据和Spark流的基础知识,而后深刻到实现部分github

介绍

想象一下,每秒有超过8500条微博被发送,900多张照片被上传到Instagram上,超过4200个Skype电话被打,超过78000个谷歌搜索发生,超过200万封电子邮件被发送(根据互联网实时统计)。算法

咱们正在之前所未有的速度和规模生成数据。在数据科学领域工做真是太好了!可是,随着大量数据的出现,一样面临着复杂的挑战。sql

主要是,咱们如何收集这种规模的数据?咱们如何确保咱们的机器学习管道在数据生成和收集后继续产生结果?这些都是业界面临的重大挑战,也是为何流式数据的概念在各组织中愈来愈受到重视的缘由。缓存

增长处理流式数据的能力将大大提升你当前的数据科学能力。这是业界急需的技能,若是你能掌握它,它将帮助你得到下一个数据科学的角色。bash

所以,在本文中,咱们将了解什么是流数据,了解Spark流的基本原理,而后研究一个与行业相关的数据集,以使用Spark实现流数据。服务器

目录

  1. 什么是流数据?session

  2. Spark流基础

    1. 离散流

    2. 缓存

    3. 检查点

  3. 流数据中的共享变量

    1. 累加器变量

    2. 广播变量

  4. 利用PySpark对流数据进行情感分析

什么是流数据?

咱们看到了上面的社交媒体数据——咱们正在处理的数据使人难以置信。你能想象存储全部这些数据须要什么吗?这是一个复杂的过程!所以,在咱们深刻讨论本文的Spark方面以前,让咱们花点时间了解流式数据究竟是什么。

流数据没有离散的开始或结束。这些数据是每秒从数千个数据源生成的,须要尽快进行处理和分析。至关多的流数据须要实时处理,好比Google搜索结果。

咱们知道,一些结论在事件发生后更具价值,它们每每会随着时间而失去价值。举个体育赛事的例子——咱们但愿看到即时分析、即时统计得出的结论,以便在那一刻真正享受比赛,对吧?

Spark流基础

Spark流是Spark API的扩展,它支持对实时数据流进行可伸缩和容错的流处理。

在跳到实现部分以前,让咱们先了解Spark流的不一样组件。

离散流

离散流或数据流表明一个连续的数据流。这里,数据流要么直接从任何源接收,要么在咱们对原始数据作了一些处理以后接收。

构建流应用程序的第一步是定义咱们从数据源收集数据的批处理时间。若是批处理时间为2秒,则数据将每2秒收集一次并存储在RDD中。而这些RDD的连续序列链是一个不可变的离散流,Spark能够将其做为一个分布式数据集使用。

想一想一个典型的数据科学项目。在数据预处理阶段,咱们须要对变量进行转换,包括将分类变量转换为数值变量、删除异常值等。Spark维护咱们在任何数据上定义的全部转换的历史。所以,不管什么时候发生任何错误,它均可以追溯转换的路径并从新生成计算结果。

咱们但愿Spark应用程序运行24小时 x 7,而且不管什么时候出现任何故障,咱们都但愿它尽快恢复。可是,Spark在处理大规模数据时,出现任何错误时须要从新计算全部转换。你能够想象,这很是昂贵。

缓存

如下是应对这一挑战的一种方法。咱们能够临时存储计算(缓存)的结果,以维护在数据上定义的转换的结果。这样,当出现任何错误时,咱们没必要一次又一次地从新计算这些转换。

数据流容许咱们将流数据保存在内存中。当咱们要计算同一数据上的多个操做时,这颇有帮助。

检查点(Checkpointing)

当咱们正确使用缓存时,它很是有用,但它须要大量内存。并非每一个人都有数百台拥有128GB内存的机器来缓存全部东西。

这就引入了检查点的概念。

检查点是保存转换数据帧结果的另外一种技术。它将运行中的应用程序的状态不时地保存在任何可靠的存储器(如HDFS)上。可是,它比缓存速度慢,灵活性低。

当咱们有流数据时,咱们可使用检查点。转换结果取决于之前的转换结果,须要保留才能使用它。咱们还检查元数据信息,好比用于建立流数据的配置和一组DStream(离散流)操做的结果等等。

流数据中的共享变量

有时咱们须要为Spark应用程序定义map、reduce或filter等函数,这些函数必须在多个集群上执行。此函数中使用的变量将复制到每一个计算机(集群)。

在这里,每一个集群有一个不一样的执行器,咱们须要一些东西,能够给咱们这些变量之间的关系。

例如,假设咱们的Spark应用程序运行在100个不一样的集群上,捕获来自不一样国家的人发布的Instagram图片。咱们须要一个在他们的帖子中提到的特定标签的计数。

如今,每一个集群的执行器将计算该集群上存在的数据的结果。可是咱们须要一些东西来帮助这些集群进行通讯,这样咱们就能够获得聚合的结果。在Spark中,咱们有一些共享变量能够帮助咱们克服这个问题

累加器变量

用例,好比错误发生的次数、空白日志的次数、咱们从某个特定国家收到请求的次数,全部这些均可以使用累加器来解决。

每一个集群上的执行器将数据发送回驱动程序进程,以更新累加器变量的值。累加器仅适用于关联和交换的操做。例如,sum和maximum有效,而mean无效。

广播变量

当咱们处理位置数据时,好比城市名称和邮政编码的映射,这些都是固定变量。如今,若是任何集群上的特定转换每次都须要此类数据,咱们不须要向驱动程序发送请求,由于这太昂贵了。

相反,咱们能够在每一个集群上存储此数据的副本。这些类型的变量称为广播变量。

广播变量容许程序员在每台机器上缓存一个只读变量。一般,Spark会使用有效的广播算法自动分配广播变量,但若是咱们有多个阶段须要相同数据的任务,咱们也能够定义它们。

利用PySpark对流数据进行情感分析

是时候启动你最喜欢的IDE了!让咱们在本节中进行写代码,并以实际的方式理解流数据。

在本节中,咱们将使用真实的数据集。咱们的目标是在推特上发现仇恨言论。为了简单起见,若是推特带有种族主义或性别歧视情绪,咱们说它包含仇恨言论。

所以,任务是将种族主义或性别歧视的推文与其余推文进行分类。咱们将使用Tweets和label的训练样本,其中label'1'表示Tweet是种族主义/性别歧视,label'0'表示其余。

为何这个项目与流处理相关?由于社交媒体平台以评论和状态更新的形式接收海量流媒体数据。这个项目将帮助咱们限制公开发布的内容。

你能够在这里更详细地查看问题陈述-练习问题:Twitter情感分析(https://datahack.analyticsvidhya.com/contest/practice-problem-twitter-sentiment-analysis/?utm_source=blog&utm_medium=streaming-data-pyspark-machine-learning-model)。咱们开始吧!

设置项目工做流

  1. 模型构建:咱们将创建一个逻辑回归模型管道来分类tweet是否包含仇恨言论。在这里,咱们的重点不是创建一个很是精确的分类模型,而是查看如何使用任何模型并返回流数据的结果

  2. 初始化Spark流上下文:一旦构建了模型,咱们就须要定义从中获取流数据的主机名和端口号

  3. 流数据:接下来,咱们将从定义的端口添加netcat服务器的tweets,Spark API将在指定的持续时间后接收数据

  4. 预测并返回结果:一旦咱们收到tweet文本,咱们将数据传递到咱们建立的机器学习管道中,并从模型返回预测的情绪

下面是咱们工做流程的一个简洁说明:

创建Logistic回归模型的数据训练

咱们在映射到标签的CSV文件中有关于Tweets的数据。咱们将使用logistic回归模型来预测tweet是否包含仇恨言论。若是是,那么咱们的模型将预测标签为1(不然为0)。

你能够在这里下载数据集和代码(https://github.com/lakshay-arora/PySpark/tree/master/spark_streaming)。

首先,咱们须要定义CSV文件的模式,不然,Spark将把每列的数据类型视为字符串。咱们读取数据并检查:

# 导入所需库
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.streaming import StreamingContext
import pyspark.sql.types as tp
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
from pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizer
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import Row

# 初始化spark session
sc = SparkContext(appName="PySparkShell")
spark = SparkSession(sc)
    
# 定义方案
my_schema = tp.StructType([
  tp.StructField(name= 'id',          dataType= tp.IntegerType(),  nullable= True),
  tp.StructField(name= 'label',       dataType= tp.IntegerType(),  nullable= True),
  tp.StructField(name= 'tweet',       dataType= tp.StringType(),   nullable= True)
])
    
  
# 读取数据集
my_data = spark.read.csv('twitter_sentiments.csv',
                         schema=my_schema,
                         header=True)

# 查看数据
my_data.show(5)

# 输出方案
my_data.printSchema()

定义机器学习管道

如今咱们已经在Spark数据帧中有了数据,咱们须要定义转换数据的不一样阶段,而后使用它从咱们的模型中获取预测的标签。

在第一阶段中,咱们将使用RegexTokenizer 将Tweet文本转换为单词列表。而后,咱们将从单词列表中删除停用词并建立单词向量。在最后阶段,咱们将使用这些词向量创建一个逻辑回归模型,并获得预测情绪。

请记住,咱们的重点不是创建一个很是精确的分类模型,而是看看如何在预测模型中得到流数据的结果。

# 定义阶段1:标记tweet文本 
stage_1 = RegexTokenizer(inputCol= 'tweet' , outputCol= 'tokens', pattern= '\\W')
# 定义阶段2:删除停用字
stage_2 = StopWordsRemover(inputCol= 'tokens', outputCol= 'filtered_words')
# 定义阶段3:建立大小为100的词向量
stage_3 = Word2Vec(inputCol= 'filtered_words', outputCol= 'vector', vectorSize= 100)
# 定义阶段4:逻辑回归模型
model = LogisticRegression(featuresCol= 'vector', labelCol= 'label')

设置咱们的机器学习管道

让咱们在Pipeline对象中添加stages变量,而后按顺序执行这些转换。将管道与训练数据集匹配,如今,每当咱们有新的Tweet时,咱们只须要将其传递到管道对象并转换数据以得到预测:

# 设置管道
pipeline = Pipeline(stages= [stage_1, stage_2, stage_3, model])

#拟合模型
pipelineFit = pipeline.fit(my_data)

流数据和返回的结果

假设咱们每秒收到数百条评论,咱们但愿经过阻止发布包含仇恨言论的评论的用户来保持平台的干净。因此,每当咱们收到新的文本,咱们就会把它传递到管道中,获得预测的情绪。

咱们将定义一个函数 get_prediction,它将删除空白语句并建立一个数据框,其中每行包含一条推特。

所以,初始化Spark流上下文并定义3秒的批处理持续时间。这意味着咱们将对每3秒收到的数据进行预测:

#定义一个函数来计算情感
def get_prediction(tweet_text):
	try:
    # 过滤获得长度大于0的tweets
		tweet_text = tweet_text.filter(lambda x: len(x) > 0)
    # 建立一个列名为“tweet”的数据框,每行将包含一条tweet
		rowRdd = tweet_text.map(lambda w: Row(tweet=w))
    # 建立spark数据框
		wordsDataFrame = spark.createDataFrame(rowRdd)
    # 利用管道对数据进行转换,获得预测的情绪
		pipelineFit.transform(wordsDataFrame).select('tweet','prediction').show()
	except : 
		print('No data')
    
# 初始化流上下文
ssc = StreamingContext(sc, batchDuration= 3)

# 建立一个将链接到hostname:port的数据流,如localhost:9991
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))

# 用一个关键字“tweet_APP”分割tweet文本,这样咱们就能够从一条tweet中识别出一组单词
words = lines.flatMap(lambda line : line.split('TWEET_APP'))

# 获取收到的推文的预期情绪
words.foreachRDD(get_prediction)

#开始计算
ssc.start()             

# 等待结束
ssc.awaitTermination()

在一个终端上运行程序并使用Netcat(一个实用工具,可用于将数据发送到定义的主机名和端口号)。可使用如下命令启动TCP链接:

nc -lk port_number

最后,在第二个终端中键入文本,你将在另外一个终端中实时得到预测:

视频演示地址:https://cdn.analyticsvidhya.com/wp-content/uploads/2019/12/final_twitter_sentiment.mp4?_=1

结尾

流数据在将来几年会增长的愈来愈多,因此你应该开始熟悉这个话题。记住,数据科学不只仅是创建模型,还有一个完整的管道须要处理。

本文介绍了Spark流的基本原理以及如何在真实数据集上实现它。我鼓励你使用另外一个数据集或收集实时数据并实现咱们刚刚介绍的内容(你也能够尝试其余模型)。

原文连接:https://www.analyticsvidhya.com/blog/2019/12/streaming-data-pyspark-machine-learning-model/

欢迎关注磐创AI博客站:
http://panchuang.net/

sklearn机器学习中文官方文档:
http://sklearn123.com/

欢迎关注磐创博客资源汇总站:
http://docs.panchuang.net/

相关文章
相关标签/搜索