1.如何实现mysql与elasticsearch的数据同步?java
逐条转换为json显然不合适,须要借助第三方工具或者本身实现。核心功能点:同步增、删、改、查同步。node
二、mysql与elasticsearch同步的方法有哪些?优缺点对比?mysql
目前该领域比较牛的插件有:git
1)、elasticsearch-jdbc,严格意义上它已经不是第三方插件。已经成为独立的第三方工具。https://github.com/jprante/elasticsearch-jdbc 2)、elasticsearch-river-mysql插件 https://github.com/scharron/elasticsearch-river-mysql 3)、go-mysql-elasticsearch(国内做者siddontang) https://github.com/siddontang/go-mysql-elasticsearchgithub
1-3同步工具/插件对比:sql
go-mysql-elasticsearch仍处理开发不稳定阶段。 为何选择elasticsearch-jdbc而不是elasticsearch-river-mysql插件的缘由?(参考:http://stackoverflow.com/questions/23658534/using-elasticsearch-river-mysql-to-stream-data-from-mysql-database-to-elasticsea) 1)通用性角度:elasticsearch-jdbc更通用, 2)版本更新角度:elasticsearch-jdbc GitHub活跃度很高,最新的版本2.3.3.02016年5月28日兼容Elasticsearch2.3.3版本。 而elasticsearch-river-mysql 2012年12月13往后便再也不更新。 综上,选择elasticsearch-jdbc做为mysql同步Elasticsearch的工具理所固然。数据库
elasticsearch-jdbc的缺点与不足(他山之石):json
1)、go-mysql-elasticsearch做者siddontang在博客提到的: elasticsearch-river-jdbc的功能是很强大,但并无很好的支持增量数据更新的问题,它须要对应的表只增不减,而这个几乎在项目中是不可能办到的。 http://www.jianshu.com/p/05cff717563c 2)、 博主leotse90在博文中提到elasticsearch-jdbc的缺点:那就是删除操做不能同步(物理删除)! http://leotse90.com/2015/11/11/ElasticSearch与MySQL数据同步以及修改表结构/app
我截止2016年6月16日没有测试到,不妄加评论。curl
这里写图片描述
三、elasticsearch-jdbc如何使用?要不要安装?
3.1 和早期版本不一样点
elasticsearch-jdbcV2.3.2.0版本不须要安装。如下笔者使用的elasticsearch也是2.3.2测试。 操做系统:CentOS release 6.6 (Final) 看到这里,你可能会问早期的版本有什么不一样呢?很大不一样。从我搜集资料来看,不一样点以下: 1)早期1.x版本,做为插件,须要安装。 2)配置也会有不一样。
3.2 elasticsearch-jdbc使用(同步方法一)
前提: 1)elasticsearch 2.3.2 安装成功,测试ok。 2)mysql安装成功,能实现增、删、改、查。 可供测试的数据库为test,表为cc,具体信息以下:
mysql> select * from cc; +----+------------+ | id | name | +----+------------+ | 1 | laoyang | | 2 | dluzhang | | 3 | dlulaoyang | +----+------------+ 3 rows in set (0.00 sec)
第一步:下载工具。 址:http://xbib.org/repository/org/xbib/elasticsearch/importer/elasticsearch-jdbc/2.3.2.0/elasticsearch-jdbc-2.3.2.0-dist.zip 第二步:导入Centos。路径本身定,笔者放到根目录下,解压。unzip elasticsearch-jdbc-2.3.2.0-dist.zip 第三步:设置环境变量。
[root@5b9dbaaa148a /]# vi /etc/profile export JDBC_IMPORTER_HOME=/elasticsearch-jdbc-2.3.2.0
使环境变量生效: [root@5b9dbaaa148a /]# source /etc/profile 第四步:配置使用。详细参考:https://github.com/jprante/elasticsearch-jdbc 1)、根目录下新建文件夹odbc_es 以下:
[root@5b9dbaaa148a /]# ll /odbc_es/ drwxr-xr-x 2 root root 4096 Jun 16 03:11 logs -rwxrwxrwx 1 root root 542 Jun 16 04:03 mysql_import_es.sh
2)、新建脚本mysql_import_es.sh,内容以下;
[root@5b9dbaaa148a odbc_es]# cat mysql_import_es.sh ’#!/bin/sh bin=$JDBC_IMPORTER_HOME/bin lib=$JDBC_IMPORTER_HOME/lib echo '{ "type" : "jdbc", "jdbc": { "elasticsearch.autodiscover":true, "elasticsearch.cluster":"my-application", #簇名,详见:/usr/local/elasticsearch/config/elasticsearch.yml "url":"jdbc:mysql://10.8.5.101:3306/test", #mysql数据库地址 "user":"root", #mysql用户名 "password":"123456", #mysql密码 "sql":"select * from cc", "elasticsearch" : { "host" : "10.8.5.101", "port" : 9300 }, "index" : "myindex", #新的index "type" : "mytype" #新的type } }'| java
-cp "${lib}/*"
-Dlog4j.configurationFile=${bin}/log4j2.xml
org.xbib.tools.Runner
org.xbib.tools.JDBCImporter
3)、为 mysql_import_es.sh 添加可执行权限。 [root@5b9dbaaa148a odbc_es]# chmod a+x mysql_import_es.sh 4)执行脚本mysql_import_es.sh [root@5b9dbaaa148a odbc_es]# ./mysql_import_es.sh
第五步:测试数据同步是否成功。 使用elasticsearch检索查询:
[root@5b9dbaaa148a odbc_es]# curl -XGET 'http://10.8.5.101:9200/myindex/mytype/_search?pretty'
{ "took" : 4, "timed_out" : false, "_shards" : { "total" : 8, "successful" : 8, "failed" : 0 }, "hits" : { "total" : 3, "max_score" : 1.0, "hits" : [ { "_index" : "myindex", "_type" : "mytype", "_id" : "AVVXKgeEun6ksbtikOWH", "_score" : 1.0, "_source" : { "id" : 1, "name" : "laoyang" } }, { "_index" : "myindex", "_type" : "mytype", "_id" : "AVVXKgeEun6ksbtikOWI", "_score" : 1.0, "_source" : { "id" : 2, "name" : "dluzhang" } }, { "_index" : "myindex", "_type" : "mytype", "_id" : "AVVXKgeEun6ksbtikOWJ", "_score" : 1.0, "_source" : { "id" : 3, "name" : "dlulaoyang" } } ] } }
出现以上包含mysql数据字段的信息则为同步成功。
四、 elasticsearch-jdbc 同步方法二
[root@5b9dbaaa148a odbc_es]# cat mysql_import_es_simple.sh #!/bin/sh bin=$JDBC_IMPORTER_HOME/bin lib=$JDBC_IMPORTER_HOME/lib java
-cp "${lib}/*"
-Dlog4j.configurationFile=${bin}/log4j2.xml
org.xbib.tools.Runner
org.xbib.tools.JDBCImporter statefile.json
[root@5b9dbaaa148a odbc_es]# cat statefile.json
{ "type" : "jdbc", "jdbc": { "elasticsearch.autodiscover":true, "elasticsearch.cluster":"my-application", "url":"jdbc:mysql://10.8.5.101:3306/test", "user":"root", "password":"123456", "sql":"select * from cc", "elasticsearch" : { "host" : "10.8.5.101", "port" : 9300 }, "index" : "myindex_2", "type" : "mytype_2" } }
脚本和json文件分开,脚本执行前先加载json文件。 执行方式:直接运行脚本 ./mysql_import_es_simple.sh 便可。
五、Mysql与elasticsearch等价查询
目标:实现从表cc中查询id=3的name信息。 1)MySQL中sql语句查询:
mysql> select * from cc where id=3; +----+------------+ | id | name | +----+------------+ | 3 | dlulaoyang | +----+------------+ 1 row in set (0.00 sec)
2)elasticsearch检索:
[root@5b9dbaaa148a odbc_es]# curl http://10.8.5.101:9200/myindex/mytype/_search?pretty -d '
{ "filter" : { "term" : { "id" : "3" } } }' { "took" : 3, "timed_out" : false, "_shards" : { "total" : 8, "successful" : 8, "failed" : 0 }, "hits" : { "total" : 1, "max_score" : 1.0, "hits" : [ { "_index" : "myindex", "_type" : "mytype", "_id" : "AVVXKgeEun6ksbtikOWJ", "_score" : 1.0, "_source" : { "id" : 3, "name" : "dlulaoyang" } } ] } }
常见错误:
错误日志位置:/odbc_es/logs 日志内容: [root@5b9dbaaa148a logs]# tail -f jdbc.log [04:03:39,570][INFO ][org.xbib.elasticsearch.helper.client.BaseTransportClient][pool-3-thread-1] after auto-discovery connected to [{5b9dbaaa148a}{aksn2ErNRlWjUECnp_8JmA}{10.8.5.101}{10.8.5.101:9300}{master=true}]
Bug一、[02:46:23,894][ERROR][importer.jdbc ][pool-3-thread-1] error while processing request: cluster state is RED and not YELLOW, from here on, everything will fail! 缘由: you created an index with replicas but you had only one node in the cluster. One way to solve this problem is by allocating them on a second node. Another way is by turning replicas off. 你建立了带副本 replicas 的索引,可是在你的簇中只有一个节点。
解决方案: 方案一:容许分配‘它们’到第二个节点。 方案二:关闭副本replicas(很是可行)。以下:
curl -XPUT 'localhost:9200/_settings' -d ' { "index" : { "number_of_replicas" : 0 } }
Bug二、[13:00:37,137][ERROR][importer.jdbc ][pool-3-thread-1] error while processing request: no cluster nodes available, check settings {autodiscover=false, client.transport.ignore_cluster_name=false, client.transport.nodes_sampler_interval=5s, client.transport.ping_timeout=5s, cluster.name=elasticsearch, org.elasticsearch.client.transport.NoNodeAvailableException: no cluster nodes available, check 解决方案: 见上脚本中新增: “elasticsearch.cluster”:”my-application”, #簇名,和/usr/local/elasticsearch/config/elasticsearch.yml 簇名保持一致。
下载地址 http://xbib.org/repository/org/xbib/elasticsearch/importer/elasticsearch-jdbc/2.3.3.0/elasticsearch-jdbc-2.3.3.0-dist.zip 解压, 设置环境变量 修改bin中脚本 运行。
注意:包下载下来没有包含statefile.json 文件,第一次运行sh文件生成该配置,后面使用都用该文件配置
./mysql-goodstaxi.sh & touch jdbc.log
#!/bin/sh DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" bin=/httx/run/elasticsearch-jdbc-2.3.3.0/bin lib=/httx/run/elasticsearch-jdbc-2.3.3.0/lib echo ' { "type" : "jdbc", "jdbc" : { "url" : "jdbc:mysql://10.7.*.*:8066/tete?useUnicode=true&characterEncoding=utf-8", "statefile" : "statefile.json", "schedule" : "0 0-59 0-23 ? * *", "user" : "54645", "password" : "456456", "sql" : [ { "statement" : "select *,TradeId as _id from Trade where stampDate > ?", "parameter" : [ "$metrics.lastexecutionstart" ] } ], "index_settings" : { "analysis" : { "analyzer" : { "ik" : { "tokenizer" : "ik" } } } }, "elasticsearch" : { "cluster" : "565", "host" : "10.7.*.*", "port" : 9300 }, "index" : "goods", "type" : "goods", "index_settings" : { "index" : { "number_of_shards" : 1 } } } } ' | java \ -cp "${lib}/*" \ -Dlog4j.configurationFile=${bin}/log4j2.xml \ org.xbib.tools.Runner \ org.xbib.tools.JDBCImporte
16546
#!/bin/sh DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" bin=/httx/6/elasticsearch-jdbc-2.3.3.0/bin lib=/httx/6/elasticsearch-jdbc-2.3.3.0/lib echo ' { "type" : "jdbc", "jdbc" : { "url" : "jdbc:mysql://10.7.*.*:8066/good?useUnicode=true&characterEncoding=utf-8", "statefile" : "statefile.json", "schedule" : "0 0-59 0-23 ? * *", "user" : "admin", "password" : "45456", "sql" : "select *,6TradeId as _id from 6Trade", "elasticsearch" : { "cluster" : "6", "host" : "10.7.*.*", "port" : 9300 }, "index" : "good", "type" : "goods", "index_settings" : { "index" : { "number_of_shards" : 1 } } } } ' | java \ -cp "${lib}/*" \ -Dlog4j.configurationFile=${bin}/log4j2.xml \ org.xbib.tools.Runner \ org.xbib.tools.JDBCImporter