logstash将Kafka中的日志数据订阅到HDFS

前言:一般状况下,咱们将Kafka的日志数据经过logstash订阅输出到ES,而后用Kibana来作可视化分析,这就是咱们一般用的ELK日志分析模式。可是基于ELK的日志分析,一般比较经常使用的是实时分析,日志存个十天半个月都会删掉。那么在一些状况下,我须要将日志数据也存一份到我HDFS,积累到比较久的时间作半年、一年甚至更长时间的大数据分析。下面就来讲如何最简单的经过logstash将kafka中的数据订阅一份到hdfs。node

wKiom1d8ekGgX_HBAABasyVQt-E025.png-wh_50

一:安装logstash(下载tar包安装也行,我直接yum装了)git

#yum install logstash-2.1.1


二:从github上克隆代码github

#git clone  https://github.com/heqin5136/logstash-output-webhdfs-discontinued.git
#ls
logstash-output-webhdfs-discontinued


三:安装logstash-output-webhdfs插件web

#cd logstash-output-webhdfs-discontinued
logstash的bin目录下有个plugin,使用plugin来安装插件
#/opt/logstash/bin/plugin install logstash-output-webhdfs

wKiom1d8b9HzWVqnAAAN1y8C7dQ438.png-wh_50


四:配置logstashjson

#vim /etc/logstash/conf.d/logstash.conf
input {
  kafka {
    zk_connect => '10.10.10.1:2181,10.10.10.2:2181,10.10.10.3:2181'   #kafka的zk集群地址
    group_id => 'hdfs'                     #消费者组,不要和ELK上的消费者同样
    topic_id => 'apiAppWebCms-topic'       #topic 
    consumer_id => 'logstash-consumer-10.10.8.8'   #消费者id,自定义,我写本机ip。
    consumer_threads => 1
    queue_size => 200
    codec => 'json'
  }
}

output {            
#若是你一个topic中会有好几种日志,能够提取出来分开存储在hdfs上。
if [type] == "apiNginxLog" {
    webhdfs {
           workers => 2
           host => "10.10.8.1"        #hdfs的namenode地址    
           port => 50070              #webhdfs端口
           user => "hdfs"             #hdfs运行的用户啊,以这个用户的权限去写hdfs。
           path => "/data/logstash/apiNginxLog-%{+YYYY}-%{+MM}-%{+dd}/logstash-%{+HH}.log 
             #按天建目录,按小时建log文件。
           flush_size => 500
#       compression => "snappy"             #压缩格式,能够不压缩
        idle_flush_time => 10
        retry_interval => 0.5
       }
   }
if [type] == "apiAppLog" {
    webhdfs {
        workers => 2
        host => "10.64.8.1"
        port => 50070
        user => "hdfs"
        path => "/data/logstash/api/apiAppLog-%{+YYYY}-%{+MM}-%{+dd}.log"
        flush_size => 500
#        compression => "snappy"
        idle_flush_time => 10
        retry_interval => 0.5
       }
   }
  stdout { codec => rubydebug }
}

 

五:启动logstashvim

#/etc/init.d/logstash start


已经能够成功写入了。api

wKiom1d8d97DWrGPAABNBo8GGGY014.png

wKioL1d8eFTSO0dwAACYhM0Li-E358.png-wh_50

相关文章
相关标签/搜索