咱们知道过去对于Kafka的定义是分布式,分区化的,带备份机制的日志提交服务。也就是一个分布式的消息队列,这也是他最多见的用法。可是Kafka不止于此,打开最新的官网。正则表达式
咱们看到Kafka最新的定义是:Apache Kafka® is a distributed streaming platform数据库
分布式流处理平台。apache
这里也清晰的描述了Kafka的特色:Kafka用于构建实时数据管道和流式应用程序。它具备水平可扩展性、容错性、速度极快,并在数千家公司投入生产。json
因此如今的Kafka已经不只是一个分布式的消息队列,更是一个流处理平台。这源于它于0.9.0.0和0.10.0.0引入的两个全新的组件Kafka Connect与Kafka Streaming。bootstrap
咱们知道消息队列必须存在上下游的系统,对消息进行搬入搬出。好比经典的日志分析系统,经过flume读取日志写入kafka,下游由storm进行实时的数据处理。服务器
Kafka Connect的做用就是替代Flume,让数据传输这部分工做能够由Kafka Connect来完成。Kafka Connect是一个用于在Apache Kafka和其余系统之间可靠且可靠地传输数据的工具。它能够快速地将大量数据集合移入和移出Kafka。架构
Kafka Connect的导入做业能够将数据库或从应用程序服务器收集的数据传入到Kafka,导出做业能够将Kafka中的数据传递到查询系统,也能够传输到批处理系统以进行离线分析。框架
Kafka Connect功能包括:分布式
Kafka Connect目前支持两种运行模式:独立和集群。ide
在独立模式下,只有一个进程,这种更容易设置和使用。可是没有容错功能。
> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
第一个参数config/connect-standalone.properties是一些基本的配置:
这几个在独立和集群模式下都须要设置:
#bootstrap.servers kafka集群列表 bootstrap.servers=localhost:9092 #key.converter key的序列化转换器 好比json的 key.converter=org.apache.kafka.connect.json.JsonConverter #value.converter value的序列化转换器 value.converter=org.apache.kafka.connect.json.JsonConverter #独立模式特有的配置: #offset.storage.file.filename 用于存储偏移量的文件 offset.storage.file.filename =/home/kafka/connect.offsets
后面的参数connector1.properties [connector2.properties ...] 能够多个,是链接器配置内容
这里咱们配置一个从文件读取数据并存入kafka的配置:
connect-file-sink.properties
name
- 链接器的惟一名称。尝试再次使用相同名称注册将失败。
connector.class
- 链接器的Java类 此链接器的类的全名或别名。这里咱们选择FileStreamSink
tasks.max
- 应为此链接器建立的最大任务数。若是链接器没法达到此级别的并行性,则可能会建立更少的任务。
key.converter
- (可选)覆盖worker设置的默认密钥转换器。
value.converter
- (可选)覆盖worker设置的默认值转换器。
下面两个必须设置一个:
topics
- 以逗号分隔的主题列表,用做此链接器的输入topics.regex
- 用做此链接器输入的主题的Java正则表达式name=local-file-sink connector.class=FileStreamSink tasks.max=1 file=test.sink.txt topics=connect-test
能够在链接器中配置转换器
须要指定参数:
transforms
- 转换的别名列表,指定将应用转换的顺序。transforms.$alias.type
- 转换的彻底限定类名。transforms.$alias.$transformationSpecificConfig
转换的配置属性例如,咱们把刚才的文件转换器的内容添加字段
首先设置connect-standalone.properties
key.converter.schemas.enable = false value.converter.schemas.enable = false
设置connect-file-source.properties
name=local-file-source connector.class=FileStreamSource tasks.max=1 file=test.txt topic=connect-test transforms=MakeMap, InsertSource transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value transforms.MakeMap.field=line transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value transforms.InsertSource.static.field=data_source transforms.InsertSource.static.value=test-file-source
没有转换前的结果:
"foo" "bar" "hello world"
转换后:
{"line":"foo","data_source":"test-file-source"} {"line":"bar","data_source":"test-file-source"} {"line":"hello world","data_source":"test-file-source"}
经常使用转换类型:
集群模式下,能够扩展,容错。
> bin/connect-distributed.sh config/connect-distributed.properties
在集群模式下,Kafka Connect在Kafka主题中存储偏移量,配置和任务状态。
connect-distributed.properties
#也须要基本的配置 bootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter #还有一些配置要注意 #group.id(默认connect-cluster) - Connect的组id 请注意,这不得与使用者的组id 冲突 group.id=connect-cluster #用于存储偏移的主题; 此主题应具备许多分区 offset.storage.topic=connect-offsets offset.storage.replication.factor=1 #用于存储链接器和任务配置的主题 只能一个分区 config.storage.topic=connect-configs config.storage.replication.factor=1 #用于存储状态的主题; 此主题能够有多个分区 status.storage.topic=connect-status status.storage.replication.factor=1
在集群模式下,配置并不会在命令行传进去,而是须要REST API来建立,修改和销毁链接器。
能够配置REST API服务器,支持http与https
listeners=http://localhost:8080,https://localhost:8443
默认状况下,若是未listeners
指定,则REST服务器使用HTTP协议在端口8083上运行。
如下是当前支持的REST API:
GET /connectors
- 返回活动链接器列表POST /connectors
- 建立一个新的链接器; 请求主体应该是包含字符串name
字段的JSON对象和包含config
链接器配置参数的对象字段GET /connectors/{name}
- 获取有关特定链接器的信息GET /connectors/{name}/config
- 获取特定链接器的配置参数PUT /connectors/{name}/config
- 更新特定链接器的配置参数GET /connectors/{name}/status
- 获取链接器的当前状态,包括它是否正在运行,失败,暂停等,分配给哪一个工做人员,错误信息(若是失败)以及全部任务的状态GET /connectors/{name}/tasks
- 获取当前为链接器运行的任务列表GET /connectors/{name}/tasks/{taskid}/status
- 获取任务的当前状态,包括它是否正在运行,失败,暂停等,分配给哪一个工做人员,以及错误信息是否失败PUT /connectors/{name}/pause
- 暂停链接器及其任务,这将中止消息处理,直到恢复链接器PUT /connectors/{name}/resume
- 恢复暂停的链接器(若是链接器未暂停,则不执行任何操做)POST /connectors/{name}/restart
- 从新启动链接器(一般是由于它已经失败)POST /connectors/{name}/tasks/{taskId}/restart
- 重启个别任务(一般由于失败)DELETE /connectors/{name}
- 删除链接器,暂停全部任务并删除其配置kakfa容许开发人员本身去开发一个链接器。
要在Kafka和其余系统之间复制数据,用户须要建立一个Connector
Connector有两种形式:
SourceConnectors
从另外一个系统导入数据,例如,JDBCSourceConnector
将关系数据库导入Kafka
SinkConnectors
导出数据,例如,HDFSSinkConnector
将Kafka主题的内容导出到HDFS文件
和对应的Task:
SourceTask
和SinkTask
Task造成输入输出流,开发Task要注意偏移量的问题。
每一个流应该是一系列键值记录。还须要按期提交已处理的数据的偏移量,以便在发生故障时,处理能够从上次提交的偏移量恢复。Connector还须要是动态的,实现还负责监视外部系统是否存在任何更改。
开发链接器只须要实现两个接口,即Connector
和Task
。
这里咱们简单开发一个FileStreamConnector。
此链接器是为在独立模式下使用,SourceConnector/
SourceTask读取文件的每一行,
SinkConnector/
SinkTask每一个记录写入一个文件。
继承SourceConnector,添加字段(要读取的文件名和要将数据发送到的主题)
public class FileStreamSourceConnector extends SourceConnector { private String filename; private String topic;
定义实际读取数据的类
@Override public Class<? extends Task> taskClass() { return FileStreamSourceTask.class; }
在FileStreamSourceTask
下面定义该类。接下来,咱们添加一些标准的生命周期方法,start()
和stop()
@Override public void start(Map<String, String> props) { // The complete version includes error handling as well. filename = props.get(FILE_CONFIG); topic = props.get(TOPIC_CONFIG); } @Override public void stop() { // Nothing to do since no background monitoring is required. }
最后,实施的真正核心在于taskConfigs()
@Override public List<Map<String, String>> taskConfigs(int maxTasks) { ArrayList<Map<String, String>> configs = new ArrayList<>(); // Only one input stream makes sense. Map<String, String> config = new HashMap<>(); if (filename != null) config.put(FILE_CONFIG, filename); config.put(TOPIC_CONFIG, topic); configs.add(config); return configs; }
实现SourceTask
建立FileStreamSourceTask继承SourceTask
public class FileStreamSourceTask extends SourceTask { String filename; InputStream stream; String topic; @Override public void start(Map<String, String> props) { filename = props.get(FileStreamSourceConnector.FILE_CONFIG); stream = openOrThrowError(filename); topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG); } @Override public synchronized void stop() { stream.close(); }
接下来,咱们实现任务的主要功能,即poll()
从输入系统获取事件并返回如下内容的方法List
:
@Override public List<SourceRecord> poll() throws InterruptedException { try { ArrayList<SourceRecord> records = new ArrayList<>(); while (streamValid(stream) && records.isEmpty()) { LineAndOffset line = readToNextLine(stream); if (line != null) { Map<String, Object> sourcePartition = Collections.singletonMap("filename", filename); Map<String, Object> sourceOffset = Collections.singletonMap("position", streamOffset); records.add(new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line)); } else { Thread.sleep(1); } } return records; } catch (IOException e) { // Underlying stream was killed, probably as a result of calling stop. Allow to return // null, and driving thread will handle any shutdown if necessary. } return null; }
不像SourceConnector
和SinkConnector
,SourceTask
并SinkTask
有很是不一样的接口,由于SourceTask
采用的是拉接口,并SinkTask
使用推接口。二者共享公共生命周期方法,但SinkTask
彻底不一样:
public abstract class SinkTask implements Task { public void initialize(SinkTaskContext context) { this.context = context; } public abstract void put(Collection<SinkRecord> records); public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) { }
这是一个简单的例子,它们有简单的结构化数据 - 每一行只是一个字符串。几乎全部实用的链接器都须要具备更复杂数据格式的模式。要建立更复杂的数据,您须要使用Kafka Connect data
API。
Schema schema = SchemaBuilder.struct().name(NAME) .field("name", Schema.STRING_SCHEMA) .field("age", Schema.INT_SCHEMA) .field("admin", new SchemaBuilder.boolean().defaultValue(false).build()) .build(); Struct struct = new Struct(schema) .put("name", "Barbara Liskov") .put("age", 75);
更多Kafka相关技术文章:
什么是Kafka? Kafka监控工具汇总 Kafka快速入门 Kafka核心之Consumer Kafka核心之Producer
更多实时计算,Flink,Kafka等相关技术博文,欢迎关注实时流式计算