Kafka链接器是Kafka的一部分,是在Kafka和其它技术之间构建流式管道的一个强有力的框架。它可用于将数据从多个地方(包括数据库、消息队列和文本文件)流式注入到Kafka,以及从Kafka将数据流式传输到目标端(如文档存储、NoSQL、数据库、对象存储等)中。javascript
现实世界并不完美,出错是不免的,所以在出错时Kafka的管道能尽量优雅地处理是最好的。一个常见的场景是获取与特定序列化格式不匹配的主题的消息(好比预期为Avro时实际为JSON,反之亦然)。自从Kafka 2.0版本发布以来,Kafka链接器包含了错误处理选项,即将消息路由到php
在本文中将介绍几种处理问题的常见模式,并说明如何实现。css
有时可能但愿在发生错误时当即中止处理,可能遇到质量差的数据是因为上游的缘由致使的,必须由上游来解决,继续尝试处理其它的消息已经没有意义。java
这是Kafka链接器的默认行为,也可使用下面的配置项显式地指定:sql
errors.tolerance = none
复制代码
在本示例中,该链接器配置为从主题中读取JSON格式数据,而后将其写入纯文本文件。注意这里为了演示使用的是FileStreamSinkConnector
链接器,不建议在生产中使用。数据库
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "file_sink_01",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"topics":"test_topic_json",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"file":"/data/file_sink_01.txt"
}
}'
复制代码
主题中的某些JSON格式消息是无效的,链接器会当即终止,进入如下的FAILED
状态:apache
$ curl -s "http://localhost:8083/connectors/file_sink_01/status"| \
jq -c -M '[.name,.tasks[].state]'
["file_sink_01","FAILED"]
复制代码
查看Kafka链接器工做节点的日志,能够看到错误已经记录而且任务已经终止:json
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
…
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)
…
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('b' (code 98)): was expecting double-quote to start field name
at [Source: (byte[])"{brokenjson-:"bar 1"}"; line: 1, column: 3]
复制代码
要修复管道,须要解决源主题上的消息问题。除非事先指定,Kafka链接器是不会简单地“跳过”无效消息的。若是是配置错误(例如指定了错误的序列化转换器),那最好了,改正以后从新启动链接器便可。不过若是确实是该主题的无效消息,那么须要找到一种方式,即不要阻止全部其它有效消息的处理。ruby
若是只是但愿处理一直持续下去:bash
errors.tolerance = all
复制代码
在实际中大概以下:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "file_sink_05",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"topics":"test_topic_json",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"file":"/data/file_sink_05.txt",
"errors.tolerance": "all"
}
}'
复制代码
启动链接器以后(仍是原来的源主题,其中既有有效的,也有无效的消息),就能够持续地运行:
$ curl -s "http://localhost:8083/connectors/file_sink_05/status"| \
jq -c -M '[.name,.tasks[].state]'
["file_sink_05","RUNNING"]
复制代码
这时即便链接器读取的源主题上有无效的消息,也不会有错误写入Kafka链接器工做节点的输出,而有效的消息会按照预期写入输出文件:
$ head data/file_sink_05.txt
{foo=bar 1}
{foo=bar 2}
{foo=bar 3}
…
复制代码
配置了errors.tolerance = all
以后,Kafka链接器就会忽略掉无效的消息,而且默认也不会记录被丢弃的消息。若是确认配置errors.tolerance = all
,那么就须要仔细考虑是否以及如何知道实际上发生的消息丢失。在实践中这意味着基于可用指标的监控/报警,和/或失败消息的记录。
肯定是否有消息被丢弃的最简单方法,是将源主题上的消息数与写入目标端的数量进行对比:
$ kafkacat -b localhost:9092 -t test_topic_json -o beginning -C -e -q -X enable.partition.eof=true | wc -l 150 $ wc -l data/file_sink_05.txt 100 data/file_sink_05.txt 复制代码
这个作法虽然不是很优雅,可是确实能看出发生了消息的丢失,而且由于日志中没有记录,因此用户仍然对此一无所知。
一个更加可靠的办法是,使用JMX指标来主动监控和报警错误消息率:
这时能够看到发生了错误,可是并不知道那些消息发生了错误,不过这是用户想要的。其实即便以后这些被丢弃的消息被写入了/dev/null
,实际上也是能够知道的,这也正是死信队列概念出现的点。
Kafka链接器能够配置为将没法处理的消息(例如上面提到的反序列化错误)发送到一个单独的Kafka主题,即死信队列。有效消息会正常处理,管道也会继续运行。而后能够从死信队列中检查无效消息,并根据须要忽略或修复并从新处理。
[图片上传失败...(image-9dbb1e-1554814992353)]
进行以下的配置能够启用死信队列:
errors.tolerance = all
errors.deadletterqueue.topic.name =
复制代码
若是运行于单节点Kafka集群,还须要配置errors.deadletterqueue.topic.replication.factor = 1
,其默认值为3。
具备此配置的链接器配置示例大体以下:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "file_sink_02",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"topics":"test_topic_json",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"file": "/data/file_sink_02.txt",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name":"dlq_file_sink_02",
"errors.deadletterqueue.topic.replication.factor": 1
}
}'
复制代码
使用和以前相同的源主题,而后处理混合有有效和无效的JSON数据,会看到新的链接器能够稳定运行:
$ curl -s "http://localhost:8083/connectors/file_sink_02/status"| \
jq -c -M '[.name,.tasks[].state]'
["file_sink_02","RUNNING"]
复制代码
源主题中的有效记录将写入目标文件:
$ head data/file_sink_02.txt
{foo=bar 1}
{foo=bar 2}
{foo=bar 3}
[…]
复制代码
这样管道能够继续正常运行,而且还有了死信队列主题中的数据,这能够从指标数据中看出:
检查主题自己也能够看出来:
ksql> LIST TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
---------------------------------------------------------------------------------------------------
dlq_file_sink_02 | false | 1 | 1 | 0 | 0
test_topic_json | false | 1 | 1 | 1 | 1
---------------------------------------------------------------------------------------------------
ksql> PRINT 'dlq_file_sink_02' FROM BEGINNING;
Format:STRING
1/24/19 5:16:03 PM UTC , NULL , {foo:"bar 1"}
1/24/19 5:16:03 PM UTC , NULL , {foo:"bar 2"}
1/24/19 5:16:03 PM UTC , NULL , {foo:"bar 3"}
…
复制代码
从输出中能够看出,消息的时间戳为(1/24/19 5:16:03 PM UTC
),键为(NULL
),而后为值。这时能够看到值是无效的JSON格式{foo:"bar 1"}
(foo
也应加上引号),所以JsonConverter在处理时会抛出异常,所以最终会输出到死信主题。
可是只有看到消息才能知道它是无效的JSON,即使如此,也只能假设消息被拒绝的缘由,要肯定Kafka链接器将消息视为无效的实际缘由,有两个方法:
下面会分别介绍。
消息头是使用Kafka消息的键、值和时间戳存储的附加元数据,是在Kafka 0.11版本中引入的。Kafka链接器能够将有关消息拒绝缘由的信息写入消息自己的消息头中。这个作法比写入日志文件更好,由于它将缘由直接与消息联系起来。
配置以下的参数,能够在死信队列的消息头中包含拒绝缘由:
errors.deadletterqueue.context.headers.enable = true
复制代码
配置示例大体以下:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "file_sink_03",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"topics":"test_topic_json",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"file": "/data/file_sink_03.txt",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name":"dlq_file_sink_03",
"errors.deadletterqueue.topic.replication.factor": 1,
"errors.deadletterqueue.context.headers.enable":true
}
}'
复制代码
和以前一致,链接器能够正常运行(由于配置了errors.tolerance=all
)。
$ curl -s "http://localhost:8083/connectors/file_sink_03/status"| \
jq -c -M '[.name,.tasks[].state]'
["file_sink_03","RUNNING"]
复制代码
源主题中的有效消息会正常写入目标文件:
$ head data/file_sink_03.txt
{foo=bar 1}
{foo=bar 2}
{foo=bar 3}
[…]
复制代码
可使用任何消费者工具来检查死信队列上的消息(以前使用了KSQL),不过这里会使用kafkacat,而后立刻就会看到缘由,最简单的操做大体以下:
kafkacat -b localhost:9092 -t dlq_file_sink_03
% Auto-selecting Consumer mode (use -P or -C to override)
{foo:"bar 1"}
{foo:"bar 2"}
…
复制代码
不过kafkacat有更强大的功能,能够看到比消息自己更多的信息:
kafkacat -b localhost:9092 -t dlq_file_sink_03 -C -o-1 -c1 \
-f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p Offset: %o Headers: %h\n'
复制代码
这个命令将获取最后一条消息(-o-1
,针对偏移量,使用最后一条消息),只读取一条消息(-c1
),而且经过-f
参数对其进行格式化,以更易于理解:
Key (-1 bytes):
Value (13 bytes): {foo:"bar 5"}
Timestamp: 1548350164096
Partition: 0
Offset: 34
Headers: __connect.errors.topic=test_topic_json,__connect.errors.partition=0,__connect.errors.offset=94,__connect.errors.connector.name=file_sink_03,__connect.errors.task.id=0,__connect.errors.stage=VALU
E_CONVERTER,__connect.errors.class.name=org.apache.kafka.connect.json.JsonConverter,__connect.errors.exception.class.name=org.apache.kafka.connect.errors.DataException,__connect.errors.exception.message=Co
nverting byte[] to Kafka Connect data failed due to serialization error: ,__connect.errors.exception.stacktrace=org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed
due to serialization error:
[…]
复制代码
也能够只显示消息头,并使用一些简单的技巧将其拆分,这样能够更清楚地看到该问题的更多信息:
$ kafkacat -b localhost:9092 -t dlq_file_sink_03 -C -o-1 -c1 -f '%h'|tr ',' '\n'
__connect.errors.topic=test_topic_json
__connect.errors.partition=0
__connect.errors.offset=94
__connect.errors.connector.name=file_sink_03
__connect.errors.task.id=0
__connect.errors.stage=VALUE_CONVERTER
__connect.errors.class.name=org.apache.kafka.connect.json.JsonConverter
__connect.errors.exception.class.name=org.apache.kafka.connect.errors.DataException
__connect.errors.exception.message=Converting byte[] to Kafka Connect data failed due to serialization error:
复制代码
Kafka链接器处理的每条消息都来自源主题和该主题中的特定点(偏移量),消息头已经准确地说明了这一点。所以可使用它来回到原始主题并在须要时检查原始消息,因为死信队列已经有一个消息的副本,这个检查更像是一个保险的作法。
根据从上面的消息头中获取的详细信息,能够再检查一下源消息:
__connect.errors.topic=test_topic_json
__connect.errors.offset=94
复制代码
将这些值分别插入到kafkacat的表明主题和偏移的-t
和-o
参数中,能够获得:
$ kafkacat -b localhost:9092 -C \
-t test_topic_json -o94 \
-f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p Offset: %o Topic: %t\n'
复制代码
Key (-1 bytes):
Value (13 bytes): {foo:"bar 5"}
Timestamp: 1548350164096
Partition: 0
Offset: 94
Topic: test_topic_json
复制代码
与死信队列中的上述消息相比,能够看到彻底相同,甚至包括时间戳,惟一的区别是主题、偏移量和消息头。
记录消息的拒绝缘由的第二个选项是将其写入日志。根据安装方式不一样,Kafka链接器会将其写入标准输出或日志文件。不管哪一种方式都会为每一个失败的消息生成一堆详细输出。进行以下配置可启用此功能:
errors.log.enable = true
复制代码
经过配置errors.log.include.messages = true
,还能够在输出中包含有关消息自己的元数据。此元数据中包括一些和上面提到的消息头中同样的项目,包括源消息的主题和偏移量。注意它不包括消息键或值自己。
这时的链接器配置以下:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "file_sink_04",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"topics":"test_topic_json",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"file": "/data/file_sink_04.txt",
"errors.tolerance": "all",
"errors.log.enable":true,
"errors.log.include.messages":true
}
}'
复制代码
链接器是能够成功运行的:
$ curl -s "http://localhost:8083/connectors/file_sink_04/status"| \
jq -c -M '[.name,.tasks[].state]'
["file_sink_04","RUNNING"]
Valid records from the source topic get written to the target file:
$ head data/file_sink_04.txt
{foo=bar 1}
{foo=bar 2}
{foo=bar 3}
[…]
复制代码
这时去看Kafka链接器的工做节点日志,会发现每一个失败的消息都有错误记录:
ERROR Error encountered in task file_sink_04-0. Executing stage 'VALUE_CONVERTER' with class 'org.apache.kafka.connect.json.JsonConverter', where consumed record is {topic='test_topic_json', partition=0, offset=94, timestamp=1548350164096, timestampType=CreateTime}. (org.apache.kafka.connect.runtime.errors.LogReporter)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)
[…]
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('f' (code 102)): was expecting double-quote to start field name
at [Source: (byte[])"{foo:"bar 5"}"; line: 1, column: 3]
复制代码
能够看到错误自己,还有就是和错误有关的信息:
{topic='test_topic_json', partition=0, offset=94, timestamp=1548350164096, timestampType=CreateTime}
复制代码
如上所示,能够在kafkacat等工具中使用该主题和偏移量来检查源主题上的消息。根据抛出的异常也可能会看到记录的源消息:
Caused by: org.apache.kafka.common.errors.SerializationException:
…
at [Source: (byte[])"{foo:"bar 5"}"; line: 1, column: 3]
复制代码
虽然设置了一个死信队列,可是如何处理那些“死信”呢?由于它只是一个Kafka主题,因此能够像使用任何其它主题同样使用标准的Kafka工具。上面已经看到了,好比可使用kafkacat来检查消息头,而且对于消息的内容及其元数据的通常检查kafkacat也能够作。固然根据被拒绝的缘由,也能够选择对消息进行重播。
一个场景是链接器正在使用Avro转换器,可是主题上的倒是JSON格式消息(所以被写入死信队列)。可能因为遗留缘由JSON和Avro格式的生产者都在写入源主题,这个问题得解决,可是目前只须要将管道流中的数据写入接收器便可。
首先,从初始的接收器读取源主题开始,使用Avro反序列化并路由到死信队列:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "file_sink_06__01-avro",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"topics":"test_topic_avro",
"file":"/data/file_sink_06.txt",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"errors.tolerance":"all",
"errors.deadletterqueue.topic.name":"dlq_file_sink_06__01",
"errors.deadletterqueue.topic.replication.factor":1,
"errors.deadletterqueue.context.headers.enable":true,
"errors.retry.delay.max.ms": 60000,
"errors.retry.timeout": 300000
}
}'
复制代码
另外再建立第二个接收器,将第一个接收器的死信队列做为源主题,并尝试将记录反序列化为JSON,在这里要更改的是value.converter
、key.converter
、源主题名和死信队列名(若是此链接器须要将任何消息路由到死信队列,要避免递归)。
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "file_sink_06__02-json",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"topics":"dlq_file_sink_06__01",
"file":"/data/file_sink_06.txt",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"errors.tolerance":"all",
"errors.deadletterqueue.topic.name":"dlq_file_sink_06__02",
"errors.deadletterqueue.topic.replication.factor":1,
"errors.deadletterqueue.context.headers.enable":true,
"errors.retry.delay.max.ms": 60000,
"errors.retry.timeout": 300000
}
}'
复制代码
如今能够验证一下。
首先,源主题收到20条Avro消息,以后能够看到20条消息被读取并被原始Avro接收器接收:
而后发送8条JSON消息,这时8条消息被发送到死信队列,而后被JSON接收器接收:
如今再发送5条格式错误的JSON消息,以后能够看到二者都有失败的消息,有2点能够确认:
除了使用JMX监控死信队列以外,还能够利用KSQL的聚合能力编写一个简单的流应用来监控消息写入队列的速率:
-- 为每一个死信队列主题注册流。
CREATE STREAM dlq_file_sink_06__01(MSG VARCHAR)WITH(KAFKA_TOPIC ='dlq_file_sink_06__01',VALUE_FORMAT ='DELIMITED');
CREATE STREAM dlq_file_sink_06__02(MSG VARCHAR)WITH(KAFKA_TOPIC ='dlq_file_sink_06__02',VALUE_FORMAT ='DELIMITED');
-- 从主题的开头消费数据
SET 'auto.offset.reset' = 'earliest';
-- 使用其它列建立监控流,可用于后续聚合查询
CREATE STREAM DLQ_MONITOR WITH (VALUE_FORMAT='AVRO') AS \
SELECT 'dlq_file_sink_06__01' AS SINK_NAME, \
'Records: ' AS GROUP_COL, \
MSG \
FROM dlq_file_sink_06__01;
-- 使用来自第二个死信队列的消息注入相同的监控流
INSERT INTO DLQ_MONITOR \
SELECT 'dlq_file_sink_06__02' AS SINK_NAME, \
'Records: ' AS GROUP_COL, \
MSG \
FROM dlq_file_sink_06__02;
-- 在每一个死信队列每分钟的时间窗口内,建立消息的聚合视图
CREATE TABLE DLQ_MESSAGE_COUNT_PER_MIN AS \
SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss') AS START_TS, \
SINK_NAME, \
GROUP_COL, \
COUNT(*) AS DLQ_MESSAGE_COUNT \
FROM DLQ_MONITOR \
WINDOW TUMBLING (SIZE 1 MINUTE) \
GROUP BY SINK_NAME, \
GROUP_COL;
复制代码
这个聚合表能够以交互式的方式进行查询,下面显示了一分钟内每一个死信队列中的消息数量:
ksql> SELECT START_TS, SINK_NAME, DLQ_MESSAGE_COUNT FROM DLQ_MESSAGE_COUNT_PER_MIN;
2019-02-01 02:56:00 | dlq_file_sink_06__01 | 9
2019-02-01 03:10:00 | dlq_file_sink_06__01 | 8
2019-02-01 03:12:00 | dlq_file_sink_06__01 | 5
2019-02-01 02:56:00 | dlq_file_sink_06__02 | 5
2019-02-01 03:12:00 | dlq_file_sink_06__02 | 5
复制代码
由于这个表的下面是Kafka主题,因此能够将其路由到指望的任何监控仪表盘,还能够用于驱动告警。假定有几条错误消息是能够接受的,可是一分钟内超过5条消息就是个大问题须要关注:
CREATE TABLE DLQ_BREACH AS \
SELECT START_TS, SINK_NAME, DLQ_MESSAGE_COUNT \
FROM DLQ_MESSAGE_COUNT_PER_MIN \
WHERE DLQ_MESSAGE_COUNT>5;
复制代码
如今又有了一个报警服务能够订阅的DLQ_BREACH
主题,当收到任何消息时,能够触发适当的操做(例如通知)。
ksql> SELECT START_TS, SINK_NAME, DLQ_MESSAGE_COUNT FROM DLQ_BREACH;
2019-02-01 02:56:00 | dlq_file_sink_06__01 | 9
2019-02-01 03:10:00 | dlq_file_sink_06__01 | 8
复制代码
Kafka链接器的错误处理方式,以下表所示:
链接器生命周期阶段 | 描述 | 是否处理错误? |
---|---|---|
开始 | 首次启动链接器时,其将执行必要的初始化,例如链接到数据存储 | 无 |
拉取(针对源链接器) | 从源数据存储读取消息 | 无 |
格式转换 | 从Kafka主题读写数据并对JSON/Avro格式进行序列化/反序列化 | 有 |
单消息转换 | 应用任何已配置的单消息转换 | 有 |
接收(针对接收链接器) | 将消息写入目标数据存储 | 无 |
注意源链接器没有死信队列。
关于链接器错误处理的配置,能够按照以下的流程一步步进阶:
处理错误是任何稳定可靠的数据管道的重要组成部分,根据数据的使用方式,能够有两个选项。若是管道任何错误的消息都不能接受,代表上游存在严重的问题,那么就应该当即中止处理(这是Kafka链接器的默认行为)。
另外一方面,若是只是想将数据流式传输到存储以进行分析或非关键性处理,那么只要不传播错误,保持管道稳定运行则更为重要。这时就能够定义错误的处理方式,推荐的方式是使用死信队列并密切监视来自Kafka链接器的可用JMX指标。