腾讯大牛教你ClickHouse实时同步MySQL数据

| 做者   史鹏宙,CSIG云与智慧产业事业群研发工程师


ClickHouse做为OLAP分析引擎已经被普遍使用,数据的导入导出是用户面临的第一个问题。因为ClickHouse自己没法很好地支持单条大批量的写入,所以在实时同步数据方面须要借助其余服务协助。本文给出一种结合Canal+Kafka的方案,而且给出在多个MySQL实例分库分表的场景下,如何将多张MySQL数据表写入同一张ClickHouse表的方法,欢迎你们批评指正。java

 

首先来看看咱们的需求背景:mysql

 

1. 实时同步多个MySQL实例数据到ClickHouse,天天规模500G,记录数目亿级别,能够接受分钟级别的同步延迟;正则表达式

 

2. 某些数据库表存在分库分表的操做,用户须要跨MySQL实例跨数据库的表同步到ClickHouse的一张表中;spring

 

3. 现有的MySQL binlog开源组件(Canal),没法作到多张源数据表到一张目的表的映射关系。sql

 

基本原理数据库

 

1、使用JDBC方式同步

 

1. 使用Canal组件完成binlog的解析和数据同步;json

 

2. Canal-Server进程会假装成MySQL的slave,使用MySQL的binlog同步协议完成数据同步;bootstrap

 

3. Canal-Adapter进程负责从canal-server获取解析后的binlog,而且经过jdbc接口写入到ClickHouse;缓存

 

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

 

优势:网络

 

1. Canal组件原生支持; 

 

缺点:

 

1. Canal-Adpater写入时源表和目的表一一对应,灵活性不足;

 

2. 须要维护两个Canal组件进程;

 

2、Kafka+ClickHouse物化视图方式同步

 

1. Canal-Server完成binlog的解析,而且将解析后的json写入Kafka;

 

2. Canal-Server能够根据正则表达式过滤数据库和表名,而且根据规则写入Kafka的topic;

 

3. ClickHouse使用KafkaEngine和Materialized View完成消息消费,并写入本地表;

 

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

 

优势:

 

1. Kafka支持水平扩展,能够根据数据规模调整partition数目;

 

2. Kafka引入后将写入请求合并,防止ClickHouse生成大量的小文件,从而影响查询性能;

 

3. Canal-Server支持规则过滤,能够灵活配置上游的MySQL实例的数据库名和表名,而且指明写入的Kafka topic名称;

 

缺点:

 

1. 须要维护Kafka和配置规则;

 

2. ClickHouse须要新建相关的视图、Kafka Engine的外表等;

 

具体步骤

 

1、准备工做

 

1. 若是使用TencentDB,则在控制台确认binlog_format为ROW,无需多余操做。

 

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

 

 若是是自建MySQL,则在客户端中查询变量:

 

>   show variables like '%binlog%';

+-----------------------------------------+----------------------+

| Variable_name                           | Value                |

+-----------------------------------------+----------------------+

| binlog_format                           | ROW                  |

+-----------------------------------------+----------------------+

 

> show variables like '%log_bin%';

+---------------------------------+--------------------------------------------+

| Variable_name                   | Value                                      |

+---------------------------------+--------------------------------------------+

| log_bin                         | ON                                         |

| log_bin_basename                |  /data/mysql_root/log/20146/mysql-bin        |

| log_bin_index                   |  /data/mysql_root/log/20146/mysql-bin.index |

+---------------------------------+--------------------------------------------+

 

2. 建立帐号canal,用于同步binlog

 

CREATE USER canal IDENTIFIED BY  'canal'; 

GRANT SELECT, REPLICATION SLAVE,  REPLICATION CLIENT ON *.* TO 'canal'@'%';

FLUSH PRIVILEGES;

 

2、Canal组件部署

 

前置条件:

 

Canal组件部署的机器须要跟ClickHouse服务和MySQL网络互通;

 

须要在机器上部署java8,配置JAVA_HOME、PATH等环境变量;

 

基本概念:

 

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

 

1. Canal-Server组件部署

 

Canal-Server的主要做用是订阅binlog信息并解析和定义instance相关信息,建议每一个Canal-Server进程对应一个MySQL实例;

 

1)下载canal.deployer-1.1.4.tar.gz,解压

 

2)修改配置文件conf/canal.properties,须要关注的配置以下:

 

...

