Flink 实战 : 统计网站PV,UV

Flink 实战:统计网站PV,UV

PV,UV

  • PV(Page View) : 页面点击次数
  • UV(User View): 独立用户访问次数

假定需求以下,每间隔1分钟,统计过去5分钟的UV,PV。很容易想到,经过数据库的count,以及count distinct能够得出正确结果。在大数据量下,传统数据库或者HADOOP(hbase...)的count效率都不高。若是数据是增量的,那么流式计算每每能提供更高的吞吐和更低的延时。html

接下来经过使用Flink实现这个功能,并借这个案例描述一些Flink的基本概念。若是对其余流式计算框架有所了解,能够发现许多东西是互通的。java

Window

很容易理解,在这个案例中,咱们须要在内存中缓存5分钟的数据,时间往前推移到一分钟的时候,统计一次,而且清理数据。数据库

Flink提供了多种窗口,能够按需选择。apache

Event Time

考虑到网络的延迟和数据的乱序,不能简单的使用Flink的系统时间作统计。例如14:25分的数据可能在14:27分才到系统中,若是直接按Flink系统时间,即会影响14:20~14:25这段时间的计算结果,同时也会影响14:25~14:30的计算结果。windows

在Flink中,有如下三种时间特征,查看详细说明缓存

  • Processing time:Operator处理数据的时间。
  • Event time : 事件发生时间。
  • Ingestion time:被Flink摄入的时间。

在统计PV UV时,咱们须要根据用户访问的时间,因此使用Event Time。网络

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

接着,咱们须要告知Flink记录的真实时间timestamp,以及触发window计算的watermark。在Flink中经过实现接口AssignerWithPeriodicWatermarks来完成。app

考虑到数据可能乱序,选择BoundedOutOfOrdernessTimestampExtractor:框架

long MAX_EVENT_DELAY = 3500;
      BoundedOutOfOrdernessTimestampExtractor<String> assigner = new BoundedOutOfOrdernessTimestampExtractor<String>(Time.milliseconds(MAX_EVENT_DELAY)) {
            @Override
            public long extractTimestamp(String element) {
                VisitEvent visitEvent = null;
                try {
                    visitEvent = objectMapper.readValue(element, VisitEvent.class);
                    return visitEvent.getVisitTime();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                return Instant.now().toEpochMilli();
            }
        };

上面的一些代码主要用来作时间的处理,真实的计算经过window来完成,代码以下。ide

int[] arr = {0,2};
        FlinkKafkaConsumerBase<String> consumerWithEventTime = myConsumer.assignTimestampsAndWatermarks(assigner);
        TypeInformation<Tuple3<String, VisitEvent, String>> typeInformation = TypeInformation.of(new TypeHint<Tuple3<String, VisitEvent, String>>() {});
        DataStreamSource<String> dataStreamByEventTime = env.addSource(consumerWithEventTime);
        SingleOutputStreamOperator<UrlVisitBy> uvCounter = dataStreamByEventTime
                .map(str->objectMapper.readValue(str,VisitEvent.class))
                .map(visitEvent-> new Tuple3<>(visitEvent.getVisitUrl(), visitEvent,visitEvent.getVisitUserId()))
                .returns(typeInformation)
                .keyBy(arr)
                .window(SlidingProcessingTimeWindows.of(Time.minutes(5), Time.minutes(1),Time.hours(-8)))
                .allowedLateness(Time.minutes(1))
                .process(new ProcessWindowFunction<Tuple3<String, VisitEvent, String>, UrlVisitBy, Tuple, TimeWindow>() {
                    @Override
                    public void process(Tuple tuple, Context context, Iterable<Tuple3<String, VisitEvent, String>> elements, Collector<UrlVisitBy> out) throws Exception {
                        long count = 0;
                        Tuple2<String,String> tuple2 = null;
                        if (tuple instanceof Tuple2){
                            tuple2 = (Tuple2) tuple;
                        }
                        for (Tuple3<String, VisitEvent, String> element : elements) {
                            count++;
                        };
                        TimeWindow window = context.window();
                        out.collect(new UrlVisitBy(window.getStart(),window.getEnd(),tuple2.f0,count,tuple2.f1));
                    }
                });
        uvCounter.print();

建议使用returns(typeInformation)

因为JDK默认的编译器在编译过程当中会擦除泛型信息,这样Flink在执行的时候没法获取足够的信息来推断真实类型,那么可能会碰到这样的错误“The generic type parameters of 'XXX' are missing”。

如今只有Eclipse JDT compiler在编译后能够保留足够的信息,可是它限制了开发者只能使用Eclipse编译以及调试。另外因为兼容性问题,Eclipse对Flink的支持并不友好。官方推荐使用Intelij idea。

为了摆脱编译器的限制,Flink采用了TypeInfomation告知Flink真实类型。

相关文章
相关标签/搜索