Trafodion与Kafka集成(Trafodion + Kafka = Trafka)

Apache Kafka的Apache Trafodion消费者git

本文介绍了如何实现Apache Trafodion与Apache Kafka的无缝结合。咱们展现了Trafodion如何轻松地获取数据,如何结合不一样的开源组件,从而使用 Apache Kafka、 Trafodion、 HBase 和Hadoop建立近实时的流式处理工做流。github

如何实现各组件的结合?编程

什么是Kafka?Kafka是一个分布式、分区、多复本的日志提交服务。Kafka维护按类区分的消息,称为主题(topic)。生产者(producer)向Kafka的主题发布消息。消费者(consumer)订阅主题,接收发布到这些主题的消息。一个主题就是一个类别或者一个可订阅的条目名称。对每一个主题来讲,Kafka维护的是一个分区日志(partitioned log)。客户端控制将消息发布到哪一个分区。服务器

clipboard.png

Kafka集群包含一个或多个服务器,每台服务器被称为broker。消费者向分区的leader broker发出fetch请求。在每一个请求中,消费者指定偏移量(offset),从该位置返回日志块。若是有须要,消费者能够将偏移量倒回,从新消费数据。同时,Kafka保留消费者在日志中的位置,即偏移量(offset)。消费者在读取消息时,会提升其偏移量。消费者也能够按照任意的顺序消费消息。分区容许日志扩展到超过单个服务器,可是每一个分区的大小必须适应于其服务器。咱们能够对给定主题的数据进行分区,以便处理大量数据。架构

接下来,关于Trafodion SQL引擎的工做原理。Trafodion SQL编译器为全部的关系操做使用运算符模型,包括进程间的消息传递、用于横向扩展处理的可扩展分区功能。编译器根据region的边界或统计数据信息,生成使用表的分区布局的并行查询计划。分布式

在同一进程中以及在跨多个节点从新分区或收集数据时,查询引擎在操做符间使用数据流模型。Trafodion工做负载在运行时使用分区并行,从而并行处理多个数据分区。oop

在分区并行计划中,多个运算符为相同的计划工做。使用多队列或管道合并结果,再保存输入分区的sort顺序。因为数据被划分红多个独立执行的单元,因此分区也被称为“数据并行”。表映射UDF容许在Trafodion使用MapReduce模型编程。这些UDF可使用可选的表值参数并生成表值输出。布局

如同Trafodion中的其余运算符,表映射UDF能够并行执行。可选的优化器接口容许表映射UDF的编写人员实现多态性,以便在编译时肯定结果列(名称和类型),将谓词下推到UDF或其输入表,从而影响UDF的并行度并执行各类其余优化。fetch

Trafodion优化器采用从上而下的方法,很是适合经过运算符优化查询(例如,表映射UDF);它不会假设查询树的运算符是硬编码的。除了MapReduce模型编程的常规应用程序,表映射UDF也是一个简便而强大的机制,能够将其余数据源整合到Trafodion。优化

结合Trafodion和Kafka的并行架构是很是有益的。例如,能够直接使用示例的Trafodion Kafka UDF,也能够进行一些修改,用于消费Kafka生产者发出的数据。一旦执行了查询,Kafka UDF就将启动等待中的Trafodion执行程序。

下图展现了完整的流程:

clipboard.png

示例:将数据导入Trafodion

如下步骤将连续数据加载到8节点集群的Trafodion表:

1.建立用于加载数据的表。

CREATE TABLE employee (id int not null,
name varchar(20),
email varchar(20),
primary key(id))
SALT USING 4 PARTITIONS ON (id);

上表有4个分区;每一个节点一个分区。

2.建立一个主题。
建立一个名为employee的主题,有4个分区和2个副本。

> bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 4 –topic employee

3.开启进程。
建立一个生产者。示例代码:

Producer producer = new Producer(config);
String topic = “employee” ;
// Produce strings in delimited form “id|name|email”
for (int i = 1 ; i < = 1000 ; i++) String msg = ; System.out.println(msg); data = new KeyedMessage(topic, String.valueOf(i), msg);
producer.send(data);
}

使用易鲸捷github资料库中的示例UDF(https://github.com/esgyn/code...),消费Trafodion中的数据。注意:该简单的示例UDF不是并行执行的。

SELECT *FROM udf(kafka(‘localhost:2181′, — zookeeper connection
0, — Kafka group id
’employee’, — Kafka topic
‘IC20C20’, — int, and two char output cols
‘|’, — field delimiter
100, — max. rows to read
10000)) — timeout 10 seconds
KafkaResult(id, name, email);– name the output columns
相关文章
相关标签/搜索