架构图(资源问题一切从简)html
下载必须的包 (注意 kafka spark对jdk,scala 版本有要求,官网查看)java
192.168.10.129 flume infuxdb grafana kapacitorpython
192.168.10.130 kafka sparklinux
安装flume-ng(须要安装jdk1.8)nginx
flume是二进制包直接解压就行数据库
cd conf;cp flume-env.sh.template flume-env.shapache
echo -e 'export JAVA_HOME=/opt/jdk1.8\nexport JAVA_OPTS="-Xms500m -Xmx1000m -Dcom.sun.management.jmxremote"'>>flume-env.shjson
flume-ng详细介绍可参考 https://blog.csdn.net/zhaodedong/article/details/52541688bootstrap
source 详解tomcat
Source类型 说明 Avro Source 支持Avro协议(其实是Avro RPC),内置支持 Thrift Source 支持Thrift协议,内置支持 Exec Source 基于Unix的command在标准输出上生产数据 JMS Source 从JMS系统(消息、主题)中读取数据,ActiveMQ已经测试过 Spooling Directory Source 监控指定目录内数据变动 Twitter 1% firehose Source 经过API持续下载Twitter数据,试验性质 Netcat Source 监控某个端口,将流经端口的每个文本行数据做为Event输入 Sequence Generator Source 序列生成器数据源,生产序列数据 Syslog Sources 读取syslog数据,产生Event,支持UDP和TCP两种协议 HTTP Source 基于HTTP POST或GET方式的数据源,支持JSON、BLOB表示形式 Legacy Sources 兼容老的Flume OG中Source(0.9.x版本)
channel详解
Channel类型 说明
Memory Channel Event数据存储在内存中
JDBC Channel Event数据存储在持久化存储中,当前Flume Channel内置支持Derby
File Channel Event数据存储在磁盘文件中
Spillable Memory Channel Event数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件(当前试验性的,不建议生产环境使用)
Pseudo Transaction Channel 测试用途
Custom Channel 自定义Channel实现
sink详解
Sink类型 说明
HDFS Sink 数据写入HDFS
Logger Sink 数据写入日志文件
Avro Sink 数据被转换成Avro Event,而后发送到配置的RPC端口上
Thrift Sink 数据被转换成Thrift Event,而后发送到配置的RPC端口上
IRC Sink 数据在IRC上进行回放
File Roll Sink 存储数据到本地文件系统
Null Sink 丢弃到全部数据
HBase Sink 数据写入HBase数据库
Morphline Solr Sink 数据发送到Solr搜索服务器(集群)
ElasticSearch Sink 数据发送到Elastic Search搜索服务器(集群)
Kite Dataset Sink 写数据到Kite Dataset,试验性质的
Custom Sink 自定义Sink实现
flume.conf配置
# logser能够看作是flume服务的名称,每一个flume都由sources、channels和sinks三部分组成 ,source sinks能够多个用空格隔开 # # sources能够看作是数据源头、channels是中间转存的渠道、sinks是数据后面的去向 logser.sources = src_dir logser.sinks = sink_kfk logser.channels = ch # # source # # 源头类型 logser.sources.src_dir.type = TAILDIR # # 记录全部监控的文件信息 logser.sources.src_dir.positionFile=/opt/flume-1.8.0/logs/taildir_position.json logser.sources.src_dir.filegroups = f1 logser.sources.src_dir.filegroups.f1=/opt/logs/.* logser.sources.src_dir.filegroups.f1.headerKey1 = tomcatAPP logser.sources.src_dir.filegroups.fileHeader = true # # channel logser.channels.ch.type = memory logser.channels.ch.capacity = 10000 logser.channels.ch.transactionCapacity = 1000 # # kfk sink # # 指定sink类型是Kafka,说明日志最后要发送到Kafka logser.sinks.sink_kfk.type = org.apache.flume.sink.kafka.KafkaSink # # Kafka servers#多个用逗号区分 logser.sinks.sink_kfk.kafka.bootstrap.servers= 192.168.10.130:9092 logser.sinks.sink_kfk.kafka.topic=tomcatCom # # Bind the source and sink to the channel logser.sources.src_dir.channels = ch logser.sinks.sink_kfk.channel = ch
启动flume(先启动kafka)
nohup bin/flume-ng agent --conf conf/ --conf-file conf/flume.conf --name logser -Dflume.root.logger=INFO,console >logs/fume-ng.log 2>&1 &
java log4j延伸
log4j.properties
log4j.rootlogger=INFO,stdout log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.Hostname = flume //远程flume agent avro主机名 log4j.appender.flume.Port = 41414 /远程flume agent avro监听端口 log4j.appender.flume.UnsafeMode = true /* 注意jar应用须要加依赖 <dependency> <groupId>org.apache.flume.flume-ng-clients</groupId> <artifactId>flume-ng-log4jappender</artifactId> <version>与flume版本同样便可</version> </dependency> */
flume.conf
agent1.sources=avro-source agent1.channels=logger-channel agent1.sinks=kafka-sink #define source agent1.sources.avro-source.type=avro agent1.sources.avro-source.bind=192.168.10.129 agent1.sources.avro-source.port=41414 #define channel agent1.channels,logger-channel.type=memory #define sink agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink agent1.sinks.kafka-sink.topic=mytest #kafka topic agent1.sinks.kafka-sink.bootstrap.servers=192.168.10.130:9092 agent1.sinks.kafka-sink.flumeBatchSize=20 #一批中处理多少条消息。较大的批次能够提升吞吐量,默认100 agent1.sinks.kafka-sink.producer.acks=1 #默认为1 # Bind the source and sink to the channel agent1.sources.avro-source.channels=logger-channel agent1.sinks.kafka-sink.channel=logger-channel
安装kafka
安装scalca环境
yum localinstall scala-2.11.7.rpm
安装 jdk环境
export JAVA_HOME=/opt/jdk1.8
export PATH=$PATH:/opt/jdk1.8/bin
直接解压kafka包
ZK使用的是kafka自带的
zookeeper.properties
dataDir=/opt/kafka_2.11/zk dataLogDir=/opt/kafka_2.11/zk/logs clientPort=2181 maxClientCnxns=0
server.properties (本地测试环境单节点)
broker.id=0 #默认状况下Producer往一个不存在的Topic发送message时会自动建立这个Topic #默认删除时,会出现“marked for deletion”提示,只是将该topic标记为删除,使用list命令仍然能看到 #建立调整为不自动建立,删除怕误删因此保持默认不修改 auto.create.topics.enable=false delete.topic.enable=false port=9092 host.name=192.168.10.130 #默认是域名 外部网络或者未配置hostname映射,报错org.apache.kafka.common.errors.TimeoutException: Batch Expired advertised.host.name=192.168.10.130 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/opt/kafka_2.11/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=192.168.10.130:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0
生产消费配置
#producer.properties bootstrap.servers=192.168.10.130:9092 compression.type=none #consumer.properties bootstrap.servers=192.168.10.130:9092 group.id=test-consumer-group
常见命令
#启动zk bin/zookeeper-server-start.sh config/zookeeper.properties >> /opt/kafka_2.11/zk/logs/zk.log & #启动kafka bin/kafka-server-start.sh -daemon config/server.properties & #建立topic bin/kafka-topics.sh --create --zookeeper 192.168.10.130:2181 --replication-factor 1 --partitions 1 --topic tomcatCom #查看topic bin/kafka-topics.sh --list --zookeeper 192.168.10.130:2181 bin/kafka-topics.sh --describe --zookeeper 192.168.10.130:2181 --topic test #模拟生产 bin/kafka-console-producer.sh --broker-list 192.168.10.130:9092 --topic test #模拟消费 bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.130:9092 --topic tomcatCom --from-beginning --consumer.config config/consumer.properties
安装spark
安装scala,jdk,python 版本要求看官网
资源有限本地master采用local执行
安装infuxdb+grafana+kapacitor
yum localinstall influxdb-1.5.2.x86_64.rpm grafana-5.1.3-1.x86_64.rpm kapacitor-1.4.1.x86_64.rpm
influxdb
service influxdb start
配置文件可参考http://www.ywnds.com/?p=10763
简单运用可参考https://www.linuxdaxue.com/influxdb-study-series-manual.html
鉴权参考https://blog.csdn.net/caodanwang/article/details/51967393
influx -host '192.168.10.129' -port '8086'
influx -host '192.168.10.129' -port '8086' -username 'root' -password '123456'
curl -POST http://192.168.10.129:8086/query --data-urlencode "q=CREATE DATABASE mydb"
数据保留策略
name--名称,此示例名称为 default duration--持续时间,0表明无限制 shardGroupDuration--shardGroup的存储时间,shardGroup是InfluxDB的一个基本储存结构,应该大于这个时间的数据在查询效率上应该有所下降。 replicaN--全称是REPLICATION,副本个数 default--是不是默认策略
SHOW RETENTION POLICIES ON nginxlog_clear CREATE RETENTION POLICY "2_hours" ON "nginxlog_clear" DURATION 2h REPLICATION 1 DEFAULT -- 建立name为2_hours保留策略2小时的默认规则 -- ALTER RETENTION POLICY "2_hours" ON "nginxlog_clear" DURATION 4h DEFAULT -- 修改成4小时 -- drop retention POLICY "2_hours" ON "nginxlog_clear" -- 删除 删除了也会采用这策略报错找不到2_hours策略须要将autogen的default改成true--
连续查询
show continuous queries CREATE CONTINUOUS QUERY cq_1m ON nginxlog_clear BEGIN SELECT count(count) AS count_404 INTO current_data.two_year.ten_min_count FROM nginxlog_clear.autogen.nginx WHERE status = '404' GROUP BY time(1m) END -- cq_1m 连续查询名字 nginxlog_clear,current_data数据库 -- -- two_year,autogen保留策略-- -- ten_min_count,nginx表 -- DROP CONTINUOUS QUERY <cq_name> ON <database_name> -- 删除,持续查询不能修改只能删除从新配置--
influxdb.conf 一些配置
bind-address = "192.168.10.129:8088" [meta] dir = "/opt/influxdb/meta" [data] dir = "/opt/influxdb/data" wal-dir = "/opt/influxdb/wal" [http] enabled = true bind-address = "192.168.10.129:8086" access-log-path = "/opt/influxdb/data/logs" [continuous_queries] enabled = true log-enabled = true run-interval = "1m" #能够根据最短的持续查询group by time(1m) 设定检测时间
grafana配置
[server] domain = 192.168.10.129 [smtp] enabled = true host = 192.168.10.129:25 #注意postfix邮箱改为inet_interfaces = 192.168.10.129 password = xian6630753 from_address = admin@grafana.com from_name = Grafana
配置图形
配置告警邮箱
spark
NginxClear //这里正则匹配用正则结合模式匹配更优
package com.sgm.spark import scala.util import scalaj.http._ import org.apache.spark.SparkConf import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka010._ object NginxClear { def main(args: Array[String]) { if (args.length !=2) { System.err.println(s""" |Usage: DirectKafkaWordCount <brokers> <topics> | <bootstrap.servers> is a list of one or more Kafka brokers | <topics> is a list of one or more kafka topics to consume from | """.stripMargin) System.exit(1) } val Array(brokers, topics) = args // Create context with 30 second batch interval val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(30)) ssc.checkpoint("C:\\Users\\itlocal\\IdeaProjects\\nginxlog\\checkpoint") // Create direct kafka stream with brokers and topics val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "use_a_separate_group_id_for_each_stream", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (true: java.lang.Boolean) //采用checkpoint提交记录的偏移量,没有的话执行auto.offset.reset ) val messages = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)) val logs=messages.map(_.value) /*改用正则匹配+模式匹配结合 val clearDate=logs.map(line=> { val info = line.split("\n") val infoPattern ="""(\d+\.\d+\.\d+\.\d+)(.*)(\d+/[A-Z,a-z]+/\d+\:\d+\:\d+\:\d+)(.*)([1-5]{1}[0-9]{2}) (\d+) (\"http.*\") (\".*\") (.*)""".r info.foreach(x=>x match { case infoPattern(ip,none1,currentTime,none2,respCode,requestTime,url,none3,upstreamTIME)=> println(ip+"\t"+currentTime+"\t"+respCode+"\t"+requestTime+"\t"+url+"\t"+upstreamTIME+"\t") case _=>println("none") }) }) */ val cleanDate=logs.map(mapFunc = line => { val influxdb_url = "http://192.168.10.129:8086/write?db=nginxlog_clear" val infos = line.split(" ") if (infos.length>10) { val actime = infos(3).split(" ")(0).split("\\[")(1) val random_num = (new util.Random).nextInt(999999) val curent_timestamp = (DateUtil.getTime(actime).toString + "000000").toLong + random_num //influxdb精确到纳秒,而时间戳到毫秒,不转换成纳秒时间戳不识别 val urlPattern="\"http.*".r val ipPattern="^[1-5]{1}[0-9]{2}$".r //infos.foreach(println) var initUrl="\"none\"" var initStatus="\"none\"" infos.foreach(info=>urlPattern.findAllIn(info).foreach(x=>initUrl=Some(x).get)) infos.foreach(info=>ipPattern.findAllIn(info).foreach(x=>initStatus=x)) //println(initStatus) val date = s"nginxLog,ip=${infos(0)},acess_time=${DateUtil.parseTime(actime)},status=$initStatus,upstream_time=${infos.last} send_bytes=${infos(9)},url=$initUrl,count=1 $curent_timestamp" println(date) Http(influxdb_url).postData(date).header("content-type", "application/json").asString.code } })//.filter(clearlog=>clearlog.statusCode != 200) cleanDate.count().print() ssc.start() ssc.awaitTermination() } }
时间格式转换
DateUtil
package com.sgm.spark import java.util.{Date, Locale} import org.apache.commons.lang3.time.FastDateFormat object DateUtil { val curent_day=FastDateFormat.getInstance("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH) val targe_fomat=FastDateFormat.getInstance("yyyyMMddHHmmss") def getTime(time:String)={ //转化为时间戳 curent_day.parse(time).getTime } def parseTime(time:String)={ targe_fomat.format(new Date(getTime(time))) } def main(args: Array[String]): Unit = { println(parseTime("04/MAY/2017:09:22:05")) } }
maven安装打包 参考 scala 开发环境安装
submit提交
bin/spark-submit --master local[2] --class com.sgm.spark.NginxClear--name NginxClear /root/bigdata-1.0-SNAPSHOT.jar