以上为Flink的运行模型,Flink的程序主要由三部分构成,分别为Source、Transformation、Sink。DataSource主要负责数据的读取,Transformation主要负责对属于的转换操做,Sink负责最终数据的输出。bash
每一个Flink程序都包含如下的若干流程:服务器
执行环境StreamExecutionEnvironment是全部Flink程序的基础。markdown
建立执行环境有三种方式,分别为:架构
StreamExecutionEnvironment.getExecutionEnvironment
StreamExecutionEnvironment.createLocalEnvironment
StreamExecutionEnvironment.createRemoteEnvironment
复制代码
建立一个执行环境,表示当前执行程序的上下文。 若是程序是独立调用的,则此方法返回本地执行环境;若是从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最经常使用的一种建立执行环境的方式。socket
val env = StreamExecutionEnvironment.getExecutionEnvironment
复制代码
返回本地执行环境,须要在调用时指定默认的并行度。oop
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
复制代码
返回集群执行环境,将Jar提交到远程服务器。须要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。spa
val env = StreamExecutionEnvironment.createRemoteEnvironment(1)
复制代码
一列一列的读取遵循TextInputFormat规范的文本文件,并将结果做为String返回。命令行
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.readTextFile("/opt/modules/test.txt") stream.print() env.execute("FirstJob") 复制代码
按照指定的文件格式读取文件。线程
val env = StreamExecutionEnvironment.getExecutionEnvironment val path = new Path("/opt/modules/test.txt") val stream = env.readFile(new TextInputFormat(path), "/opt/modules/test.txt") stream.print() env.execute("FirstJob") 复制代码
从Socket中读取信息,元素能够用分隔符分开。3d
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.socketTextStream("localhost", 11111) stream.print() env.execute("FirstJob") 复制代码
从集合中建立一个数据流,集合中全部元素的类型是一致的。
val env = StreamExecutionEnvironment.getExecutionEnvironment val list = List(1,2,3,4) val stream = env.fromCollection(list) stream.print() env.execute("FirstJob") 复制代码
从迭代(Iterator)中建立一个数据流,指定元素数据类型的类由iterator返回。
val env = StreamExecutionEnvironment.getExecutionEnvironment val iterator = Iterator(1,2,3,4) val stream = env.fromCollection(iterator) stream.print() env.execute("FirstJob") 复制代码
从一个给定的对象序列中建立一个数据流,全部的对象必须是相同类型的。
val env = StreamExecutionEnvironment.getExecutionEnvironment val list = List(1,2,3,4) val stream = env.fromElements(list) stream.print() env.execute("FirstJob") 复制代码
从给定的间隔中并行地产生一个数字序列。
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.generateSequence(1,10) stream.print() env.execute("FirstJob") 复制代码
Data Sink 消费DataStream中的数据,并将它们转发到文件、套接字、外部系统或者打印出。
Flink有许多封装在DataStream操做里的内置输出格式。
将元素以字符串形式逐行写入(TextOutputFormat),这些字符串经过调用每一个元素的toString()方法来获取。
将元组以逗号分隔写入文件中(CsvOutputFormat),行及字段之间的分隔是可配置的。每一个字段的值来自对象的toString()方法。
打印每一个元素的toString()方法的值到标准输出或者标准错误输出流中。或者也能够在输出流中添加一个前缀,这个能够帮助区分不一样的打印调用,若是并行度大于1,那么输出也会有一个标识由哪一个任务产生的标志。
自定义文件输出的方法和基类(FileOutputFormat),支持自定义对象到字节的转换。
根据SerializationSchema 将元素写入到socket中。
DataStream → DataStream:输入一个参数产生一个参数。
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.generateSequence(1,10) val streamMap = stream.map { x => x * 2 } streamMap.print() env.execute("FirstJob") 复制代码
注意:stream.print():每一行前面的数字表明这一行是哪个并行线程输出的。
DataStream → DataStream:输入一个参数,产生0个、1个或者多个输出。
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.readTextFile("test.txt") val streamFlatMap = stream.flatMap{ x => x.split(" ") } streamFilter.print() env.execute("FirstJob") 复制代码
DataStream → DataStream:结算每一个元素的布尔值,并返回布尔值为true的元素。下面这个例子是过滤出非0的元素:
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.generateSequence(1,10) val streamFilter = stream.filter{ x => x == 1 } streamFilter.print() env.execute("FirstJob") 复制代码
DataStream,DataStream → ConnectedStreams:链接两个保持他们类型的数据流,两个数据流被Connect以后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.readTextFile("test.txt") val streamMap = stream.flatMap(item => item.split(" ")).filter(item => item.equals("hadoop")) val streamCollect = env.fromCollection(List(1,2,3,4)) val streamConnect = streamMap.connect(streamCollect) streamConnect.map(item=>println(item), item=>println(item)) env.execute("FirstJob") 复制代码
ConnectedStreams → DataStream:做用于ConnectedStreams上,功能与map和flatMap同样,对ConnectedStreams中的每个Stream分别进行map和flatMap处理。
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream1 = env.readTextFile("test.txt") val streamFlatMap = stream1.flatMap(x => x.split(" ")) val stream2 = env.fromCollection(List(1,2,3,4)) val streamConnect = streamFlatMap.connect(stream2) val streamCoMap = streamConnect.map( (str) => str + "connect", (in) => in + 100 ) env.execute("FirstJob") 复制代码
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream1 = env.readTextFile("test.txt") val stream2 = env.readTextFile("test1.txt") val streamConnect = stream1.connect(stream2) val streamCoMap = streamConnect.flatMap( (str1) => str1.split(" "), (str2) => str2.split(" ") ) streamConnect.map(item=>println(item), item=>println(item)) env.execute("FirstJob") 复制代码
DataStream → SplitStream:根据某些特征把一个DataStream拆分红两个或者多个DataStream。
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.readTextFile("test.txt") val streamFlatMap = stream.flatMap(x => x.split(" ")) val streamSplit = streamFlatMap.split( num => # 字符串内容为hadoop的组成一个DataStream,其他的组成一个DataStream (num.equals("hadoop")) match{ case true => List("hadoop") case false => List("other") } ) env.execute("FirstJob") 复制代码
SplitStream→DataStream:从一个SplitStream中获取一个或者多个DataStream。
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.readTextFile("test.txt") val streamFlatMap = stream.flatMap(x => x.split(" ")) val streamSplit = streamFlatMap.split( num => (num.equals("hadoop")) match{ case true => List("hadoop") case false => List("other") } ) val hadoop = streamSplit.select("hadoop") val other = streamSplit.select("other") hadoop.print() env.execute("FirstJob") 复制代码
DataStream → DataStream:对两个或者两个以上的DataStream进行union操做,产生一个包含全部DataStream元素的新DataStream。注意:若是你将一个DataStream跟它本身作union操做,在新的DataStream中,你将看到每个元素都出现两次。
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream1 = env.readTextFile("test.txt") val streamFlatMap1 = stream1.flatMap(x => x.split(" ")) val stream2 = env.readTextFile("test1.txt") val streamFlatMap2 = stream2.flatMap(x => x.split(" ")) val streamConnect = streamFlatMap1.union(streamFlatMap2) env.execute("FirstJob") 复制代码
DataStream → KeyedStream:输入必须是Tuple类型,逻辑地将一个流拆分红不相交的分区,每一个分区包含具备相同key的元素,在内部以hash的形式实现的。
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.readTextFile("test.txt") val streamFlatMap = stream.flatMap{ x => x.split(" ") } val streamMap = streamFlatMap.map{ x => (x,1) } val streamKeyBy = streamMap.keyBy(0) env.execute("FirstJob") 复制代码
KeyedStream → DataStream:一个分组数据流的聚合操做,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.readTextFile("test.txt").flatMap(item => item.split(" ")).map(item => (item, 1)).keyBy(0) val streamReduce = stream.reduce( (item1, item2) => (item1._1, item1._2 + item2._2) ) streamReduce.print() env.execute("FirstJob") 复制代码
KeyedStream → DataStream:一个有初始值的分组数据流的滚动折叠操做,合并当前元素和前一次折叠操做的结果,并产生一个新的值,返回的流中包含每一次折叠的结果,而不是只返回最后一次折叠的最终结果。
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.readTextFile("test.txt").flatMap(item => item.split(" ")).map(item => (item, 1)).keyBy(0) val streamReduce = stream.fold(100)( (begin, item) => (begin + item._2) ) streamReduce.print() env.execute("FirstJob") 复制代码
KeyedStream → DataStream:分组数据流上的滚动聚合操做。min和minBy的区别是min返回的是一个最小值,而minBy返回的是其字段中包含最小值的元素(一样原理适用于max和maxBy),返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
keyedStream.sum(0) keyedStream.sum("key") keyedStream.min(0) keyedStream.min("key") keyedStream.max(0) keyedStream.max("key") keyedStream.minBy(0) keyedStream.minBy("key") keyedStream.maxBy(0) keyedStream.maxBy("key") val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.readTextFile("test02.txt").map(item => (item.split(" ")(0), item.split(" ")(1).toLong)).keyBy(0) val streamReduce = stream.sum(1) streamReduce.print() env.execute("FirstJob") 复制代码
在5.10以前的算子都是能够直接做用在Stream上的,由于他们不是聚合类型的操做,可是到5.10后你会发现,咱们虽然能够对一个无边界的流数据直接应用聚合算子,可是它会记录下每一次的聚合结果,这每每不是咱们想要的,其实,reduce、fold、aggregation这些聚合算子都是和Window配合使用的,只有配合Window,才能获得想要的结果。