假定需求以下,每间隔1分钟,统计过去5分钟的UV,PV。很容易想到,经过数据库的count,以及count distinct能够得出正确结果。在大数据量下,传统数据库或者HADOOP(hbase...)的count效率都不高。若是数据是增量的,那么流式计算每每能提供更高的吞吐和更低的延时。html
接下来经过使用Flink实现这个功能,并借这个案例描述一些Flink的基本概念。若是对其余流式计算框架有所了解,能够发现许多东西是互通的。java
很容易理解,在这个案例中,咱们须要在内存中缓存5分钟的数据,时间往前推移到一分钟的时候,统计一次,而且清理数据。数据库
Flink提供了多种窗口,能够按需选择。apache
考虑到网络的延迟和数据的乱序,不能简单的使用Flink的系统时间作统计。例如14:25分的数据可能在14:27分才到系统中,若是直接按Flink系统时间,即会影响14:20~14:25这段时间的计算结果,同时也会影响14:25~14:30的计算结果。windows
在Flink中,有如下三种时间特征,查看详细说明:缓存
在统计PV UV时,咱们须要根据用户访问的时间,因此使用Event Time。网络
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
接着,咱们须要告知Flink记录的真实时间timestamp,以及触发window计算的watermark。在Flink中经过实现接口AssignerWithPeriodicWatermarks来完成。app
考虑到数据可能乱序,选择BoundedOutOfOrdernessTimestampExtractor:框架
long MAX_EVENT_DELAY = 3500; BoundedOutOfOrdernessTimestampExtractor<String> assigner = new BoundedOutOfOrdernessTimestampExtractor<String>(Time.milliseconds(MAX_EVENT_DELAY)) { @Override public long extractTimestamp(String element) { VisitEvent visitEvent = null; try { visitEvent = objectMapper.readValue(element, VisitEvent.class); return visitEvent.getVisitTime(); } catch (IOException e) { e.printStackTrace(); } return Instant.now().toEpochMilli(); } };
上面的一些代码主要用来作时间的处理,真实的计算经过window来完成,代码以下。ide
int[] arr = {0,2}; FlinkKafkaConsumerBase<String> consumerWithEventTime = myConsumer.assignTimestampsAndWatermarks(assigner); TypeInformation<Tuple3<String, VisitEvent, String>> typeInformation = TypeInformation.of(new TypeHint<Tuple3<String, VisitEvent, String>>() {}); DataStreamSource<String> dataStreamByEventTime = env.addSource(consumerWithEventTime); SingleOutputStreamOperator<UrlVisitBy> uvCounter = dataStreamByEventTime .map(str->objectMapper.readValue(str,VisitEvent.class)) .map(visitEvent-> new Tuple3<>(visitEvent.getVisitUrl(), visitEvent,visitEvent.getVisitUserId())) .returns(typeInformation) .keyBy(arr) .window(SlidingProcessingTimeWindows.of(Time.minutes(5), Time.minutes(1),Time.hours(-8))) .allowedLateness(Time.minutes(1)) .process(new ProcessWindowFunction<Tuple3<String, VisitEvent, String>, UrlVisitBy, Tuple, TimeWindow>() { @Override public void process(Tuple tuple, Context context, Iterable<Tuple3<String, VisitEvent, String>> elements, Collector<UrlVisitBy> out) throws Exception { long count = 0; Tuple2<String,String> tuple2 = null; if (tuple instanceof Tuple2){ tuple2 = (Tuple2) tuple; } for (Tuple3<String, VisitEvent, String> element : elements) { count++; }; TimeWindow window = context.window(); out.collect(new UrlVisitBy(window.getStart(),window.getEnd(),tuple2.f0,count,tuple2.f1)); } }); uvCounter.print();
因为JDK默认的编译器在编译过程当中会擦除泛型信息,这样Flink在执行的时候没法获取足够的信息来推断真实类型,那么可能会碰到这样的错误“The generic type parameters of 'XXX' are missing”。
如今只有Eclipse JDT compiler在编译后能够保留足够的信息,可是它限制了开发者只能使用Eclipse编译以及调试。另外因为兼容性问题,Eclipse对Flink的支持并不友好。官方推荐使用Intelij idea。
为了摆脱编译器的限制,Flink采用了TypeInfomation告知Flink真实类型。