# 端口相关信息,若是同一台机器部署多个进程须要修改

canal.port = 11111

canal.metrics.pull.port = 11112

canal.admin.port = 11110

...

# 服务模式

canal.serverMode = tcp

...

# Kafka地址

canal.mq.servers = 172.21.48.11:9092

# 使用消息队列时 这两个值必须为true

canal.mq.flatMessage = true

canal.mq.flatMessage.onlyData = true

...

# instance列表,conf目录下必须有同名的目录

canal.destinations = example,example2

 

3)配置instance

 

能够参照example新增新的instance,主要修改配置文件conf/${instance_name}/instance.properties文件。    

 

样例1:  同步某个数据库的以XX前缀开头的表

订阅 172.21.48.35的MySQL的testdb数据库中的以tb_开头的表的数据变动(例如tb_20200801 、 tb_20200802等),主要的步骤以下:

步骤1:建立example2实例:cddeployer/conf && cp -r example example2

步骤2:修改deployer/conf/example2/instance.properties文件

 

...

# 上游MySQL实例地址

canal.instance.master.address=172.21.48.35:3306

...

# 同步帐户信息

canal.instance.dbUsername=canal

canal.instance.dbPassword=canal

...

# 过滤数据库名称和表名

canal.instance.filter.regex=testdb\\.tb_.*,

步骤3:在conf/canal.properties中修改 canal.destinations ,新增example2

 

样例2:  同步多个数据库的以XX前缀开头的表,且输出到Kafka

 

订阅 172.21.48.35的MySQL的empdb_0数据库的employees_20200801表,empdb_1数据库的employees_20200802表,而且数据写入Kafka;

 

步骤1:建立example2实例:cddeployer/conf && cp -r example example3

 

步骤2:修改deployer/conf/example3/instance.properties文件

 

...

# 上游MySQL实例地址

canal.instance.master.address=172.21.48.35:3306

...

# 同步帐户信息

canal.instance.dbUsername=canal

canal.instance.dbPassword=canal

...

# 过滤数据库名称和表名

canal.instance.filter.regex=empdb_.*\\.employees_.*

...

# Kafka的topic名称和匹配的规则

canal.mq.dynamicTopic=employees_topic:empdb_.*\\.employees_.*

canal.mq.partition=0

 

# Kafka topic的分区数目(即partition数目)

canal.mq.partitionsNum=3

 

# 根据employees_开头的表中的 emp_no字段来进行数据hash,分布到不一样的partition

canal.mq.partitionHash=empdb_.*\\.employees_.*:emp_no

 

步骤3:在Kafka中新建topic employees_topic,指定分区数目为3

 

步骤4:在conf/canal.properties中修改 canal.destinations ,新增example3;修改服务模式为kafka,配置kafka相关信息;

 

 

# 服务模式

canal.serverMode = kafka

...

# Kafka地址

canal.mq.servers = 172.21.48.11:9092

# 使用消息队列时 这两个值必须为true

canal.mq.flatMessage = true

canal.mq.flatMessage.onlyData = true

...

# instance列表,conf目录下必须有同名的目录

canal.destinations =  example,example2,example3

 

2. Canal-Adapter组件部署(只针对方案一)

 

Canal-Adapter的主要做用是经过JDBC接口写入ClickHouse数据,能够配置多个表的写入;

 

1)下载canal.adapter-1.1.4.tar.gz,解压;

 

2)在lib目录下新增clickhouse驱动jar包及httpclient的jar包 httpcore-4.4.13.jar、httpclient-4.3.3.jar、clickhouse-jdbc-0.2.4.jar;

 

3)修改配置文件conf/application.yml文件,修改canalServerHost、srcDataSources、canalAdapters的配置;

 

server:

   port: 8081

spring:

   jackson:

     date-format: yyyy-MM-dd HH????????ss

     time-zone: GMT+8

     default-property-inclusion: non_null

 

canal.conf:

   mode: tcp

   canalServerHost: 127.0.0.1:11111   # canal-server的服务地址

   batchSize: 500

   syncBatchSize: 1000

   retries: 0

   timeout:

   accessKey:

  secretKey:

  #  MySQL的配置,修改用户名密码及制定数据库

   srcDataSources:

     defaultDS:

       url: jdbc:mysql://172.21.48.35:3306

       username: root

       password: yourpasswordhere

   canalAdapters:

  -  instance: example

     groups:

     - groupId: g1

       outerAdapters:

       - name: logger

       - name: rdb

         key: mysql1

         # clickhouse的配置,修改用户名密码数据库

         properties:

           jdbc.driverClassName: ru.yandex.clickhouse.ClickHouseDriver

           jdbc.url: jdbc:clickhouse://172.21.48.18:8123

           jdbc.username: default

           jdbc.password:

 

