在消息处理过程当中,除了Flink程序自己的逻辑(operator),咱们还须要和外部系统进行交互,例如本地磁盘文件,HDFS,Kafka,Mysql等。虽然Flink自己支持Exactly-Once语义,可是对于完整的数据处理系统来讲,最终呈现出来的语义和外部系统是相关的。html
咱们先总览一下Flink不一样connector的消息传递语义 。java
在Guarantees这一列,咱们能够发现如下3种语义:git
咱们结合Kafka connector来介绍这3中不一样的语义,以及分析它是如何产生的。github
Producer的at-most-once和at-least-once语义主要由“retries”控制(在callback中实现异常重发也至关于retry)。sql
若是配置值是一个大于0的整数,Producer在收到error的callback后,Producer将从新发送消息。考虑这一种状况,收到消息后,Broker正确的保存了消息,只是在返回ack时出现broker故障或者网络异常。这时候,producer收到error的callback,它不能确认异常缘由,只能从新发送消息,这样就致使了消息重复。apache
若是配置值等于0,Producer在收到error的callback后,不从新发送消息。若是异常时因为broker没有正确保存消息致使,那么将致使消息丢失。缓存
Producer的Exactly-Once语义,主要由“enable.idempotence”控制,若是该参数为true,将会确保消息最终只会被broker保存一次。一样的Producer在接收到error的callback后,它须要重发数据,只是在0.11以及更新的版本中,Producer会为每一批消息生成一个序列号,经过这个序列号Broker能够过滤重复消息。而且因为序列号是保存在topic上的,即便主分片失败了,新的broker也能知道消息是否须要过滤。这里还有一个细节须要注意,“acks”不能被设置为0或者1,由于万一主分片(leader replication)异常下线,将致使数据丢失,这样语义被破坏了。服务器
NOTE : Kafka有两个概念很容易被混淆。一个是Durable,另外一个是Message Delivery Semantics。这两个地方都存在消息丢失的可能性,可是机制彻底不一样。网络
Durable主要描述软件或者服务器故障后,数据是否仍能保留。Durable丢失消息主要是没有持久化:主分片收到数据后没有及时刷新到磁盘,副本没有及时复制以及持久化到磁盘。app
Durable主要经过“acks”控制,最强的级别是“all”,在broker返回ack以前,它会确认每个副本都已经保存了该消息。这样它能在n-1个副本宕机后,仍保留完整数据。最弱的级别是“0”,broker收到消息不确认持久化就返回,若是后续持久化失败,消息会丢失。当“acks”设置为“1”的时候,broker会确认主分片(leader replication)已经保存了消息,同时副本会主动向主分片同步,消息丢失风险较小。可是存在这种状况,消息到达主分片而且返回了success的ack,这时主分片fail而且副本将来得及同步这条消息,消息会丢失。
Message Delivery Semantics 主要是描述在消息系统中,消息实际被处理的次数。 要区别这两点,能够简单的认为,Durable关注消息的持久化,Message Delivery Semantics关注消息的发送。
Consumer的at-most-once和at-least-once语义主要经过“offset”控制。offset的可配置为自动提交和手动提交。若配置“enable.auto.commit”为true,在Consumer fetch数据后,后台会自动提交offset。若配置“enable.auto.commit”为false,须要主动调用commitSync()或者commitAsync()来提交offset。
在自动提交的情形下,Consumer表现为at-most-once语义。在主动提交的情形下,根据用户对异常处理的不一样,可表现为at-most-once或者at-least-once。
假设Consumer在fetch完数据后,后续的处理步骤出现了异常。
若是offset是自动提交的,那么Consumer将不能再次消费这些数据(除非重启Consumer,并经过seek(TopicPartition, long)重置offset)。它表现出at-most-once语义。
在捕获异常后,若是手动提交offset,表现出at-most-once语义。若是不提交offset,Consumer可重复消费该消息,表现出at-least-once语义。
在Consumer中,没有配置能够保证Exactly-Once语义。若要达到这个目标,须要在at-least-once的基础上实现幂等。这点和Producer是相似的,区别是Consumer的幂等性须要用户本身来完成。
前面的篇幅主要介绍了Kafka的3种语义(Message Delivery Semantics),经过上述内容,咱们能够得出,想要Flink和Kafka达成端到端 Exactly-Once语义,首先咱们须要0.11版本或者更新的Kafka 、Producer和Consumer,其次使用幂等的Producer发送数据以及实现幂等的Consumer消费。
数据以及部分代码来自http://training.data-artisans.com/ 。
Flink提供了经过mvn生成的精简Flink工程的方式,使用起来很是方便。在pom文件中,也包含了shade打包的方式,由于提交到集群上运行,须要jar-with-dependencies。
mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.4.2
-DgroupId=org.apache.flink.quickstart
-DartifactId=flink-java-project
-Dversion=0.1
-Dpackage=org.apache.flink.quickstart
-DinteractiveMode=false
按实际须要修改DgroupId,DartifactId,Dversion。
git clone https://github.com/dataArtisans/flink-training-exercises.git
cd flink-training-exercises
mvn clean install
<dependency> <groupId>com.data-artisans</groupId> <artifactId>flink-training-exercises</artifactId> <version>0.15.2</version> </dependency>
wget http://training.data-artisans.com/trainingData/nycTaxiRides.gz
wget http://training.data-artisans.com/trainingData/nycTaxiFares.gz
public static void main(String[] args) throws Exception { final int maxEventDelay = 60; // events are out of order by max 60 seconds final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second // set up streaming execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); String rideInput = "./taxi-data/nycTaxiRides.gz"; String taxiRideTopicId = "taxi-ride"; // start the data generator DataStream<TaxiRide> rides = env.addSource( new CheckpointedTaxiRideSource(rideInput, servingSpeedFactor)); Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); SerializationSchema<TaxiRide> taxiRideSerializationSchema = new TaxiRideSchema(); rides.addSink(new FlinkKafkaProducer011<TaxiRide>(taxiRideTopicId, new KeyedSerializationSchemaWrapper(taxiRideSerializationSchema), properties, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE // 开启Kafka EOS )); env.execute("send taxi ride to kafka "); }
上述代码的主体逻辑是读取nycTaxiRides.gz,并将数据发往Kafka。 主要使用了CheckpointedTaxiRideSource以及FlinkKafkaProducer011。 接下来,说明为何它们能达成End-To-End的Exactly-Once语义。
CheckpointedTaxiRideSource:这是一个拥有状态的文件流,它在Checkpoint的时候记录数据读取位置(至关于Kafka的offset),Flink错误恢复后会从新定位到checkpoint记录的位置,它在整个系统上表现出来的是at-least-once。考虑这样一个场景,checkpoint成功,可是某一个commit失败,原则上本次全部的提交都要回滚。若是后续的Sink处理不当或者不支持回滚,这些数据会被提交到Sink中。在Flink 恢复后,这部分数据被从新计算,致使Sink中出现了重复的数据。
FlinkKafkaProducer011 : 提供了幂等以及事务提交。Producer的幂等性参照文章开头的语义说明,这里再也不介绍。 Sink中的幂等性主要是经过两阶段提交协议来支持的(注意区分Kafka Producer自己的幂等性和依靠事务实现的幂等性)。Kafka 0.11及更新的版本提供了事务支持,能够结合Flink的两阶段提交协议使用。为了保证Sink中的数据的惟一性,将两次checkpoint之间的数据放在一个事务中,一块儿预提交,若是commit成功,则进入下一个checkpoint;若失败,终止事务并回滚数据。
FlinkKafkaProducer011 两阶段提交代码
protected void preCommit(FlinkKafkaProducer011.KafkaTransactionState transaction) throws FlinkKafka011Exception { switch(null.$SwitchMap$org$apache$flink$streaming$connectors$kafka$FlinkKafkaProducer011$Semantic[this.semantic.ordinal()]) { case 1: case 2: this.flush(transaction); case 3: this.checkErroneous(); return; default: throw new UnsupportedOperationException("Not implemented semantic"); } }
protected void commit(FlinkKafkaProducer011.KafkaTransactionState transaction) { switch(null.$SwitchMap$org$apache$flink$streaming$connectors$kafka$FlinkKafkaProducer011$Semantic[this.semantic.ordinal()]) { case 1: transaction.producer.commitTransaction(); this.recycleTransactionalProducer(transaction.producer); case 2: case 3: return; default: throw new UnsupportedOperationException("Not implemented semantic"); } }
protected void abort(FlinkKafkaProducer011.KafkaTransactionState transaction) { switch(null.$SwitchMap$org$apache$flink$streaming$connectors$kafka$FlinkKafkaProducer011$Semantic[this.semantic.ordinal()]) { case 1: transaction.producer.abortTransaction(); this.recycleTransactionalProducer(transaction.producer); case 2: case 3: return; default: throw new UnsupportedOperationException("Not implemented semantic"); } }
上面是两阶段提交的主要代码。
preCommit:将本次checkpoint中未发往Broker的数据flush到Kafka Broker。这时数据已经在Kafka Broker中,可是因为事务的隔离性,Consumer暂时不会读取到这些数据(除非配置了“read_uncommitted”)。
TIPS :为何须要调用flush?
在Flink processElement的时候,调用KafkaProducer的send来发送数据,可是Kafka为了更高的性能,send并不当即发送数据,而是缓存在buffer中,到必定的消息量才发往Kafka Broker。这里经过flush能够强制将数据发往Kafka Broker。
commit:提交事务,这时Consumer能够读到这些数据。
abort: 若是事务失败,终止事务。
FlinkKafkaConsumerEOSDemo的分析流程能够参照FlinkKafkaProducerEOSDemo,这里不作细致分析。
`public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(100);//
String taxiRideTopicId = "taxi-ride"; Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); DataStreamSource<TaxiRide> taxiRideDataStreamSource = env.addSource(new FlinkKafkaConsumer011<TaxiRide>(taxiRideTopicId, new TaxiRideSchema(), properties), "kafka-source-ride"); String filePath = "./taxi-data/taxi-ride.txt"; WriteFormat format = new WriteFormatAsText(); long period = 200; taxiRideDataStreamSource.filter(new RideCleansing.NYCFilter()).addSink( new WriteSinkFunctionByMillis<TaxiRide>(filePath,format,period) ); env.execute("print taxride "); }`
须要注意的是FlinkKafkaConsumer011的Exactly-Once语义经过用户配置自动设置,若是不肯定Flink的语义,能够在FlinkKafkaConsumer09中打断点,断点位置:
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) { properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); }
自动配置相关代码:
public static OffsetCommitMode fromConfiguration( boolean enableAutoCommit, boolean enableCommitOnCheckpoint, boolean enableCheckpointing) { if (enableCheckpointing) { // if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED; } else { // else, the mode depends only on whether auto committing is enabled in the provided Kafka properties return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED; }
Flink经过checkpoint和两阶段提交协议,为端到端的Exactly-Once的实现提供了可能,若是在项目中确实须要这种语义,不妨一试。