flume+kafka+zookeeper+spark+infuxdb+grafana+kapacitor监控平台

 

架构图(资源问题一切从简)html

 

下载必须的包  (注意 kafka spark对jdk,scala 版本有要求,官网查看)java

 

 wget https://dl.influxdata.com/influxdb/releases/influxdb-1.5.2.x86_64.rpm 
 yum localinstall influxdb-1.5.2.x86_64.rpm 
 wget https://dl.influxdata.com/kapacitor/releases/kapacitor-1.4.1.x86_64.rpm
yum localinstall kapacitor-1.4.1.x86_64.rpm 
因为网络问题同一下载了再上传到服务器

192.168.10.129  flume infuxdb grafana kapacitorpython

192.168.10.130  kafka sparklinux

安装flume-ng(须要安装jdk1.8)nginx

flume是二进制包直接解压就行数据库

cd conf;cp flume-env.sh.template  flume-env.shapache

echo -e 'export JAVA_HOME=/opt/jdk1.8\nexport JAVA_OPTS="-Xms500m -Xmx1000m -Dcom.sun.management.jmxremote"'>>flume-env.shjson

flume-ng详细介绍可参考 https://blog.csdn.net/zhaodedong/article/details/52541688bootstrap

source 详解tomcat

Source类型    说明
Avro Source    支持Avro协议(其实是Avro RPC),内置支持
Thrift Source    支持Thrift协议,内置支持
Exec Source    基于Unix的command在标准输出上生产数据
JMS Source    从JMS系统(消息、主题)中读取数据,ActiveMQ已经测试过
Spooling Directory Source    监控指定目录内数据变动
Twitter 1% firehose Source    经过API持续下载Twitter数据,试验性质
Netcat Source    监控某个端口,将流经端口的每个文本行数据做为Event输入
Sequence Generator Source    序列生成器数据源,生产序列数据
Syslog Sources    读取syslog数据,产生Event,支持UDP和TCP两种协议
HTTP Source    基于HTTP POST或GET方式的数据源,支持JSON、BLOB表示形式
Legacy Sources    兼容老的Flume OG中Source(0.9.x版本)
View Code

channel详解

Channel类型    说明
Memory Channel    Event数据存储在内存中
JDBC Channel    Event数据存储在持久化存储中,当前Flume Channel内置支持Derby
File Channel    Event数据存储在磁盘文件中
Spillable Memory Channel    Event数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件(当前试验性的,不建议生产环境使用)
Pseudo Transaction Channel    测试用途
Custom Channel    自定义Channel实现
View Code

sink详解

Sink类型    说明
HDFS Sink    数据写入HDFS
Logger Sink    数据写入日志文件
Avro Sink    数据被转换成Avro Event,而后发送到配置的RPC端口上
Thrift Sink    数据被转换成Thrift Event,而后发送到配置的RPC端口上
IRC Sink    数据在IRC上进行回放
File Roll Sink    存储数据到本地文件系统
Null Sink    丢弃到全部数据
HBase Sink    数据写入HBase数据库
Morphline Solr Sink    数据发送到Solr搜索服务器(集群)
ElasticSearch Sink    数据发送到Elastic Search搜索服务器(集群)
Kite Dataset Sink    写数据到Kite Dataset,试验性质的
Custom Sink    自定义Sink实现
View Code

 flume.conf配置

