前言
回顾第一讲:咱们了解了流(有界,无界),窗口(翻转,滑动,会话,全局)等基本概念。
知道了DataStreamAPI的基本用法:
source->transformation->sink的基本步骤
其中
source可使用flink原生的(如kafka,rabbitmq...)还能够继承RichSourceFunction
transformation 经常使用的有window keyby map reduce 聚合函数...
sink可使用flink原生的(如kafka,rabbitmq...)还能够继承RichSinkFunction
第一讲详见:https://segmentfault.com/a/11...html
可是咱们知道,Flink api 有三大类,分别是DataStreamAPI DataSetAPI tableAPI(sql)
DataSetAPI 和 DataStreamAPI 区别是一个是批处理,一个是流处理。而今天将介绍 table api用法。
另外 咱们知道,flink是有状态的流式处理计算。那么它的状态是桌面管理的?有哪些类型?咱们该怎么样去使用?我这一讲会以kafka消费位点 为切入点,一步一步的分析出kafka为什么是能够保证exactly-once 。了解了 flink 对 kafka消费位点原理后。咱们怎么使用flink的state存储咱们想要的数据呢?除此以外,今天还会介绍flink的水印,触发器,清除器,吃到生存周期,还会简单介绍一下flink的processfunction.好比咱们想合并两个数据源,就会用到coprocessfunction..java
即这一讲的内容包含:mysql
一 flink 关系型API (table API sql)
批处理和流式传输的全部Table API和SQL程序都遵循相同的模式。如下代码示例显示了Table API和SQL程序的常见结构。git
// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // create a TableEnvironment // for batch programs use BatchTableEnvironment instead of StreamTableEnvironment StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // register a Table tableEnv.registerTable("table1", ...) // or tableEnv.registerTableSource("table2", ...); // or tableEnv.registerExternalCatalog("extCat", ...); // register an output Table tableEnv.registerTableSink("outputTable", ...); // create a Table from a Table API query Table tapiResult = tableEnv.scan("table1").select(...); // create a Table from a SQL query Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ... "); // emit a Table API result Table to a TableSink, same for SQL result tapiResult.insertInto("outputTable"); // execute env.execute();
注意:表API和SQL查询能够轻松集成并嵌入到DataStream或DataSet程序中github
下面举一个例子来讲明:sql
package cn.crawler.mft_seconed.demo2; import cn.crawler.mft_seconed.KafkaEntity; import cn.crawler.mft_seconed.KafkaEntityTableSource; import com.alibaba.fastjson.JSON; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.Tumble; import org.apache.flink.table.api.WindowGroupedTable; import org.apache.flink.table.api.java.StreamTableEnvironment; import java.lang.reflect.Type; import java.util.Properties; /** * Created by liuliang * on 2019/7/13 */ public class TableAPIDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //1.简单的kafka->flink sql->mysql //配置正好执行一次策略 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //1S执行一次checkpoint env.enableCheckpointing(1000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); Properties prop = new Properties(); prop.setProperty("bootstrap.servers", "172.19.141.XX:31090"); prop.setProperty("group.id", "flink-group"); FlinkKafkaConsumer011 myConsumer = new FlinkKafkaConsumer011("fan_or_jia", new SimpleStringSchema(), prop); myConsumer.setStartFromGroupOffsets();//默认消费策略 DataStreamSource<String> kafkaSource = env.addSource(myConsumer); //将kafka中取出的数据流映射为operator SingleOutputStreamOperator<KafkaEntity> map = kafkaSource.map(new MapFunction<String, KafkaEntity>() { private static final long serialVersionUID = 1471936326697828381L; @Override public KafkaEntity map(String value) { KafkaEntity kafkaEntity = JSON.parseObject(value, KafkaEntity.class); return kafkaEntity; } }); map.print(); //打印operator //注册为mft_flink_kafka 表 从map里面取值,字段分别是id,message,name,time //数据流无缝转换为table tableEnv.registerDataStream("mft_flink_kafka",map,"id,message,name,create_time"); // Table table = tableEnv.scan("mft_flink_kafka"); // Table table1 = table.filter("id%2!=0") // .window(Tumble.over("1000.millis").on("rowtime").as("total")) // .groupBy("total") // .select("id,name,total.end as second"); // table1.printSchema(); Table sqlQuery = tableEnv.sqlQuery("select id,message,name,create_time from mft_flink_kafka"); //sink to mysql DataStream<Tuple4<Integer,String,String,Long>> appendStream = tableEnv.toAppendStream(sqlQuery, Types.TUPLE(Types.INT, Types.STRING,Types.STRING, Types.LONG)); appendStream.print(); appendStream.map(new MapFunction<Tuple4<Integer,String,String,Long>, KafkaEntity>() { @Override public KafkaEntity map(Tuple4<Integer, String,String,Long> stringStringTuple4) throws Exception { return new KafkaEntity(stringStringTuple4.f0,stringStringTuple4.f1,stringStringTuple4.f2,stringStringTuple4.f3); } }).addSink(new SinkKafkaEntity2Mysql()); // 2.table api 带窗口函数的,处理时间属性由KafkaEntityTableSource实现DefinedProctimeAttribute接口的定义。逻辑时间属性附加到由返回类型定义的物理模式TableSource。 // tableEnv.registerDataStream("kafka_demo",new KafkaEntityTableSource()); env.execute("kafkaEntity from Kafka"); } }
package cn.crawler.mft_seconed.demo2; import cn.crawler.mft_seconed.KafkaEntity; import com.alibaba.fastjson.JSON; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; import java.util.UUID; /** * Created by liuliang * on 2019/7/13 */ public class SendDataToKafkaSql { public static void main(String[] args){ SendDataToKafkaSql sendDataToKafka = new SendDataToKafkaSql(); for(int i=100;i<200;i++){ String name = ""; if(i%2==0){ name = "范冰冰"; }else { name = "贾玲"; } KafkaEntity build = KafkaEntity.builder() .message("meaasge" + i) .id(i) .name(name+i) .create_time(System.currentTimeMillis()) .build(); System.out.println(build.toString()); sendDataToKafka.send("fan_or_jia", "123", JSON.toJSONString(build)); } } public void send(String topic,String key,String data){ Properties props = new Properties(); props.put("bootstrap.servers", "172.19.141.60:31090"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<String,String>(props); for(int i=1;i<2;i++){ try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } producer.send(new ProducerRecord<String, String>(topic, key+i, data)); } prod ucer.close(); } }
package cn.crawler.mft_seconed.demo2; import cn.crawler.mft_seconed.KafkaEntity; import cn.crawler.util.MysqlUtil; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; /** * Created by liuliang * on 2019/7/15 */ public class SinkKafkaEntity2Mysql extends RichSinkFunction<KafkaEntity> { /** * 每条数据得插入都要掉一次invoke方法 * @param kafkaEntity * @param context * @throws Exception */ @Override public void invoke(KafkaEntity kafkaEntity, Context context) throws Exception { MysqlUtil mysqlUtil = new MysqlUtil(); mysqlUtil.insertKafkaEntity(kafkaEntity); } }
注:table api 还能够有流字段的逻辑时间属性... 也很简单,建立一个接受类,继承StreamTableSource<Row>就好,有机会能够详细介绍,详见官网:https://ci.apache.org/project...apache
二 flink状态管理(state)
什么是State(状态)?
能够回忆一下http是有状态的传输协议吗?json
咱们能够这样理解: state是flink在某task/operator在某时刻的一个中间结果 快照(shapshot) 在flink中状态能够理解为一种数据结构
State存在哪里呢?(这里能够画一下flink使用state时的结构图引出此问题)
官网为咱们推荐了state的三种存储方式(https://ci.apache.org/project...):bootstrap
(说明:因为目前没有相似HDFS给我存储,本博客都是使用的第一种,也就是flink默认的)
总的来讲,state分为两种,operator state和key state,key state专门对keystream使用,所包含的Sate种类也更多,可理解为dataStream.keyBy()以后的Operator State,Operator State是对每个Operator的状态进行记录,而key State则是在dataSteam进行keyBy()后,记录相同keyId的keyStream上的状态key State提供的数据类型:ValueState<T>、ListState<T>、ReducingState<T>、MapState<T>。
operator state种类只有一种就是ListState<T> ,flink官方文档用kafka的消费者举例,认为kafka消费者的partitionId和offset相似flink的operator statesegmentfault
说了这么多,有些概念第一次见到,会很懵逼,我仍是重复之前的方式,对于难以理解的东西,先举一个实际例子看效果。而后根据咱们看到的效果简单分析一下他是怎么作到的。而后知道state能够干什么之后,再来细细的分析state的基础知识。
演示程序:
package cn.crawler.mft_seconed.demo1; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * Created by liuliang * on 2019/7/12 */ public class SendDataToKafkaDemo { public static void main(String[] args){ SendDataToKafkaDemo sendDataToKafka = new SendDataToKafkaDemo(); for(int i=100;i<200;i++){ sendDataToKafka.send("demo", "123", "这是测试的数据"+i); } } public void send(String topic,String key,String data){ Properties props = new Properties(); props.put("bootstrap.servers", "172.19.141.60:31090"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<String,String>(props); for(int i=1;i<2;i++){ try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } producer.send(new ProducerRecord<String, String>(topic, key+i, data)); } producer.close(); } }
package cn.crawler.mft_seconed.demo1; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import java.util.Properties; /** * Created by liuliang * on 2019/6/19 */ public class KafkaDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //配置正好执行一次策略 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(1000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); Properties prop = new Properties(); prop.setProperty("bootstrap.servers", "172.19.141.60:31090"); prop.setProperty("group.id", "flink-group"); FlinkKafkaConsumer011 myConsumer = new FlinkKafkaConsumer011("mysqltest", new SimpleStringSchema(), prop); myConsumer.setStartFromGroupOffsets();//默认消费策略 DataStreamSource<String> source = env.addSource(myConsumer); source.addSink(new PrintSinkFunction()); env.execute("StreamingFromCollection"); } }
咱们先将KafkaDemo(前者)启动,而后启动SendDataToKafkaDemo(后者) 。让后者像前者发送数据,正常状况下,前者会接收到后者的全部数据。如今,咱们在后者发送数据的过程当中,将前者程序中止(模拟程序crash) 这时候根据输出,观察前者消费了多少数据。记住数字1.而后再启动后者,观察开始输出的数字2是不是接着数字1的。咱们能够看到,数字2就是接着数字1的。前者在crash的时候保留了状态!
接下来就是揭秘环节:
咱们以flink消费kafka消费位点为例。咱们知道flink结合checkpoint能够将kafka实现仅仅执行一次的原理。
那么flink是如何管理实现的呢?接下来的知识点可能须要知道kafka的工做原理(不了解的须要自行百度一下,我就不介绍了)真的不懂,我就简单介绍一下,并配合rabbitmq ack后 删除消息 以示区别。
总之:
kafka:kafka的consumer会记录当前消费的offset,下次会从offset开始继续消费
rabbitmq:rabbitmq的consumer在确认消费后,会向Queque发送一个ACK应答,queque收到ACK应答后,会将消息删除
哦..那必定是flink将offset存起来了!而后根据保存的offset继续日后消费!那么flink将offset存在哪里了呢?固然是state了
Flink 中实现的 Kafka 消费者是一个有状态的算子(operator),它集成了 Flink 的检查点机制,它的状态是全部 Kafka 分区的读取偏移量。当一个检查点被触发时,每个分区的偏移量都被存到了这个检查点中。Flink 的检查点机制保证了全部 operator task 的存储状态都是一致的。
第一步:
以下所示,一个 Kafka topic,有两个partition,每一个partition都含有 “A”, “B”, “C”, ”D”, “E” 5条消息。咱们将两个partition的偏移量(offset)都设置为0.
第二步:
Kafka comsumer(消费者)开始从 partition 0 读取消息。消息“A”正在被处理,第一个 consumer 的 offset 变成了1。
第三步:
消息“A”到达了 Flink Map Task。两个 consumer 都开始读取他们下一条消息(partition 0 读取“B”,partition 1 读取“A”)。各自将 offset 更新成 2 和 1 。同时,Flink 的 JobManager 开始在 source 触发了一个检查点。
第四步:
接下来,因为 source 触发了检查点,Kafka consumer 建立了它们状态的第一个快照(”offset = 2, 1”),并将快照存到了 Flink 的 JobManager 中。Source 在消息“B”和“A”从partition 0 和 1 发出后,发了一个 checkpoint barrier。Checkopint barrier 用于各个 operator task 之间对齐检查点,保证了整个检查点的一致性。消息“A”到达了 Flink Map Task,而上面的 consumer 继续读取下一条消息(消息“C”)。
第五步:
Flink Map Task 收齐了同一版本的所有 checkpoint barrier 后,那么就会将它本身的状态也存储到 JobManager。同时,consumer 会继续从 Kafka 读取消息。
第六步:
Flink Map Task 完成了它本身状态的快照流程后,会向 Flink JobManager 汇报它已经完成了这个 checkpoint。当全部的 task 都报告完成了它们的状态 checkpoint 后,JobManager 就会将这个 checkpoint 标记为成功。今后刻开始,这个 checkpoint 就能够用于故障恢复了。值得一提的是,Flink 并不依赖 Kafka offset 从系统故障中恢复。
故障恢复
在发生故障时(好比,某个 worker 挂了),全部的 operator task 会被重启,而他们的状态会被重置到最近一次成功的 checkpoint。Kafka source 分别从 offset 2 和 1 从新开始读取消息(由于这是完成的 checkpoint 中存的 offset)。看成业重启后,咱们能够期待正常的系统操做,就好像以前没有发生故障同样。以下图所示:
既然flink-kafka是这样实现的,那么咱们怎么自定义去使用state呢?下面也用程序来举例说明一下:
CheckPointing
(1)介绍,实现方式分类
checkpoint能够保存窗口和算子的执行状态,在出现异常以后重启计算任务,并保证已经执行和不会再重复执行,检查点能够分为两种,托管的和自定义的,托管检查点会自动的进行存储到指定位置:内存、磁盘和分布式存储中,自定义就须要自行实现保存相关,实现checkpoint有以下两种方式:
使用托管State变量
使用自定义State变量实现CheckpointedFunction接口或者ListCheckpoint<T extends Serializable>接口
下面将会给出两种方式的使用代码
(2) 使用Manage State,Flink自动实现state保存和恢复
下面先给出托管状态变量(manage stata)使用代码,后面给出了代码执行的打印日志。
代码分析:
Console日志输出分析:
总结:source没有使用manage state状态丢失,windows使用manage state,异常状态不丢失
package cn.crawler.mft_seconed.demo3; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; /** * Created by liuliang * on 2019/7/15 */ public class StateCheckPoint { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //打开并设置checkpoint // 1.设置checkpoint目录,这里我用的是本地路径,记得本地路径要file://开头 // 2.设置checkpoint类型,at lease onece or EXACTLY_ONCE // 3.设置间隔时间,同时打开checkpoint功能 // env.setStateBackend(new FsStateBackend("file:///Users/liuliang/Documents/others/flinkdata/state_checkpoint")); // env.setStateBackend(new FsStateBackend("file://D:\\softs\\flink\\state")); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(1000); //添加source 每一个2s 发送10条数据,key=1,达到100条时候抛出异常 //source发送记录到达100抛出异常 //source抛出异常以后,count发送统计数丢失,从新从0开始 //windows函数,重启后调用open函数,获取state数据,处理记录数从checkpoint中获取恢复,因此从100开始 //总结:source没有使用manage state状态丢失,windows使用manage state,异常状态不丢失 //问: 1. state.value()在open()方法中调用的时候,会抛出null异常,而在apply中使用就不会抛出异常。为何? // 2. 为何source里面没有open方法?source想使用state该桌面操做? env.addSource(new SourceFunction<Tuple3<Integer,String,Integer>>() { private Boolean isRunning = true; private int count = 0; @Override public void run(SourceContext<Tuple3<Integer, String, Integer>> sourceContext) throws Exception { while(isRunning){ for (int i = 0; i < 10; i++) { sourceContext.collect(Tuple3.of(1,"ahah",count)); count++; } if(count>100){ System.out.println("err_________________"); throw new Exception("123"); } System.out.println("source:"+count); Thread.sleep(2000); } } @Override public void cancel() { } }).keyBy(0) .window(TumblingProcessingTimeWindows.of(Time.seconds(2))) //窗口函数,好比是richwindowsfunction 否侧没法使用manage state .apply(new RichWindowFunction<Tuple3<Integer,String,Integer>, Integer, Tuple, TimeWindow>() { private transient ValueState<Integer> state; private int count = 0; @Override public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple3<Integer, String, Integer>> iterable, Collector<Integer> collector) throws Exception { //从state中获取值 count=state.value(); for(Tuple3<Integer, String, Integer> item : iterable){ count++; } //更新state值 state.update(count); System.out.println("windows:"+tuple.toString()+" "+count+" state count:"+state.value()); collector.collect(count); } //获取state @Override public void open(Configuration parameters) throws Exception { System.out.println("##open"); ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<Integer>( "average", // the state name TypeInformation.of(new TypeHint<Integer>() {}), // type information 0); state = getRuntimeContext().getState(descriptor); } }).print(); env.execute(); } }
咱们能够看一下我这边打印的日志:
source:10 ##open ##open ##open ##open ##open ##open ##open ##open ##open ##open ##open ##open windows:(1) 10 state count:10 9> 10 source:20 windows:(1) 20 state count:20 9> 20 source:30 windows:(1) 30 state count:30 9> 30 source:40 windows:(1) 40 state count:40 9> 40 source:50 windows:(1) 50 state count:50 9> 50 source:60 windows:(1) 60 state count:60 9> 60 source:70 windows:(1) 70 state count:70 9> 70 source:80 windows:(1) 80 state count:80 9> 80 source:90 windows:(1) 90 state count:90 9> 90 source:100 windows:(1) 100 state count:100 9> 100 err_________________ //抛出异常 source:10 //没管理,从头开始(无状态) ##open ##open ##open ##open ##open ##open ##open ##open ##open ##open ##open ##open windows:(1) 110 state count:110 //flink管理了,哪里跌倒的,从哪里开始(有状态) 9> 110 source:20 windows:(1) 120 state count:120 9> 120 source:30 windows:(1) 130 state count:130 9> 130 source:40 windows:(1) 140 state count:140 9> 140 source:50 windows:(1) 150 state count:150 9> 150 source:60 Disconnected from the target VM, address: '127.0.0.1:53266', transport: 'socket'
(3) 自定义state 自行实现实现checkpoint接口
实现CheckpointedFunction接口或者ListCheckpoint<T extends Serializable>接口
分析说明:
由于须要实现ListCheckpoint接口,因此source和windows处理代码,单独写成了JAVA类的形似,实现逻辑和验证方法跟manage state类似,可是在以下代码中,Source和Window都实现了ListCheckpoint接口,也就是说,Source抛出异常的时候,Source和Window都将能够从checkpoint中获取历史状态,从而达到不丢失状态的能力。
代码列表:
AutoSourceWithCp.java Source代码
WindowStatisticWithChk.java windows apply函数代码
CheckPointMain.java 主程序,调用
package cn.crawler.mft_seconed.demo3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; /** * 基于上面的提问,自定义一个state实现checkpoint接口 * Created by liuliang * on 2019/7/15 */ public class CheckPointMain { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new FsStateBackend("file:///Users/liuliang/Documents/others/flinkdata/state_checkpoint")); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(1000); /** * 说明: * 由于须要实现ListCheckpoint接口,因此source和windows处理代码,单独写成了JAVA类的形似, * 实现逻辑和验证方法跟manage state类似,可是在以下代码中,Source和Window都实现了ListCheckpoint接口, * 也就是说,Source抛出异常的时候,Source和Window都将能够从checkpoint中获取历史状态,从而达到不丢失状态的能力。 */ DataStream<Tuple4<Integer,String,String,Integer>> data = env.setParallelism(1).addSource(new AutoSourceWithCp()); env.setParallelism(1); data.keyBy(0) .window(TumblingProcessingTimeWindows.of(Time.seconds(2))) .apply(new WindowStatisticWithChk()) .print(); env.execute(); } }
package cn.crawler.mft_seconed.demo3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import java.util.ArrayList; import java.util.List; /** * Created by liuliang * on 2019/7/15 */ public class AutoSourceWithCp extends RichSourceFunction<Tuple4<Integer,String,String,Integer>> implements ListCheckpointed<UserState> { private int count = 0; private boolean is_running = true; @Override public void run(SourceContext sourceContext) throws Exception { while(is_running){ for (int i = 0; i < 10; i++) { sourceContext.collect(Tuple4.of(1, "hello-" + count, "alphabet", count)); count++; } System.out.println("source:"+count); Thread.sleep(2000); if(count>100){ System.out.println("准备异常啦....."); throw new Exception("exception made by ourself!"); } } } @Override public void cancel() { is_running = false; } @Override public List<UserState> snapshotState(long l, long l1) throws Exception { List<UserState> listState= new ArrayList<>(); UserState state = new UserState(count); listState.add(state); System.out.println(System.currentTimeMillis()+" ############# check point :"+listState.get(0).getCount()); return listState; } @Override public void restoreState(List<UserState> list) throws Exception { count = list.get(0).getCount(); System.out.println("AutoSourceWithCp restoreState:"+count); } }
package cn.crawler.mft_seconed.demo3; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.List; import java.util.ArrayList; /** * Created by liuliang * on 2019/7/15 */ public class WindowStatisticWithChk implements WindowFunction<Tuple4<Integer,String,String,Integer>,Integer,Tuple,TimeWindow> ,ListCheckpointed<UserState> { private int total = 0; @Override public List<UserState> snapshotState(long l, long l1) throws Exception { List<UserState> listState= new ArrayList<>(); UserState state = new UserState(total); listState.add(state); return listState; } @Override public void restoreState(List<UserState> list) throws Exception { total = list.get(0).getCount(); } @Override public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple4<Integer, String, String, Integer>> iterable, Collector<Integer> collector) throws Exception { int count = 0; for(Tuple4<Integer, String, String, Integer> data : iterable){ count++; System.out.println("apply key"+tuple.toString()+" count:"+data.f3+" "+data.f0); } total = total+count; System.out.println("windows total:"+total+" count:"+count+" "); collector.collect(count); } }
package cn.crawler.mft_seconed.demo3; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; /** * Created by liuliang * on 2019/7/15 */ @Data @AllArgsConstructor @NoArgsConstructor @Builder public class UserState implements Serializable { private Integer count; }
特别说明:
1.码字不易。
2.个人代码都放在了github上:欢迎start。https://github.com/iamcrawler...
3.本文参照了flink官方文档以及一下文档:
https://www.jianshu.com/p/efa...
https://blog.csdn.net/u013560...