[TOC]java
TIPS
1.在使用map等方法时,传入的方法可以使用匿名实现的方式,经过open,close,getRuntimeContext等方法实现更复杂的需求(好比自定义计数器、累加器、广播变量等)
package cn.lang.flink.demo import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} object FlinkWordCount { def main(args: Array[String]): Unit = { // env val env = ExecutionEnvironment.getExecutionEnvironment val input = "/Users/lang/Ideaproject/Github/LangPersonalProject" + "/flink/src/main/resources/cn/lang/flink/demo/wordcount.txt" /** source */ val raw_data: DataSet[String] = env.readTextFile(input) //从本地文件中读取数据,每个iterm就是一行数据 /** Exception: ------------------------------------------------------ ** * flink could not find implicit value for evidence parameter of type ** * 这是由于map方法中有[R: TypeInformation]须要定义,推荐直接导入的方式解决 ** */ import org.apache.flink.api.scala._ /** transformation */ val result: AggregateDataSet[(String, Int)] = raw_data .flatMap(_.split(" ")) // 对每行数据按照空格进行分割 .map((_, 1)) // 对每个单词进行计数 .groupBy(0) // 按照索引位在0号的单词进行分组 .sum(1) // 按照索引位在1号的单词数量进行加和 /** sink */ result.print() } }
package cn.lang.flink.demo import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object FlinkStreamWordCount { def main(args: Array[String]): Unit = { // 定义监听的地址和端口 val host = "localhost" val port = 9999 // env val env = StreamExecutionEnvironment.getExecutionEnvironment /** source */ // 监听端口,可在命令行执行 {nc -lk 9999} 输入数据 val raw_stream: DataStream[String] = env.socketTextStream(host, port) // avoid could not find the implicit import org.apache.flink.api.scala._ /** transformation */ val result: DataStream[(String, Int)] = raw_stream .flatMap(_.split(" ")) // 对每行数据按照空格进行分割 .map((_, 1)) // 对每个单词进行计数 .keyBy(0) // 按照索引位在0号的单词进行分组,这里注意和DataSet API的区别 .sum(1) // 按照索引位在1号的单词数量进行加和 /** sink */ result.print() // 连续输入4个flink单词以后的打印状况 // 13> (flink,1) // 13> (flink,2) // 13> (flink,3) // 13> (flink,4) 这里的结果会一直累加,注意和后面窗口函数的区别 // execute the stream work env.execute("FlinkStreamWordCount") } }
package cn.lang.flink.demo import java.util.Properties import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 import org.apache.flink.streaming.util.serialization.SimpleStringSchema object KafkaFlinkStreamWordCount { def main(args: Array[String]): Unit = { // environment val env = StreamExecutionEnvironment.getExecutionEnvironment // property 生产中能够读args也能够读配置文件 val ZOOKEEPER_HOST = "localhost:2181" val KAFKA_BROKER = "localhost:9092" val TRANSACTION_GROUP = "transaction" val IN_KAFKA_TOPIC = "first" // set kafka properties val kafkaProps = new Properties() kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST) kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKER) kafkaProps.setProperty("group.id", TRANSACTION_GROUP) kafkaProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") kafkaProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") kafkaProps.setProperty("auto.offset.reset", "latest") // source // 本地执行 启动zk和kafka,以及console的生产者 // /Users/langjiang/Software/zookeeper-3.4.10/bin/zkServer.sh start // /Users/langjiang/Software/kafka_2.11-2.1.0/bin/kafka-server-start.sh -daemon /Users/langjiang/Software/kafka_2.11-2.1.0/config/server.properties // /Users/langjiang/Software/kafka_2.11-2.1.0/bin/kafka-server-stop.sh val transaction: DataStream[String] = env.addSource( new FlinkKafkaConsumer011[String](IN_KAFKA_TOPIC, new SimpleStringSchema(), kafkaProps)) // transform val result= transaction .flatMap(_.split(" ")) // 对每行数据按照空格进行分割 .map((_, 1)) // 对每个单词进行计数 .keyBy(0) // 按照索引位在0号的单词进行分组,这里注意和DataSet API的区别 .sum(1) // 按照索引位在1号的单词数量进行加和 // sink1 // 将最终的结果写回到Kafka // val OUT_KAFKA_TOPIC = "second" // result.addSink(new FlinkKafkaProducer011[(String)](KAFKA_BROKER, OUT_KAFKA_TOPIC, new SimpleStringSchema())) // sink2 result.print() // 13> (flink,1) // 前面的数字表明的是计算的core // 13> (flink,2) // 13> (flink,3) // 13> (flink,4) // 13> (flink,5) // 1> (spark,1) env.execute("KafkaFlinkStreamWordCount") } }
package cn.lang.flink.demo import java.util.Properties import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011} import org.apache.flink.streaming.util.serialization.SimpleStringSchema object KafkaFlinkStreamTimeWindow { def main(args: Array[String]): Unit = { // environment val env = StreamExecutionEnvironment.getExecutionEnvironment // property val ZOOKEEPER_HOST = "localhost:2181" val KAFKA_BROKER = "localhost:9092" val TRANSACTION_GROUP = "transaction" val IN_KAFKA_TOPIC = "first" // set val kafkaProps = new Properties() kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST) kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKER) kafkaProps.setProperty("group.id", TRANSACTION_GROUP) kafkaProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") kafkaProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") kafkaProps.setProperty("auto.offset.reset", "latest") // source val transaction: DataStream[String] = env.addSource( new FlinkKafkaConsumer011[String](IN_KAFKA_TOPIC, new SimpleStringSchema(), kafkaProps)) /** transform1 */ // 每10s内上报正常和异常日志的条数 // val result = transaction // .map(iterm => (iterm.contains("error"), 1)) // 若是包含某一条带有error的日志,那么将添加统计 // .keyBy(0) // 按照索引位为0的是否正常日志进行分组,分别统计正常与否的条数 // .timeWindow(Time.seconds(10)) // 设定窗口10s,这里只有一个参数,启动的是滚动窗口 // // 启动的是return window(TumblingProcessingTimeWindows.of(size)) // .reduce((history, add) => (history._1, history._2 + add._2)) // 能够理解为累加器的update // 每过10s打印一次 // 1> (false,2) : 1588391750005 // 10> (true,1) : 1588391760003 // 10> (true,4) : 1588391770002 // 10> (true,1) : 1588391780000 /** transform2 */ // 最近15s内上报日志异常条数 val result: DataStream[(String, Int)] = transaction .map(iterm => (iterm, 1)) // 若是包含某一条带有error的日志,那么将添加统计 .filter(_._1.contains("error")) // 过滤正常的日志 .keyBy(0) // 按照索引位为0号的日志自己以及是否正常进行分组 .timeWindow(Time.seconds(15), Time.seconds(5)) // 窗口设置为15s,滑动步长设置为5s .reduce((history, add) => (history._1, history._2 + add._2)) // 核心是理解reduce方法须要传入的是什么 // 每过5s打印一次 // 14> (error,1) : 1588392335002 // 14> (error,3) : 1588392340001 // 14> (error,3) : 1588392345004 // 14> (error,2) : 1588392350003 // sink result.map(iterm => iterm.toString() + " : " + System.currentTimeMillis()).print() env.execute("KafkaFlinkStreamTimeWindow") } }
package cn.lang.flink.demo import java.text.SimpleDateFormat import java.util.Properties import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 import org.apache.flink.streaming.util.serialization.SimpleStringSchema import scala.collection.mutable /** * watermark * watermark = maxEventTime - delayTimeInternal(different within every record) * invoke window calculate while watermark > timeWindowDeadline **/ object KafkaFlinkStreamEventTime { // 返回的是13位时间戳,精确度到毫秒 def dataToTimestamp(date: String): Long = { val sdf = new SimpleDateFormat("yyyy年MM月dd日HH:mm:ss") sdf.parse(date).getTime } def main(args: Array[String]): Unit = { // environment val env = StreamExecutionEnvironment.getExecutionEnvironment // attention site imported from别导错包,这里是启用event time的机制 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // property val ZOOKEEPER_HOST = "localhost:2181" val KAFKA_BROKER = "localhost:9092" val TRANSACTION_GROUP = "transaction" val IN_KAFKA_TOPIC = "first" // set val kafkaProps = new Properties() kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST) kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKER) kafkaProps.setProperty("group.id", TRANSACTION_GROUP) kafkaProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") kafkaProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") kafkaProps.setProperty("auto.offset.reset", "latest") // source,这里输入的每一条数据格式:uid,2020年05月02日17:26:16 val transaction = env.addSource( new FlinkKafkaConsumer011[String](IN_KAFKA_TOPIC, new SimpleStringSchema(), kafkaProps)) .setParallelism(1) /** transform */ /* 转换需求是将同一个uid窗口内上报的全部数据 */ val result: DataStream[mutable.HashSet[Long]] = transaction .map(iterm => Event(iterm.split(",")(0), dataToTimestamp(iterm.split(",")(1)))) // 传入的每一条数据都是13位时间戳 .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[Event](Time.milliseconds(2000)) { override def extractTimestamp(element: Event): Long = element.timestamp }) .keyBy(_.uid) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .fold(new mutable.HashSet[Long]()) { case (set, iterm) => set += iterm.timestamp } // sink result.map(iterm => iterm.mkString(";") + " : " + System.currentTimeMillis()).print() env.execute("KafkaFlinkStreamEventTime") } } // 这是业务中数据按照对应格式封装的样例类,其中包含有event time case class Event(uid: String, timestamp: Long) // 这里上报时间戳默认是10位 // 水印机制的类,也能够使用BoundedOutOfOrdernessTimestampExtractor匿名实现类 class MyBoundedOutOfOrdernessTimestampExtractor(delayInterval: Long) extends AssignerWithPeriodicWatermarks[Event] { // 上一个发送的水印值(也就是上一个触发窗口时的水印值) var lastEmittedWatermark: Long = 0L // 当前进入全部数据中最大的event time和上一次发送水印值的差值 var maxOutOfOrderness: Long = delayInterval // 当前进入全部数据中最大的event time var currentMaxTimestamp: Long = lastEmittedWatermark + this.maxOutOfOrderness // 获取当前的水印 override def getCurrentWatermark: Watermark = { val tmp = this.currentMaxTimestamp - this.maxOutOfOrderness if (tmp >= lastEmittedWatermark) { lastEmittedWatermark = tmp } new Watermark(lastEmittedWatermark) } // 从数据样例类中抽取时间戳 override def extractTimestamp(element: Event, previousElementTimestamp: Long): Long = { val tmp = element.timestamp if (tmp > currentMaxTimestamp) { currentMaxTimestamp = tmp } tmp } }