Kafka链接器深度解读之转换器和序列化

Kafka链接器是Apache Kafka®的一部分,提供数据存储与Kafka之间的流式集成。对于数据工程师来讲,只须要使用JSON格式配置文件便可。目前已经有不少数据存储的链接器,仅举几例来讲,包括JDBCElasticsearchIBM MQS3BigQueryhtml

对于开发者,Kafka链接器有丰富的API,若有必要,能够开发本身的链接器。此外它还具备用于配置和管理链接器的REST APIjava

Kafka链接器自己是模块化的,提供了很是强大的知足集成需求的方法,部分关键组件包括:git

  • 链接器:定义了一组如何与数据存储集成的JAR文件;
  • 转换器:处理数据的序列化和反序列化;
  • 变换:传输过程当中的消息处理(可选)。

围绕Kafka链接器,常见的错误或者误解之一是数据的序列化,这是Kafka链接器经过转换器进行处理的,下面会介绍它们的工做机制,并说明一些常见问题如何处理。github

Kafka消息只是字节

Kafka消息是按照主题进行组织的。每条消息都是一个键/值对,不过Kafka就须要这些。当数据在Kafka中存储时都只是字节,这使得Kafka能够适用于各类场景,但这也意味着开发者有责任决定如何对数据进行序列化。sql

在配置Kafka链接器时,标准步骤的关键之一是序列化格式,要确保主题的读取方和写入方使用相同的序列化格式,不然会出现混乱和错误!docker

常见的格式有不少,包括:数据库

  • JSON;
  • Avro;
  • Protobuf;
  • 字符串分割(如CSV)。

每种格式都有优势和缺点。apache

序列化格式的选择

选择序列化格式的一些原则包括:json

  • 模式:不少时候数据都会有一个模式。可能不喜欢这个事实,但做为开发人员有责任保留和传播此模式,由于模式提供了服务之间的契约。某些消息格式(例如Avro和Protobuf)具备强大的模式支持,而其它消息格式支持较少(JSON)或根本没有(分隔字符串);
  • 生态系统兼容性:Avro是Confluent平台的一等公民,获得了Confluent模式注册表、Kafka链接器、KSQL等的原生支持。而Protobuf则依赖于部分功能支持的社区贡献;
  • 消息大小:JSON是纯文本格式,消息大小依赖于Kafka自己的压缩配置,而Avro和Protobuf都是二进制格式,所以消息较小;
  • 语言支持:Java体系对Avro有强大的支持,但若是应用不是基于Java的,那么可能会发现它不太容易处理。

若是使用JSON格式写入目标端,须要在主题中使用JSON格式么?

不须要,无论是从源端读取数据的格式,仍是将数据写入外部存储,都不会影响Kafka中消息序列化的格式。bootstrap

Kafka中的链接器负责从源端(例如数据库)读取数据,并将其做为数据的内部表示传递给转换器,而后,Kafka中的转换器会将此源数据对象序列化到主题上。

当使用Kafka链接器做为接收端时,正好相反,即转换器未来自主题的数据反序列化为内部表示,其会传递給链接器,而后使用指定方法写入目标端。

这意味着能够在主题中好比以Avro格式保存数据,而后好比将其写入HDFS时,再指定接收端链接器使用的格式

配置转换器

Kafka链接器在工做节点级别使用默认的转换器配置,也能够在每一个链接器上覆盖它。因为在整个流水线中使用相同的序列化格式一般是一个好的作法,因此一般只需在工做节点上配置转换器,而无需在链接器中指定。可是若是从其它主题中提取数据而它们使用不一样的序列化格式时,就要在链接器配置中指定它,即便在链接器的配置中覆盖它,执行任务的仍是那个转换器。

正确的链接器永远不会序列化/反序列化存储在Kafka中的消息,而是让配置的转换器完成这项工做。

注意Kafka消息只是键/值字节对,所以须要使用key.convertervalue.converter配置项为键和值指定转换器,某些状况下,能够为键和值指定不一样的转换器。

下面是使用String转换器的示例,因为它只是一个字符串,数据没有模式,所以用于value并非那么有用:

"key.converter": "org.apache.kafka.connect.storage.StringConverter",

某些转换器有其它配置项,对于Avro,须要指定模式注册表,对于JSON,须要指定是否但愿Kafka链接器将模式嵌入JSON自己。为转换器指定配置项时,要使用key.converter.value.converter.前缀。例如要将Avro用于消息的内容,须要指定如下配置项:

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",

常见的转换器包括:

io.confluent.connect.avro.AvroConverter
  • String:Apache Kafka的一部分
org.apache.kafka.connect.storage.StringConverter
  • JSON:Apache Kafka的一部分
org.apache.kafka.connect.json.JsonConverter
  • ByteArray:Apache Kafka的一部分
