canal https://github.com/alibaba/canal/wiki/QuickStartjava
canal-adapter 安装 https://github.com/alibaba/canal/wiki/ClientAdaptermysql
ElasticSearch适配器 https://github.com/alibaba/canal/wiki/Sync-ESgit
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gzgithub
对于自建 MySQL , 须要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置以下spring
[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 须要定义,不要和 canal 的 slaveId 重复
受权 canal 连接 MySQL 帐号具备做为 MySQL slave 的权限, 若是已有帐户可直接 grantsql
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
如下配置根据pf部门提供进行修改数据库
解压缩json
mkdir /yourpath/soft/canal.deployer-1.1.4 tar zxvf canal.deployer-1.1.4.tar.gz -C /yourpath/soft/canal.deployer-1.1.4
配置修改,复制 example 文件夹新为 basic 文件夹 (basic 这个名称与后面adapter配置对应)数组
cd /yourpath/soft/canal.deployer-1.1.4 cp -r example basic vi conf/basic/instance.properties
conf/basic/instance.properties 修改(数据库,链接数据库帐户密码)app
canal.instance.master.address=172.23.201.55:3313 //数据库地址端口 canal.instance.dbUsername=canal // 数据库帐户 canal.instance.dbPassword=canal // 数据库密码 canal.instance.filter.regex=pf_basic\\..* // 匹配 pf_basic库
修改后
################################################# ## mysql serverId , v1.0.26+ will autoGen # canal.instance.mysql.slaveId=0 # enable gtid use true/false canal.instance.gtidon=false # position info canal.instance.master.address=172.23.201.55:3313 //数据库地址端口 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password canal.instance.dbUsername=canal // 数据库帐户 canal.instance.dbPassword=canal // 数据库密码 canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex canal.instance.filter.regex=pf_basic\\..* // 匹配 pf_basic库 # table black regex canal.instance.filter.black.regex= # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config canal.mq.topic=example # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* #################################################
使用canal用户启动
cd /yourpath/soft/canal.deployer-1.1.4 su canal sh bin/startup.sh --- sh bin/stop.sh // 关闭
ls /yourpath/soft/canal.deployer-1.1.4/logs basic 日志 ls /yourpath/soft/canal.deployer-1.1.4/logs/basic/basic.log
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz
mkdir /yourpath/soft/canal.adapter-1.1.4 tar zxvf canal.adapter-1.1.4.tar.gz -C /yourpath/soft/canal.adapter-1.1.4
cd /yourpath/soft/canal.adapter-1.1.4 cat conf/application.yml |grep -v "^#" server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp # kafka rocketMQ canalServerHost: 172.23.201.59:11111 //canal 地址和端口 batchSize: 500 syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: srcDataSources: defaultDS: url: jdbc:mysql://172.23.201.55:3313/pf_basic?useUnicode=true //数据库 username: canal password: canal canalAdapters: - instance: basic # canal instance Name or mq topic name 对应canal建立的文件夹 groups: - groupId: g1 outerAdapters: - name: es hosts: 172.16.195.227:9200,172.16.195.226:9200,172.16.195.228:9200,172.16.195.232:9200,172.16.195.233:9200 # es 集群地址, 逗号分隔 properties: mode: rest # transport # or rest
adapter将会自动加载 conf/es 下的全部.yml结尾的配置文件
示例: cat pf_basic_sms_report.yml dataSourceKey: defaultDS destination: basic # basic 对应canal建立的文件夹 groupId: g1 esMapping: _index: sms_20200602 _type: report _id: _id upsert: true # pk: id sql: "SELECT a.id _id, a.id , a.mobile , a.supplier , a.msgid , a.status , a.status_desc statusDesc, a.uid , a.charging_count chargingCount, a.caller , a.report_time reportTime, a.create_time createTime FROM t_sms_status_report a" # objFields: # _labels: array:; # etlCondition: "where a.c_time>={}" commitBatch: 3000
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值 outerAdapterKey: exampleKey # 对应application.yml中es配置的key destination: example # cannal的instance或者MQ的topic groupId: # 对应MQ模式下的groupId, 只会同步对应groupId的数据 esMapping: _index: mytest_user # es 的索引名称 _type: _doc # es 的type名称, es7下无需配置此项 _id: _id # es 的_id, 若是不配置该项必须配置下面的pk项_id则会由es自动分配 # pk: id # 若是不须要_id, 则须要指定一个属性为主键属性 # sql映射 sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name, a.c_time as _c_time, c.labels as _labels from user a left join role b on b.id=a.role_id left join (select user_id, group_concat(label order by id desc separator ';') as labels from label group by user_id) c on c.user_id=a.id" # objFields: # _labels: array:; # 数组或者对象属性, array:; 表明以;字段里面是以;分隔的 # _obj: object # json对象 etlCondition: "where a.c_time>='{0}'" # etl 的条件参数 commitBatch: 3000 # 提交批大小
cd /yourpath/soft/canal.adapter-1.1.4 bin/startup.sh
cd /yourpath/soft/logstash/config/conf.d cat config-jdbc_mysql-to-es.conf input { jdbc { codec => "json_lines" add_field => {"@doc_type" => "send"} #数据库驱动路径 jdbc_driver_library => "/yourpath/soft/logstash/driver/mysql-connector-java-5.1.46.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" #数据库地址 jdbc_connection_string => "jdbc:mysql://172.23.201.55:3313/pf_basic" jdbc_user => "logstash" jdbc_password => "Logstash@yourpath" jdbc_paging_enabled => "true" jdbc_page_size => 10000 # sql_last_value 是一个内置的变量,记录了上次执行的跟踪字段值 #statement => "select * from t_sms_status_report where create_time > :sql_last_value" statement_filepath => "/yourpath/soft/logstash/config/conf.d/sql/sms_send.sql" #多久同步一次,一分钟一次 schedule => "* * * * *" #是否开启记录追踪 record_last_run => "true" #这里能够手动设置:sql_last_value的值,默认时间是1970-01-01,默认数字是0 last_run_metadata_path => "/yourpath/soft/logstash/config/conf.d/state/last_run.pf-basic-sms-send" #是否每次清除last_run_metadata_path的内容 clean_run => "false" #是否须要追踪字段,若是为true,则须要指定tracking_column,默认是timestamp use_column_value => "true" #指定追踪的字段 tracking_column => "createTime" #将字段名称转换为小写形式,默认为true lowercase_column_names => "false" #追踪字段的类型,目前只有数字和时间类型,默认是数字类型 tracking_column_type => "timestamp" #设置时区 jdbc_default_timezone => "Asia/Shanghai" } } filter { mutate { remove_field => ["@version", "@timestamp"] split => {"reqPhone" => "," "sendPhone" => ","} } } output { elasticsearch { hosts => ["172.16.195.227:9200", "172.16.195.226:9200", "172.16.195.228:9200", "172.16.195.232:9200", "172.16.195.233:9200"] index => "sms" document_type => "%{@doc_type}" document_id => "%{id}" # 写入动做,string类型,可能的值: (index), delete, create, update action => "index" # update模式下使用,以当前更新的文档source看成upsert的内容,从而建立新文档。默认值为false doc_as_upsert => "false" } }
/yourpath/soft/logstash/config/conf.d/state 数据同步的状态文件
/yourpath/soft/logstash/config/conf.d/sql 执行数据同步的sql文件
示例: cat sms_report.sql SELECT a.id , a.mobile , a.supplier , a.msgid , a.status , a.status_desc statusDesc, a.uid , a.charging_count chargingCount, a.caller , DATE_FORMAT(a.report_time, '%Y-%m-%dT%T+08:00') reportTime, DATE_FORMAT(a.create_time, '%Y-%m-%dT%T+08:00') createTime FROM t_sms_status_report a WHERE a.create_time > :sql_last_value
sudo /yourpath/soft/logstash/bin/logstash -f /yourpath/soft/logstash/config/conf.d/config-jdbc_mysql-to-es.conf
该方法进行的全量同步,以后使用canal 进行增量同步