在Kafka的2.3版本中,对Kafka链接器作了很大的改进。首先就是在添加和删除链接器时,修改了Kafka链接器处理任务的方式。以前这个动做形成了整个系统的停顿,这是一直被开发和运维人员诟病的地方,除此以外,社区中频繁提到的其余一些问题,也获得了解决。html
Kafka链接器集群由一个或多个工做节点进程组成,集群以任务的形式分发链接器的负载。在添加或删除链接器或工做节点时,Kafka链接器会尝试再平衡这些任务。在Kafka的2.3版本以前,集群会中止全部任务,从新计算全部任务的执行位置,而后重启全部任务。每次再平衡都会暂停全部数据进出的工做,一般时间很短,但有时也会持续一段时间。git
如今经过KIP-415,Kafka 2.3用增量协做再平衡作了替代,之后将仅对须要启动、中止或移动的任务进行再平衡。具体的详细信息请参见这里。docker
下面用一些链接器作了一个简单的测试,这里只使用了一个分布式Kafka链接器工做节点,而源端使用了kafka-connect-datagen
,它以固定的时间间隔根据给定的模式生成随机数据。以固定的时间间隔就能够粗略地计算因为再平衡而中止任务的时间,由于生成的消息做为Kafka消息的一部分,包含了时间戳。这些消息以后会被流式注入Elasticsearch,之因此用它,不只由于它是一个易于使用的接收端,也由于能够经过观察源端消息的时间戳来查看生产中的任何停顿。apache
经过以下的方式,能够建立源端:json
curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/source-datagen-01/config \ -d '{ "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "kafka.topic": "orders", "quickstart":"orders", "max.interval":200, "iterations":10000000, "tasks.max": "1" }'
经过以下方式建立接收端:api
curl -s -X PUT -H "Content-Type:application/json" \ http://localhost:8083/connectors/sink-elastic-orders-00/config \ -d '{ "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "topics": "orders", "connection.url": "http://elasticsearch:9200", "type.name": "type.name=kafkaconnect", "key.ignore": "true", "schema.ignore": "false", "transforms": "addTS", "transforms.addTS.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.addTS.timestamp.field": "op_ts" }'
这里使用了单消息转换,将Kafka消息的时间戳提高到消息自己的字段中,以即可以在Elasticsearch中进行公开。以后会使用Kibana进行绘制,这样产生的消息数量降低就能够显示出来,与再平衡发生的位置一致:安全
在Kafka链接器的工做节点日志中,能够查看活动和时间,并对Kafka的2.2版本和2.3版本的行为进行比较:bash
**注意:**为了清楚地说明问题,日志作了精简处理。app
在再平衡问题(如前述)已大大改善以后,Kafka链接器的第二大困扰多是难以在Kafka链接器工做节点日志中肯定某个消息属于哪一个链接器。运维
以前能够直接从链接器的任务中获取日志中的消息,例如:
INFO Using multi thread/connection supporting pooling connection manager (io.searchbox.client.JestClientFactory) INFO Using default GSON instance (io.searchbox.client.JestClientFactory) INFO Node Discovery disabled... (io.searchbox.client.JestClientFactory) INFO Idle connection reaping disabled... (io.searchbox.client.JestClientFactory)
他们属于哪一个任务?不清楚。也许会认为JestClient
与Elasticsearch
有关,也许它们来自Elasticsearch
链接器,可是如今有5个不一样的Elasticsearch
链接器在运行,那么它们来自哪一个实例?更不用说链接器能够有多个任务了。
在Apache Kafka 2.3中,可使用映射诊断上下文(MDC)日志,在日志中提供了更多的上下文信息:
INFO [sink-elastic-orders-00|task-0] Using multi thread/connection supporting pooling connection manager (io.searchbox.client.JestClientFactory:223) INFO [sink-elastic-orders-00|task-0] Using default GSON instance (io.searchbox.client.JestClientFactory:69) INFO [sink-elastic-orders-00|task-0] Node Discovery disabled... (io.searchbox.client.JestClientFactory:86) INFO [sink-elastic-orders-00|task-0] Idle connection reaping disabled... (io.searchbox.client.JestClientFactory:98)
这个日志格式的更改默认是禁用的,以保持后向兼容性。要启用此改进,须要编辑etc/kafka/connect-log4j.properties
文件,按照以下方式修改log4j.appender.stdout.layout.ConversionPattern
:
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m (%c:%L)%n
经过环境变量CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN
,Kafka链接器的Docker镜像也支持了这个特性。
具体细节请参见KIP-449。
REST改进
KIP-465为/connectors
REST端点添加了一些方便的功能。经过传递其余参数,能够获取有关每一个链接器的更多信息,而没必要迭代结果并进行其余REST调用。
例如,在Kafka 2.3以前要查询全部任务的状态,必须执行如下操做,使用xargs
迭代输出并重复调用status
端点:
$ curl -s "http://localhost:8083/connectors"| \ jq '.[]'| \ xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| \ jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| \ column -s : -t| sed 's/\"//g'| sort sink-elastic-orders-00 | RUNNING | RUNNING source-datagen-01 | RUNNING | RUNNING
如今使用Kafka 2.3,可使用/connectors?expand=status
加上一些jq
技巧进行单个REST调用,就能够达到和以前同样的效果:
$ curl -s "http://localhost:8083/connectors?expand=status" | \ jq 'to_entries[] | [.key, .value.status.connector.state,.value.status.tasks[].state]|join(":|:")' | \ column -s : -t| sed 's/\"//g'| sort sink-elastic-orders-00 | RUNNING | RUNNING source-datagen-01 | RUNNING | RUNNING
还有/connectors?expand=status
,它将返回每一个链接器信息,如配置、链接器类型等,也能够把它们结合起来:
$ curl -s "http://localhost:8083/connectors?expand=info&expand=status"|jq 'to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \ column -s : -t| sed 's/\"//g'| sort sink | sink-elastic-orders-00 | RUNNING | RUNNING | io.confluent.connect.elasticsearch.ElasticsearchSinkConnector source | source-datagen-01 | RUNNING | RUNNING | io.confluent.kafka.connect.datagen.DatagenConnector
Kafka链接器现已支持client.id
由于KIP-411,Kafka链接器如今能够以更有用的方式为每项任务配置client.id
。以前,只能看到consumer-25
做为链接器的消费者组的一部分从给定的分区进行消费,如今则能够将其直接绑定回特定的任务,从而使故障排除和诊断更加容易。
链接器级生产者/消费者配置覆写
长期以来的一个常见需求是可以覆写分别由Kafka链接器接收端和源端使用的消费者设置或生产者设置。到目前为止,它们都采用了工做节点配置中指定的值,除非生成了更多的工做节点,不然没法对诸如安全主体之类的内容进行细粒度的控制。
Kafka 2.3中的KIP-458使工做节点可以容许对配置进行覆写。connector.client.config.override.policy
是一个新的参数,在工做节点级能够有3个可选项:
值 | 描述 |
---|---|
None | 默认策略,不容许任何配置的覆写 |
Principal | 容许覆盖生产者、消费者和admin 前缀的security.protocol 、sasl.jaas.config 和sasl.mechanism |
All | 容许覆盖生产者、消费者和admin 前缀的全部配置 |
经过在工做节点配置中设置上述参数,如今能够针对每一个链接器对配置进行覆写。只需提供带有consumer.override
(接收端)或producer.override
(源端)前缀的必需参数便可,还能够针对死信队列使用admin.override
。
在下面的示例中,建立链接器时,它将从主题的当前点开始消费数据,而不是读取主题中的全部可用数据,这是经过将consumer.override.auto.offset.reset
配置为latest
覆盖auto.offset.reset configuration
来实现的。
curl -i -X PUT -H "Content-Type:application/json" \ http://localhost:8083/connectors/sink-elastic-orders-01-latest/config \ -d '{ "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "topics": "orders", "consumer.override.auto.offset.reset": "latest", "tasks.max": 1, "connection.url": "http://elasticsearch:9200", "type.name": "type.name=kafkaconnect", "key.ignore": "true", "schema.ignore": "false", "transforms": "renameTopic", "transforms.renameTopic.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.renameTopic.regex": "orders", "transforms.renameTopic.replacement": "orders-latest" }'
经过检查工做节点日志,能够看到覆写已经生效:
[2019-07-17 13:57:27,532] INFO [sink-elastic-orders-01-latest|task-0] ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = latest […]
能够看到这个ConsumerConfig
日志条目与建立的链接器直接关联,证实了上述MDC日志记录的有用性。
第二个链接器运行于同一主题但没有consumer.override
,所以继承了默认值earliest
:
[2019-07-17 13:57:27,487] INFO [sink-elastic-orders-01-earliest|task-0] ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = earliest […]
经过将数据从主题流式传输到Elasticsearch能够检查配置的差别形成的影响。
$ curl -s "localhost:9200/_cat/indices?h=idx,docsCount" orders-latest 2369 orders-earliest 144932
有两个索引:一个从同一主题注入了较少的消息,由于orders-latest
索引只注入了链接器建立后才到达主题的消息;而另外一个orders-earliest
索引,由一个单独的链接器注入,它会使用Kafka链接器的默认配置,即会注入全部的新消息,再加上主题中原有的全部消息。