替代Flume——Kafka Connect简介

file 咱们知道过去对于Kafka的定义是分布式,分区化的,带备份机制的日志提交服务。也就是一个分布式的消息队列,这也是他最多见的用法。可是Kafka不止于此,打开最新的官网。正则表达式

file

咱们看到Kafka最新的定义是:Apache Kafka® is a distributed streaming platform数据库

分布式流处理平台。apache

file

这里也清晰的描述了Kafka的特色:Kafka用于构建实时数据管道和流式应用程序。它具备水平可扩展性、容错性、速度极快,并在数千家公司投入生产。json

因此如今的Kafka已经不只是一个分布式的消息队列,更是一个流处理平台。这源于它于0.9.0.0和0.10.0.0引入的两个全新的组件Kafka Connect与Kafka Streaming。bootstrap

Kafka Connect简介

咱们知道消息队列必须存在上下游的系统,对消息进行搬入搬出。好比经典的日志分析系统,经过flume读取日志写入kafka,下游由storm进行实时的数据处理。服务器

file

Kafka Connect的做用就是替代Flume,让数据传输这部分工做能够由Kafka Connect来完成。Kafka Connect是一个用于在Apache Kafka和其余系统之间可靠且可靠地传输数据的工具。它能够快速地将大量数据集合移入和移出Kafka。架构

Kafka Connect的导入做业能够将数据库或从应用程序服务器收集的数据传入到Kafka,导出做业能够将Kafka中的数据传递到查询系统,也能够传输到批处理系统以进行离线分析。框架

Kafka Connect功能包括:分布式

  • 一个通用的Kafka链接的框架 - Kafka Connect规范化了其余数据系统与Kafka的集成,简化了链接器开发,部署和管理
  • 分布式和独立模式 - 支持大型分布式的管理服务,也支持小型生产环境的部署
  • REST界面 - 经过易用的REST API提交和管理Kafka Connect
  • 自动偏移管理 - 只需从链接器获取一些信息,Kafka Connect就能够自动管理偏移量提交过程,所以链接器开发人员无需担忧链接器开发中偏移量提交这部分的开发
  • 默认状况下是分布式和可扩展的 - Kafka Connect构建在现有的组管理协议之上。能够添加扩展集群
  • 流媒体/批处理集成 - 利用Kafka现有的功能,Kafka Connect是桥接流媒体和批处理数据系统的理想解决方案

file

运行Kafka Connect

Kafka Connect目前支持两种运行模式:独立和集群。ide

独立模式

在独立模式下,只有一个进程,这种更容易设置和使用。可是没有容错功能。

启动:
> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
独立模式配置

第一个参数config/connect-standalone.properties是一些基本的配置:

这几个在独立和集群模式下都须要设置:

#bootstrap.servers   kafka集群列表
bootstrap.servers=localhost:9092
#key.converter       key的序列化转换器  好比json的  key.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter     value的序列化转换器
value.converter=org.apache.kafka.connect.json.JsonConverter

#独立模式特有的配置:
#offset.storage.file.filename       用于存储偏移量的文件
offset.storage.file.filename =/home/kafka/connect.offsets
独立模式链接器配置(配置文件)

后面的参数connector1.properties [connector2.properties ...] 能够多个,是链接器配置内容

这里咱们配置一个从文件读取数据并存入kafka的配置:

connect-file-sink.properties

  • name - 链接器的惟一名称。尝试再次使用相同名称注册将失败。

  • connector.class - 链接器的Java类 此链接器的类的全名或别名。这里咱们选择FileStreamSink

  • tasks.max - 应为此链接器建立的最大任务数。若是链接器没法达到此级别的并行性,则可能会建立更少的任务。

  • key.converter - (可选)覆盖worker设置的默认密钥转换器。

  • value.converter - (可选)覆盖worker设置的默认值转换器。

    下面两个必须设置一个:

    • topics - 以逗号分隔的主题列表,用做此链接器的输入
    • topics.regex - 用做此链接器输入的主题的Java正则表达式
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test

能够在链接器中配置转换器

须要指定参数:

  • transforms - 转换的别名列表,指定将应用转换的顺序。
  • transforms.$alias.type - 转换的彻底限定类名。
  • transforms.$alias.$transformationSpecificConfig 转换的配置属性

