Spark Streaming链接TCP Socket

1.Spark Streaming是什么

Spark Streaming是在Spark上创建的可扩展的高吞吐量实时处理流数据的框架,数据能够是来自多种不一样的源,例如kafka,Flume,Twitter,ZeroMQ或者TCP Socket等。在这个框架下,支持对流数据的各类运算,好比map,reduce,join等。处理事后的数据能够存储到文件系统或数据库。算法

利用Spark Streaming,你可使用与批量加载数据相同的API来建立数据管道,并经过数据管道处理流式数据。此外,Spark Steaming的“micro-batching”方式提供至关好的弹性来应对某些缘由形成的任务失败。数据库

2. Spark Streaming的基本原理

Spark Streaming对数据的处理方式主要采用的方法是对Stream数据进行时间切片,分红小的数据片断,经过相似批处理的方式处理数据片断。框架

Spark Streaming把实时输入数据流以时间片Δt (如1秒)为单位切分红块。Spark Streaming会把每块数据做为一个RDD,并使用RDD操做处理每一小块数据。socket

Spark Streaming将流式计算分解成一系列短小的批处理做业。Spark Streaming的输入数据分红一段一段的数据(DStreaming),每一段数据都转换成Spark中的RDD,而后将Spark Streaming中对DStream的操做变为针对Spark中对RDD的操做,将RDD通过操做变成中间结果保存在内存中。分布式

3. DStream

上面提到了DStreaming,那么DStreaming究竟是什么呢:函数

DStreaming至关于在Streaming的框架下对RDD进行封装,表示的是咱们处理的一个实时数据流。相似于RDD,DStream提供了转换操做,窗口转换操做和输出操做三种操做方法。spa

4.Spark Streaming的优点

Spark Streaming是一种构建在Spark上的实时计算框架,它扩展了Spark处理大规模流式数据的能力。线程

实时性:能运行在100+的结点上,并达到秒级延迟。Spark Streaming将流式计算分解成多个Spark Job,对于每一段数据的处理都会通过Spark的任务集的调度过程。其最小的Batch Size的选取在0.5~2秒钟之间(Storm目前最小的延迟是100ms左右),因此Spark Streaming可以知足除对实时性要求很是高的全部流式准实时计算场景。code

高效和容错的特性:对于流式计算来讲,容错性相当重要。在spark中每个RDD都是一个不可变的分布式可重算的数据集,其记录着肯定性的操做,只要输入数据是可容错的,那么任意一个RDD的分区出错或不可用,都是能够利用原始输入数据经过转换操做而从新算出的。而spark Streaming使用基于内存的Spark做为执行引擎, 其容错性天然很好。orm

吞吐量:Spark Streaming能集成Spark的批处理和交互查询,其吞吐量比Storm至少高2~5倍。而且它为实现复杂的算法提供了和批处理相似的简单接口。

 

接下来用Spark  Streaming链接TCP Socket来讲明如何使用Spark  Streaming:

1 建立StreamingContext对象

首先使用StreamingContext模块,这个模块的做用是提供全部的流数据处理的功能:

1 from pyspark import SparkContext
2 from pyspark.streaming import StreamingContext
3 
4 sc = SparkContext("local[2]", "streamwordcount")
5 # 建立本地的SparkContext对象,包含2个执行线程
6 
7 ssc = StreamingContext(sc, 2)
8 # 建立本地的StreamingContext对象,处理的时间片间隔时间,设置为2s

2 建立DStream对象

咱们须要链接一个打开的 TCP 服务端口,从而获取流数据,这里使用的源是TCP Socket,因此使用socketTextStream()函数:

lines = ssc.socketTextStream("localhost", 8888)
# 建立DStream,指明数据源为socket:来自localhost本机的8888端口

3 对DStream进行操做

咱们开始对lines进行处理,首先对当前2秒内获取的数据进行分割并执行标准的MapReduce流程计算。

words = lines.flatMap(lambda line: line.split(" "))
# 使用flatMap和Split对2秒内收到的字符串进行分割

获得的words是一系列的单词,再执行下面的操做:

pairs = words.map(lambda word: (word, 1))
# map操做将独立的单词映射到(word,1)元组

wordCounts = pairs.reduceByKey(lambda x, y: x + y)
# reduceByKey操做对pairs执行reduce操做得到(单词,词频)元组

5 输出数据

将处理后的数据输出到一个文件中:

outputFile = "/home/feige/streaming/ss"
# 输出文件夹的前缀,Spark Streaming会自动使用当前时间戳来生成不一样的文件夹名称

wordCounts.saveAsTextFiles(outputFile)
# 将结果输出

6 启动应用

要使程序在Spark Streaming上运行起来,须要执行Spark Streaming启动的流程,调用start()函数启动,awaitTermination()函数等待处理结束的信号。

ssc.start() 
# 启动Spark Streaming应用
ssc.awaitTermination()

打开终端执行:

nc -lk 8888

nc的-l参数表示建立一个监听端口,等待新的链接。-k参数表示当前链接结束后仍然保持监听,必须与-l参数同时使用。

执行完上面的命令后不关闭终端,咱们将在这个终端中输入一些处理的数据:

打开一个新的终端来执行咱们的Spark Streaming应用:

 这里是spark streaming执行的过程

如今咱们来看看程序执行的效果,程序每隔2秒扫描一次监控窗口输入的内容,咱们查看一下:

结束语:

最近压力比较大,杂事诸多,相信这段时间事后一切都会好起来的,加油!!!

相关文章
相关标签/搜索