# logser能够看作是flume服务的名称,每一个flume都由sources、channels和sinks三部分组成 ,source sinks能够多个用空格隔开
# # sources能够看作是数据源头、channels是中间转存的渠道、sinks是数据后面的去向
logser.sources = src_dir
logser.sinks = sink_kfk
logser.channels = ch
# # source
# # 源头类型
logser.sources.src_dir.type = TAILDIR
# # 记录全部监控的文件信息 
logser.sources.src_dir.positionFile=/opt/flume-1.8.0/logs/taildir_position.json
logser.sources.src_dir.filegroups = f1
logser.sources.src_dir.filegroups.f1=/opt/logs/.*
logser.sources.src_dir.filegroups.f1.headerKey1 = tomcatAPP
logser.sources.src_dir.filegroups.fileHeader = true
# # channel
logser.channels.ch.type = memory
logser.channels.ch.capacity = 10000
logser.channels.ch.transactionCapacity = 1000
# # kfk sink
# # 指定sink类型是Kafka,说明日志最后要发送到Kafka
logser.sinks.sink_kfk.type = org.apache.flume.sink.kafka.KafkaSink
# # Kafka servers#多个用逗号区分
logser.sinks.sink_kfk.kafka.bootstrap.servers= 192.168.10.130:9092
logser.sinks.sink_kfk.kafka.topic=tomcatCom
# # Bind the source and sink to the channel
logser.sources.src_dir.channels = ch
logser.sinks.sink_kfk.channel = ch

启动flume(先启动kafka)

nohup  bin/flume-ng agent --conf conf/ --conf-file conf/flume.conf --name logser -Dflume.root.logger=INFO,console >logs/fume-ng.log 2>&1 &

java log4j延伸

log4j.properties

log4j.rootlogger=INFO,stdout
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = flume //远程flume agent avro主机名
log4j.appender.flume.Port = 41414 /远程flume agent avro监听端口
log4j.appender.flume.UnsafeMode = true
/* 注意jar应用须要加依赖
<dependency>  
    <groupId>org.apache.flume.flume-ng-clients</groupId>  
    <artifactId>flume-ng-log4jappender</artifactId>  
    <version>与flume版本同样便可</version>  
</dependency>  
*/

flume.conf

agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink

#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=192.168.10.129
agent1.sources.avro-source.port=41414

#define channel
agent1.channels,logger-channel.type=memory

#define sink
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic=mytest #kafka topic
agent1.sinks.kafka-sink.bootstrap.servers=192.168.10.130:9092
agent1.sinks.kafka-sink.flumeBatchSize=20  #一批中处理多少条消息。较大的批次能够提升吞吐量,默认100
agent1.sinks.kafka-sink.producer.acks=1 #默认为1

# Bind the source and sink to the channel
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel

 

安装kafka

安装scalca环境

yum localinstall scala-2.11.7.rpm 

安装 jdk环境

export JAVA_HOME=/opt/jdk1.8
export PATH=$PATH:/opt/jdk1.8/bin

直接解压kafka包

ZK使用的是kafka自带的

zookeeper.properties

dataDir=/opt/kafka_2.11/zk
dataLogDir=/opt/kafka_2.11/zk/logs
clientPort=2181
maxClientCnxns=0

server.properties (本地测试环境单节点)

broker.id=0
#默认状况下Producer往一个不存在的Topic发送message时会自动建立这个Topic
#默认删除时,会出现“marked for deletion”提示,只是将该topic标记为删除,使用list命令仍然能看到
#建立调整为不自动建立,删除怕误删因此保持默认不修改
auto.create.topics.enable=false
delete.topic.enable=false
port=9092
host.name=192.168.10.130
#默认是域名 外部网络或者未配置hostname映射,报错org.apache.kafka.common.errors.TimeoutException: Batch Expired
advertised.host.name=192.168.10.130
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka_2.11/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.10.130:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

生产消费配置

#producer.properties
bootstrap.servers=192.168.10.130:9092
compression.type=none
#consumer.properties
bootstrap.servers=192.168.10.130:9092
group.id=test-consumer-group

常见命令