org.apache.kafka.connect.converters.ByteArrayConverter
com.blueapron.connect.protobuf.ProtobufConverter

JSON和模式

虽然JSON默认不支持携带模式,但Kafka链接器确实支持嵌入模式的特定JSON格式。因为模式也包含在每一个消息中,所以生成的数据大小可能会变大。

若是正在配置Kafka源链接器而且但愿Kafka链接器在写入Kafka的消息中包含模式,须要作以下的配置:

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

最终向Kafka写入的消息大体以下,schema以及payload为JSON中的顶级元素:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int64",
        "optional": false,
        "field": "registertime"
      },
      {
        "type": "string",
        "optional": false,
        "field": "userid"
      },
      {
        "type": "string",
        "optional": false,
        "field": "regionid"
      },
      {
        "type": "string",
        "optional": false,
        "field": "gender"
      }
    ],
    "optional": false,
    "name": "ksql.users"
  },
  "payload": {
    "registertime": 1493819497170,
    "userid": "User_1",
    "regionid": "Region_5",
    "gender": "MALE"
  }
}

请注意消息的大小,以及由内容与模式组成的消息的大小。考虑到在每条消息中都重复这一点,就会看到为何像Avro这样的格式颇有意义,由于模式是单独存储的,而消息只包含有效内容(并进行过压缩)。

若是从一个Kafka主题中使用Kafka接收链接器消费JSON格式的数据,则须要了解数据中是否包含模式,若是包含,则要与上面的格式相同,而不能是一些任意的格式,那么配置以下:

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

不过,若是使用的JSON数据没有schema/payload结构,像下面这样:

{
  "registertime": 1489869013625,
  "userid": "User_1",
  "regionid": "Region_2",
  "gender": "OTHER"
}

则必须经过配置通知Kafka链接器不要寻找模式,以下:

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

和之前同样,要记住转换器配置项(此处schemas.enable)须要合适的前缀key.convertervalue.converter

常见错误

若是Kafka链接器中转换器配置不正确,可能遇到如下一些常见错误。这些消息会出如今Kafka链接器配置的接收端中,由于这里是对存储在Kafka中的消息进行反序列化的点。转换器问题一般不会在源端发生,由于源端已经配置了序列化。其中每一个都会致使链接器失败,开始的错误为:

