版权声明:本套技术专栏是做者(秦凯新)平时工做的总结和升华,经过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。QQ邮箱地址:1120746959@qq.com,若有任何问题,可随时联系。java
// 设置检查点目录
ssc.checkpoint("./streaming_checkpoint")
// 获取Kafka配置(经过配置文件读取,ConfigurationManager自定义方法)
val broker_list = ConfigurationManager.config.getString("kafka.broker.list")
val topics = ConfigurationManager.config.getString("kafka.topics")
// kafka消费者配置
val kafkaParam = Map(
"bootstrap.servers" -> broker_list,//用于初始化连接到集群的地址
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
//用于标识这个消费者属于哪一个消费团体
"group.id" -> "commerce-consumer-group",
//若是没有初始化偏移量或者当前的偏移量不存在任何服务器上,可使用这个配置属性
//可使用这个配置,latest自动重置偏移量为最新的偏移量
"auto.offset.reset" -> "latest",
//若是是true,则这个消费者的偏移量会在后台自动提交
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// 建立DStream,返回接收到的输入数据
// LocationStrategies:根据给定的主题和集群地址建立consumer
// LocationStrategies.PreferConsistent:持续的在全部Executor之间分配分区
// ConsumerStrategies:选择如何在Driver和Executor上建立和配置Kafka Consumer
// ConsumerStrategies.Subscribe:订阅一系列主题
val adRealTimeLogDStream=KafkaUtils.createDirectStream[String,String](ssc,
LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Array(topics),kafkaParam))
复制代码
setStartFromGroupOffsets()【默认消费策略】apache
默认读取上次保存的offset信息 若是是应用第一次启动,读取不到上次的offset信息,则会根据这个参数auto.offset.reset的值来进行消费数据bootstrap
setStartFromEarliest() 从最先的数据开始进行消费,忽略存储的offset信息缓存
setStartFromLatest() 从最新的数据进行消费,忽略存储的offset信息服务器
setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long>)架构
当checkpoint机制开启的时候,KafkaConsumer会按期把kafka的offset信息还有其余operator的状态信息一块保存起来。当job失败重启的时候,Flink会从最近一次的checkpoint中进行恢复数据,从新消费kafka中的数据。app
为了可以使用支持容错的kafka Consumer,须要开启checkpoint env.enableCheckpointing(5000); // 每5s checkpoint一次运维
Kafka Consumers Offset 自动提交有如下两种方法来设置,能够根据job是否开启checkpoint来区分:异步
(1) Checkpoint关闭时: 能够经过下面两个参数配置socket
enable.auto.commit
(2) Checkpoint开启时:当执行checkpoint的时候才会保存offset,这样保证了kafka的offset和checkpoint的状态偏移量保持一致。 能够经过这个参数设置
setCommitOffsetsOnCheckpoints(boolean)
这个参数默认就是true。表示在checkpoint的时候提交offset, 此时,kafka中的自动提交机制就会被忽略
依赖引入:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.3</version>
</dependency>
案例实战:
public class StreamingKafkaSource {
public static void main(String[] args) throws Exception {
//获取Flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//checkpoint配置
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//设置statebackend
//env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));
String topic = "kafkaConsumer";
Properties prop = new Properties();
prop.setProperty("bootstrap.servers","SparkMaster:9092");
prop.setProperty("group.id","kafkaConsumerGroup");
FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), prop);
myConsumer.setStartFromGroupOffsets();//默认消费策略
DataStreamSource<String> text = env.addSource(myConsumer);
text.print().setParallelism(1);
env.execute("StreamingFromCollection");
}
}
复制代码
Kafka Producer的容错-Kafka 0.9 and 0.10
若是Flink开启了checkpoint,针对FlinkKafkaProducer09和FlinkKafkaProducer010 能够提供 at-least-once的语义,还须要配置下面两个参数:
setLogFailuresOnly(false)
setFlushOnCheckpoint(true)
注意:建议修改kafka 生产者的重试次数retries【这个参数的值默认是0】
Kafka Producer的容错-Kafka 0.11,若是Flink开启了checkpoint,针对FlinkKafkaProducer011 就能够提供 exactly-once的语义,可是须要选择具体的语义
Semantic.NONE
Semantic.AT_LEAST_ONCE【默认】
Semantic.EXACTLY_ONCE
public class StreamingKafkaSink {
public static void main(String[] args) throws Exception {
//获取Flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//checkpoint配置
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//设置statebackend
//env.setStateBackend(new RocksDBStateBackend("hdfs://SparkMaster:9000/flink/checkpoints",true));
DataStreamSource<String> text = env.socketTextStream("SparkMaster", 9001, "\n");
String brokerList = "SparkMaster:9092";
String topic = "kafkaProducer";
Properties prop = new Properties();
prop.setProperty("bootstrap.servers",brokerList);
//第一种解决方案,设置FlinkKafkaProducer011里面的事务超时时间
//设置事务超时时间
//prop.setProperty("transaction.timeout.ms",60000*15+"");
//第二种解决方案,设置kafka的最大事务超时时间,主要是kafka的配置文件设置。
//FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(brokerList, topic, new SimpleStringSchema());
//使用仅一次语义的kafkaProducer
FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(topic, new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), prop, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
text.addSink(myProducer);
env.execute("StreamingFromCollection");
}
}
复制代码
kafka必不可少,关于kafka还有不少要说的内容,详情请参考个人kafka商业环境实战系列吧。
版权声明:本套技术专栏是做者(秦凯新)平时工做的总结和升华,经过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。QQ邮箱地址:1120746959@qq.com,若有任何问题,可随时联系。
秦凯新 于深圳 20181127023