4)修改配置文件conf/rdb/mytest_user.yml文件

 

dataSourceKey: defaultDS

destination: example

groupId: g1

outerAdapterKey: mysql1

concurrent: true

dbMapping:

  database:  testdb

   mirrorDb: true

上述的配置文件中,因为开启了mirrorDb: true,目的端的ClickHouse必须有相同的数据库名和表名。

 

样例1:源数据库与目标数据库名字不一样,源表名与目标表名不一样

 

修改adapter的conf/rdb/mytest_user.yml配置文件,指定源数据库和目标数据库

 

dataSourceKey: defaultDS

destination: example

groupId: g1

outerAdapterKey: mysql1

concurrent: true

dbMapping:

   database: source_database_name

   table: source_table

   targetTable: destination_database_name.destination_table

   targetColumns:

     id:

     name:

  commitBatch:  3000 # 批量提交的大小

 

样例2:多个源数据库表写入目的端的同一张表

 

在conf/rdb 目录配置多个yml文件,分别指明不一样的table名称。


 

Kafka 服务配置

 

1、调整合理的producer参数

 

确认Canal-Server里的canal.properties文件,重要参数见下表;

 

配置项

Kafka SDK配置项

配置说明

canal.mq.servers

bootstrap.servers

Kafka服务地址

canal.mq.retries

retries

producer在发送失败的时候会重试次数,默认为0

canal.mq.batchSize

batch.size

producer尝试发送同一个partition的请求数据量,默认为16K

canal.mq.maxRequestSize

max.request.size

producer请求的最大大小,默认为1M

canal.mq.lingerMs

linger.ms

producer等待发送的延迟,默认为100ms

canal.mq.bufferMemory

buffer.memory

producer使用的缓存消息的最大内存,默认为30M

canal.mq.flatMessage

-

Canal-Server 将binlog解析结果转为json;下游为ClickHouse Kafka Engine表时必须为true

canal.mq.flatMessage.onlyData

-

Canal-Server 只发送binlog解析结果中的data部分;下游为ClickHouse Kafka Engine表时必须为true

canal.mq.acks

acks

producer但愿leader返回的用于确认请求完成的确认数量. 可选值 all, -1, 0 1. 默认值为all

 

2、新建相关的topic名称

 

根据Canal-Server里instance里配置文件instance.properties,注意分区数目与canal.mq.partitionsNum 保持一致;

 

partition数目须要考虑如下因素:

 

1. 上游的MySQL的数据量。原则上数据写入量越大,应该分配更多的partition数目;

 

2. 考虑下游ClickHouse的实例数目。topic的partition分区总数 最好 不大于 下游ClickHouse的总实例数目,保证每一个ClickHouse实例都能至少分配到一个partition;


 

ClickHouse服务配置

 

根据上游MySQL实例的表的schema新建数据表;

 

引入Kafka时须要额外新建Engine=Kafka的外表以及相关的物化视图表;

 

建议:

 

1. 为每一个外表新增不一样的 kafka_group_name,防止相互影响;

 

2. 设置kafka_skip_broken_messages 参数为合理值,遇到没法解析数据会跳过;

 

3. 设置合理的kafka_num_consumers值,最好保证全部ClickHouse实例该值的总和大于 topic的partition数目;

 

新建相关的分布式查询表;


 

服务启动

 

启动相关的Canal组件进程;

 

1. canal-server:  sh bin/startup.sh

 

2. canal-adapter: sh bin/startup.sh

 

在MySQL中插入数据,观察日志是否能够正常运行;

 

若是使用Kafka,能够经过kafka-console-consumer.sh脚本观察binlog数据解析;

 

观察ClickHouse数据表中是否正常写入数据;


 

实际案例

 

需求:实时同步MySQL实例的empdb_0.employees_20200801表和empdb_1.employees_20200802数据表

 

方案:使用方案二

 

环境及参数:

 

MySQL地址

172.21.48.35:3306

CKafka地址

172.21.48.11:9092

Canal instance名称

employees

