nginx日志目录 /usr/local/nginx/logs,日志格式:python
123.13.17.13 - - [25/Aug/2016:00:00:01 +0800] "GET /AppFiles/apk/studynet/icon_v120/apk_80111_1.jpg HTTP/1.1" 206 51934 "http://img.xxx.com:8080/AppFiles/apk/studynet/icon_v120/apk_80111_1.jpg" "Dalvik/1.6.0 (Linux; U; Android 4.4.2; S100 Build/KOT49H)" 120.210.166.150 - - [25/Aug/2016:00:00:01 +0800] "GET /AppFiles/apk/studynet/products/product_lc01.zip HTTP/1.1" 206 16631 "http://img.xxx.com:8080/AppFiles/apk/studynet/products/product_lc01.zip" "Dalvik/1.6.0 (Linux; U; Android 4.4.2; S908 Build/KVT49L)" 123.13.17.13 - - [25/Aug/2016:00:00:01 +0800] "GET /AppFiles/apk/studynet/icon_v120/apk_80111_0.jpg HTTP/1.1" 206 53119 "http://img.xxx.com:8080/AppFiles/apk/studynet/icon_v120/apk_80111_0.jpg" "Dalvik/1.6.0 (Linux; U; Android 4.4.2; S100 Build/KOT49H)" 219.137.119.16 - - [25/Aug/2016:00:00:01 +0800] "GET /AppFiles/apk/gamenet/icon/icon_0_506_0.jpg HTTP/1.1" 404 1035 "-" "Dalvik/v3.3.110_update3 (Linux; U; Android 2.2.1-R-20151127.1131; ET_35 Build/KTU84Q)" 120.210.166.150 - - [25/Aug/2016:00:00:01 +0800] "GET /AppFiles/apk/studynet/products/product_lc01.zip HTTP/1.1" 206 40719 "http://img.xxx.com:8080/AppFiles/apk/studynet/products/product_lc01.zip" "Dalvik/1.6.0 (Linux; U; Android 4.4.2; S908 Build/KVT49L)"
日志以空格分割,共12列数据:nginx
一、客户端IP 二、空白(远程登陆名称) 三、空白(认证的远程用户) 四、请求时间 五、时区(UTC) 六、请求方法 七、请求资源 八、http协议 九、状态码 十、发送字节数 十一、访问来源 十二、客户浏览信息(不具体拆分)
nginx服务器部署HDFS日志上传脚本,定时将nginx日志上传到HDFS平台。web
#!/usr/bin/env python # -*- encoding: utf-8 -*- import subprocess import sys import datetime webid = 'test1' #HDFS存储日志标志,另外一台Web服务器为:test2 currdate = datetime.datetime.now().strftime('%Y%m%d') logspath = '/usr/local/nginx/logs/access.log' #日志路径 logname = 'access.log.'+webid try: #建立HDFS目录,目录格式:nginx/20160825,加wait()是为了让父进程等待子进程完成后再继续往下执行(subporcess默认启动子进程后不等待其执行结果就继续往下执行) subprocess.Popen(['/usr/local/hadoop-2.6.4/bin/hadoop','fs','-mkdir','-p','hdfs:///user/root/nginx'+currdate],stdout=subprocess.PIPE).wait() except Exception as e: pass putinfo = subprocess.Popen(['/usr/local/hadoop-2.6.4/bin/hadoop','fs','-put',logspath,'hdfs:///user/root/nginx/' +currdate +'/'+logname],stdout=subprocess.PIPE) #上传本地日志到HDFS for line in putinfo.stdout: print line
将上传脚本部署到corntab实现定时功能安全
0 0 * * * /usr/bin/python /root/hadooptest/hdfsput.py >> /dev/null 2>&1服务器
日志上传到HDFS上,信息以下:app
[root@wx ~]# hadoop fs -ls /user/root/nginx/20160825 Found 2 items -rw-r--r-- 1 root supergroup 15 2016-08-25 15:58 /user/root/nginx/20160825/access.log.test1 -rw-r--r-- 1 root supergroup 28 2016-08-25 15:58 /user/root/nginx/20160825/access.log.test2
实现精确到分钟统计网站访问流量,mapper操做时将web日志中的每分钟做为key,将对应的行发送字节数value,在reducer操做时对时间相同的key作累加。oop
使用MRJob #/usr/bin/env python # -*- coding:utf-8 -*- from mrjob.job import MRJob import re class MRCounter(MRJob): def mapper(self, key, line): i = 0 for flow in line.split(): #获取时间段,为域日志的第4列,内容如:“[24/Aug/2016:00:00:02” if i==3: timerow = flow.split(':') hm = timerow[1] + ':' + timerow[2] #获取'小时:分钟',做为key if i==9 and re.match(r'\d{1,}',flow): #获取日志第10列:发送的字节数,做为value yield hm,int(flow) #初始化key:value i+=1 def reducer(self, key, occurences): yield key,sum(occurences) #相同key“小时:分钟”的value作累加操做 if __name__ == '__main__': MRCounter.run()
生成Hadoop任务,运行:网站
python /root/hadoop/httpflow.py -r hadoop -o hdfs://output/httpflow hdfs:///user/root/nginxui
分析结果按期导入MySql生成报表。spa
能够帮助咱们了解网站健康状态,利用MRJob的多步调用实现。
#!/usr/bin/env python # -*- encoding: utf-8 -*- from mrjob.job import MRJob import re class MRCounter(MRJob): def mapper(self, key, line): i = 0 for httpcode in line.split(): if i == 8 and re.match(r'\d{1,3}',httpcode): #获取日志中HTTP状态码段,做为key yield httpcode,1 #初始化key:value,value计数为1,方便reducer作累加 i+=1 def reducer(self, httpcode,occurrences): yield httpcode,sum(occurrences) #对排序后的key对应的value做sum累加 def steps(self): return [self.mr(mapper=self.mapper),self.mr(reducer=self.reducer)] #在steps方法中添加调用队列 if __name__ == '__main__': MRCounter.run()
生成Hadoop任务,运行:
python httpstatus.py -r hadoop -o hdfs:///output/httpstatus hdfs:///user/nginx
结果:
[root@wx hadooptest]# hadoop fs -cat /output/httpstatus/part-00000 "200" 608997 "206" 2802574 "302" 1 "304" 34600 "400" 30 "401" 1 "404" 1653791 "416" 180358 "499" 2689
统计访问来源IP能够了解网站用户分布,帮助安全人员捕捉攻击来源。定义匹配IP正则字符串做为key,将value初始化为1,执行reducer操做累加统计。
#!/usr/bin/env python # -*- encoding: utf-8 -*- from mrjob.job import MRJob import re IP_RE = re.compile(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}') #定义IP正则匹配 class MRCounter(MRJob): def mapper(self, key, line): for ip in IP_RE.findall(line): #匹配IP正则后生成key:value,其中key为IP地址,value初始值为1 yield ip,1 def reducer(self, ip,occurrences): yield ip,sum(occurrences) #对排序后的key对应的value做sum累加 if __name__ == '__main__': MRCounter.run()
执行任务:
python ipstat.py -r hadoop -o hdfs:///output/ipstat hdfs:///user/nginx