#启动zk
bin/zookeeper-server-start.sh config/zookeeper.properties >> /opt/kafka_2.11/zk/logs/zk.log &
#启动kafka
bin/kafka-server-start.sh -daemon config/server.properties &
#建立topic
bin/kafka-topics.sh --create --zookeeper 192.168.10.130:2181 --replication-factor 1 --partitions 1 --topic tomcatCom
#查看topic
bin/kafka-topics.sh --list --zookeeper 192.168.10.130:2181
bin/kafka-topics.sh --describe --zookeeper 192.168.10.130:2181 --topic test 
#模拟生产
bin/kafka-console-producer.sh --broker-list 192.168.10.130:9092 --topic test 
#模拟消费
bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.130:9092 --topic tomcatCom  --from-beginning --consumer.config config/consumer.properties

安装spark

安装scala,jdk,python 版本要求看官网

资源有限本地master采用local执行

安装infuxdb+grafana+kapacitor

yum localinstall influxdb-1.5.2.x86_64.rpm  grafana-5.1.3-1.x86_64.rpm  kapacitor-1.4.1.x86_64.rpm 

influxdb 

service influxdb start

配置文件可参考http://www.ywnds.com/?p=10763

简单运用可参考https://www.linuxdaxue.com/influxdb-study-series-manual.html

鉴权参考https://blog.csdn.net/caodanwang/article/details/51967393

influx -host '192.168.10.129' -port '8086'

influx -host '192.168.10.129' -port '8086' -username 'root' -password '123456'

curl -POST http://192.168.10.129:8086/query --data-urlencode "q=CREATE DATABASE mydb"

 数据保留策略

name--名称,此示例名称为 default
duration--持续时间,0表明无限制
shardGroupDuration--shardGroup的存储时间,shardGroup是InfluxDB的一个基本储存结构,应该大于这个时间的数据在查询效率上应该有所下降。
replicaN--全称是REPLICATION,副本个数
default--是不是默认策略
SHOW RETENTION POLICIES ON nginxlog_clear 
CREATE RETENTION POLICY "2_hours" ON "nginxlog_clear" DURATION 2h REPLICATION 1 DEFAULT   -- 建立name为2_hours保留策略2小时的默认规则 --
ALTER RETENTION POLICY "2_hours" ON "nginxlog_clear" DURATION 4h DEFAULT -- 修改成4小时 --
drop retention POLICY "2_hours" ON "nginxlog_clear" -- 删除 删除了也会采用这策略报错找不到2_hours策略须要将autogen的default改成true--

 连续查询

show continuous queries 
CREATE CONTINUOUS QUERY cq_1m ON nginxlog_clear BEGIN SELECT count(count) AS count_404 INTO current_data.two_year.ten_min_count FROM nginxlog_clear.autogen.nginx WHERE status = '404' GROUP BY time(1m) END
-- cq_1m 连续查询名字 nginxlog_clear,current_data数据库 --
-- two_year,autogen保留策略--
-- ten_min_count,nginx表 --
DROP CONTINUOUS QUERY <cq_name> ON <database_name>
-- 删除,持续查询不能修改只能删除从新配置--

influxdb.conf 一些配置

bind-address = "192.168.10.129:8088"
[meta]
dir = "/opt/influxdb/meta"
[data]
dir = "/opt/influxdb/data"
wal-dir = "/opt/influxdb/wal"
[http]
enabled = true
bind-address = "192.168.10.129:8086"
access-log-path = "/opt/influxdb/data/logs"
[continuous_queries]
enabled = true
log-enabled = true
run-interval = "1m"  #能够根据最短的持续查询group by time(1m) 设定检测时间

 grafana配置

[server]
domain = 192.168.10.129
[smtp]
enabled = true 
host = 192.168.10.129:25  #注意postfix邮箱改为inet_interfaces = 192.168.10.129
password = xian6630753  
from_address = admin@grafana.com
from_name = Grafana

 配置图形

 

配置告警邮箱

 spark

 NginxClear  //这里正则匹配用正则结合模式匹配更优

 

