在系列开篇,我提到了四种将SQL SERVER数据同步到ES中的方案,本文将采用最简单的一种方案,即便用LastModifyTime来追踪DB中在最近一段时间发生了变动的数据。html
安装部分的官方文档在这里:https://www.elastic.co/guide/en/logstash/current/installing-logstash.htmljava
能够直接查看官方文档。sql
我这里使用的仍是以前文章中所述的CentOS来进行安装。数据库
首先须要安装Java(万物源于Java)服务器
输入命令找到的OpenJDK 1.8.X版本(截止我尝试时,在Java11上会有问题):jvm
yum search java | grep -i --color JDK
使用Yum进行安装:elasticsearch
yum install java-1.8.0-openjdk
配置环境变量JAVA_HOME、CLASSPATH、PATH。ide
打开/etc/profile文件:sqlserver
vi /etc/profile
将下面几行代码粘贴到该文件的最后:fetch
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-0.el7_6.x86_64/
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin
保存并关闭,而后执行下列命令让设置当即生效。
source /etc/profile
能够输入下面的命令查看是否已生效:
java –-version
echo $JAVA_HOME
echo $CLASSPATH
echo $PATH
首先注册ELK官方的GPG-KEY:
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
而后cd /etc/yum.repos.d/文件夹下,建立一个logstash.repo文件,并将下面一段内容粘贴到该文件中保存:
[logstash-7.x] name=Elastic repository for 7.x packages baseurl=https://artifacts.elastic.co/packages/7.x/yum gpgcheck=1 gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-md
而后执行安装命令:
sudo yum install logstash
以上步骤可能比较慢,还有另一种办法,就是经过下载来安装LogStash:
官方文档在这里:https://www.elastic.co/cn/downloads/logstash
首先在上面的连接中下载LogStash的tar.gz包,这个过程有可能也很慢,个人解决方案是在本身机器上使用迅雷进行下载,完事儿Copy到Linux服务器中。
下载完成后,执行解压操做:
sudo tar -xvf logstash-7.2.0.tar.gz
解压完成后,进入解压后的logstash-7.2.0文件夹。
接着咱们安装Logstash-input-jdbc插件:
bin/logstash-plugin install logstash-input-jdbc
下载SQL SERVER jbdc组件,这里咱们从微软官网下载:https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-2017 ,固然这个连接只是目前的,若是你在尝试时这个连接失效了,那就自行百度搜索吧~
下载完成后,解压到logstash下面的lib目录下,这里我本身为了方便,把微软默认给jdbc外面包的一层语言名称的文件夹给去掉了。
接着,咱们到/config文件夹,新建一个logstash.conf文件,内容大概以下:
下面的每个参数含义均可以在官方文档中找到:
input { jdbc { jdbc_driver_library => "/usr/local/logstash-7.2.0/lib/mssql-jdbc-7.2.2/mssql-jdbc-7.2.2.jre8.jar" // 这里请灵活应变,能找到咱们上一步下载的jdbc jar包便可 jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver" // 这个名字是固定的 jdbc_connection_string => "jdbc:sqlserver: //数据库ServerIP:1433;databaseName=数据库名;" jdbc_user => "数据库帐号" jdbc_password => "数据库密码" schedule => "* * * * *" // Corn 表达式,请自行百度写法 jdbc_default_timezone => "Asia/Shanghai" jdbc_page_size => "500" // 每一批传输的数量 record_last_run => "true" //是否保存状态 use_column_value => "true" //设置为时true,使用定义的 tracking_column值做为:sql_last_value。设置为时false,:sql_last_value反映上次执行查询的时间。 tracking_column => "LastModificationTime" //配合use_column_value使用 last_run_metadata_path => "/usr/opt/logstash/config/last_id" //记录:sql_last_value的文件 lowercase_column_names => "false" //将DB中的列名自动转换为小写 tracking_column_type => "timestamp" //tracking_column的数据类型,只能是numberic和timestamp clean_run => "false" //是否应保留先前的运行状态,其实我也不知道这个字段干啥用的~~ statement => "SELECT * FROM 表 WITH(NOLOCK) WHERE LastModificationTime > :sql_last_value" //从DB中抓数据的SQL脚本 } } output { elasticsearch { index => "test" //ES集群的索引名称 document_id => "%{Id}" //Id是表里面的主键,为了拿这个主键在ES中生成document ID hosts => ["http://192.168.154.135:9200"]// ES集群的地址 } }
上面的被注释搞的乱糟糟的,给大家一个能够复制的版本吧:
input { jdbc { jdbc_driver_library => "/usr/local/logstash-7.2.0/lib/mssql-jdbc-7.2.2/mssql-jdbc-7.2.2.jre8.jar" jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver" jdbc_connection_string => "jdbc:sqlserver://SERVER_IP:1433;databaseName=DBName;" jdbc_user => "xxx" jdbc_password => "password" schedule => "* * * * *" jdbc_default_timezone => "Asia/Shanghai" jdbc_page_size => "50000" record_last_run => "true" use_column_value => "true" tracking_column => "LastModificationTime" last_run_metadata_path => "/usr/local/logstash-7.2.0/config/last_id" lowercase_column_names => "false" tracking_column_type => "timestamp" clean_run => "false" statement => "SELECT * FROM xxx WITH(NOLOCK) WHERE LastModificationTime > :sql_last_value" } } output { elasticsearch { index => "item" document_id => "%{Id}" hosts => ["http://ES集群IP:9200"] } }
回头来讲一下这个LogStash的总体思路吧,其实个人理解,LogStash就是一个数据搬运工,他的搬运数据,分为三个大的阶段:
对应的官方文档:https://www.elastic.co/guide/en/logstash/current/pipeline.html
而这每个阶段,都是经过一些插件来实现的,好比在上述的配置文件中,咱们有:
剩下的部分就简单了,切换目录到logstash的目录下,执行命令:
bin/logstash -f config/logstash.conf
最后执行的效果图大概以下:
可使用Elasticsearch-Head等插件来查看是否同步正常:
大概就是这样啦,后续我这边会继续尝试使用其余方式来进行数据同步,欢迎你们关注~