Kafka链接器之在2.3版本中的改进

在Kafka的2.3版本中,对Kafka链接器作了很大的改进。首先就是在添加和删除链接器时,修改了Kafka链接器处理任务的方式。以前这个动做形成了整个系统的停顿,这是一直被开发和运维人员诟病的地方,除此以外,社区中频繁提到的其余一些问题,也获得了解决。html

Kafka链接器中的增量协做再平衡

Kafka链接器集群由一个或多个工做节点进程组成,集群以任务的形式分发链接器的负载。在添加或删除链接器或工做节点时,Kafka链接器会尝试再平衡这些任务。在Kafka的2.3版本以前,集群会中止全部任务,从新计算全部任务的执行位置,而后重启全部任务。每次再平衡都会暂停全部数据进出的工做,一般时间很短,但有时也会持续一段时间。git

如今经过KIP-415,Kafka 2.3用增量协做再平衡作了替代,之后将仅对须要启动、中止或移动的任务进行再平衡。具体的详细信息请参见这里docker

下面用一些链接器作了一个简单的测试,这里只使用了一个分布式Kafka链接器工做节点,而源端使用了kafka-connect-datagen,它以固定的时间间隔根据给定的模式生成随机数据。以固定的时间间隔就能够粗略地计算因为再平衡而中止任务的时间,由于生成的消息做为Kafka消息的一部分,包含了时间戳。这些消息以后会被流式注入Elasticsearch,之因此用它,不只由于它是一个易于使用的接收端,也由于能够经过观察源端消息的时间戳来查看生产中的任何停顿。apache

经过以下的方式,能够建立源端:json

curl -s -X PUT -H  "Content-Type:application/json" http://localhost:8083/connectors/source-datagen-01/config \
    -d '{
    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
    "kafka.topic": "orders",
    "quickstart":"orders",
    "max.interval":200,
    "iterations":10000000,
    "tasks.max": "1"
  }'

经过以下方式建立接收端:api

curl -s -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/sink-elastic-orders-00/config \
    -d '{
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "topics": "orders",
        "connection.url": "http://elasticsearch:9200",
        "type.name": "type.name=kafkaconnect",
        "key.ignore": "true",
        "schema.ignore": "false",
        "transforms": "addTS",
        "transforms.addTS.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.addTS.timestamp.field": "op_ts"
        }'

这里使用了单消息转换,将Kafka消息的时间戳提高到消息自己的字段中,以即可以在Elasticsearch中进行公开。以后会使用Kibana进行绘制,这样产生的消息数量降低就能够显示出来,与再平衡发生的位置一致:安全

在Kafka链接器的工做节点日志中,能够查看活动和时间,并对Kafka的2.2版本和2.3版本的行为进行比较:bash

**注意:**为了清楚地说明问题,日志作了精简处理。app

对日志的改进

在再平衡问题(如前述)已大大改善以后,Kafka链接器的第二大困扰多是难以在Kafka链接器工做节点日志中肯定某个消息属于哪一个链接器。运维

以前能够直接从链接器的任务中获取日志中的消息,例如:

INFO Using multi thread/connection supporting pooling connection manager (io.searchbox.client.JestClientFactory)
INFO Using default GSON instance (io.searchbox.client.JestClientFactory)
INFO Node Discovery disabled... (io.searchbox.client.JestClientFactory)
INFO Idle connection reaping disabled... (io.searchbox.client.JestClientFactory)

他们属于哪一个任务?不清楚。也许会认为JestClientElasticsearch有关,也许它们来自Elasticsearch链接器,可是如今有5个不一样的Elasticsearch链接器在运行,那么它们来自哪一个实例?更不用说链接器能够有多个任务了。

在Apache Kafka 2.3中,可使用映射诊断上下文(MDC)日志,在日志中提供了更多的上下文信息:

INFO [sink-elastic-orders-00|task-0] Using multi thread/connection supporting pooling connection manager (io.searchbox.client.JestClientFactory:223)
INFO [sink-elastic-orders-00|task-0] Using default GSON instance (io.searchbox.client.JestClientFactory:69)
INFO [sink-elastic-orders-00|task-0] Node Discovery disabled... (io.searchbox.client.JestClientFactory:86)
INFO [sink-elastic-orders-00|task-0] Idle connection reaping disabled... (io.searchbox.client.JestClientFactory:98)

这个日志格式的更改默认是禁用的,以保持后向兼容性。要启用此改进,须要编辑etc/kafka/connect-log4j.properties文件,按照以下方式修改log4j.appender.stdout.layout.ConversionPattern

log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m (%c:%L)%n

经过环境变量CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERNKafka链接器的Docker镜像也支持了这个特性。

具体细节请参见KIP-449

REST改进

