工具篇-Flink里边的一些坑

1. 自定义Sink写入hbase?java

使用的是原生的hbase客户端,能够本身控制每多少条记录刷新一次。遇到了几个坑致使数据写不到hbase里边去:node

  • 集群hbase版本和客户端版本不一致(版本1和版本2相互之间会有冲突)
  • Jar包冲突

例如protobuf-java版本冲突,常见的是两个关键错误,java.io.IOException: java.lang.reflect.InvocationTargetException 和 Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.hbase.protobuf.ProtobufUtilapache

2. Flink 消费Kafka偏移量

Flink读写Kafka,若是使用Consumer08的话,偏移量会提交Zk,下边这个配置能够写在Conf文件中,提交偏移量的Zk能够直接指定。Consumer09之后版本就不向Zk提交了,Kafka本身会单独搞一个Topic存储消费状态。bootstrap

1 xxxx08 { 2     bootstrap.servers = "ip:9092"
3     zookeeper.connect = "ip1:2181,ip2/vio"
4     group.id = "group1"
5     auto.commit.enable = true
6     auto.commit.interval.ms = 30000
7     zookeeper.session.timeout.ms = 60000
8     zookeeper.connection.timeout.ms = 30000
9 }
1 final Properties consumerProps = ConfigUtil 2         .getProperties(config, “xxxx08");// 使用本身编写的Util函数读取配置
3 
4     final FlinkKafkaConsumer08<String> source =
5         new FlinkKafkaConsumer08<String>(topic, new SimpleStringSchema(), consumerProps); 

3. Flink 的日志打印

Flink打印日志的时候,日志打印到哪,日志文件是否是切块,并非在工程resource下配置文件里指定的!!!而是在flink/conf中指定的,好比我安装的Flink On Yarn模式,只须要在安装的机器上flink/conf文件夹下修改对应的配置文件便可,以下:session

具体能够参考:Flink日志配置ide

4. Flink 的akka时间超时

这个问题比较常见,遇见过两次,总结下:函数

首先是集群机器负载比较高,有的机器负载百分之几万都有,在这时候taskmanager、jobmanager就会报akka超时的异常,能够适当增大akka超时时间扛过这段时间;oop

而后最多见的是程序里调用外部接口,延迟较高,有的是5秒甚至10秒,这种时候akka就会超时测试

5. Flink 的读HDFS写Kafka

flink读hdfs的时候用了DataSet,本身在中间map里边已经写到kafka里边了,因此不想要sink,但flink要求必须有sink,因此只能加个.output(new DiscardingOutputFormat<>()),这样对程序不会形成影响。 fetch

6. 本地测试Flink

本地测试Flink偶尔会报错,记录下:

(1)本地Apache flink集群没有运行,会报下面链接被拒绝的错误,你只须要启动它:./bin/start-cluster.sh

org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel $ AnnotatedConnectException:拒绝链接:localhost / 127.0.0.1:8081

7. Flink on YARN

参见Flink on YARN(下):常见问题与排查思路

8. Flink并行度设置

java.io.IOException: Insufficient number of network buffers: required 4, but only 0 available. The total number of network buffers is currently set to 6472 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'. at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:272) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:257) at org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:278) at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608) at java.lang.Thread.run(Thread.java:748)
View Code

并行度设置不合理,按报的错设置便可:

source.parallelism = 40 map.parallelism = 40 sink.parallelism = 40

9. Flink常见报错

  • java.lang.Exception: Container released on a lost node

异常缘由是 Container 运行所在节点在 YARN 集群中被标记为 LOST,该节点上的全部 Container 都将被 YARN RM 主动释放并通知 AM,JobManager 收到此异常后会 Failover 自行恢复(从新申请资源并启动新的 TaskManager),遗留的 TaskManager 进程可在超时后自行退出

  •  Could not build the program from JAR file.

这个问题的迷惑性较大,不少时候并不是指定运行的 JAR 文件问题,而是提交过程当中发生了异常,须要根据日志信息进一步排查。

10. Flink消费Kafka单条数据过大起Lag

能够在kafka consumer中设置下列参数:

pro.put("fetch.message.max.bytes", "8388608"); pro.put("max.partition.fetch.bytes", "8388608");
相关文章
相关标签/搜索