例如,咱们把刚才的文件转换器的内容添加字段

首先设置connect-standalone.properties

key.converter.schemas.enable = false
value.converter.schemas.enable = false

设置connect-file-source.properties

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
transforms=MakeMap, InsertSource
transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.MakeMap.field=line
transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource.static.field=data_source
transforms.InsertSource.static.value=test-file-source

没有转换前的结果:

"foo"
"bar"
"hello world"

转换后:

{"line":"foo","data_source":"test-file-source"}
{"line":"bar","data_source":"test-file-source"}
{"line":"hello world","data_source":"test-file-source"}

经常使用转换类型:

  • InsertField - 使用静态数据或记录元数据添加字段
  • ReplaceField - 过滤或重命名字段
  • MaskField - 用类型的有效空值替换字段(0,空字符串等)
  • ValueToKey Value转换为Key
  • HoistField - 将整个事件做为单个字段包装在Struct或Map中
  • ExtractField - 从Struct和Map中提取特定字段,并在结果中仅包含此字段
  • SetSchemaMetadata - 修改架构名称或版本
  • TimestampRouter - 根据原始主题和时间戳修改记录主题
  • RegexRouter - 根据原始主题,替换字符串和正则表达式修改记录主题

集群模式

集群模式下,能够扩展,容错。

启动:
> bin/connect-distributed.sh config/connect-distributed.properties

在集群模式下,Kafka Connect在Kafka主题中存储偏移量,配置和任务状态。

集群模式配置

connect-distributed.properties

#也须要基本的配置
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

#还有一些配置要注意
#group.id(默认connect-cluster) - Connect的组id 请注意,这不得与使用者的组id 冲突
group.id=connect-cluster

#用于存储偏移的主题; 此主题应具备许多分区
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1

#用于存储链接器和任务配置的主题  只能一个分区
config.storage.topic=connect-configs
config.storage.replication.factor=1

#用于存储状态的主题; 此主题能够有多个分区
status.storage.topic=connect-status
status.storage.replication.factor=1

在集群模式下,配置并不会在命令行传进去,而是须要REST API来建立,修改和销毁链接器。

集群模式链接器配置(REST API)

能够配置REST API服务器,支持http与https

listeners=http://localhost:8080,https://localhost:8443

默认状况下,若是未listeners指定,则REST服务器使用HTTP协议在端口8083上运行。

如下是当前支持的REST API:

  • GET /connectors - 返回活动链接器列表
  • POST /connectors - 建立一个新的链接器; 请求主体应该是包含字符串name字段的JSON对象和包含config链接器配置参数的对象字段
  • GET /connectors/{name} - 获取有关特定链接器的信息
  • GET /connectors/{name}/config - 获取特定链接器的配置参数
  • PUT /connectors/{name}/config - 更新特定链接器的配置参数
  • GET /connectors/{name}/status - 获取链接器的当前状态,包括它是否正在运行,失败,暂停等,分配给哪一个工做人员,错误信息(若是失败)以及全部任务的状态
  • GET /connectors/{name}/tasks - 获取当前为链接器运行的任务列表
  • GET /connectors/{name}/tasks/{taskid}/status - 获取任务的当前状态,包括它是否正在运行,失败,暂停等,分配给哪一个工做人员,以及错误信息是否失败
  • PUT /connectors/{name}/pause - 暂停链接器及其任务,这将中止消息处理,直到恢复链接器
  • PUT /connectors/{name}/resume - 恢复暂停的链接器(若是链接器未暂停,则不执行任何操做)
  • POST /connectors/{name}/restart - 从新启动链接器(一般是由于它已经失败)
  • POST /connectors/{name}/tasks/{taskId}/restart - 重启个别任务(一般由于失败)
  • DELETE /connectors/{name} - 删除链接器,暂停全部任务并删除其配置

链接器开发指南

kakfa容许开发人员本身去开发一个链接器。

核心概念

要在Kafka和其余系统之间复制数据,用户须要建立一个Connector

Connector有两种形式:

SourceConnectors从另外一个系统导入数据,例如,JDBCSourceConnector将关系数据库导入Kafka

SinkConnectors导出数据,例如,HDFSSinkConnector将Kafka主题的内容导出到HDFS文件

和对应的Task:

SourceTaskSinkTask

