Bboss is a good elasticsearch Java rest client. It operates and accesses elasticsearch in a way similar to mybatis.html
BBoss Environmental requirements
JDK requirement: JDK 1.7+linux
Elasticsearch version requirements: 1.x,2.X,5.X,6.X,+git
Spring boot: 1.x,2.x,+github
kafka2x-Elasticsearch 数据同步工具demo
兼容 kafka_2.12-2.3.0 系列版本 ,使用本demo所带的应用程序运行容器环境,能够快速编写,打包发布可运行的数据导入工具windows
支持的 kafka_2.12-2.3.0 系列版本 到elasticsearch数据同步微信
kafka低版本(kafka_2.12-0.10.2.0系列版本)同步工具案例地址:https://gitee.com/bbossgroups/kafka1x-elasticsearchmybatis
支持的Elasticsearch版本: 1.x,2.x,5.x,6.x,7.x,+app
支持海量PB级数据同步导入功能eclipse
使用参考文档jvm
导入maven坐标
<dependency> <groupId>com.bbossgroups.plugins</groupId> <artifactId>bboss-elasticsearch-rest-kafka2x</artifactId> <version>5.9.9</version> <scope>compile</scope> </dependency>
构建部署
准备工做
须要经过gradle构建发布版本,gradle安装配置参考文档:
https://esdoc.bbossgroups.com/#/bboss-build
下载源码工程-基于gradle
https://github.com/bbossgroups/kafka2x-elasticsearch
从上面的地址下载源码工程,而后导入idea或者eclipse,根据本身的需求,修改导入程序逻辑
org.frameworkset.elasticsearch.imp.Kafka2ESdemo
若是须要测试和调试导入功能,运行Kafka2ESdemo的main方法便可便可:
public class Dbdemo { public static void main(String args[]){ Kafka2ESdemo dbdemo = new Kafka2ESdemo(); boolean dropIndice = true;//CommonLauncher.getBooleanAttribute("dropIndice",false);//同时指定了默认值 dbdemo.scheduleTimestampImportData(dropIndice); } ..... }
修改es配置-kafka2x-elasticsearch\src\main\resources\application.properties
修改完毕配置后,就能够进行功能调试了。
测试调试经过后,就能够构建发布可运行的版本了:进入命令行模式,在源码工程根目录kafka2x-elasticsearch 下运行如下gradle指令打包发布版本
release.bat
运行做业
gradle构建成功后,在build/distributions目录下会生成能够运行的zip包,解压运行导入程序
linux:
chmod +x restart.sh
./restart.sh
windows: restart.bat
做业jvm配置
修改jvm.options,设置内存大小和其余jvm参数
-Xms1g
-Xmx1g
做业参数配置
在使用kafka2x-elasticsearch 时,为了不调试过程当中不断打包发布数据同步工具,能够将部分控制参数配置到启动配置文件resources/application.properties中,而后在代码中经过如下方法获取配置的参数:
#工具主程序 mainclass=org.frameworkset.elasticsearch.imp.Kafka2ESdemo # 参数配置 # 在代码中获取方法:CommonLauncher.getBooleanAttribute("dropIndice",false);//同时指定了默认值false dropIndice=false
在代码中获取参数dropIndice方法:
boolean dropIndice = CommonLauncher.getBooleanAttribute("dropIndice",false);//同时指定了默认值false
另外能够在resources/application.properties配置控制做业执行的一些参数,例如工做线程数,等待队列数,批处理size等等:
queueSize=50 workThreads=10 batchSize=20
在做业执行方法中获取并使用上述参数:
int batchSize = CommonLauncher.getIntProperty("batchSize",10);//同时指定了默认值 int queueSize = CommonLauncher.getIntProperty("queueSize",50);//同时指定了默认值 int workThreads = CommonLauncher.getIntProperty("workThreads",10);//同时指定了默认值 importBuilder.setBatchSize(batchSize); importBuilder.setQueue(queueSize);//设置批量导入线程池等待队列长度 importBuilder.setThreadCount(workThreads);//设置批量导入线程池工做线程数量
elasticsearch技术交流群:166471282
elasticsearch微信公众号:bbossgroup
码云项目:kafka2x-elasticsearch