在早期的时候,阿里巴巴公司由于杭州和美国两个地方的机房都部署了数据库实例,但由于跨机房同步数据的业务需求 ,便孕育而生出了canal,主要是基于trigger(触发器)
的方式获取增量变动。从 2010 年开始,阿里巴巴公司开始逐步尝试数据库日志解析,获取增量变动的数据进行同步,由此衍生出了增量订阅和消费业务。html
当前的 canal 支持的数据源端Mysql版本包括( 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x)java
目前广泛基于日志增量订阅和消费的业务,主要包括node
在介绍canal的原理以前,咱们先来了解下MySQL主从复制的原理mysql
MySQL主从复制原理 git
binary log
中, 其中记录的内容叫作二进制日志事件binary log events
,能够经过show binlog events
命令进行查看binary log
中的binary log events
拷贝到它的中继日志relay log
relay log
中的事件,将数据变动映射到它本身的数据库表中了解了MySQL的工做原理,咱们能够大体猜测到Canal应该也是采用相似的逻辑去实现增量数据订阅的功能,那么接下来咱们看看实际上Canal的工做原理是怎样的?github
canal工做原理 spring
binary log
给 slave (也就是 canal )binary log
对象(数据为byte
流)基于这样的原理与方式,即可以完成数据库增量日志的获取解析,提供增量数据订阅和消费,实现mysql实时增量数据传输的功能。sql
既然canal是这样的一个框架,又是纯Java语言编写而成,那么咱们接下来就开始学习怎么使用它并把它用到咱们的实际工做中。docker
由于目前容器化技术的火热,本文经过使用Docker来快速搭建开发环境,而传统方式的环境搭建,在咱们学会了Docker容器环境搭建后,也能自行依葫芦画瓢搭建成功。因为本篇主要讲解canal,因此关于Docker的内容不会涉及太多,主要会介绍Docker的基本概念和命令使用。数据库
相信绝大多数人都使用过虚拟机Vmware,在使用Vmware进行环境搭建的时候,只需提供了一个普通的系统镜像并成功安装,剩下的软件环境与应用配置仍是如咱们在本机操做同样在虚拟机里也操做一遍,并且Vmware占用宿主机的资源较多,容易形成宿主机卡顿,并且系统镜像自己也占用过多空间。
为了便于你们快速理解Docker,便与Vmware作对比来作介绍,docker 提供了一个开始,打包,运行app的平台,把app(应用)和底层infrastructure(基础设施)隔离开来。Docker中最主要的两个概念就是镜像(相似Vmware的系统镜像)与容器(相似Vmware里安装的系统)
什么是Image(镜像)
Docker的网络类型有三种:
bridge:桥接网络
默认状况下启动的Docker容器,都是使用 bridge,Docker安装时建立的桥接网络,
每次Docker容器重启时,会按照顺序获取对应的IP地址,
这个就致使重启下,Docker的IP地址就变了
复制代码
none:无指定网络
使用 --network=none ,docker 容器就不会分配局域网的IP
复制代码
host:主机网络
使用 --network=host,此时,Docker 容器的网络会附属在主机上,二者是互通的。
例如,在容器中运行一个Web服务,监听8080端口,则主机的8080端口就会自动映射到容器中。
复制代码
建立自定义网络:(设置固定IP)
docker network create --subnet=172.18.0.0/16 mynetwork
复制代码
查看存在的网络类型docker network ls
附上Docker的下载安装地址==> Docker Download
下载canal镜像docker pull canal/canal-server
docker pull mysql
,下载过的则以下图
docker images
##生成mysql容器
docker run -d --name mysql --net mynetwork --ip 172.18.0.6 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=root mysql
##生成canal-server容器
docker run -d --name canal-server --net mynetwork --ip 172.18.0.4 -p 11111:11111 canal/canal-server
## 命令介绍
--net mynetwork #使用自定义网络
--ip #指定分配ip
复制代码
查看docker中运行的容器docker ps
以上只是初步准备好了基础的环境,可是怎么让canal假装成salve并正确获取mysql中的binary log呢?
对于自建 MySQL , 须要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,经过修改mysql配置文件来开启bin_log,使用 find / -name my.cnf
查找my.cnf, 修改文件内容以下
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 须要定义,不要和 canal 的 slaveId 重复
复制代码
进入mysql容器docker exec -it mysql bash
建立连接MySQL的帐号canal
并授予做为 MySQL slave 的权限, 若是已有帐户可直接 GRANT
mysql -uroot -proot
# 建立帐号
CREATE USER canal IDENTIFIED BY 'canal';
# 授予权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
# 刷新并应用
FLUSH PRIVILEGES;
复制代码
数据库重启后, 简单测试 my.cnf 配置是否生效
show variables like 'log_bin';
show variables like 'log_bin';
show master status;
复制代码
进入canal-server容器docker exec -it canal-server bash
编辑canal-server的配置 vi canal-server/conf/example/instance.properties
docker restart canal-server
进入容器查看启动日志
docker exec -it canal-server bash
tail -100f canal-server/logs/example/example.log
复制代码
本文的ElasticSearch也是基于Docker环境搭建,因此读者可执行以下命令
# 下载对镜像
docker pull elasticsearch:7.1.1
docker pull mobz/elasticsearch-head:5-alpine
# 建立容器并运行
docker run -d --name elasticsearch --net mynetwork --ip 172.18.0.2 -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.1.1
docker run -d --name elasticsearch-head --net mynetwork --ip 172.18.0.5 -p 9100:9100 mobz/elasticsearch-head:5-alpine
复制代码
环境已经准备好了,如今就要开始咱们的编码实战部分了,怎么经过应用程序去获取canal解析后的binlog数据。首先咱们基于spring boot搭建一个canal demo应用。结构以下图所示
package com.example.canal.study.pojo;
import lombok.Data;
import java.io.Serializable;
/**
* 普通的实体domain对象
* @Data 用户生产getter、setter方法
*/
@Data
public class Student implements Serializable {
private String id;
private String name;
private int age;
private String sex;
private String city;
}
复制代码
CanalConfig.java
package com.example.canal.study.common;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.InetSocketAddress;
/**
* 配置一些跟canal相关到配置与公共bean
* @author haha
*/
@Configuration
public class CanalConfig {
// @Value 获取 application.properties配置中端内容
@Value("${canal.server.ip}")
private String canalIp;
@Value("${canal.server.port}")
private Integer canalPort;
@Value("${canal.destination}")
private String destination;
@Value("${elasticSearch.server.ip}")
private String elasticSearchIp;
@Value("${elasticSearch.server.port}")
private Integer elasticSearchPort;
@Value("${zookeeper.server.ip}")
private String zkServerIp;
/**
* 获取简单canal-server链接
*/
@Bean
public CanalConnector canalSimpleConnector() {
CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalIp, canalPort), destination, "", "");
return canalConnector;
}
/**
* 经过链接zookeeper获取canal-server链接
*/
@Bean
public CanalConnector canalHaConnector() {
CanalConnector canalConnector = CanalConnectors.newClusterConnector(zkServerIp, destination, "", "");
return canalConnector;
}
/**
* elasticsearch 7.x客户端
*/
@Bean
public RestHighLevelClient restHighLevelClient() {
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost(elasticSearchIp, elasticSearchPort))
);
return client;
}
}
复制代码
CanalDataParser.java
因为这个类的代码较多,文中则摘出其中比较重要的部分,其它部分代码可从github上获取
/**
* 元祖类型的对象定义
* @param <A>
* @param <B>
*/
public static class TwoTuple<A, B> {
public final A eventType;
public final B columnMap;
public TwoTuple(A a, B b) {
eventType = a;
columnMap = b;
}
}
/**
* 解析canal中的message对象内容
* @param entrys
* @return
*/
public static List<TwoTuple<EventType, Map>> printEntry(List<Entry> entrys) {
List<TwoTuple<EventType, Map>> rows = new ArrayList<>();
for (Entry entry : entrys) {
// binlog event的事件事件
long executeTime = entry.getHeader().getExecuteTime();
// 当前应用获取到该binlog锁延迟的时间
long delayTime = System.currentTimeMillis() - executeTime;
Date date = new Date(entry.getHeader().getExecuteTime());
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 当前的entry(binary log event)的条目类型属于事务
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {
TransactionBegin begin = null;
try {
begin = TransactionBegin.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
}
// 打印事务头信息,执行的线程id,事务耗时
logger.info(transaction_format,
new Object[]{entry.getHeader().getLogfileName(),
String.valueOf(entry.getHeader().getLogfileOffset()),
String.valueOf(entry.getHeader().getExecuteTime()),
simpleDateFormat.format(date),
entry.getHeader().getGtid(),
String.valueOf(delayTime)});
logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId());
printXAInfo(begin.getPropsList());
} else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
TransactionEnd end = null;
try {
end = TransactionEnd.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
}
// 打印事务提交信息,事务id
logger.info("----------------\n");
logger.info(" END ----> transaction id: {}", end.getTransactionId());
printXAInfo(end.getPropsList());
logger.info(transaction_format,
new Object[]{entry.getHeader().getLogfileName(),
String.valueOf(entry.getHeader().getLogfileOffset()),
String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
entry.getHeader().getGtid(), String.valueOf(delayTime)});
}
continue;
}
// 当前entry(binary log event)的条目类型属于原始数据
if (entry.getEntryType() == EntryType.ROWDATA) {
RowChange rowChage = null;
try {
// 获取储存的内容
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
}
// 获取当前内容的事件类型
EventType eventType = rowChage.getEventType();
logger.info(row_format,
new Object[]{entry.getHeader().getLogfileName(),
String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
entry.getHeader().getTableName(), eventType,
String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
entry.getHeader().getGtid(), String.valueOf(delayTime)});
// 事件类型是query或数据定义语言DDL直接打印sql语句,跳出继续下一次循环
if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
logger.info(" sql ----> " + rowChage.getSql() + SEP);
continue;
}
printXAInfo(rowChage.getPropsList());
// 循环当前内容条目的具体数据
for (RowData rowData : rowChage.getRowDatasList()) {
List<CanalEntry.Column> columns;
// 事件类型是delete返回删除前的列内容,不然返回改变后列的内容
if (eventType == CanalEntry.EventType.DELETE) {
columns = rowData.getBeforeColumnsList();
} else {
columns = rowData.getAfterColumnsList();
}
HashMap<String, Object> map = new HashMap<>(16);
// 循环把列的name与value放入map中
for (Column column: columns){
map.put(column.getName(), column.getValue());
}
rows.add(new TwoTuple<>(eventType, map));
}
}
}
return rows;
}
复制代码
ElasticUtils.java
package com.example.canal.study.common;
import com.alibaba.fastjson.JSON;
import com.example.canal.study.pojo.Student;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.common.xcontent.XContentType;
import java.util.Map;
/**
* es的crud工具类
* @author haha
*/
@Slf4j
@Component
public class ElasticUtils {
@Autowired
private RestHighLevelClient restHighLevelClient;
/**
* 新增
* @param student
* @param index 索引
*/
public void saveEs(Student student, String index) {
IndexRequest indexRequest = new IndexRequest(index)
.id(student.getId())
.source(JSON.toJSONString(student), XContentType.JSON)
.opType(DocWriteRequest.OpType.CREATE);
try {
IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
log.info("保存数据至ElasticSearch成功:{}", response.getId());
} catch (Exception e) {
log.error("保存数据至elasticSearch失败: {}", e);
}
}
/**
* 查看
* @param index 索引
* @param id _id
* @throws Exception
*/
public void getEs(String index, String id) {
GetRequest getRequest = new GetRequest(index, id);
GetResponse response = null;
try {
response = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
Map<String, Object> fields = response.getSource();
for (Map.Entry<String, Object> entry : fields.entrySet()) {
System.out.println(entry.getKey() + ":" + entry.getValue());
}
} catch (Exception e) {
log.error("从elasticSearch获取数据失败: {}", e);
}
}
/**
* 更新
* @param student
* @param index 索引
* @throws Exception
*/
public void updateEs(Student student, String index) {
UpdateRequest updateRequest = new UpdateRequest(index, student.getId());
updateRequest.doc(JSON.toJSONString(student), XContentType.JSON);
UpdateResponse response = null;
try {
response = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
log.info("更新数据至ElasticSearch成功:{}", response.getId());
} catch (Exception e) {
log.error("更新数据至elasticSearch失败: {}", e);
}
}
/**
* 根据id删除数据
* @param index 索引
* @param id _id
* @throws Exception
*/
public void DeleteEs(String index, String id) {
DeleteRequest deleteRequest = new DeleteRequest(index, id);
DeleteResponse response = null;
try {
response = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
log.info("从elasticSearch删除数据成功:{}", response.getId());
} catch (Exception e) {
log.error("从elasticSearch删除数据失败: {}", e);
}
}
}
复制代码
BinLogElasticSearch.java
package com.example.canal.study.action;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.example.canal.study.common.CanalDataParser;
import com.example.canal.study.common.ElasticUtils;
import com.example.canal.study.pojo.Student;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* 获取binlog数据并发送到es中
*
* @author haha
*/
@Slf4j
@Component
public class BinLogElasticSearch {
@Autowired
private CanalConnector canalSimpleConnector;
@Autowired
private ElasticUtils elasticUtils;
//@Qualifier("canalHaConnector")使用名为canalHaConnector的bean
@Autowired
@Qualifier("canalHaConnector")
private CanalConnector canalHaConnector;
public void binLogToElasticSearch() throws IOException {
openCanalConnector(canalSimpleConnector);
// 轮询拉取数据
Integer batchSize = 5 * 1024;
while (true) {
// Message message = canalHaConnector.getWithoutAck(batchSize);
Message message = canalSimpleConnector.getWithoutAck(batchSize);
long id = message.getId();
int size = message.getEntries().size();
log.info("当前监控到binLog消息数量{}", size);
if (id == -1 || size == 0) {
try {
// 等待4秒
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//1. 解析message对象
List<CanalEntry.Entry> entries = message.getEntries();
List<CanalDataParser.TwoTuple<CanalEntry.EventType, Map>> rows = CanalDataParser.printEntry(entries);
for (CanalDataParser.TwoTuple<CanalEntry.EventType, Map> tuple : rows) {
if (tuple.eventType == CanalEntry.EventType.INSERT) {
Student student = createStudent(tuple);
// 2。将解析出的对象同步到elasticSearch中
elasticUtils.saveEs(student, "student_index");
// 3.消息确认已处理
canalSimpleConnector.ack(id);
// canalHaConnector.ack(id);
}
if (tuple.eventType == CanalEntry.EventType.UPDATE) {
Student student = createStudent(tuple);
elasticUtils.updateEs(student, "student_index");
// 3.消息确认已处理
canalSimpleConnector.ack(id);
// canalHaConnector.ack(id);
}
if (tuple.eventType == CanalEntry.EventType.DELETE) {
elasticUtils.DeleteEs("student_index", tuple.columnMap.get("id").toString());
canalSimpleConnector.ack(id);
// canalHaConnector.ack(id);
}
}
}
}
}
/**
* 封装数据至Student对象中
*
* @param tuple
* @return
*/
private Student createStudent(CanalDataParser.TwoTuple<CanalEntry.EventType, Map> tuple) {
Student student = new Student();
student.setId(tuple.columnMap.get("id").toString());
student.setAge(Integer.parseInt(tuple.columnMap.get("age").toString()));
student.setName(tuple.columnMap.get("name").toString());
student.setSex(tuple.columnMap.get("sex").toString());
student.setCity(tuple.columnMap.get("city").toString());
return student;
}
/**
* 打开canal链接
*
* @param canalConnector
*/
private void openCanalConnector(CanalConnector canalConnector) {
//链接CanalServer
canalConnector.connect();
// 订阅destination
canalConnector.subscribe();
}
/**
* 关闭canal链接
*
* @param canalConnector
*/
private void closeCanalConnector(CanalConnector canalConnector) {
//关闭链接CanalServer
canalConnector.disconnect();
// 注销订阅destination
canalConnector.unsubscribe();
}
}
复制代码
CanalDemoApplication.java(spring boot 启动类)
package com.example.canal.study;
import com.example.canal.study.action.BinLogElasticSearch;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* 应用的启动类
* @author haha
*/
@SpringBootApplication
public class CanalDemoApplication implements ApplicationRunner {
@Autowired
private BinLogElasticSearch binLogElasticSearch;
public static void main(String[] args) {
SpringApplication.run(CanalDemoApplication.class, args);
}
// 程序启动则执行run方法
@Override
public void run(ApplicationArguments args) throws Exception {
binLogElasticSearch.binLogToElasticSearch();
}
}
复制代码
application.properties
server.port=8081
spring.application.name = canal-demo
canal.server.ip = localhost
canal.server.port = 11111
canal.destination = example
zookeeper.server.ip = localhost:2181
zookeeper.sasl.client = false
elasticSearch.server.ip = localhost
elasticSearch.server.port = 9200
复制代码
经过上面的学习,咱们知道了单机直连方式的canala应用。在当今互联网时代,单实例模式逐渐被集群高可用模式取代,那么canal的多实例集群方式如何搭建呢!
准备zookeeper的docker镜像与容器
docker pull zookeeper
docker run -d --name zookeeper --net mynetwork --ip 172.18.0.3 -p 2181:2181 zookeeper
docker run -d --name canal-server2 --net mynetwork --ip 172.18.0.8 -p 11113:11113 canal/canal-server
复制代码
最终效果如图
canal.port=11113
canal.zkServers=172.18.0.3:2181
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
复制代码
canal.instance.mysql.slaveId = 1235
#以前的canal slaveId是1234,保证slaveId不重复便可
canal.instance.master.address = 172.18.0.6:3306
复制代码
注意: 两台机器上的instance目录的名字须要保证彻底一致,HA模式是依赖于instance name进行管理,同时必须都选择default-instance.xml配置
启动两个不一样容器的canal,启动后,能够经过tail -100f logs/example/example.log
查看启动日志,只会看到一台机器上出现了启动成功的日志。
好比我这里启动成功的是 172.18.0.4
[zk: localhost:2181(CONNECTED) 15] get /otter/canal/destinations/example/running
{"active":true,"address":"172.18.0.4:11111","cid":1}
复制代码
能够经过指定zookeeper地址和canal的instance name,canal client会自动从zookeeper中的running节点,获取当前服务的工做节点,而后与其创建连接:
[zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example/running
{"active":true,"address":"172.18.0.4:11111","cid":1}
复制代码
对应的客户端编码可使用以下形式,上文中的CanalConfig.java
中的canalHaConnector
就是一个HA链接
CanalConnector connector = CanalConnectors.newClusterConnector("172.18.0.3:2181", "example", "", "");
复制代码
连接成功后,canal server会记录当前正在工做的canal client信息,好比客户端ip,连接的端口信息等 (聪明的你,应该也能够发现,canal client也能够支持HA功能)
[zk: localhost:2181(CONNECTED) 4] get /otter/canal/destinations/example/1001/running
{"active":true,"address":"192.168.124.5:59887","clientId":1001}
复制代码
数据消费成功后,canal server会在zookeeper中记录下当前最后一次消费成功的binlog位点. (下次你重启client时,会从这最后一个位点继续进行消费)
[zk: localhost:2181(CONNECTED) 5] get /otter/canal/destinations/example/1001/cursor
{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"mysql.mynetwork","port":3306}},"postion":{"included":false,"journalName":"binlog.000004","position":2169,"timestamp":1562672817000}}
复制代码
中止正在工做的172.18.0.4的canal server
docker exec -it canal-server bash
cd canal-server/bin
sh stop.sh
复制代码
这时172.18.0.8会立马启动example instance,提供新的数据服务
[zk: localhost:2181(CONNECTED) 19] get /otter/canal/destinations/example/running
{"active":true,"address":"172.18.0.8:11111","cid":1}
复制代码
与此同时,客户端也会随着canal server的切换,经过获取zookeeper中的最新地址,与新的canal server创建连接,继续消费数据,整个过程自动完成
es与es-head是两个独立的进程,当es-head访问es服务时,会存在一个跨域问题。因此咱们须要修改es的配置文件,增长一些配置项来解决这个问题,以下
[root@localhost /usr/local/elasticsearch-head-master]# cd ../elasticsearch-5.5.2/config/
[root@localhost /usr/local/elasticsearch-5.5.2/config]# vim elasticsearch.yml
# 文件末尾加上以下配置
http.cors.enabled: true
http.cors.allow-origin: "*"
复制代码
修改完配置文件后需重启es服务
解决方法:
一、进入head安装目录;
二、cd _site/
三、编辑vendor.js 共有两处
#6886行 contentType: "application/x-www-form-urlencoded
改为 contentType: "application/json;charset=UTF-8"
#7574行 var inspectData = s.contentType === "application/x-www-form-urlencoded" &&
改为 var inspectData = s.contentType === "application/json;charset=UTF-8" &&
复制代码
#pom中除了加入依赖
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.1.1</version>
</dependency>
#还需加入
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.1.1</version>
</dependency>
复制代码
相关参考 git hub issues
参考:为何ElasticSearch要在7.X版本去掉type?
因为本文使用的是elasticsearch7.x以上的版本,目前spring-data-elasticsearch底层采用es官方TransportClient,而es官方计划放弃TransportClient,工具以es官方推荐的RestHighLevelClient进行调用请求。 可参考RestHighLevelClient API
若是建立时未指定 --restart=always ,可经过update 命令
docker update --restart=always [containerID]
复制代码
host 模式是为了性能,可是这却对 docker 的隔离性形成了破坏,致使安全性下降。 在性能场景下,能够用 --netwokr host 开启 Host 模式,但须要注意的是,若是你用 Windows 或 Mac 本地启动容器的话,会遇到 host 模式失效的问题。缘由是 host 模式只支持 Linux 宿主机。
参见官方文档:docs.docker.com/network/hos…
出现这个错的意思是zookeeper做为外部应用须要向系统申请资源,申请资源的时候须要经过认证,而sasl是一种认证方式,咱们想办法来绕过sasl认证。避免等待,来提升效率。
在项目代码中加入System.setProperty("zookeeper.sasl.client", "false");
,若是是spring boot 项目能够在application.properties中加入zookeeper.sasl.client=false
参考:Increased CPU usage by unnecessary SASL checks
把canal的官方源码下载到本机git clone https://github.com/alibaba/canal.git
,而后修改client模块下pom.xml文件中关于zookeeper的内容,而后从新mvn install
mvn install
生产的包
修改hosts文件只能够实现域名到ip的映射(域名重定向),iptables能够实现端口的重定向,可是这个问题是要经过ip到ip的重定向能够解决,可是研究了一下没找到怎么设置(windows、mac),因此咱们修改canal的官方源码来达到咱们想要的目的。修改ClusterCanalConnector.java
中的connect()
方法。
如下是修改后内容对比图
本文示例项目源代码==>canal-elasticsearch-sync