2014 年的时候,Kafka 的三个主要开发人员从 LinkedIn 出来创业,开了一家叫做 Confluent 的公司。和其余大数据公司相似,Confluent 的产品叫做 Confluent Platform。这个产品的核心是 Kafka,分为三个版本:Confluent Open Source、Confluent Enterprise 和 Confluent Cloud。java
这里就不过多说confluent的背景,详细的状况能够查看官方网站https://www.confluent.io,这里主要介绍,如利用confluent平台实时的捕获mysql中的数据。mysql
wget http://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.39.tar.gz tar xzvf mysql-connector-java-5.1.39.tar.gz sed -i '$a export CLASSPATH=/root/mysql-connector-java-5.1.39/mysql-connector-java-5.1.39-bin.jar:$CLASSPATH' /etc/profile source /etc/profile
下载confluent的tar包解压安装。sql
cd /usr/local # tar zxvf confluent.tar.gz
confluent平台各组件的默认端口号json
Component | Default Port |
Zookeeper | 2181 |
Apache Kafka brokers (plain text) | 9092 |
Schema Registry REST API | 8081 |
REST Proxy | 8082 |
Kafka Connect REST API | 8083 |
Confluent Control Center | 9021 |
建立一个confluent从mysql加载数据的配置文件quickstart-mysql.propertiesbootstrap
name=mysql-whitelist-timestamp-source connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=10 connection.user=root connection.password=root connection.url=jdbc:mysql://192.168.248.128:3306/foodsafe?characterEncoding=utf8&useSSL=true #数据表白名单 #table.whitelist=t1 mode=timestamp+incrementing timestamp.column.name=modified incrementing.column.name=id #topic的前缀,confulent平台会为每张表建立一个topic,topic的名称为前缀+表名 topic.prefix=mysql-test-
自定义查询模式:测试
若是使用上面的配置来启动服务,则confluent平台将会监测拉取全部表的数据,有时候可能并不须要这样作,confulent平台提供了自定义查询模式。配置参考以下:大数据
#User defined connector instance name name=mysql-whitelist-timestamp-source #The class implementing the connector connector.class=io.confluent.connect.jdbc.JdbcSourceConnector #Maximum number of tasks to run for this connector instance tasks.max=10 connection.url=jdbc:mysql://192.168.248.128:3306/foodsafe?characterEncoding=utf8&useSSL=true connection.user=root connection.password=root query=SELECT f.`name`,p.price,f.create_time from foods f join price p on (f.id = p.food_id) mode=timestamp timestamp.column.name=timestamp topic.prefix=mysql-joined-data
query模式下使用where查询语句容易形成kafka拼接sql错误,最好采用join网站
1.启动zookeeperui
由于zookeeper是一个长期的服务,最好在后台运行,同时须要有写权限到/var/lib在这一步以及以后的步骤,若是没有权限请查看安装confulent的用户是否具备/var/lib的写权限this
# cd /usr/local/confulent-3.2.2 # ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties & # 以守护进程方式启动 # sudo confluent-3.2.2/bin/zookeeper-server-start -daemon /etc/kafka/zookeeper.properties
中止zookeeper
$ ./bin/zookeeper-server-stop
2.启动kafka
# cd /usr/local/confluent-3.2.2 # ./bin/kafka-server-start ./etc/kafka/server.properties &
中止kafka服务
./bin/kafka-server-stop
3.启动Schema Registry
# cd /usr/local/confluent-3.2.2 # ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties &
中止schema-registry
# ./bin/schema-registry-stop
4.启动监听mysql数据的producer
# cd /usr/local/confluent-3.2.2 # ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-jdbc/quickstart-mysql.properties &
5.启动消费数据的consumer
# cd /usr/local/confluent-3.2.2 #./bin/kafka-avro-console-consumer --new-consumer --bootstrap-server localhost:9092 --topic mysql-test-t1 --from-beginning
测试sql
DROP TABLE IF EXISTS `t1`; CREATE TABLE `t1` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(200) DEFAULT NULL, `createtime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `modified` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`), KEY `id` (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8; -- ---------------------------- -- Records of t1 -- ---------------------------- INSERT INTO `t1` VALUES ('1', 'aa', '2017-07-10 08:03:51', '2017-07-10 23:03:30'); INSERT INTO `t1` VALUES ('3', 'bb', '2017-07-10 08:03:45', '2017-07-10 23:03:34'); INSERT INTO `t1` VALUES ('4', '年内', '2017-07-10 08:05:51', '2017-07-10 23:05:45'); INSERT INTO `t1` VALUES ('5', '年内', '2017-07-10 08:44:28', '2017-07-10 23:15:45'); INSERT INTO `t1` VALUES ('6', '公共', '2017-07-18 06:05:11', '2017-07-18 21:04:58'); INSERT INTO `t1` VALUES ('7', '哈哈', '2017-07-18 19:05:04', '2017-07-18 07:32:13'); INSERT INTO `t1` VALUES ('8', '公共经济', '2017-07-27 20:33:10', '2017-07-18 07:34:43');
数据插入语句
INSERT INTO `t1` (name,createtime,modified)VALUES ('公共经济2', '2017-07-27 20:33:10', '2017-07-18 07:34:43');
插入新数据后将会在consumer端实时输出咱们插入的数据
{"id":7,"name":{"string":"哈哈"},"createtime":1500429904000,"modified":1500388333000} {"id":8,"name":{"string":"公共经济"},"createtime":1501212790000,"modified":1500388483000} {"id":9,"name":{"string":"公共经济1"},"createtime":1501212790000,"modified":1500388483000} {"id":10,"name":{"string":"公共经济2"},"createtime":1501212790000,"modified":1500388483000}
关于confluent的使用国内目前使用彷佛不多,相关的中文文档也极少。本文是去年7月份我在作实时数据交换技术调研是根据官方文档实践的记录。