ERROR WorkerSinkTask{id=sink-file-users-json-noschema-01-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator. execAndHandleError(RetryWithToleranceOperator.java:178)
   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute (RetryWithToleranceOperator.java:104)

这个错误以后,会看到一个异常堆栈,其中描述了错误的缘由。注意对于链接器中的任何严重错误,都会抛出上述错误,所以可能会看到与序列化无关的错误。要快速定位错误是由哪一个错误配置致使的,可参考下表:

问题:使用JsonConverter读取非JSON格式数据

若是源端主题上有非JSON格式的数据,可是使用JsonConverter进行读取,就会看到:

org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
…
org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x1cfa7e2 (above 0x0010ffff) at char #1, byte #7)

这多是由于源端主题以Avro或其它格式序列化引发的。

解决方案:若是数据其实是Avro格式,则须要按照以下方式修改Kafka链接器的接收端:

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",

或者,若是主题由Kafka链接器注入,也能够调整上游源端,让其输出JSON格式数据:

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",

问题:使用AvroConverter读取非Avro格式数据

这是最多见的错误,当尝试使用AvroConverter从非Avro格式的主题读取数据时,会发生这种状况,还包括使用非Confluent模式注册表的Avro序列化器写入的数据:

org.apache.kafka.connect.errors.DataException: my-topic-name
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
…
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

解决方案:检查源端主题的序列化格式,调整Kafka链接器接收端使用正确的转换器,或将上游格式修改成Avro(这样最好)。若是上游主题由Kafka链接器注入,也能够按以下方式配置源端链接器的转换器:

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",

问题:读取没有指望的schema/payload结构的JSON数据

如前所述,Kafka链接器支持包含有效内容和模式的特殊JSON格式消息结构,若是读取的JSON数据不是这样的结构,会有下面的错误:

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

要知道,对于schemas.enable=true惟一有效的JSON结构是,schemapayload做为顶级元素(如上所示)。

正如错误消息自己所述,若是只是简单的JSON数据,则应将链接器的配置更改成:

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",

若是要在数据中包含模式,要么切换到使用Avro(推荐),要么配置上游的Kafka链接器以在消息中包含模式:

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",

解决问题的提示

查看链接器工做节点的日志

要查看Kafka链接器的错误日志,须要定位到Kafka链接器工做节点的输出。这个位置取决于Kafka链接器是如何启动的,安装Kafka链接器有好几种方法,包括Docker、Confluent CLI、systemd和手动下载的压缩包,而后工做节点的日志分别位于:

  • Docker:docker logs container_name
  • Confluent CLI:confluent log connect
  • systemd:日志文件写入/var/log/confluent/kafka-connect
  • 其它:默认状况下,Kafka链接器会将其输出发送到stdout,所以能够在启动Kafka链接器的终端会话中看到。

查看Kafka链接器的配置文件

要更改Kafka链接器工做节点的配置项(适用于全部运行的链接器),须要相应地作出以下的修改:

  • Docker:配置环境变量,好比在Docker Compose中:
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
  • Confluent CLI:使用配置文件/etc/schema-registry/connect-avro-distributed.properties
  • systemd(deb/rpm):使用配置文件/etc/kafka/connect-distributed.properties
  • 其它:启动Kafka链接器时,能够指定工做节点的属性文件,例如:
$ cd confluent-5.0.0
$ ./bin/connect-distributed ./etc/kafka/connect-distributed.properties

检查Kafka主题

假设遇到了上面提到过的错误,而且想要解决为何Kafka链接器的接收端没法从主题中读取数据。

这时须要检查正在读取的主题的数据,并确认它采用了指望的序列化格式。另外,要记住全部消息都必须采用这种格式,因此不要只是假设由于如今以正确的格式向主题发送消息,因此不会出现问题。Kafka链接器和其它消费者也会读取该主题的已有消息。

下面将使用命令行来描述如何进行故障排除,但还有一些其它工具也能够作:

  • Confluent控制中心有经过可视化的方式查看主题内容的功能,包括自动肯定序列化格式;
  • KSQL的PRINT命令会将主题的内容打印到控制台,包括自动肯定序列化格式;
  • Confluent CLI工具备consume命令,其可被用于读取字符串和Avro格式的数据。

若是认为是字符串/JSON格式数据

可使用控制台工具,包括kafkacatkafka-console-consumer,以kafkacat为例:

$ kafkacat -b localhost:9092 -t users-json-noschema -C -c1
{
  "registertime":1493356576434,"userid":"User_8","regionid":"Region_2","gender":"MALE"}

使用jq命令,还能够验证和格式化JSON:

$ kafkacat -b localhost:9092 -t users-json-noschema -C -c1|jq '.'
{
  "registertime": 1493356576434,
  "userid": "User_8",
  "regionid": "Region_2",
  "gender": "MALE"
}

若是你看到了下面这样的乱码字符,其极可能是二进制数据,好比经过Avro或Protobuf格式写入就是这样的:

$ kafkacat -b localhost:9092 -t users-avro -C -c1
ڝ���VUser_9Region_MALE

若是认为是Avro格式数据

须要使用专为读取和反序列化Avro数据而设计的控制台工具,这里会使用kafka-avro-console-consumer。先要确认指定了正确的模式注册表URL:

$ kafka-avro-console-consumer --bootstrap-server localhost:9092 \
                              --property schema.registry.url=http://localhost:8081 \
                              --topic users-avro \
                              --from-beginning --max-messages 1
{"registertime":1505213905022,"userid":"User_5","regionid":"Region_4","gender":"FEMALE"}

和以前同样,若是要对其格式化,能够经过管道输出结果给jq

$ kafka-avro-console-consumer --bootstrap-server localhost:9092 \
                              --property schema.registry.url=http://localhost:8081 \
                              --topic users-avro \
                              --from-beginning --max-messages 1 | \
                              jq '.'
{
  "registertime": 1505213905022,
  "userid": "User_5",
  "regionid": "Region_4",
  "gender": "FEMALE"
}

内部转换器

当运行在分布式模式时,Kafka链接器使用Kafka自己来存储有关其操做的元数据,包括链接器配置,偏移量等。

经过internal.key.converter/internal.value.converter配置项,这些Kafka主题自己能够配置使用不一样的转换器。可是这些配置项只供内部使用,实际上从Kafka 2.0版本开始就已被弃用。再也不须要修改这些,若是还要修改这些配置项,从Kafka的2.0版本开始,将会收到警告。

将模式应用于没有模式的消息

不少时候Kafka链接器会从已经存在模式的地方引入数据,这时只要保留该模式而后使用合适的序列化格式(例如Avro),加上好比模式注册表等提供的兼容性保证,该数据的全部下游用户就均可以从可用的模式中受益。可是若是没有明确的模式呢?

可能正使用FileSourceConnector从纯文本文件中读取数据(不建议用于生产,但一般用于PoC),或者可能正在使用REST链接器从REST端点提取数据。因为这二者以及其它的都没有固有的模式,所以须要进行声明。

有时可能只是想从源端读取字节而后将它们写入一个主题上,但大多数状况下须要作正确的事情并应用模式以便数据能够正确地处理。做为数据提取的一部分处理一次,而不是将问题推送到每一个消费者(多是多个),这是一个更好的作法。

能够编写本身的Kafka流式应用以将模式应用于Kafka主题中的数据,但也可使用KSQL。这篇文章展现了如何对从REST端点提取的JSON数据执行此操做。下面会看一下将模式应用于某些CSV数据的简单示例,显然是能够作到的。

假设有一个名为testdata-csv的Kafka主题,其中有一些CSV数据,大体以下:

$ kafkacat -b localhost:9092 -t testdata-csv -C
1,Rick Astley,Never Gonna Give You Up
2,Johnny Cash,Ring of Fire

经过观察,能够猜想它有三个字段,可能为:

  • ID;
  • 艺术家;
  • 歌曲。

若是将数据保留在这样的主题中,那么任何想要使用该数据的应用程序(多是Kafka链接器接收端、定制的Kafka应用或者其它),都须要每次猜想这个模式。或者更糟糕的是,每一个消费端应用的开发者都须要不断向数据提供方确认模式及其任何变动。正如Kafka解耦系统同样,这种模式依赖性迫使团队之间存在硬性耦合,这并非一件好事。

所以要作的只是使用KSQL将模式应用于数据,并填充一个新的派生主题,其中保存模式。在KSQL中,能够像下面这样查看主题数据:

ksql> PRINT 'testdata-csv' FROM BEGINNING;
Format:STRING
11/6/18 2:41:23 PM UTC , NULL , 1,Rick Astley,Never Gonna Give You Up
11/6/18 2:41:23 PM UTC , NULL , 2,Johnny Cash,Ring of Fire

这里的前两个字段(11/6/18 2:41:23 PM UTCNULL)分别是Kafka消息的时间戳和键,而其他字段来自CSV文件。下面用KSQL注册这个主题并声明模式:

ksql> CREATE STREAM TESTDATA_CSV (ID INT, ARTIST VARCHAR, SONG VARCHAR) \
WITH (KAFKA_TOPIC='testdata-csv', VALUE_FORMAT='DELIMITED');

Message
----------------
Stream created
----------------

经过KSQL能够查看如今有一个数据流模式:

ksql> DESCRIBE TESTDATA_CSV;

Name                 : TESTDATA_CSV
 Field   | Type
-------------------------------------
 ROWTIME | BIGINT (system)
 ROWKEY  | VARCHAR(STRING) (system)
 ID      | INTEGER
 ARTIST  | VARCHAR(STRING)
 SONG    | VARCHAR(STRING)
-------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;

经过查询KSQL流确认数据是否符合预期。注意对于已有的Kafka主题,此时只是做为Kafka的消费者,还没有更改或复制任何数据。

ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql> SELECT ID, ARTIST, SONG FROM TESTDATA_CSV;
1 | Rick Astley | Never Gonna Give You Up
2 | Johnny Cash | Ring of Fire

最后,建立一个新的Kafka主题,由具备模式的从新序列化的数据填充。KSQL查询是连续的,所以除了将任何已有的数据从源端主题发送到目标端主题以外,KSQL还将向主题发送任何将来的数据。

ksql> CREATE STREAM TESTDATA WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM TESTDATA_CSV;

Message
----------------------------
Stream created and running
----------------------------

这时使用Avro格式的控制台消费者对数据进行验证:

$ kafka-avro-console-consumer --bootstrap-server localhost:9092 \
                                --property schema.registry.url=http://localhost:8081 \
                                --topic TESTDATA \
                                --from-beginning | \
                                jq '.'
{
  "ID": {
    "int": 1
},
  "ARTIST": {
    "string": "Rick Astley"
},
  "SONG": {
    "string": "Never Gonna Give You Up"
  }
}
[…]

甚至能够在模式注册表中查看已经注册的模式:

$ curl -s http://localhost:8081/subjects/TESTDATA-value/versions/latest|jq '.schema|fromjson'
{
  "type": "record",
  "name": "KsqlDataSourceSchema",
  "namespace": "io.confluent.ksql.avro_schemas",
  "fields": [
    {
      "name": "ID",
      "type": [
        "null",
        "int"
      ],
      "default": null
    },
    {
      "name": "ARTIST",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "SONG",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}

任何写入原始主题(testdata-csv)的新消息都由KSQL自动处理,并写入Avro格式的名为TESTDATA的新主题。如今,任何想要使用此数据的应用或团队均可以简单地处理TESTDATA主题,并利用声明模式的Avro序列化数据。还可使用此技术更改主题中的分区数,分区键和复制因子。

结论

Kafka链接器是一个很是简单但功能强大的工具,可用于将其它系统与Kafka集成,最多见的误解是Kafka链接器提供的转换器。以前已经介绍过Kafka消息只是键/值对,了解应该使用哪一个序列化机制而后在Kafka链接器中对其进行标准化很是重要。

相关文章
相关标签/搜索