Task造成输入输出流,开发Task要注意偏移量的问题。

每一个流应该是一系列键值记录。还须要按期提交已处理的数据的偏移量,以便在发生故障时,处理能够从上次提交的偏移量恢复。Connector还须要是动态的,实现还负责监视外部系统是否存在任何更改。

开发一个简单的链接器

开发链接器只须要实现两个接口,即ConnectorTask

这里咱们简单开发一个FileStreamConnector。

此链接器是为在独立模式下使用,SourceConnectorSourceTask读取文件的每一行,SinkConnectorSinkTask每一个记录写入一个文件。

链接器示例:

继承SourceConnector,添加字段(要读取的文件名和要将数据发送到的主题)

public class FileStreamSourceConnector extends SourceConnector {
    private String filename;
    private String topic;

定义实际读取数据的类

@Override
public Class<? extends Task> taskClass() {
    return FileStreamSourceTask.class;
}

FileStreamSourceTask下面定义该类。接下来,咱们添加一些标准的生命周期方法,start()stop()

@Override
public void start(Map<String, String> props) {
    // The complete version includes error handling as well.
    filename = props.get(FILE_CONFIG);
    topic = props.get(TOPIC_CONFIG);
}
 
@Override
public void stop() {
    // Nothing to do since no background monitoring is required.
}

最后,实施的真正核心在于taskConfigs()

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
    ArrayList<Map<String, String>> configs = new ArrayList<>();
    // Only one input stream makes sense.
    Map<String, String> config = new HashMap<>();
    if (filename != null)
        config.put(FILE_CONFIG, filename);
    config.put(TOPIC_CONFIG, topic);
    configs.add(config);
    return configs;
}

任务示例:

源任务

实现SourceTask 建立FileStreamSourceTask继承SourceTask

public class FileStreamSourceTask extends SourceTask {
    String filename;
    InputStream stream;
    String topic;
 
    @Override
    public void start(Map<String, String> props) {
        filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
        stream = openOrThrowError(filename);
        topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
    }
 
    @Override
    public synchronized void stop() {
        stream.close();
    }

接下来,咱们实现任务的主要功能,即poll()从输入系统获取事件并返回如下内容的方法List

@Override
public List<SourceRecord> poll() throws InterruptedException {
    try {
        ArrayList<SourceRecord> records = new ArrayList<>();
        while (streamValid(stream) && records.isEmpty()) {
            LineAndOffset line = readToNextLine(stream);
            if (line != null) {
                Map<String, Object> sourcePartition = Collections.singletonMap("filename", filename);
                Map<String, Object> sourceOffset = Collections.singletonMap("position", streamOffset);
                records.add(new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line));
            } else {
                Thread.sleep(1);
            }
        }
        return records;
    } catch (IOException e) {
        // Underlying stream was killed, probably as a result of calling stop. Allow to return
        // null, and driving thread will handle any shutdown if necessary.
    }
    return null;
}
接收任务

不像SourceConnectorSinkConnectorSourceTaskSinkTask有很是不一样的接口,由于SourceTask采用的是拉接口,并SinkTask使用推接口。二者共享公共生命周期方法,但SinkTask彻底不一样:

public abstract class SinkTask implements Task {
    public void initialize(SinkTaskContext context) {
        this.context = context;
    }
 
    public abstract void put(Collection<SinkRecord> records);
 
    public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
    }

这是一个简单的例子,它们有简单的结构化数据 - 每一行只是一个字符串。几乎全部实用的链接器都须要具备更复杂数据格式的模式。要建立更复杂的数据,您须要使用Kafka Connect dataAPI。

Schema schema = SchemaBuilder.struct().name(NAME)
    .field("name", Schema.STRING_SCHEMA)
    .field("age", Schema.INT_SCHEMA)
    .field("admin", new SchemaBuilder.boolean().defaultValue(false).build())
    .build();
 
Struct struct = new Struct(schema)
    .put("name", "Barbara Liskov")
    .put("age", 75);

更多Kafka相关技术文章:

什么是Kafka? Kafka监控工具汇总 Kafka快速入门 Kafka核心之Consumer Kafka核心之Producer

更多实时计算,Flink,Kafka等相关技术博文,欢迎关注实时流式计算

file

相关文章
相关标签/搜索