官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_time.html
翻译:https://www.jianshu.com/p/68ab40c7f347html
Window:Window是处理无界流的关键,Windows将流拆分为一个个有限大小的buckets
,能够能够在每个buckets
中进行计算java
start_time,end_time:当Window时时间窗口的时候,每一个window都会有一个开始时间和结束时间(前开后闭),这个时间是系统时间apache
event-time: 事件发生时间,是事件发生所在设备的当地时间,好比一个点击事件的时间发生时间,是用户点击操做所在的手机或电脑的时间json
Watermarks:能够把他理解为一个水位线,这个Watermarks在不断的变化,一旦Watermarks大于了某个window的end_time,就会触发此window的计算,Watermarks就是用来触发window计算的。api
什么是乱序呢?能够理解为数据到达的顺序和他的event-time排序不一致。致使这的缘由有不少,好比延迟,消息积压,重试等等ide
由于Watermarks是用来触发window窗口计算的,咱们能够根据事件的event-time,计算出Watermarks,而且设置一些延迟,给迟到的数据一些机会。测试
假如咱们设置10s的时间窗口(window),那么0~10s,10~20s都是一个窗口,以0~10s为例,0位start-time,10为end-time。假若有4个数据的event-time分别是8(A),12.5(B),9(C),13.5(D),咱们设置Watermarks为当前全部到达数据event-time的最大值减去延迟值3.5秒this
当A到达的时候,Watermarks为max{8}-3.5=8-3.5 = 4.5 < 10
,不会触发计算
当B到达的时候,Watermarks为max(12.8,5)-3.5=12.5-3.5 = 9 < 10
,不会触发计算
当C到达的时候,Watermarks为max(12.5,8,9)-3.5=12.5-3.5 = 9 < 10
,不会触发计算
当D到达的时候,Watermarks为max(13.5,12.5,8,9)-3.5=13.5-3.5 = 10 = 10
,触发计算
触发计算的时候,会将AC(由于他们都小于10)都计算进去spa
经过上面这种方式,咱们就将迟到的C计算进去了翻译
这里的延迟3.5s是咱们假设一个数据到达的时候,比他早3.5s的数据确定也都到达了,这个是须要根据经验推算的,加入D到达之后有到达了一个E,event-time=6,可是因为0~10的时间窗口已经开始计算了,因此E就丢了。
下面代码中的BoundedOutOfOrdernessGenerator就是一个典型的Watermarks实例
package xuwei.tech; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.meituan.flink.common.conf.FlinkConf; import com.meituan.flink.common.kafka.MTKafkaConsumer08; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; import java.util.Date; /** * Created by smile on 14/11/2017. 统计每 10 秒内每种操做有多少个 */ public class EventTimeWindowCount { private static final Logger logger = LoggerFactory.getLogger(EventTimeWindowCount.class); public static void main(String[] args) throws Exception { // 获取做业名 String jobName = FlinkConf.getJobName(args); // 获取执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置使用 EventTime // 做为时间戳(默认是 // ProcessingTime) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 开启 Checkpoint(每 10 秒保存一次检查点,模式为 Exactly Once) env.enableCheckpointing(10000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 设置从 Kafka 的 topic // "log.orderlog" 中读取数据 MYKafkaConsumer08 consumer = new MYKafkaConsumer08(jobName); DataStream<String> stream = env.addSource(consumer.getInstance("log.orderlog", new SimpleStringSchema())); // 默认接上次开始消费,如下的写法(setStartFromLatest)能够从最新开始消费,相应的还有(setStartFromEarliest // 从最旧开始消费) // DataStream<String> stream = // env.addSource(consumer.getInstance("log.orderlog", new // SimpleStringSchema()).setStartFromLatest()); DataStream<String> orderAmount = // 将读入的字符串转化为 OrderRecord 对象 stream.map(new ParseOrderRecord()) // 设置从 OrderRecord 对象中提取时间戳的方式,下文 BoundedOutOfOrdernessGenerator // 类中具体实现该方法 .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator()) // 用 OrderRecord 对象的 action // 字段进行分流(相同 action // 的进入相同流,不一样 action // 的进入不一样流) .keyBy("action") // 触发 10s 的滚动窗口,即每十秒的数据进入同一个窗口 .window(TumblingEventTimeWindows.of(Time.seconds(10))) // 将同一窗口的每一个 OrderRecord 对象的 count // 字段加起来(其他字段只保留第一个进入该窗口的,后进去的丢弃) .sum("count") // 将结果从 OrderRecord 对象转换为 String,每十万条输出一条 .flatMap(new ParseResult()); // 若是想每条都输出来,那就输得慢一点,每 10 秒输出一条数据(请将上一行的 flatMap 换成下一行的 map) // .map(new ParseResultSleep()); // 输出结果(而后就能够去 Task Manage 的 Stdout 里面看) // 小数据量测试的时候能够这么写,正式上线的时候不要这么写!数据量大建议仍是写到 Kafka Topic 或者其余的下游里面去 orderAmount.print(); env.execute(jobName); } public static class ParseOrderRecord implements MapFunction<String, OrderRecord> { @Override public OrderRecord map(String s) throws Exception { JSONObject jsonObject = JSON.parseObject(s); long id = jsonObject.getLong("id"); int dealId = jsonObject.getInteger("dealid"); String action = jsonObject.getString("_mt_action"); double amount = jsonObject.getDouble("amount"); String timestampString = jsonObject.getString("_mt_datetime"); // 将字符串格式的时间戳解析为 long 类型,单位毫秒 SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date timestampDate = simpleDateFormat.parse(timestampString); long timestamp = timestampDate.getTime(); return new OrderRecord(id, dealId, action, amount, timestamp); } } public static class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<OrderRecord> { private final long maxOutOfOrderness = 3500; // 3.5 seconds private long currentMaxTimestamp; @Override public long extractTimestamp(OrderRecord record, long previousElementTimestamp) { // 将数据中的时间戳字段(long 类型,精确到毫秒)赋给 // timestamp 变量,此处是 // OrderRecord 的 timestamp // 字段 long timestamp = record.timestamp; currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return timestamp; } @Override public Watermark getCurrentWatermark() { // return the watermark as current highest timestamp minus the // out-of-orderness bound return new Watermark(currentMaxTimestamp - maxOutOfOrderness); } } public static class ParseResult implements FlatMapFunction<OrderRecord, String> { private static long msgCount = 0; @Override public void flatMap(OrderRecord record, Collector<String> out) throws Exception { // 每十万条输出一条,防止输出太多在 Task // Manage 的 Stdout 里面刷新不出来 if (msgCount == 0) { out.collect("Start from: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(record.timestamp) + " action: " + record.action + " count = " + record.count); msgCount = 0; } msgCount++; msgCount %= 100000; } } public static class ParseResultSleep implements MapFunction<OrderRecord, String> { @Override public String map(OrderRecord record) throws Exception { // 每 10 秒输出一条数据,防止输出太多在 Task Manage 的 Stdout 里面刷新不出来 // 正式上线的时候不要这么写! Thread.sleep(10000); return "Start from: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(record.timestamp) + " action: " + record.action + " count = " + record.count; } } public static class OrderRecord { public long id; public int dealId; public String action; public double amount; public long timestamp; public long count; public OrderRecord() { } public OrderRecord(long id, int dealId, String action, double amount, long timestamp) { this.id = id; this.dealId = dealId; this.action = action; this.amount = amount; this.timestamp = timestamp; this.count = 1; } } }