getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最经常使用的一种建立执行环境的方式。php
返回本地执行环境,须要在调用时指定默认的并行度。java
val env = StreamExecutionEnvironment.createLocalEnvironment(1) //parallelism
返回集群执行环境,将Jar提交到远程服务器。须要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。mysql
//hostname port jarFiles val env = ExecutionEnvironment.createRemoteEnvironment(host, port,"/flink/wc.jar")
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.7.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.7.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.12</artifactId> <version>1.7.0</version> </dependency> </dependencies> <build> <plugins> <!-- 该插件用于将Scala代码编译成class文件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.4.6</version> <executions> <execution> <!-- 声明绑定到maven的compile阶段 --> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
//文件中读取 val fileDs = env.readTextFile("in/tbStock.txt") //端口读取 val socketDs = env.socketTextStream("localhost",777) //集合中获取 val collectDs = env.fromCollection(List("aaa","bbb","ccc","aaa"))
//kafka配置文件 val properties = new Properties() properties.setProperty("bootstrap.servers", "hadoop102:9092") properties.setProperty("group.id", "consumer-group") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest") //接收kafka的topic-demo这个topic发来的数据 val kafkaDataStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer011[String]("topic-demo", new SimpleStringSchema(), properties))
可参考: https://www.aboutyun.com/forum.php?mod=viewthread&tid=27395linux
Flink经过checkpoint来保存数据是否处理完成的状态redis
由JobManager协调各个TaskManager进行checkpoint存储,checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也能够改成文件级的进行持久化保存。sql
执行过程其实是一个两段式提交,每一个算子执行完成,会进行“预提交”,直到执行完sink操做,会发起“确认提交”,若是执行失败,预提交会放弃掉。apache
若是宕机须要经过StateBackend进行恢复,只能恢复全部确认提交的操做。json
env.addSource(new MySource) //自定义source class MySource extends SourceFunction[(String,Double)] { //flag: 表示数据源是否还在正常运行 var running: Boolean = true override def cancel(): Unit = { running = false } override def run(ctx: SourceFunction.SourceContext[(String,Double)]): Unit = { //初始化一个随机数发生器 val rand = new Random() var curTemp = 1.to(10).map( i => ("item_" + i, 65 + rand.nextGaussian() * 20) ) while (running) { curTemp.foreach( t => ctx.collect(t) ) Thread.sleep(5000) //每5秒钟产生一组数据 } } }
//map val streamMap = stream.map { x => x * 2 } //flatmap val streamFlatMap = stream.flatMap{ x => x.split(" ") } //filter val streamFilter = stream.filter{ x => x == 1 }
keyBy(DataStream → KeyedStream):输入必须是Tuple类型,逻辑地将一个流拆分红不相交的分区,每一个分区包含具备相同key的元素,在内部以hash的形式实现的。bootstrap
reduce(KeyedStream → DataStream):一个分组数据流的聚合操做,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。服务器
val keyedStream: KeyedStream[(String, Int), Tuple] = startUplogDstream.map(startuplog=>(startuplog.ch,1)).keyBy(0) //reduce //sum keyedStream.reduce{ (ch1,ch2)=> (ch1._1,ch1._2+ch2._2) }.print()
split(DataStream → SplitStream):根据某些特征把一个DataStream拆分红两个或者多个DataStream。
select(SplitStream→DataStream):从一个SplitStream中获取一个或者多个DataStream。
//根据Item的id进行拆分 val splitStream:SplitStream[Item] = dStream.split { item => List(item.id) } //获取标记为item_1的数据集 splitStream.select("item_1").print()
connect(DataStream,DataStream → ConnectedStreams):链接两个保持他们类型的数据流,两个数据流被Connect以后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
CoMap,CoFlatMap(ConnectedStreams → DataStream):做用于ConnectedStreams上,功能与map和flatMap同样,对ConnectedStreams中的每个Stream分别进行map和flatMap处理。
val connStream: ConnectedStreams[StartUpLog, StartUpLog] = appStoreStream.connect(otherStream) val allStream: DataStream[String] = connStream.map( (log1: StartUpLog) => log1.ch, (log2: StartUpLog) => log2.ch )
DataStream → DataStream:对两个或者两个以上的DataStream进行union操做,产生一个包含全部DataStream元素的新DataStream。注意:若是你将一个DataStream跟它本身作union操做,在新的DataStream中,你将看到每个元素都出现两次。
val unionStream: DataStream[StartUpLog] = appStoreStream.union(otherStream) unionStream.print("union:::")
1)Union以前两个流的类型必须是同样,Connect能够不同,在以后的coMap中再去调整成为同样的。
2)Connect只能操做两个流,Union能够操做多个
Flink暴露了全部udf函数的接口(实现方式为接口或者抽象类)。例如:MapFunction, FilterFunction, ProcessFunction 等等。
val flinkTweets = tweets.filter(new FlinkFilter)
//自定义filter类 class FlinkFilter extends FilterFunction[String] { override def filter(value: String): Boolean = { value.contains("flink") } }
val flinkTweets = tweets.filter(_.contains("flink"))
富函数是 DataStream API 提供的一个函数类的接口,全部 Flink 函数类都有其 Rich 版本。它与常规函数的不一样在于,能够获取运行环境的上下文,并拥有一些生命周期方法,因此能够实现更复杂的功能。
open()方法是 rich function 的初始化方法,当一个算子例如map或者filter被调用以前open()会被调用。
close()方法是生命周期中的最后一个调用的方法,作一些清理工做。
getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息,例如函数执行的并行度,任务的名字,以及 state 状态。
Flink 没有相似于spark中foreach方法,让用户进行迭代的操做。虽有对外的输出操做都要利用Sink完成。最后经过相似以下方式完成整个任务最终输出操做。
dstream.addSink(new FlinkKafkaProducer011[String]("linux01:9092","test", new SimpleStringSchema()))
<dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency>
val config = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(6379).build() resultDStream.addSink(new RedisSink[Item](config,new MyRedisMapper)) //定义redisMapper class MyRedisMapper extends RedisMapper[Item] { override def getCommandDescription: RedisCommandDescription = { new RedisCommandDescription(RedisCommand.HSET,"item_test") //hkey } override def getKeyFromData(data: Item): String = data.id override def getValueFromData(data: Item): String = data.toString }
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>1.7.2</version> </dependency>
//定义es的host集合 val list = new util.ArrayList[HttpHost]() list.add(new HttpHost("linux01", 9200)) //定义esBuilder val esBuilder = new ElasticsearchSink.Builder[Item](list,new ElasticsearchSinkFunction[Item] { override def process(element: Item, ctx: RuntimeContext, indexer: RequestIndexer): Unit = { //定义es数据存储方式和存储值 val json = new util.HashMap[String, String]() json.put("data", element.toString) //定义存储索引 type 和数据源 val indexRequest = Requests.indexRequest().index("indexName").`type`("_doc").source(json) indexer.add(indexRequest) } }) resultDStream.addSink(esBuilder.build())
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.44</version> </dependency>
resultDStream.addSink(new MyJDBCSink) //自定义jdbcsink class MyJDBCSink extends RichSinkFunction[Sensor]{ var conn: Connection = _ var insertStmt: PreparedStatement = _ var updateStmt: PreparedStatement = _ //open 简历链接 override def open(parameters: Configuration): Unit = { conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456") insertStmt = conn.prepareStatement("INSERT INTO item_test (id, num) VALUES (?, ?)") updateStmt = conn.prepareStatement("UPDATE item_test SET num = ? WHERE id = ?") } //调用执行 override def invoke(value: Sensor, context: SinkFunction.Context[_]): Unit = { updateStmt.setDouble(1, value.temp) updateStmt.setString(2, value.id) updateStmt.execute() if (updateStmt.getUpdateCount == 0) { insertStmt.setString(1, value.id) insertStmt.setDouble(2, value.temp) insertStmt.execute() } } //关闭资源 override def close(): Unit = { insertStmt.close() updateStmt.close() conn.close() } }