# ! /usr/bin/env python
# encoding=utf8
import random
import time
class WebLogGeneration(object):
#类属性,由全部类的对象共享
site_url_base = "http://www.xxx.com/"
#基本构造函数
def __init__(self):
# 前面7条是IE,因此大概浏览器类型70%为IE,接入类型上,20%为移动设备,分别是7和8条,5%为空
# https://github.com/mssola/user_agent/blob/master/all_test.go
self.user_agent_dist = {0.0:"Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)",
0.1:"Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)",
0.2:"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727)",
0.3:"Mozilla/4.0 (compatible; MSIE6.0; Windows NT 5.0; .NET CLR 1.1.4322)",
0.4:"Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko",
0.5:"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:41.0) Gecko/20100101 Firefox/41.0",
0.6:"Mozilla/4.0 (compatible; MSIE6.0; Windows NT 5.0; .NET CLR 1.1.4322)",
0.7:"Mozilla/5.0 (iPhone; CPU iPhone OS 7_0_3 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11B511 Safari/9537.53",
0.8:"Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19",
0.9:"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.85 Safari/537.36",
1:" ",}
self.ip_slice_list = [10, 29, 30, 46, 55, 63, 72, 87, 98,132,156,124,167,143,187,168,190,201,202,214,215,222]
self.url_path_list = ["login.php","view.php","list.php","upload.php","admin/login.php","edit.php","index.html"]
self.http_refer = ["http://www.baidu.com/s?wd={query}","http://www.google.cn/search?q={query}","http://www.sogou.com/web?query={query}",
"http://one.cn.yahoo.com/s?p={query}","http://cn.bing.com/search?q={query}"]
self.search_keyword = ["spark","hadoop","hive","spark mlib","spark sql"]
def sample_ip(self):
slice = random.sample(self.ip_slice_list,4) #从ip_slice_list中随机获取4个元素,做为一个片段返回
return ".".join([str(item) for item in slice])
def sample_url(self):
return random.sample(self.url_path_list,1)[0]
def sample_user_agent(self):
dist_uppon = random.uniform(0,1)
return self.user_agent_dist[float('%0.1f' % dist_uppon)]
#主要搜索引擎referrer参数
def sample_refer(self):
if random.uniform(0,1) > 0.2: #只有20% 流量有refer
return "-"
refer_str = random.sample(self.http_refer,1)
query_str = random.sample(self.search_keyword,1)
return refer_str[0].format(query=query_str[0])
def sample_one_log(self,count=3):
time_str = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
while count > 1:
query_log = "{ip} - - [{local_time}] \"GET /{url} HTTP/1.1\" 200 0 \"{refer}\" \"{user_agent}\" \"-\"".format(ip
=self.sample_ip(),local_time=time_str,url=self.sample_url(),refer=self.sample_refer(),user_agent=self.sample_user_agent())
print query_log
count = count -1
if __name__ == "__main__":
web_log_gene = WebLogGeneration()
#while True:
# time.sleep(random.uniform(0,3))
web_log_gene.sample_one_log(random.uniform(10,100))
这是一条日志的示例,为一行形式,各字段间用空格分隔,字符串类型的值用双引号包围:php
46.202.124.63 - - [2015-11-26 09:54:27] "GET /view.php HTTP/1.1" 200 0 "http://www.google.cn/search?q=hadoop" "Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)" "-"
#!/bin/bash
echo "Hello World !"
# HDFS命令
HDFS="hadoop fs"
# Streaming 程序监听的目录,注意跟后面Streaming程序的配置要保持一致
streaming_dir="/spark/streaming"
#清空旧数据
#su hdfs <<EOF
$HDFS -rm "${streaming_dir}"'/tmp/*' > /dev/null 2>&1
$HDFS -rm "${streaming_dir}"'/*' > /dev/null 2>&1
#EOF
#一直运行
while [ 1 ];do
python sample_web_log.py > test.log
# 给日志文件加上时间戳,避免重名
tmplog="access.`date +'%s'`.log"
$HDFS -put test.log ${streaming_dir}/tmp/$tmplog
$HDFS -mv ${streaming_dir}/tmp/$tmplog ${streaming_dir}/
echo "`date +"%F %T"` put $tmplog to HDFS succeed"
sleep 1
done
#EOF
Spark Streaming 程序代码以下所示,能够在 bin/spark-shell 交互式环境下运行,若是要以 Spark 程序的方式运行,按注释中的说明调整一下 StreamingContext 的生成方式便可。启动 bin/spark-shell 时,为了不因 DEBUG 日志信息太多而影响观察输出,能够将 DEBUG 日志重定向至文件,屏幕上只显示主要输出,方法是 ./bin/spark-shell 2>spark-shell-debug.log:html
// 导入类
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
// 设计计算的周期,单位秒
val batch = 10
/*
* 这是bin/spark-shell交互式模式下建立StreamingContext的方法
* 非交互式请使用下面的方法来建立
*/
val ssc = new StreamingContext(sc, Seconds(batch))
/*
// 非交互式下建立StreamingContext的方法
val conf = new SparkConf().setAppName("NginxAnay")
val ssc = new StreamingContext(conf, Seconds(batch))
*/
/*
* 建立输入DStream,是文本文件目录类型
* 本地模式下也可使用本地文件系统的目录,好比 file:///home/spark/streaming
*/
val lines = ssc.textFileStream("hdfs:///spark/streaming")
/*
* 下面是统计各项指标,调试时能够只进行部分统计,方便观察结果
*/
// 1. 总PV
lines.count().print()
// 2. 各IP的PV,按PV倒序// 空格分隔的第一个字段就是IP
lines.map(line => {(line.split(" ")(0), 1)}).reduceByKey(_ + _).transform(rdd => {
rdd.map(ip_pv => (ip_pv._2, ip_pv._1)).
sortByKey(false).
map(ip_pv => (ip_pv._2, ip_pv._1))
}).print()
// 3. 搜索引擎PV
val refer = lines.map(_.split("\"")(3))
// 先输出搜索引擎和查询关键词,避免统计搜索关键词时重复计算// 输出(host, query_keys)
val searchEnginInfo = refer.map(r => {
val f = r.split('/')
val searchEngines = Map(
"www.google.cn" -> "q",
"www.yahoo.com" -> "p",
"cn.bing.com" -> "q",
"www.baidu.com" -> "wd",
"www.sogou.com" -> "query"
)
if (f.length > 2) {
val host = f(2)
if (searchEngines.contains(host)) {
val query = r.split('?')(1)
if (query.length > 0) {
val arr_search_q = query.split('&').filter(_.indexOf(searchEngines(host)+"=") == 0)
if (arr_search_q.length > 0)
(host, arr_search_q(0).split('=')(1))
else
(host, "")
} else {
(host, "")
}
} else
("", "")
} else
("", "")
})
// 输出搜索引擎PV
searchEnginInfo.filter(_._1.length > 0).map(p => {(p._1, 1)}).reduceByKey(_ + _).print()
// 4. 关键词PV
searchEnginInfo.filter(_._2.length > 0).map(p => {(p._2, 1)}).reduceByKey(_ + _).print()
// 5. 终端类型PV
lines.map(_.split("\"")(5)).map(agent => {
val types = Seq("iPhone", "Android")
var r = "Default"
for (t <- types) {
if (agent.indexOf(t) != -1)
r = t
}
(r, 1)
}).reduceByKey(_ + _).print()
// 6. 各页面PV
lines.map(line => {(line.split("\"")(1).split(" ")(1), 1)}).reduceByKey(_ + _).print()
// 启动计算,等待执行结束(出错或Ctrl-C退出)
ssc.start()
ssc.awaitTermination()
打开两个终端,一个调用上面的 bash 脚本模拟提交日志,一个在交互式环境下运行上面的 Streaming 程序。你能够看到各项指标的输出,好比某个批次下的输出为(依次对应上面的 6 个计算项):python
1.总PVgit
-------------------------------------------
github
Time: 1448533850000 ms
web
-------------------------------------------
sql
44374
shell
2.各IP的PV,按PV倒序数据库
-------------------------------------------
apache
Time: 1448533850000 ms
-------------------------------------------
(72.63.87.30,30)
(63.72.46.55,30)
(98.30.63.10,29)
(72.55.63.46,29)
(63.29.10.30,29)
(29.30.63.46,29)
(55.10.98.87,27)
(46.29.98.30,27)
(72.46.63.30,27)
(87.29.55.10,26)
3.搜索引擎PV
-------------------------------------------
Time: 1448533850000 ms
-------------------------------------------
(cn.bing.com,1745)
(www.baidu.com,1773)
(www.google.cn,1793)
(www.sogou.com,1845)
4.关键词PV
-------------------------------------------
Time: 1448533850000 ms
-------------------------------------------
(spark,1426)
(hadoop,1455)
(spark sql,1429)
(spark mlib,1426)
(hive,1420)
5.终端类型PV
-------------------------------------------
Time: 1448533850000 ms
-------------------------------------------
(Android,4281)
(Default,35745)
(iPhone,4348)
6.各页面PV
-------------------------------------------
Time: 1448533850000 ms
-------------------------------------------
(/edit.php,6435)
(/admin/login.php,6271)
(/login.php,6320)
(/upload.php,6278)
(/list.php,6411)
(/index.html,6309)
(/view.php,6350)
查看数据更直观的作法是用图形来展现,常见作法是将结果写入外部 DB ,而后经过一些图形化报表展现系统展现出来。好比对于终端类型,咱们能够用饼图展现,如图6-11所示。
图6-11 终端类型分布图示例(另见彩插图6-11)
对于连续的数据,咱们也能够用拆线图来展现趋势。好比某页面的PV,如图6-12所示。
除了常规的每一个固定周期进行一次统计,咱们还能够对连续多个周期的数据进行统计。以统计总 PV 为例,上面的示例是每 10 秒统计一次,可能还须要每分钟统计一次,至关于 6 个 10 秒的周期。咱们能够利用窗口方法实现,不一样的代码以下:
// 窗口方法必须配置checkpint,能够这样配置: ssc.checkpoint("hdfs:///spark/checkpoint")
// 这是常规每10秒一个周期的PV统计 lines.count().print()
// 这是每分钟(连续多个周期)一次的PV统计 lines.countByWindow(Seconds(batch*6), Seconds(batch*6)).print()
使用相同的办法运行程序以后,咱们首先会看到连续 6 次 10 秒周期的 PV 统计输出:
-------------------------------------------
Time: 1448535090000 ms
-------------------------------------------
1101
-------------------------------------------
Time: 1448535100000 ms
-------------------------------------------
816
-------------------------------------------
Time: 1448535110000 ms
-------------------------------------------
892
-------------------------------------------
Time: 1448535120000 ms
-------------------------------------------
708
-------------------------------------------
Time: 1448535130000 ms
-------------------------------------------
881
-------------------------------------------
Time: 1448535140000 ms
-------------------------------------------
872
在这以后,有一个 1 分钟周期的 PV 统计输出,它的值恰好是上面 6 次计算结果的总和:
-------------------------------------------
Time: 1448535140000 ms
-------------------------------------------
5270
streaming
目录,并增设 tmp
临时文件夹。
log.sh
文件中须要填入如下内容:
#!/bin/bash
echo "Hello World !"
# HDFS命令
HDFS="hadoop fs"
# Streaming 程序监听的目录,注意跟后面Streaming程序的配置要保持一致
streaming_dir="/spark/streaming"
#清空旧数据
#su hdfs <<EOF
$HDFS -rm "${streaming_dir}"'/tmp/*' > /dev/null 2>&1
$HDFS -rm "${streaming_dir}"'/*' > /dev/null 2>&1
#EOF
#一直运行
while [ 1 ];do
python sample_web_log.py > test.log
# 给日志文件加上时间戳,避免重名
tmplog="access.`date +'%s'`.log"
$HDFS -put test.log ${streaming_dir}/tmp/$tmplog
$HDFS -mv ${streaming_dir}/tmp/$tmplog ${streaming_dir}/
echo "`date +"%F %T"` put $tmplog to HDFS succeed"
sleep 1
done
#EOF
/*
* 这是bin/spark-shell交互式模式下建立StreamingContext的方法
* 非交互式请使用下面的方法来建立
*/
val ssc = new StreamingContext(sc,Seconds(batch))
/*
// 非交互式下建立StreamingContext的方法
val conf = new SparkConf().setAppName("NginxAnay")
val ssc = new StreamingContext(conf, Seconds(batch))
*/
/*
* 建立输入DStream,是文本文件目录类型
* 本地模式下也可使用本地文件系统的目录,好比 file:///home/spark/streaming
*/
val lines = ssc.textFileStream("hdfs:///spark/streaming")
/*
* 下面是统计各项指标,调试时能够只进行部分统计,方便观察结果
*/
//1.总pv
lines.count().print()
//2. 各IP的PV,按PV倒序
// 空格分隔的第一个字段就是IP
lines.map(line => {(line.split(" ")(0),1)}).reduceByKey(_ + _).transform(rdd => {
rdd.map(ip_pv => (ip_pv._2,ip_pv._1)).
sortByKey(false).
map(ip_pv => (ip_pv._2,ip_pv._1))
}).print()
//3.搜索引擎PV
val refer = lines.map(_.split("\"")(3))
//先输出搜索引擎和查询关键词,避免统计搜索关键词时重复计算
//输出(host,query_keys)
val searchEnginInfo = refer.map(r => {
val f = r.split('/')
val searchEngines = Map(
"www.google.cn" -> "q",
"www.yahoo.com" -> "p",
"cn.bing.com" -> "q",
"www.baidu.com" -> "wd",
"www.sogou.com" -> "query"
)
if (f.length > 2) {
val host = f(2)
if(searchEngines.contains(host)) {
val query = r.split('?')(1)
if(query.length > 0) {
val arr_search_q = query.split('&').filter(_.indexOf(searchEngines(host)+"=") == 0)
if(arr_search_q.length > 0)
(host,arr_search_q(0).split('=')(1))
else
(host,"")
} else {
(host,"")
}
} else
("","")
} else
("","")
})
//输出搜索引擎PV
searchEnginInfo.filter(_._1.length > 0).map(p => {(p._1,1)}).reduceByKey(_ + _).print()
//4.关键词PV
searchEnginInfo.filter(_._2.length > 0).map(p => {(p._2,1)}).reduceByKey(_ + _).print()
//5.终端类型PV
lines.map(_.split("\"")(5)).map(agent => {
val types = Seq("iPhone","Android")
var r = "Default"
for (t <- types) {
if(agent.indexOf(t) != -1)
r = t
}
(r,1)
}).reduceByKey(_ + _).print()
//6.各页面PV
lines.map(line => {(line.split("\"")(1).split(" ")(1),1)}).reduceByKey(_ + _).print()
//启动计算,等待执行结束(出错或Ctrl+C退出)
ssc.start()