好程序员大数据学习路线之Logstach与flume对比

好程序员大数据学习路线之Logstach与flume对比,没有集群的概念,logstach与flume都称为组
logstash是用JRuby语言开发的
组件的对比:
  logstach : input filter output
  flume : source channel sink
优劣对比:
logstach :
安装简单,安装体积小
有filter组件,使得该工具具备数据过滤,数据切分的功能
能够与ES无缝结合
具备数据容错功能,在数据采集的时候,若是发生宕机或断开的状况,会断点续传(会记录读取的偏移量)
  综上,该工具主要用途为采集日志数据
flume:
高可用方面要比logstach强大
flume一直在强调数据的安全性,flume在数据传输过程当中是由事务控制的
flume能够应用在多类型数据传输领域
数据对接
将logstach.gz文件上传解压便可
能够在logstach目录下建立conf文件,用来存储配置文件
一 命令启动
1.bin/logstash -e 'input { stdin {} } output { stdout{} }'
  stdin/stdout(标准输入输出流)
hello xixi
2018-09-12T21:58:58.649Z hadoop01 hello xixi
hello haha
2018-09-12T21:59:19.487Z hadoop01 hello haha
2.bin/logstash -e 'input { stdin {} } output { stdout{codec => rubydebug} }'
hello xixi
{node

"message" => "hello xixi",
  "@version" => "1",
"@timestamp" => "2018-09-12T22:00:49.612Z",
      "host" => "hadoop01"

}
3.es集群中 ,须要启动es集群
  bin/logstash -e 'input { stdin {} } output { elasticsearch {hosts => ["192.168.88.81:9200"]} stdout{} }'
输入命令后,es自动生成index,自动mapping.
hello haha
2018-09-12T22:13:05.361Z hadoop01 hehello haha
  bin/logstash -e 'input { stdin {} } output { elasticsearch {hosts => ["192.168.88.81:9200", "192.168.88.82:9200"]} stdout{} }'
4.kafka集群中,启动kafka集群
  bin/logstash -e 'input { stdin {} } output { elasticsearch {hosts => ["192.168.88.81:9200", "192.168.88.82:9200"]} stdout{} }'
二 配置文件启动
须要启动zookeeper集群,kafka集群,es集群
1.与kafka数据对接
vi logstash-kafka.conf
  启动
  bin/logstash -f logstash-kafka.conf (-f:指定文件)
  在另外一节点上启动kafka消费命令
input {
file {程序员

path => "/root/data/test.log"
discover_interval => 5
start_position => "beginning"

}
}json

output {bootstrap

kafka {
  topic_id => "test1"
  codec => plain {
    format => "%{message}"
    charset => "UTF-8"
  }
  bootstrap_servers => "node01:9092,node02:9092,node03:9092"
}

}
2.与kafka-es数据对接
vi logstash-es.conf安全

启动logstash

bin/logstash -f logstash-es.conf
  在另外一节点上启动kafka消费命令
input {ruby

file {
    type => "gamelog"
    path => "/log/*/*.log"
    discover_interval => 10
    start_position => "beginning" 
}

}app

output {elasticsearch

elasticsearch {
    index => "gamelog-%{+YYYY.MM.dd}"
    hosts => ["node01:9200", "node02:9200", "node03:9200"]
}

}
数据对接过程
logstach节点存放: 哪一个节点空闲资源多放入哪一个节点 (灵活存放)
图片描述工具

1.启动logstach监控logserver目录,把数据采集到kafka
2.启动另一个logstach,监控kafka某个topic数据,把他采集到elasticsearch
数据对接案例
须要启动两个logstach,调用各个配置文件,进行对接
1.采集数据到kafka
  cd conf
  建立配置文件: vi gs-kafka.conf
input {
file {oop

codec => plain {
  charset => "GB2312"
}
path => "/root/basedir/*/*.txt"
discover_interval => 5
start_position => "beginning"

}
}

output {

kafka {
  topic_id => "gamelogs"
  codec => plain {
    format => "%{message}"
    charset => "GB2312"
  }
  bootstrap_servers => "node01:9092,node02:9092,node03:9092"
}

}
  建立kafka对应的topic
bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 1 --topic gamelogs
2.在hadoop01上启动logstach
  bin/logstash -f conf/gs-kafka.conf
3.在hadoop02上启动另一个logstach
  cd logstach/conf
  vi kafka-es.conf
input {
kafka {

type => "accesslogs"
codec => "plain"
auto_offset_reset => "smallest"
group_id => "elas1"
topic_id => "accesslogs"
zk_connect => "node01:2181,node02:2181,node03:2181"

}

kafka {

type => "gamelogs"
auto_offset_reset => "smallest"
codec => "plain"
group_id => "elas2"
topic_id => "gamelogs"
zk_connect => "node01:2181,node02:2181,node03:2181"

}
}

filter {
if [type] == "accesslogs" {

json {
  source => "message"
  remove_field => [ "message" ]
  target => "access"
}

}

if [type] == "gamelogs" {

mutate {
  split => { "message" => "    " }
  add_field => {
    "event_type" => "%{message[3]}"
    "current_map" => "%{message[4]}"
    "current_X" => "%{message[5]}"
    "current_y" => "%{message[6]}"
    "user" => "%{message[7]}"
    "item" => "%{message[8]}"
    "item_id" => "%{message[9]}"
    "current_time" => "%{message[12]}"
 }
 remove_field => [ "message" ]

}
}
}

output {

if [type] == "accesslogs" {

elasticsearch {
  index => "accesslogs"
  codec => "json"
  hosts => ["node01:9200", "node02:9200", "node03:9200"]
}

}

if [type] == "gamelogs" {

elasticsearch {
  index => "gamelogs1"
  codec => plain {
    charset => "UTF-16BE"
  }
  hosts => ["node01:9200", "node02:9200", "node03:9200"]
}

}
}
   bin/logstash -f conf/kafka-es.conf
4.修改basedir文件中任意数据便可产生es的index文件
图片描述

5.网页数据存储在设置的/data/esdata中
6.在网页中查找指定字段
  默认分词器为term,只能查找单个汉字,query_string能够查找全汉字
图片描述

相关文章
相关标签/搜索