package com.sgm.spark
import scala.util
import scalaj.http._
import org.apache.spark.SparkConf
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
object NginxClear {
  def main(args: Array[String]) {
    if (args.length !=2) {
      System.err.println(s"""
                            |Usage: DirectKafkaWordCount <brokers> <topics>
                            |  <bootstrap.servers> is a list of one or more Kafka brokers
                            |  <topics> is a list of one or more kafka topics to consume from
                            |
        """.stripMargin)
      System.exit(1)
    }


    val Array(brokers, topics) = args

    // Create context with 30 second batch interval
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(30))
    ssc.checkpoint("C:\\Users\\itlocal\\IdeaProjects\\nginxlog\\checkpoint")
    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (true: java.lang.Boolean)  //采用checkpoint提交记录的偏移量,没有的话执行auto.offset.reset
    )
    val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
    val logs=messages.map(_.value)
    /*改用正则匹配+模式匹配结合
    val clearDate=logs.map(line=> {
      val info = line.split("\n")
      val infoPattern ="""(\d+\.\d+\.\d+\.\d+)(.*)(\d+/[A-Z,a-z]+/\d+\:\d+\:\d+\:\d+)(.*)([1-5]{1}[0-9]{2}) (\d+) (\"http.*\") (\".*\") (.*)""".r
      info.foreach(x=>x match {
        case infoPattern(ip,none1,currentTime,none2,respCode,requestTime,url,none3,upstreamTIME)=>
          println(ip+"\t"+currentTime+"\t"+respCode+"\t"+requestTime+"\t"+url+"\t"+upstreamTIME+"\t")
        case _=>println("none")
      })
    }) */
    val cleanDate=logs.map(mapFunc = line => {
      val influxdb_url = "http://192.168.10.129:8086/write?db=nginxlog_clear"
      val infos = line.split(" ")
      if (infos.length>10) {
        val actime = infos(3).split(" ")(0).split("\\[")(1)
        val random_num = (new util.Random).nextInt(999999)
        val curent_timestamp = (DateUtil.getTime(actime).toString + "000000").toLong + random_num  //influxdb精确到纳秒,而时间戳到毫秒,不转换成纳秒时间戳不识别
        val urlPattern="\"http.*".r
        val ipPattern="^[1-5]{1}[0-9]{2}$".r
        //infos.foreach(println)
        var initUrl="\"none\""
        var initStatus="\"none\""
        infos.foreach(info=>urlPattern.findAllIn(info).foreach(x=>initUrl=Some(x).get))
        infos.foreach(info=>ipPattern.findAllIn(info).foreach(x=>initStatus=x))
        //println(initStatus)
        val date = s"nginxLog,ip=${infos(0)},acess_time=${DateUtil.parseTime(actime)},status=$initStatus,upstream_time=${infos.last} send_bytes=${infos(9)},url=$initUrl,count=1 $curent_timestamp"
        println(date)
        Http(influxdb_url).postData(date).header("content-type", "application/json").asString.code
      }
    })//.filter(clearlog=>clearlog.statusCode != 200)
    cleanDate.count().print()
    ssc.start()
    ssc.awaitTermination()
  }
}    

 

时间格式转换

DateUtil

package com.sgm.spark


import java.util.{Date, Locale}

import org.apache.commons.lang3.time.FastDateFormat

object DateUtil {
  val curent_day=FastDateFormat.getInstance("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH)
  val targe_fomat=FastDateFormat.getInstance("yyyyMMddHHmmss")
  def getTime(time:String)={    //转化为时间戳
    curent_day.parse(time).getTime
  }
  def parseTime(time:String)={
    targe_fomat.format(new Date(getTime(time)))
  }
  def main(args: Array[String]): Unit = {
    println(parseTime("04/MAY/2017:09:22:05"))
  }
}

 maven安装打包 参考  scala 开发环境安装

submit提交

bin/spark-submit --master local[2] --class com.sgm.spark.NginxClear--name NginxClear  /root/bigdata-1.0-SNAPSHOT.jar
相关文章
相关标签/搜索