最近在学Flink,准备用Flink搭建一个实时的推荐系统。找到一个好的网站(也算做是flink创始者的官方网站),上面有关于Flink的上手教程,用来练练手,熟悉熟悉,下文仅仅是个人笔记。html
网站New York City Taxi & Limousine Commission提供了关于纽约市从2009-1015年关于出租车驾驶的公共数据集。java
具体数据下载方法,可见# Taxi Data Streams,下载完数据后,不要解压缩。git
咱们的第一个数据集包含纽约市的出租车出行的信息,每一次出行包含两个事件:START和END,能够分别理解为开始和结束该行程。每个事件又包括11个属性,详细介绍以下:apache
taxiId : Long // a unique id for each taxi driverId : Long // a unique id for each driver isStart : Boolean // TRUE for ride start events, FALSE for ride end events startTime : DateTime // the start time of a ride endTime : DateTime // the end time of a ride, // "1970-01-01 00:00:00" for start events startLon : Float // the longitude of the ride start location startLat : Float // the latitude of the ride start location endLon : Float // the longitude of the ride end location endLat : Float // the latitude of the ride end location passengerCnt : Short // number of passengers on the ride
另外一个数据集包含出租车的费用信息,与每一次行程对应:缓存
taxiId : Long // a unique id for each taxi driverId : Long // a unique id for each driver startTime : DateTime // the start time of a ride paymentType : String // CSH or CRD tip : Float // tip(小费) for this ride tolls : Float // tolls for this ride totalFare : Float // total fare collected
首先定义TaxiRide事件,即数据集中的每个record。app
咱们使用Flink的source函数(TaxiRideSource)读取TaxiRide流,这个source是基于事件时间进行的。一样的,费用事件TaxiFare的流经过函数TaxiFareSource进行传送。为了让生成的流更加真实,事件传送的时间是与timestamp成比例的。两个真实相隔十分钟发生的事件在流中也相差十分钟。此外,咱们能够定义一个变量speed-up factor
为60,该变量为加速因子,那么真实事件中的一分钟在流中只有1秒钟,缩短60倍嘛。不只如此,咱们还能够定义最大服务延时,这个延时使得每一个事件在最大服务延时以内随机出现,这么作的目的是让这个流的事件产生与在real-world发生的不肯定性更接近。dom
对于这个应用,咱们设置speed-up factor
为600(即10分钟至关于1秒),以及最大延时时间为60。socket
全部的行动都应使用事件时间(event time)(相对于处理时间(processing time))来实现。ide
Event-time decouples the program semantics from serving speed and guarantees consistent results even in case of historic data or data which is delivered out-of-order.函数
事件时间(event time)将程序语义与服务速度分离开,即便在历史数据或无序传送的数据的状况下也能保证一致的结果。简单来讲就是,在数据处理的过程当中,依赖的时间跟在流中出现的时间无关,只跟该事件发生的时间有关。
private void generateUnorderedStream(SourceContext<TaxiRide> sourceContext) throws Exception { // 设置服务开始时间servingStartTime long servingStartTime = Calendar.getInstance().getTimeInMillis(); // 数据开始时间dataStartTime,即第一个ride的timestamp long dataStartTime; Random rand = new Random(7452); // 使用优先队列进行emit,其比较方式为他们的等待时间 PriorityQueue<Tuple2<Long, Object>> emitSchedule = new PriorityQueue<>( 32, new Comparator<Tuple2<Long, Object>>() { @Override public int compare(Tuple2<Long, Object> o1, Tuple2<Long, Object> o2) { return o1.f0.compareTo(o2.f0); } }); // 读取第一个ride,并将第一个ride插入到schedule里 String line; TaxiRide ride; if (reader.ready() && (line = reader.readLine()) != null) { // read first ride ride = TaxiRide.fromString(line); // extract starting timestamp dataStartTime = getEventTime(ride); // get delayed time,这个delayedtime是dataStartTime加一个随机数,随机数有最大范围,用来模拟真实世界状况 long delayedEventTime = dataStartTime + getNormalDelayMsecs(rand); // 将ride插入到schedule里 emitSchedule.add(new Tuple2<Long, Object>(delayedEventTime, ride)); // 设置水印时间 long watermarkTime = dataStartTime + watermarkDelayMSecs; // 下一个水印时间是时间戳是 watermarkTime - maxDelayMsecs - 1 // 只能证实,这个时间必定是小于dataStartTime的 Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1); // 将该水印放入Schedule,且这个水印被优先队列移到了ride以前 emitSchedule.add(new Tuple2<Long, Object>(watermarkTime, nextWatermark)); } else { return; } // 从文件里读取下一个ride(peek) if (reader.ready() && (line = reader.readLine()) != null) { ride = TaxiRide.fromString(line); } // read rides one-by-one and emit a random ride from the buffer each time while (emitSchedule.size() > 0 || reader.ready()) { // insert all events into schedule that might be emitted next // 在Schedule里的下一个事件的延时后时间 long curNextDelayedEventTime = !emitSchedule.isEmpty() ? emitSchedule.peek().f0 : -1; // 当前从文件读取的ride的事件时间 long rideEventTime = ride != null ? getEventTime(ride) : -1; // 这个while循环用来进行当前Schedule为空的状况 while( ride != null && ( // while there is a ride AND emitSchedule.isEmpty() || // and no ride in schedule OR rideEventTime < curNextDelayedEventTime + maxDelayMsecs) // not enough rides in schedule ) { // insert event into emit schedule long delayedEventTime = rideEventTime + getNormalDelayMsecs(rand); emitSchedule.add(new Tuple2<Long, Object>(delayedEventTime, ride)); // read next ride if (reader.ready() && (line = reader.readLine()) != null) { ride = TaxiRide.fromString(line); rideEventTime = getEventTime(ride); } else { ride = null; rideEventTime = -1; } } // 提取Schedule里的第一个ride,叫作head Tuple2<Long, Object> head = emitSchedule.poll(); // head应该要到达的时间 long delayedEventTime = head.f0; long now = Calendar.getInstance().getTimeInMillis(); // servingTime = servingStartTime + (delayedEventTime - dataStartTime)/ this.servingSpeed long servingTime = toServingTime(servingStartTime, dataStartTime, delayedEventTime); // 应该再等多久,才让这个ride发生呢?(哈哈,我好喜欢这个描述) long waitTime = servingTime - now; // 既然要等,那就睡着等吧 Thread.sleep( (waitTime > 0) ? waitTime : 0); // 若是这个head是一个TaxiRide if(head.f1 instanceof TaxiRide) { TaxiRide emitRide = (TaxiRide)head.f1; // emit ride sourceContext.collectWithTimestamp(emitRide, getEventTime(emitRide)); } // 若是这个head是一个水印标志 else if(head.f1 instanceof Watermark) { Watermark emitWatermark = (Watermark)head.f1; // emit watermark sourceContext.emitWatermark(emitWatermark); // 并设置下一个水印标志到Schedule中 long watermarkTime = delayedEventTime + watermarkDelayMSecs; // 一样,保证这个水印的时间戳在下一个ride的timestamp以前 Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1); emitSchedule.add(new Tuple2<Long, Object>(watermarkTime, nextWatermark)); } } }
那么,如何在java中运行这些sources,下面是一个示例:
// get an ExecutionEnvironment StreamExecutionEnvironment env = StreamExcutionEnvironment.getExecutionEnvironment(); // configure event-time processing env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // get the taxi ride data stream DataStream<TaxiRide> rides = env.addSource( new TaxiRideSource("/path/to/nycTaxiRides.gz", maxDelay, servingSpeed));
另外,有一些应用须要咱们使用加入检查点的机制。检查点(checkpoint)是从failure中恢复的一种机制。他也须要创建CheckpointedTaxiRideSource来在流中运行。
因为咱们的应用要研究的是在纽约市内的出租车状况,因此咱们要排除掉纽约市外的地点。经过这个过滤器:
private static class NYCFilter implements FilterFunction<TaxiRide> { @Override public boolean filter(TaxiRide taxiRide) throws Exception { return GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat) && GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat); } }
执行过滤器:
// start the data generator DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor))); DataStream<TaxiRide> filteredRides = rides // filter out rides that do not start or stop in NYC .filter(new NYCFilter());
如今咱们须要把TaxiRide和TaxiFare二者的数据记录结合。在这个过程当中,咱们要同时处理两个source的流数据。这里介绍几个用到的Transformation functions:
因为咱们没办法控制ride和fare到达的前后,因此咱们储存先到的信息直到和他匹配的信息到来。这就须要用到有状态的计算
public class RidesAndFaresExercise extends ExerciseBase { public static void main(String[] args) throws Exception { ParameterTool params = ParameterTool.fromArgs(args); final String ridesFile = params.get("rides", pathToRideData); final String faresFile = params.get("fares", pathToFareData); final int delay = 60; // at most 60 seconds of delay final int servingSpeedFactor = 1800; // 30 minutes worth of events are served every second // set up streaming execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(ExerciseBase.parallelism); DataStream<TaxiRide> rides = env .addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor))) .filter((TaxiRide ride) -> ride.isStart) .keyBy("rideId"); DataStream<TaxiFare> fares = env .addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor))) .keyBy("rideId"); DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides .connect(fares) .flatMap(new EnrichmentFunction()); printOrTest(enrichedRides); env.execute("Join Rides with Fares (java RichCoFlatMap)"); } public static class EnrichmentFunction extends RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, TaxiFare>> { // keyed, managed state private ValueState<TaxiRide> rideState; private ValueState<TaxiFare> fareState; @Override public void open(Configuration config) throws Exception { rideState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class)); fareState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class)); } @Override public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception { TaxiFare fare = fareState.value(); if (fare != null) { fareState.clear(); out.collect(new Tuple2(ride, fare)); } else { rideState.update(ride); } } @Override public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception { TaxiRide ride = rideState.value(); if (ride != null){ rideState.clear(); out.collect(new Tuple2(ride, fare)); } else { fareState.update(fare); } } } }
运行,能够看到,生成的数据是这样的,ride和fare结合到了一块儿:
3> (196965,START,2013-01-01 11:54:08,1970-01-01 00:00:00,-73.99048,40.75611,-73.98388,40.767143,2,2013007021,2013014447,196965,2013007021,2013014447,2013-01-01 11:54:08,CSH,0.0,0.0,6.5) 1> (197311,START,2013-01-01 11:55:44,1970-01-01 00:00:00,-73.98894,40.72127,-73.95267,40.771126,1,2013008802,2013012009,197311,2013008802,2013012009,2013-01-01 11:55:44,CRD,2.7,0.0,16.2) 2> (196608,START,2013-01-01 11:53:00,1970-01-01 00:00:00,-73.97817,40.761055,-73.98574,40.75613,2,2013004060,2013014162,196608,2013004060,2013014162,2013-01-01 11:53:00,CSH,0.0,0.0,5.5)
那么如今,咱们想要上面的二者结合操做更加的Robust。对于现实中的数据,有时某些record会丢失,这意味着咱们可能只收到TaxiRide
and TaxiFare
中的一个,另外一个永远不会到。因此先到的那个record会一直占用着内存。为了解决这个问题,咱们尝试在CoProcessFunction中清理掉没有被匹配的状态。
这个功能定义在类 ExpiringStateExercise
中:
首先给出missing data的输入,这里咱们丢掉全部ride的END事件,START事件每隔1000个丢失一个。😯
DataStream<TaxiRide> rides = env .addSource(rideSourceOrTest(new CheckpointedTaxiRideSource(ridesFile, servingSpeedFactor))) .filter((TaxiRide ride) -> (ride.isStart && (ride.rideId % 1000 != 0))) .keyBy(ride -> ride.rideId); SingleOutputStreamOperator processed = rides .connect(fares) // Applies the given ProcessFunction on the input stream, thereby creating a transformed output stream. // The function will be called for every element in the input streams and can produce zero or more output elements. .process(new EnrichmentFunction());
咱们使用CoprocessingFunction来进行上面描述的操做。对于有两个inputs的流来讲,下面的描述生动形象的介绍了咱们须要override的3个方法:
For example, you might be joining customer data to financial trades, while keeping state for the customer data. If you care about having complete and deterministic joins in the face of out-of-order events, you can use a timer to evaluate and emit the join for a trade when the watermark for the customer data stream has passed the time of that trade.
processElement1(...)
& processElement2(...)
用于两个数据流的call。onTimer()
用于设定抛弃掉没有寻到匹配的record的动做。
@Override // Called when a timer set using TimerService fires. public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception { if (fareState.value() != null) { ctx.output(unmatchedFares, fareState.value()); fareState.clear(); } if (rideState.value() != null) { ctx.output(unmatchedRides, rideState.value()); rideState.clear(); } } @Override // A Context that allows querying the timestamp of the element, // querying the TimeDomain of the firing timer and getting a TimerService for registering timers and querying the time. // The context is only valid during the invocation of this method, do not store it. public void processElement1(TaxiRide ride, Context context, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception { // 当前处理事件是ride,且当前状态中fare为非空, 则输出。 // (因为ride在以前已经被keyby()过,这里只会传送跟fare相同rideId的ride) TaxiFare fare = fareState.value(); if (fare != null) { fareState.clear(); out.collect(new Tuple2(ride, fare)); } else { // 不然,更新rideState rideState.update(ride); // 只要水印到达,咱们就中止等待相应的fare // Registers a timer to be fired when the event time watermark passes the given time. context.timerService().registerEventTimeTimer(ride.getEventTime()); } }
输出结果以下,能够看到输出的内容的时间戳都相差1000,跟以前定义的一致。
1> 1000,2013000992,2013000989,2013-01-01 00:05:38,CSH,0.0,4.8,18.3 3> 2000,2013001967,2013001964,2013-01-01 00:08:25,CSH,0.0,0.0,17.5 4> 3000,2013002904,2013002901,2013-01-01 00:11:00,CRD,4.38,0.0,22.38
如今,咱们想肯定每小时得到最多小费(tip)的驾驶员(每一条fare的record里有小费这一栏)。 最简单的方法是分两步:首先使用一小时长的时间窗口(time window)来计算每小时内每一个驾驶员的总提示,而后从该窗口流的结果中找到每小时得到最多小费的驾驶员。
咱们在下列code中会遇到如下几个问题:
AggregareFunction: 这个函数有一个将输入元素加到accumulator的方法。首先,这个函数接口有一个初始化accumulator的方法,而且能够将两个accumulators融合成一个,不只如此还能够从accumulator中提取出output。
ProcessWindowFunction:这个函数输入一个包含窗口的全部元素的可迭代的集合以及一个包含time和state的Context object,这些输入可以使他提供更加丰富的功能。
public class HourlyTipsExercise extends ExerciseBase { public static void main(String[] args) throws Exception { // read parameters ParameterTool params = ParameterTool.fromArgs(args); final String input = params.get("input", ExerciseBase.pathToFareData); final int maxEventDelay = 60; // events are out of order by max 60 seconds final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second // set up streaming execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(ExerciseBase.parallelism); // start the data generator DataStream<TaxiFare> fares = env.addSource(fareSourceOrTest(new TaxiFareSource(input, maxEventDelay, servingSpeedFactor))); // compute tips per hour for each driver DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares // 根据driveId 进行分组 .keyBy((TaxiFare fare) -> fare.driverId) // 设置窗口时间为1小时 .timeWindow(Time.hours(1)) // AddTips()为aggFunction, WrapWithWindowInfo()为windowFunction .aggregate(new AddTips(), new WrapWithWindowInfo()); // find the highest total tips in each hour DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips .timeWindowAll(Time.hours(1)) .maxBy(2); printOrTest(hourlyMax); // execute the transformation pipeline env.execute("Hourly Tips (java)"); } /* Adds up the tips. */ public static class AddTips implements AggregateFunction< TaxiFare, // input type Float, // accumulator type Float // output type > { @Override public Float createAccumulator() { return 0F; } @Override public Float add(TaxiFare fare, Float aFloat) { return fare.tip + aFloat; } @Override public Float getResult(Float aFloat) { return aFloat; } @Override public Float merge(Float aFloat, Float accumulator) { return aFloat + accumulator; } } /* * Wraps the pre-aggregated result into a tuple along with the window's timestamp and key. */ public static class WrapWithWindowInfo extends ProcessWindowFunction< Float, Tuple3<Long, Long, Float>, Long, TimeWindow> { @Override public void process(Long key, Context context, Iterable<Float> elements, Collector<Tuple3<Long, Long, Float>> out) throws Exception { Float sumOfTips = elements.iterator().next(); out.collect(new Tuple3<>(context.window().getEnd(), key, sumOfTips)); } } }
如下是输出结果:
1> (1357002000000,2013000493,54.45) 2> (1357005600000,2013010467,64.53) 3> (1357009200000,2013010589,104.75)
广播变量(Broadcast State):这种机制用来支持数据从须要上游任务广播传送到下游任务的事件。
这篇文章对广播变量讲的很详细:# A Practical Guide to Broadcast State in Apache Flink
在这个机制中,咱们将系统分为actions stream和pattern stream。actions stream即为正常的数据流,也就是例子中 rides。pattern为咱们广播的数据流,这里能够理解为咱们的监听室须要对rides进行监听,即咱们传输一个pattern到broadcast state中,而后operator打印出action stream中符合这个pattern的数据。
在这里,咱们的pattern是一个interger n,表明分钟数。咱们想要打印的是在咱们传送这个pattern的时刻,全部已经开始了n分钟且尚未结束的rides。
接下来是他的应用代码:
首先,在这个简单的例子中,咱们须要一个广播变量描述符,可是并不用他储存东西。
final MapStateDescriptor<Long, Long> dummyBroadcastState = new MapStateDescriptor<>( "dummy", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO );
而后,设置一个socket接口,用来接收pattern:
BroadcastStream<String> queryStream = env.socketTextStream("localhost", 9999) .assignTimestampsAndWatermarks(new QueryStreamAssigner()) .broadcast(dummyBroadcastState);
当咱们获得按照rideId分组后的rides stream以及从socket返回的分钟n的broadcast stream后,咱们链接这两个streams。而后将它传送到QueryFunction()
处理。QueryFunction
将pattern(也就是socket返回的分钟数n)与ride进行匹配,最后返回被匹配的rides。
DataStream<TaxiRide> reports = rides .keyBy((TaxiRide ride) -> ride.taxiId) .connect(queryStream) .process(new QueryFunction()); public static class QueryFunction extends KeyedBroadcastProcessFunction<Long, TaxiRide, String, TaxiRide> { private ValueStateDescriptor<TaxiRide> taxiDescriptor = new ValueStateDescriptor<>("saved ride", TaxiRide.class); private ValueState<TaxiRide> taxiState; @Override public void open(Configuration config) { // 获得每个taxi的上一个事件的状态 taxiState = getRuntimeContext().getState(taxiDescriptor); } @Override public void processElement(TaxiRide ride, ReadOnlyContext ctx, Collector< TaxiRide> out) throws Exception { // For every taxi, let's store the most up-to-date information. // TaxiRide implements Comparable to make this easy. TaxiRide savedRide = taxiState.value(); if (ride.compareTo(savedRide) > 0) { taxiState.update(ride); } } @Override public void processBroadcastElement(String msg, Context ctx, Collector<TaxiRide> out) throws Exception { DateTimeFormatter timeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withLocale(Locale.US).withZoneUTC(); Long thresholdInMinutes = Long.valueOf(msg); Long wm = ctx.currentWatermark(); System.out.println("QUERY: " + thresholdInMinutes + " minutes at " + timeFormatter.print(wm)); // Collect to the output all ongoing rides that started at least thresholdInMinutes ago. ctx.applyToKeyedState(taxiDescriptor, new KeyedStateFunction<Long, ValueState<TaxiRide>>() { @Override public void process(Long taxiId, ValueState<TaxiRide> taxiState) throws Exception { TaxiRide ride = taxiState.value(); if (ride.isStart) { long minutes = (wm - ride.getEventTime()) / 60000; if (ride.isStart && (minutes >= thresholdInMinutes)) { out.collect(ride); } } } }); } }
Reference: