Kafka Connect是在0.9之后加入的功能,主要是用来将其余系统的数据导入到Kafka,而后再将Kafka中的数据导出到另外的系统。
能够用来作实时数据同步的ETL,数据实时分析处理等。html
主要有2种模式:Standalone(单机模式)和Distribute(分布式模式)。
单机主要用来开发,测试,分布式的用于生产环境。apache
启动命令: bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties …]
这么多的配置,搞毛线啊。说真的,官网的这个,当时我真没看懂。下面解释下:
执行单机启动脚本connect-standalone.sh,将connect-standalone.properties属性文件传递进去做为Worker的配置,
另外的配置就是属于Connector的配置,会被所有传递给SouceConnector或者SinkConnector。json
bootstrap.servers:kafka集群地址,例如:10.33.2.1:9092,10.33.2.9:9092,10.33.1.13:9092bootstrap
key.converter:用来转换写入或者读出kafka中消息的key的,例如:org.apache.kafka.connect.json.JsonConverter。
效果是对指定了key是id=1000,转换成{“id” : 1000},也可使用Avro的格式。我作的项目使用的maxwell,发现使用Json转换后,
字段的双引号全没了,冒号变成等号,变成了这种鬼东西{id=1000}。后面直接改为使用String的转换器:org.apache.kafka.connect.storage.StringConverter。并发
value.converter:同上,只是用来转换消息的value的,也就是传输的具体数据。分布式
offset.storage.file.filename:默认是:/tmp/connect.offsets。
这个配置要注意,单机模式是须要本身持久化offset的。 Kafka Connect会用这里配置的文件保存offset。
并且针对producer和consumer(也就是source和sink)须要单独分别配置:
producer.offset.storage.file.filename=/temp/source-offset
consuer.offset.storage.file.filename=/temp/sink-offset
可是,我单机好像历来没成功过,每次重启,都是重头消费。学习
启动命令:
bin/connect-distributed.sh config/connect-distributed.properties
分布式模式下,connector类及其配置都是经过Rest API接口提交给kafka的。
但不须要配置保存offset的文件,由于分布式下,都是将offsets,configs和status保存到topics中的。
而后由Worker决定如何存储配置,分配工做,存储offsets和task的状态信息。
切记,为了程序的高可用,这3个topics最好手动建立。
具体命令,请看另一篇博客:Kafka命令 。测试
group.id:也就是connect-cluster的group id,这个不能和consumer的goup id冲突。
config.storage.topic:用来保存connector和task的配置的。 单分片,多副本,压实类型(compacted) 的topic。
由于非压实的topic在必定配置,触发条件下,会删除!!!
这里的多副本是为了配置一直均可用,建议数量等于Kafka Brokers的数量。单分片,应该是刚开始启动,初始化的时候, 只有一个线程消费。spa
offset.storage.topic:用来保存offset的,既有source connect的,也有sink connect的offset。多分片,多副本,压实的topic。
status.storage.topic:用来存储task状态的,多分片,多副本,压实的topic。
这两个多分片,多副本配置和通常的topic相同就好了,好比咱们是3个副本,5个partition。插件
上面的3个topic,你均可以用console-consumer进行消费看看,特别是status.storage.topic很是有用,
由于分布式模式下,task由于一些配置,异常关掉,只会显示xxxTask closed,可是不会显示异常信息。
而从status.storage.topic中消费出来的消息能够看到具体异常信息。
name:connector的惟一名字
connector.class:用来链接Kafka集群的类名,也就是你继承SourceConnector或SinkConnector的实现类,
也就是Connector程序的入口,尽可能使用全量路径名。
tasks.max:task的数量,一个task就是一个线程。task数量设置要小于等于分片partition的数量,多了并发度没法提升。
key.converter:覆盖掉传递给Worker的消息的key转换类,也就是connect-stadalone.properties
和connect-distirbute.properties中key.converter。
value.converter:同上。
topics:要消费的topic列表,对于sink connector才须要配置。
用来将消息进行修改,转换,以及路由的。能够将多个组合起来,做为一个转换链。
我的建议是,这些代码直接在connector中写,除非能够部署上去,多个系统公用。
还有一方面是,黑盒的类太多,出问题了后不知道是哪里出问题了,并且也不能本地debug。
再就是着他妈的Transformations配置也太多了,学习成本好高啊。这块的详细内容没看,详情请看官网。
这块就是用来查询Connector和Task的状态,主要用于Connector集群的监控。
GET /connectors - 查询全部connectors
POST /connectors - 提交一个connector。好比是JSON格式,例子:
{ "name": "dis-maxwell-sink", "config": { "name" : "maxwell-sink-song", "connector.class" : "com.cimc.maxwell.sink.MySqlSinkConnector", "tasks.max": 1, "topics": "estation.db_ez.t_parcel,estation.db_ez.t_box", } }
GET /connectors/{name} - 查询指定connector信息的
GET /connectors/{name}/config - 查询指定connector配置的
PUT /connectors/{name}/config - 更新指定connector配置的
GET /connectors/{name}/status - 查询指定connector状态的
GET /connectors/{name}/tasks - 查询指定connector的全部tasks
GET /connectors/{name}/tasks/{taskid}/status - 查询指定connector的指定task的状态的,taskid通常是0,1,2之类
PUT /connectors/{name}/pause - 暂停指定connector的,慎用,好比由于系统更新升级,想停掉source connector拉取消息
PUT /connectors/{name}/resume - 恢复上面暂停的connector的
POST /connectors/{name}/restart - 重启一个connector(connector由于一些缘由挂掉了,好比被强行杀死,通常不是异常形成)
POST /connectors/{name}/tasks/{taskId}/restart - 重启一个指定的task的
DELETE /connectors/{name} - 删除一个connector
GET /connector-plugins - 获取全部已安装的connector插件
PUT /connector-plugins/{connector-type}/config/validate - 校验connector的配置的属性类型。
详见个人另一篇博客:Kafka Connect 开发详解 。
其实Kafka Connect的本质就是将Kafka Client包装了一层,并对开发者提供统一的实现接口。
Source Connector对应Producer,Sink Connector对应Consumer。
1.对开发者提供了统一的实现接口
2.开发,部署和管理都很是方便,统一
3.使用分布式模式进行水平扩展,毫无压力
4.在分布式模式下能够经过Rest Api提交和管理Connectors
5.对offset自动管理,只须要很简单的配置,而不像Consumer中须要开发者处理
6.流式/批式处理的支持
这是已经获得支持的组件,不须要作额外的开发: https://www.confluent.io/product/connectors/
括号中的Source表示将数据从其余系统导入Kafka,Sink表示将数据从Kafka导出到其余系统。
其余的我没看,可是JDBC的实现比较的坑爹,是经过primary key(如id)和时间戳(如updateTime)字段,
来判断数据是否更新,这样的话应用范围很是受局限。