Kafka目的topic

employees_topic 

 

1.在MySQL新建相关表

 

# MySQL表的建表语句

CREATE DATABASE `empdb_0`;

CREATE DATABASE `empdb_1`;

 

CREATE TABLE  `empdb_0`.`employees_20200801` (

   `emp_no` int(11) NOT NULL,

   `birth_date` date NOT NULL,

   `first_name` varchar(14) NOT NULL,

   `last_name` varchar(16) NOT NULL,

   `gender` enum('M','F') NOT NULL,

   `hire_date` date NOT NULL,

   PRIMARY KEY (`emp_no`)

);

 

CREATE TABLE  `empdb_1`.`employees_20200802` (

   `emp_no` int(11) NOT NULL,

   `birth_date` date NOT NULL,

   `first_name` varchar(14) NOT NULL,

   `last_name` varchar(16) NOT NULL,

   `gender` enum('M','F') NOT NULL,

   `hire_date` date NOT NULL,

   PRIMARY KEY (`emp_no`)

);

 

2. Canal-Server配置

 

步骤1. 修改conf/canal.properties文件

 

canal.serverMode = kafka

...

canal.destinations = example,employees

...

canal.mq.servers = 172.21.48.11:9092

canal.mq.retries = 0

canal.mq.batchSize = 16384

canal.mq.maxRequestSize = 1048576

canal.mq.lingerMs = 100

canal.mq.bufferMemory = 33554432

canal.mq.canalBatchSize = 50

canal.mq.canalGetTimeout = 100

canal.mq.flatMessage = true

canal.mq.flatMessage.onlyData = true

canal.mq.compressionType = none

canal.mq.acks = all

canal.mq.producerGroup = cdbproducer

canal.mq.accessChannel = local

...

 

步骤2. 新增employees实例,修改employees/instances.properties配置

 

...

canal.instance.master.address=172.21.48.35:3306

...

canal.instance.dbUsername=canal

canal.instance.dbPassword=canal

...

canal.instance.filter.regex=empdb_.*\\.employees_.*

...

canal.mq.dynamicTopic=employees_topic:empdb_.*\\.employees_.*

canal.mq.partition=0

canal.mq.partitionsNum=3

canal.mq.partitionHash=empdb_.*\\.employees_.*:emp_no

 

3. Kafka配置

 

4. 新增topic employees_topic,分区数为3

 

5. ClickHouse建表

 

CREATE DATABASE testckdb ON CLUSTER  default_cluster;

 

CREATE TABLE IF NOT EXISTS  testckdb.ck_employees ON CLUSTER default_cluster (

   `emp_no` Int32,

   `birth_date` String,

   `first_name` String,

   `last_name` String,

   `gender` String,

   `hire_date` String

) ENGINE=MergeTree() ORDER BY (emp_no)

SETTINGS index_granularity = 8192;

 

 

CREATE TABLE IF NOT EXISTS  testckdb.ck_employees_stream ON CLUSTER default_cluster (

   `emp_no` Int32,

   `birth_date` String,

   `first_name` String,

   `last_name` String,

   `gender` String,

   `hire_date` String

) ENGINE = Kafka()

SETTINGS

   kafka_broker_list = '172.21.48.11:9092',

   kafka_topic_list = 'employees_topic',

   kafka_group_name = 'employees_group',

   kafka_format = 'JSONEachRow',

   kafka_skip_broken_messages = 1024,

  kafka_num_consumers  = 1;

 

 

CREATE MATERIALIZED VIEW IF NOT EXISTS  testckdb.ck_employees_mv ON CLUSTER default_cluster TO testckdb.ck_employees(

   `emp_no` Int32,

   `birth_date` String,

   `first_name` String,

   `last_name` String,

   `gender` String,

  `hire_date`  String

) AS SELECT

   `emp_no`,

   `birth_date`,

   `first_name`,

   `last_name`,

   `gender`,

   `hire_date`

FROM

   testckdb.ck_employees_stream;

 

CREATE TABLE IF NOT EXISTS  testckdb.ck_employees_dis ON CLUSTER default_cluster AS testckdb.ck_employees  

ENGINE=Distributed(default_cluster,  testckdb, ck_employees);

 

6. 启动Canal-Server服务

 

MySQL实例上游插入数据,观察数据是否在Canal-Server解析正常,是否在ClickHouse中完成同步。

相关文章
相关标签/搜索