KIP-465/connectorsREST端点添加了一些方便的功能。经过传递其余参数,能够获取有关每一个链接器的更多信息,而没必要迭代结果并进行其余REST调用。

例如,在Kafka 2.3以前要查询全部任务的状态,必须执行如下操做,使用xargs迭代输出并重复调用status端点:

$ curl -s "http://localhost:8083/connectors"| \
    jq '.[]'| \
    xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| \
    jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| \
    column -s : -t| sed 's/\"//g'| sort
sink-elastic-orders-00  |  RUNNING  |  RUNNING
source-datagen-01       |  RUNNING  |  RUNNING

如今使用Kafka 2.3,可使用/connectors?expand=status加上一些jq技巧进行单个REST调用,就能够达到和以前同样的效果:

$ curl -s "http://localhost:8083/connectors?expand=status" | \
     jq 'to_entries[] | [.key, .value.status.connector.state,.value.status.tasks[].state]|join(":|:")' | \
     column -s : -t| sed 's/\"//g'| sort
sink-elastic-orders-00  |  RUNNING  |  RUNNING
source-datagen-01       |  RUNNING  |  RUNNING

还有/connectors?expand=status,它将返回每一个链接器信息,如配置、链接器类型等,也能够把它们结合起来:

$ curl -s "http://localhost:8083/connectors?expand=info&expand=status"|jq 'to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
       column -s : -t| sed 's/\"//g'| sort
sink    |  sink-elastic-orders-00  |  RUNNING  |  RUNNING  |  io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
source  |  source-datagen-01       |  RUNNING  |  RUNNING  |  io.confluent.kafka.connect.datagen.DatagenConnector

Kafka链接器现已支持client.id

由于KIP-411,Kafka链接器如今能够以更有用的方式为每项任务配置client.id。以前,只能看到consumer-25做为链接器的消费者组的一部分从给定的分区进行消费,如今则能够将其直接绑定回特定的任务,从而使故障排除和诊断更加容易。

链接器级生产者/消费者配置覆写

长期以来的一个常见需求是可以覆写分别由Kafka链接器接收端和源端使用的消费者设置生产者设置。到目前为止,它们都采用了工做节点配置中指定的值,除非生成了更多的工做节点,不然没法对诸如安全主体之类的内容进行细粒度的控制。

Kafka 2.3中的KIP-458使工做节点可以容许对配置进行覆写。connector.client.config.override.policy是一个新的参数,在工做节点级能够有3个可选项:

描述
None 默认策略,不容许任何配置的覆写
Principal 容许覆盖生产者、消费者和admin前缀的security.protocolsasl.jaas.configsasl.mechanism
All 容许覆盖生产者、消费者和admin前缀的全部配置

经过在工做节点配置中设置上述参数,如今能够针对每一个链接器对配置进行覆写。只需提供带有consumer.override(接收端)或producer.override(源端)前缀的必需参数便可,还能够针对死信队列使用admin.override

在下面的示例中,建立链接器时,它将从主题的当前点开始消费数据,而不是读取主题中的全部可用数据,这是经过将consumer.override.auto.offset.reset配置为latest覆盖auto.offset.reset configuration来实现的。

curl -i -X PUT -H  "Content-Type:application/json" \
      http://localhost:8083/connectors/sink-elastic-orders-01-latest/config \
      -d '{
  "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "topics": "orders",
  "consumer.override.auto.offset.reset": "latest",
  "tasks.max": 1,
  "connection.url": "http://elasticsearch:9200",  "type.name": "type.name=kafkaconnect",
  "key.ignore": "true",   "schema.ignore": "false",
  "transforms": "renameTopic",
  "transforms.renameTopic.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.renameTopic.regex": "orders",
  "transforms.renameTopic.replacement": "orders-latest"
}'

经过检查工做节点日志,能够看到覆写已经生效:

[2019-07-17 13:57:27,532] INFO [sink-elastic-orders-01-latest|task-0] ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
[…]

能够看到这个ConsumerConfig日志条目与建立的链接器直接关联,证实了上述MDC日志记录的有用性。

第二个链接器运行于同一主题但没有consumer.override,所以继承了默认值earliest

[2019-07-17 13:57:27,487] INFO [sink-elastic-orders-01-earliest|task-0] ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
[…]

经过将数据从主题流式传输到Elasticsearch能够检查配置的差别形成的影响。

$ curl -s "localhost:9200/_cat/indices?h=idx,docsCount"
orders-latest     2369
orders-earliest 144932

有两个索引:一个从同一主题注入了较少的消息,由于orders-latest索引只注入了链接器建立后才到达主题的消息;而另外一个orders-earliest索引,由一个单独的链接器注入,它会使用Kafka链接器的默认配置,即会注入全部的新消息,再加上主题中原有的全部消息。

相关文章
相关标签/搜索