clickhouse在上一篇转载的博客中已初步介绍,最近在公司项目中,遇到了数据库大量数据查询慢的问题,借此来实战clickhouse,本文重点介绍数据同步。java
接下来重点讲一下,使用flume同步oracle数据至clickhouse。git
wget http://www.apache.org/dist/flume/1.5.2/apache-flume-1.5.2-bin.tar.gzgithub
tar zxvf apache-flume-1.5.2-bin.tar.gzsql
https://github.com/keedio/flume-ng-sql-source数据库
(1)为了能正确同步到clickhouse,需修改代码,如图: apache
将默认分隔符由‘,’改成‘\t’;(不改的话,插入数据到clickhouse会报错)oracle
(2)编译打包:mvn package -Dmaven.test.skip=truemaven
(3)将打包的jar包flume-ng-sql-source-1.5.2.jar,上传至flume的lib目录下。ide
这个相对麻烦一些,网上没有详细的资料,我这里尽可能详细叙述,有问题能够联系我。工具
https://github.com/camathieu/flume-ng-kafka-sink.git
这里为何下载flume-ng-kafka-sink,主要做为参考,将项目中的kafka-sink改成clickhouse-sink。
(1)将org.apache.flume.sink.kafka包下的KafkaSink所有屏蔽。
(2)修改pom,如图:
内容以下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <name>Apache Flume ClickHouse Sink</name> <description>Kafka 0.8+ sink for Apache Flume NG</description> <groupId>org.apache.flume.sink.clickhouse</groupId> <artifactId>flume-clickhouse-sink</artifactId> <version>1.5.2</version> <packaging>jar</packaging> <parent> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-sinks</artifactId> <version>1.4.0</version> </parent> <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.0</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>19.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> </plugins> </build> </project>
(3)下载clickhouse-sink代码
下载地址:https://reviews.apache.org/r/50692/diff/1#2
以下图,将代码拷贝至本地
(4)修改代码
在测试过程当中,遇到了同步的数据中文乱码的问题,须要修改类ClickHouseSink,封装HttpEntity的时候指定StreamUtils.UTF_8格式,如图:
其中涉及到了三个工具类(StreamUtils、StringUtils、Utils),直接复制拷贝到工程中便可。
这三个类,能够从项目中找到:https://github.com/yandex/clickhouse-jdbc.git
执行 mvn package,生成包:flume-clickhouse-sink-1.5.2.jar
将打包的jar包flume-ng-sql-source-1.5.2.jar,上传至flume的lib目录下。
在flume的conf目录下建立配置文件:oracle-clickhouse.conf
配置文件格式参考flume官网文档,这里不做过多介绍,直接上示例,我这里同步了两张表,每张表都有一个channel、source和sink:
agent.channels = channelMProductPL channelMProductPP agent.sources = sourceMProductPL sourceMProductPP agent.sinks = sinkMProductPL sinkMProductPP ###########sql source################# # For each Test of the sources, the type is defined agent.sources.sourceMProductPL.type = org.keedio.flume.source.SQLSource agent.sources.sourceMProductPL.hibernate.connection.url = jdbc:oracle:thin:@10.10.10.10:1521:orcl # Hibernate Database connection properties agent.sources.sourceMProductPL.hibernate.connection.user = user agent.sources.sourceMProductPL.hibernate.connection.password = password agent.sources.sourceMProductPL.hibernate.connection.autocommit = true agent.sources.sourceMProductPL.hibernate.dialect = org.hibernate.dialect.Oracle10gDialect agent.sources.sourceMProductPL.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriver #agent.sources.sourceMProductPL.hibernate.table = M_PRODUCT_PL agent.sources.sourceMProductPL.run.query.delay=10000 agent.sources.sourceMProductPL.enclose.by.quotes = false agent.sources.sourceMProductPL.status.file.path = /tmp/flume/apache-flume-1.5.2-bin agent.sources.sourceMProductPL.status.file.name = agent.sqlSource.status.mProductPL agent.sources.sourceMProductPL.inputCharset = UTF-8 # Custom query agent.sources.sourceMProductPL.start.from = 0 agent.sources.sourceMProductPL.custom.query = SELECT PRODUCT_PL,TXTSH FROM M_PRODUCT_PL WHERE "TO_NUMBER"(PRODUCT_PL) > $@$ agent.sources.sourceMProductPL.batch.size = 1 #1000 agent.sources.sourceMProductPL.max.rows = 1 #1000 agent.sources.sourceMProductPL.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider agent.sources.sourceMProductPL.hibernate.c3p0.min_size=1 agent.sources.sourceMProductPL.hibernate.c3p0.max_size=10 ############################## # For each Test of the sources, the type is defined agent.sources.sourceMProductPP.type = org.keedio.flume.source.SQLSource agent.sources.sourceMProductPP.hibernate.connection.url = jdbc:oracle:thin:@10.10.10.10:1521:orcl # Hibernate Database connection properties agent.sources.sourceMProductPP.hibernate.connection.user = user agent.sources.sourceMProductPP.hibernate.connection.password = password agent.sources.sourceMProductPP.hibernate.connection.autocommit = true agent.sources.sourceMProductPP.hibernate.dialect = org.hibernate.dialect.Oracle10gDialect agent.sources.sourceMProductPP.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriver #agent.sources.sourceMProductPP.hibernate.table = M_PRODUCT_PP agent.sources.sourceMProductPP.run.query.delay=10000 agent.sources.sourceMProductPP.enclose.by.quotes = false agent.sources.sourceMProductPP.status.file.path = /tmp/flume/apache-flume-1.5.2-bin agent.sources.sourceMProductPP.status.file.name = agent.sqlSource.status.mProductPP agent.sources.sourceMProductPP.inputCharset = UTF-8 # Custom query agent.sources.sourceMProductPP.start.from = 0 agent.sources.sourceMProductPP.custom.query = SELECT PRODUCT_PP,TXTSH FROM M_PRODUCT_PP WHERE "TO_NUMBER"(PRODUCT_PP) > $@$ agent.sources.sourceMProductPP.batch.size = 1 #1000 agent.sources.sourceMProductPP.max.rows = 1 #1000 agent.sources.sourceMProductPP.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider agent.sources.sourceMProductPP.hibernate.c3p0.min_size=1 agent.sources.sourceMProductPP.hibernate.c3p0.max_size=10 ############################## agent.channels.channelMProductPL.type = memory agent.channels.channelMProductPL.capacity = 1000 agent.channels.channelMProductPL.transactionCapacity = 1000 agent.channels.channelMProductPL.byteCapacityBufferPercentage = 20 agent.channels.channelMProductPL.byteCapacity = 1600000 agent.channels.channelMProductPP.type = memory agent.channels.channelMProductPP.capacity = 1000 agent.channels.channelMProductPP.transactionCapacity = 1000 agent.channels.channelMProductPP.byteCapacityBufferPercentage = 20 agent.channels.channelMProductPP.byteCapacity = 1600000 agent.sinks.sinkMProductPL.type = org.apache.flume.sink.clickhouse.ClickHouseSink agent.sinks.sinkMProductPL.host = http://10.122.1.229 agent.sinks.sinkMProductPL.port = 8123 agent.sinks.sinkMProductPL.database = store_analysis agent.sinks.sinkMProductPL.table = M_PRODUCT_PL agent.sinks.sinkMProductPL.batchSize = 1 #3000 agent.sinks.sinkMProductPL.format = TabSeparated agent.sinks.sinkMProductPP.type = org.apache.flume.sink.clickhouse.ClickHouseSink agent.sinks.sinkMProductPP.host = http://10.122.1.229 agent.sinks.sinkMProductPP.port = 8123 agent.sinks.sinkMProductPP.database = store_analysis agent.sinks.sinkMProductPP.table = M_PRODUCT_PP agent.sinks.sinkMProductPP.batchSize = 1 #3000 agent.sinks.sinkMProductPP.format = TabSeparated agent.sinks.sinkMProductPL.channel = channelMProductPL agent.sources.sourceMProductPL.channels=channelMProductPL agent.sinks.sinkMProductPP.channel = channelMProductPP agent.sources.sourceMProductPP.channels=channelMProductPP
参数根据具体状况自定义设置。
在flume的bin目录下,执行命令:
./flume-ng agent --conf ../conf -conf-file ../conf/oracle-clickhouse.conf -name agent -Dflume.root.logger=INFO,console
若是要后台执行,加nohup &
至此,同步工做已完成!