「Flink」Flink中的时间类型

Flink中的时间类型和窗口是很是重要概念,是学习Flink必需要掌握的两个知识点。html

Flink中的时间类型

时间类型介绍

Flink流式处理中支持不一样类型的时间。分为如下几种:apache

  1. 处理时间
    • Flink程序执行对应操做的系统时间。全部基于时间的操做(例如:时间窗口)都将使用运行相应operator的系统时间。例如:每一个小时的处理时间窗口包括在系统时间范围内全部operator接收到的记录。例如:若是应用程序在09:15开始运行,则第一个滚动时间窗口将包括:09:15 – 10:00 之间的处理事件,下一个窗口包括上午10:00 – 11:00之间的处理事件
    • 这种处理时间方式实时性是最好的,但数据未必准确
  2. 事件时间
    • 每一个事件发生的时间。这个时间通常是在进入到Flink以前就包含在事件中
    • 针对Eventtime,事件被处理的时间以来与事件自己
    • Eventtime必需要指定如何生成Eventtime Watermark(水印)
    • 理想状况,无论事件什么时候到达或者顺序如何,事件时间处理可以获得完整一致地结果。
    • 事件处理在等待乱序事件时,会产生一些延迟。这样会对Eventtime的应用性能有必定的影响
  3. 摄入时间
    • 摄入时间是事件进入Flink的时间
    • 在source operator中,每一个记录以时间戳的形式获取源的当前时间
    • 它在概念是处于事件时间和处理时间中间
    • 摄入时间不能处理乱序问题或者延迟数据,摄入时间能够由流式系统自动生成水印

Flink支持的这几种时间恰好和咱们上一篇播客中的内容相对应。app

http://www.javashuo.com/article/p-hcvvzljd-gh.htmlide

应用一张Flink官网的图。oop

image

Flink代码中设置时间类型

一般,咱们在Flink初始化流式运行环境时,就会设置流处理时间特性。这个设置很重要,它决定了数据流的行为方式。(例如:是否须要给事件分配时间戳),以及窗口操做应该使用什么样的时间类型。例如:KeyedStream.timeWindow(Time.seconds(30))。性能


咱们接下来经过实现一个每5秒中进行一次单词计数的案例,来讲明Flink中如何指定时间类型。学习

public class WordCountWindow {
    public static void main(String[] args) throws Exception {
        // 1. 初始化流式运行环境
        Configuration conf = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

        // 2. 设置时间处理类型,这里设置的方式处理时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        // 3. 定义数据源,每秒发送一个hadoop单词
        DataStreamSource<String> wordDS = env.addSource(new RichSourceFunction<String>() {

            private boolean isCanaled = false;

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                while (!isCanaled) {
                    ctx.collect("hadooop");
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
                isCanaled = true;
            }
        });

        // 4. 每5秒进行一次,分组统计
        // 4.1 转换为元组
        wordDS.map(word -> Tuple2.of(word, 1))
                // 指定返回类型
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                // 按照单词进行分组
                .keyBy(t -> t.f0)
                // 滚动窗口,3秒计算一次
                .timeWindow(Time.seconds(3))
                .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    }
                }, new RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
                    @Override
                    public void apply(String word, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {

                        // 打印窗口开始、结束时间
                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        System.out.println("窗口开始时间:" + sdf.format(window.getStart())
                                + " 窗口结束时间:" + sdf.format(window.getEnd())
                                + " 窗口计算时间:" + sdf.format(System.currentTimeMillis()));

                        int sum = 0;
                        Iterator<Tuple2<String, Integer>> iterator = input.iterator();
                        while(iterator.hasNext()) {
                            Integer count = iterator.next().f1;
                            sum += count;
                        }
                        out.collect(Tuple2.of(word, sum));
                    }
                }).print();

        env.execute("app");
    }
}

窗口开始时间:2020-02-05 00:22:21 窗口结束时间:2020-02-05 00:22:24 窗口计算时间:2020-02-05 00:22:24
4> (hadooop,2)
窗口开始时间:2020-02-05 00:22:24 窗口结束时间:2020-02-05 00:22:27 窗口计算时间:2020-02-05 00:22:27
4> (hadooop,3)
窗口开始时间:2020-02-05 00:22:27 窗口结束时间:2020-02-05 00:22:30 窗口计算时间:2020-02-05 00:22:30
4> (hadooop,3)
窗口开始时间:2020-02-05 00:22:30 窗口结束时间:2020-02-05 00:22:33 窗口计算时间:2020-02-05 00:22:33
4> (hadooop,3)
窗口开始时间:2020-02-05 00:22:33 窗口结束时间:2020-02-05 00:22:36 窗口计算时间:2020-02-05 00:22:36
4> (hadooop,3)
窗口开始时间:2020-02-05 00:22:36 窗口结束时间:2020-02-05 00:22:39 窗口计算时间:2020-02-05 00:22:39spa

咱们能够看到,这个滚动窗口,每3秒计算一次,是按照系统时间来计算的。code

咱们再把时间窗口设置为1分钟,再试试。orm

窗口开始时间:2020-02-05 00:27:00 窗口结束时间:2020-02-05 00:28:00 窗口计算时间:2020-02-05 00:28:00
4> (hadooop,32)

窗口开始时间:2020-02-05 00:28:00 窗口结束时间:2020-02-05 00:29:00 窗口计算时间:2020-02-05 00:29:00
4> (hadooop,60)

恰好在 00:27:00 – 00:28:00之间。


参考文件:

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_time.html

相关文章
相关标签/搜索