其实作这个Demo的目的是如何基于Elasticsearch构建网站日志处理系统,经过数据同步工具等一些列开源组件来快速构建一个日志处理系统,项目雏形初步成型中。css
日志演示网址:http://es.52itstyle.comhtml
区域演示网址:http://es.52itstyle.com/area/indexvue
固然,项目功能会逐步增长,实现一个365°全方位的Demo案例。java
JDK1.七、Maven、Eclipse、SpringBoot1.5.九、elasticsearch2.4.六、Dubbox2.8.四、zookeeper3.4.六、Redis、kafka、Vue、Iviewnode
spring-boot-starter-parent-1.5.9.RELEASE、spring-data-elasticsearch-2.1.9.RELEAS、elasticsearch-2.4.6(5.0+以上须要依赖JDK8)mysql
截止2018年1月22日,ElasticSearch目前最新的已到6.1.2,可是spring-boot的更新速度远远跟不上ElasticSearch更新的速度,目前spring-boot支持的最新版本是elasticsearch-2.4.6。jquery
参考:https://github.com/spring-projects/spring-data-elasticsearch/wiki/Spring-Data-Elasticsearch---Spring-Boot---version-matrixgit
使用spring-boot中的spring-data-elasticsearch,可使用两种内置客户端接入github
一、节点客户端(node client): 配置文件中设置为local:false,节点客户端以无数据节点(node-master或node-client)身份加入集群,换言之,它本身不存储任何数据,可是它知道数据在集群中的具体位置,而且可以直接转发请求到对应的节点上。web
二、传输客户端(Transport client): 配置文件中设置为local:true,这个更轻量的传输客户端可以发送请求到远程集群。它本身不加入集群,只是简单转发请求给集群中的节点。 两个Java客户端都经过9300端口与集群交互,使用Elasticsearch传输协议(Elasticsearch Transport Protocol)。集群中的节点之间也经过9300端口进行通讯。若是此端口未开放,你的节点将不能组成集群。
spring.data.elasticsearch.cluster-name=elasticsearch
#默认就是本机,若是要使用远程服务器,或者局域网服务器,那就须要在这里配置ip:prot;能够配置多个,以逗号分隔,至关于集群。
#Java客户端:经过9300端口与集群进行交互
#其余全部程序语言:均可以使用RESTful API,经过9200端口的与Elasticsearch进行通讯。
#spring.data.elasticsearch.cluster-nodes=192.168.1.180:9300
复制代码
须要自行安装ElasticSearch,注意ElasticSearch版本尽可能要与JAR包一致。
下载地址:https://www.elastic.co/downloads/past-releases/elasticsearch-2-4-6
安装说明:http://www.52itstyle.com/thread-20114-1-1.html
新版本不建议使用root用户启动,须要自建ElasticSearch用户,也可使用如下命令启动 elasticsearch -Des.insecure.allow.root=true -d 或者在elasticsearch中加入ES_JAVA_OPTS="-Des.insecure.allow.root=true"。
├─src
│ ├─main
│ │ ├─java
│ │ │ └─com
│ │ │ └─itstyle
│ │ │ └─es
│ │ │ │ Application.java
│ │ │ │
│ │ │ ├─common
│ │ │ │ ├─constant
│ │ │ │ │ PageConstant.java
│ │ │ │ │
│ │ │ │ └─interceptor
│ │ │ │ MyAdapter.java
│ │ │ │
│ │ │ └─log
│ │ │ ├─controller
│ │ │ │ LogController.java
│ │ │ │
│ │ │ ├─entity
│ │ │ │ Pages.java
│ │ │ │ SysLogs.java
│ │ │ │
│ │ │ ├─repository
│ │ │ │ ElasticLogRepository.java
│ │ │ │
│ │ │ └─service
│ │ │ │ LogService.java
│ │ │ │
│ │ │ └─impl
│ │ │ LogServiceImpl.java
│ │ │
│ │ ├─resources
│ │ │ │ application-dev.properties
│ │ │ │ application-prod.properties
│ │ │ │ application-test.properties
│ │ │ │ application.yml
│ │ │ │ spring-context-dubbo.xml
│ │ │ │
│ │ │ ├─static
│ │ │ │ ├─iview
│ │ │ │ │ │ iview.css
│ │ │ │ │ │ iview.min.js
│ │ │ │ │ │
│ │ │ │ │ └─fonts
│ │ │ │ │ ionicons.eot
│ │ │ │ │ ionicons.svg
│ │ │ │ │ ionicons.ttf
│ │ │ │ │ ionicons.woff
│ │ │ │ │
│ │ │ │ ├─jquery
│ │ │ │ │ jquery-3.2.1.min.js
│ │ │ │ │
│ │ │ │ └─vue
│ │ │ │ vue.min.js
│ │ │ │
│ │ │ └─templates
│ │ │ └─log
│ │ │ index.html
│ │ │
│ │ └─webapp
│ │ │ index.jsp
│ │ │
│ │ └─WEB-INF
│ │ web.xml
│ │
│ └─test
│ └─java
│ └─com
│ └─itstyle
│ └─es
│ └─test
│ Logs.java
│
复制代码
使用ElasticsearchTemplate模板插入了20万条数据,本地向外网服务器(1核1G),用时60s+,一分钟左右的时间。虽然索引库容量有增长,可是等了大约 10分钟左右的时间才能搜索出来。
分页查询到10000+的时候系统报错,Result window is too large,修改config下的elasticsearch.yml 追加如下代码便可:
# 自行定义数量
index.max_result_window : '10000000'
复制代码
参考:https://www.elastic.co/guide/en/elasticsearch/reference/current/index-modules.html
Elasticsearch为Java用户提供了两种内置客户端:
节点客户端,顾名思义,其自己也是Elasticsearch集群的一个组成部分。以无数据节点(none data node)身份加入集群,换言之,它本身不存储任何数据,可是它知道数据在集群中的具体位置,而且可以直接转发请求到对应的节点上。
这个更轻量的传输客户端可以发送请求到远程集群。它本身不加入集群,只是简单转发请求给集群中的节点。两个Java客户端都经过9300端口与集群交互,使用Elasticsearch传输协议(Elasticsearch Transport Protocol)。集群中的节点之间也经过9300端口进行通讯。若是此端口未开放,你的节点将不能组成集群。
elasticsearch-head是一个界面化的集群操做和管理工具,能够对集群进行傻瓜式操做。你能够经过插件把它集成到es(首选方式),也能够安装成一个独立webapp。
es-head主要有三个方面的操做:
插件安装方式、参考:https://github.com/mobz/elasticsearch-head
安装成功之后会在plugins目录下出现一个head目录,代表安装已经成功。
浏览截图:
Elasticsearch、Logstash 随着 Kibana 的命名升级直接从2.4跳跃到了5.0,5.x版本的 ELK 在版本对应上要求相对较高,再也不支持5.x和2.x的混搭,同时 Elastic 作了一个 package ,对本来的 marvel、watch、alert 作了一个封装,造成了 x-pack 。
安装:https://www.elastic.co/guide/en/elasticsearch/reference/6.1/installing-xpack-es.html
x-pack安装以后有一个超级用户elastic ,其默认的密码是changeme,拥有对全部索引和数据的控制权,可使用该用户建立和修改其余用户,固然这里能够经过kibana的web界面进行用户和用户组的管理。
修改elastic用户的密码:
curl -XPUT -u elastic 'localhost:9200/_xpack/security/user/elastic/_password' -d '{ "password" : "123456" }'
复制代码
下载安装:
方式一 - download pre-build package from here: https://github.com/medcl/elasticsearch-analysis-ik/releases unzip plugin to folder your-es-root/plugins/
方式一二 - use elasticsearch-plugin to install ( version > v5.5.1 ): ./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v6.0.0/elasticsearch-analysis-ik-6.0.0.zip
因为Elasticsearch版本是2.4.6,这里选择IK版本为1.10.6
wget https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v1.10.6/elasticsearch-analysis-ik-1.10.6.zip
复制代码
下载解压之后在 Elasticsearch 的config下的elasticsearch.yml文件中,添加以下代码(2.0以上能够不设置)。
index:
analysis:
analyzer:
ik:
alias: [ik_analyzer]
type: org.elasticsearch.index.analysis.IkAnalyzerProvider
ik_max_word:
type: ik
use_smart: false
ik_smart:
type: ik
use_smart: true
复制代码
或者
index.analysis.analyzer.ik.type : “ik”
复制代码
http://192.168.1.180:9200/_analyze?analyzer=standard&pretty=true&text=我爱你中国
{
"tokens" : [ {
"token" : "我",
"start_offset" : 0,
"end_offset" : 1,
"type" : "<IDEOGRAPHIC>",
"position" : 0
}, {
"token" : "爱",
"start_offset" : 1,
"end_offset" : 2,
"type" : "<IDEOGRAPHIC>",
"position" : 1
}, {
"token" : "你",
"start_offset" : 2,
"end_offset" : 3,
"type" : "<IDEOGRAPHIC>",
"position" : 2
}, {
"token" : "中",
"start_offset" : 3,
"end_offset" : 4,
"type" : "<IDEOGRAPHIC>",
"position" : 3
}, {
"token" : "国",
"start_offset" : 4,
"end_offset" : 5,
"type" : "<IDEOGRAPHIC>",
"position" : 4
} ]
}
复制代码
http://121.42.155.213:9200/_analyze?analyzer=ik&pretty=true&text=我爱你中国
{
"tokens" : [ {
"token" : "我爱你",
"start_offset" : 0,
"end_offset" : 3,
"type" : "CN_WORD",
"position" : 0
}, {
"token" : "爱你",
"start_offset" : 1,
"end_offset" : 3,
"type" : "CN_WORD",
"position" : 1
}, {
"token" : "中国",
"start_offset" : 3,
"end_offset" : 5,
"type" : "CN_WORD",
"position" : 2
} ]
}
复制代码
使用第三方工具类库elasticsearch-jdbc实现MySql到elasticsearch的同步。
centos7.五、JDK八、elasticsearch-jdbc-2.3.2.0
#!/bin/sh
# elasticsearch-jdbc 安装路径
bin=/home/elasticsearch-jdbc-2.3.2.0/bin
lib=/home/elasticsearch-jdbc-2.3.2.0/lib
echo '{ "type" : "jdbc", "jdbc": { # 若是数据库中存在Json文件 这里设置成false,不然会同步出错 "detect_json":false, "url":"jdbc:mysql://127.0.0.1:3306/itstyle_log??useUnicode=true&characterEncoding=utf-8&useSSL=false&allowMultiQueries=true", "user":"root", "password":"root", # 若是想自动生成_id,去掉第一个获取字段便可;若是想Id做为主键,把id设置为_id便可 "sql":"SELECT id AS _id,id,user_id AS userId ,username,operation,time,method,params,ip,device_type AS deviceType,log_type AS logType,exception_detail AS exceptionDetail, gmt_create AS gmtCreate,plat_from AS platFrom FROM sys_log", "elasticsearch" : { "host" : "127.0.0.1",#elasticsearch服务地址 "port" : "9300" #远程elasticsearch服务 此端口必定要开放 }, "index" : "elasticsearch",# 索引名至关于库 "type" : "sysLog" # 类型名至关于表 } }' | java \
-cp "${lib}/*" \
-Dlog4j.configurationFile=${bin}/log4j2.xml \
org.xbib.tools.Runner \
org.xbib.tools.JDBCImporter
复制代码
chmod +x mysql_import_es.sh
./mysql_import_es.sh
复制代码
Spring-data-elasticsearch是Spring提供的操做ElasticSearch的数据层,封装了大量的基础操做,经过它能够很方便的操做ElasticSearch的数据。
/**
* @param <T>
* @param <ID>
* @author Rizwan Idrees
* @author Mohsin Husen
*/
@NoRepositoryBean
public interface ElasticsearchRepository<T, ID extends Serializable> extends ElasticsearchCrudRepository<T, ID> {
<S extends T> S index(S entity);
Iterable<T> search(QueryBuilder query);
Page<T> search(QueryBuilder query, Pageable pageable);
Page<T> search(SearchQuery searchQuery);
Page<T> searchSimilar(T entity, String[] fields, Pageable pageable);
void refresh();
Class<T> getEntityClass();
}
复制代码
ElasticsearchRepository里面有几个特殊的search方法,这些是ES特有的,和普通的JPA区别的地方,用来构建一些ES查询的。 主要是看QueryBuilder和SearchQuery两个参数,要完成一些特殊查询就主要看构建这两个参数。
通常状况下,咱们不是直接是new NativeSearchQuery,而是使用NativeSearchQueryBuilder。 经过NativeSearchQueryBuilder.withQuery(QueryBuilder1).withFilter(QueryBuilder2).withSort(SortBuilder1).withXXXX().build();这样的方式来完成NativeSearchQuery的构建。
ElasticSearchTemplate更可能是对ESRepository的补充,里面提供了一些更底层的方法。
这里咱们主要实现快读批量插入的功能,插入20万条数据,本地向外网服务器(1核1G),用时60s+,一分钟左右的时间。虽然索引库容量有增长,可是等了大约10分钟左右的时间才能搜索出来。
//批量同步或者插入数据
public void bulkIndex(List<SysLogs> logList) {
long start = System.currentTimeMillis();
int counter = 0;
try {
List<IndexQuery> queries = new ArrayList<>();
for (SysLogs log : logList) {
IndexQuery indexQuery = new IndexQuery();
indexQuery.setId(log.getId()+ "");
indexQuery.setObject(log);
indexQuery.setIndexName("elasticsearch");
indexQuery.setType("sysLog");
//也可使用IndexQueryBuilder来构建
//IndexQuery index = new IndexQueryBuilder().withId(person.getId() + "").withObject(person).build();
queries.add(indexQuery);
if (counter % 1000 == 0) {
elasticSearchTemplate.bulkIndex(queries);
queries.clear();
System.out.println("bulkIndex counter : " + counter);
}
counter++;
}
if (queries.size() > 0) {
elasticSearchTemplate.bulkIndex(queries);
}
long end = System.currentTimeMillis();
System.out.println("bulkIndex completed use time:"+ (end-start));
} catch (Exception e) {
System.out.println("IndexerService.bulkIndex e;" + e.getMessage());
throw e;
}
}
复制代码
见包:com.itstyle.es.common.redis
监听配置 RedisListener:
@Component
public class RedisListener {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisListener.class);
@Bean
RedisMessageListenerContainer container(
RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
LOGGER.info("启动监听");
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic("itstyle_log"));
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
@Bean
Receiver receiver(CountDownLatch latch) {
return new Receiver(latch);
}
@Bean
CountDownLatch latch() {
return new CountDownLatch(1);
}
@Bean
StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
}
复制代码
日志接收Receiver:
public class Receiver {
private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
@Autowired
private ElasticLogRepository elasticLogRepository;
private CountDownLatch latch;
@Autowired
public Receiver(CountDownLatch latch) {
this.latch = latch;
}
public void receiveMessage(String message) {
LOGGER.info("接收log消息 <{}>",message);
if(message == null){
LOGGER.info("接收log消息 <" + null + ">");
}else {
ObjectMapper mapper = new ObjectMapper();
try {
SysLogs log = mapper.readValue(message, SysLogs.class);
elasticLogRepository.save(log);
LOGGER.info("接收log消息内容 <{}>",log.getOperation());
} catch (JsonParseException e) {
e.printStackTrace();
} catch (JsonMappingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
latch.countDown();
}
}
复制代码
测试 LogController:http://lip:port/redisLog
见包: com.itstyle.es.common.kafka