项目状况介绍:
基于Python 3.6.6 ,实现对nginx访问的日志分析代码,实现了对日志中code的占比统计和浏览器类型和访问状况统计
实现的代码段有:
1.编写窗户函数,实如今必定的时间内对数据进行分析
2.经过正则表达式对日志进行匹配,加载日志文件,提取出文本里每行的日志信息
3.编写消费端代码,即便得提取到的数据可以按照消费端的代码进行处理
4.消息分发代码实现,经过queue,将提取的的文本放到队列里,供消费端代码处理
项目代码以下nginx
import random import datetime import time from queue import Queue import threading import re from pathlib import Path from user_agents import parse """ 这段代码,实现了再一段时间内得到数据,经过不一样的handler(即消费端函数) 对获取到的同一份数据进行处理,主要是两段消费函数,网页返回的code的统计和浏览器的分析 这段代码,窗口函数中,data = src.get(),使得没有新的数据产生时,该代码会阻塞,直到有新的数据生成,再次进行处理 """ pattern = '''(?P<remote>[\d.]{7,}\s-\s-\s\[(?P<datetime>[^\[\]]+)\])\s\ "(?P<method>.*)\s(?P<url>.*)\s(?P<protocol>.*)"\s(?P<status>\d{3})\s(?P<size>\d+)\s"[^"]+"\s"(?P<useragent>[^"]+)"''' #编译 regex = re.compile(pattern) #构造字典 ops = { 'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'), 'status': int, 'size': int, 'useragent': lambda ua: parse(ua) } #提取信息 def extract(line: str) -> dict: matcher = regex.match(line) if matcher: return {name: ops.get(name, lambda x: x)(data) for name, data in matcher.groupdict().items()} # 打开文件 def openfile(path: str): """装载日志文件""" with open(path) as f: for line in f: fields = extract(line) if fields: yield fields else: continue #装载文件,判断文件类型已是否存在 def load(*paths): for item in paths: p = Path(item) if not p.exists(): continue if p.is_dir(): for file in p.iterdir(): if file.is_file(): yield from openfile(str(file)) elif p.is_file(): yield from openfile(str(p)) # 随机生成100之内的数字 def source(second=1): """生成数据""" while True: yield { 'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))), 'value': random.randint(1, 100) } time.sleep(second) # 滑动窗口函数 def window(src: Queue, handler, width: int, interval: int): ''' 窗口函数,表示间隔一段时间取出必定的数据进行处理 :param src:数据源,这里是缓存队列,用于获取数据 :param handler:数据处理的函数 :param width:时间窗口函数,秒 :param interval:处理时间间隔,秒 ''' start = datetime.datetime.strptime('20170101 000000 +0800', '%Y%m%d %H%M%S %z') current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z') buffer = [] delta = datetime.timedelta(seconds=width - interval) while True: # 从数据源获取数据 data = src.get() # 这个代码会阻塞,等待数据输入,没有数据输入就阻塞 if data: buffer.append(data) current = data['datetime'] # 存入临时缓冲等待计算 # 每隔interval从新计算buffer中的一次数据 if (current - start).total_seconds() >= interval: ret = handler(buffer) start = current # 清除超出width的数据 buffer = [x for x in buffer if x['datetime'] > current - delta] # 随机数平均的测算函数 source() def handler(iterable): #return sum(map(lambda x: x['value'], iterable)) / len(iterable) print(sum(map(lambda x:x['value'],iterable))/len(iterable)) # 测试函数 def donothing_handler(iterable): #return iterable print(iterable) # 状态码占比 def status_handler(iterable): # 时间窗口内的一批数据 status = {} for item in iterable: key = item['status'] status[key] = status.get(key, 0) + 1 total = len(iterable) print({k:float( "{:.2f}".format(status[k] / total)) for k, v in status.items()}) return {k: status[k] / total for k, v in status.items()} # 浏览器分析 allbrowsers = {} def browser_handler(iterable): browsers = {} for item in iterable: ua = item['useragent'] key = (ua.browser.family, ua.browser.version_string) browsers[key] = browsers.get(key, 0) + 1 allbrowsers[key] = allbrowsers.get(key, 0) + 1 print(sorted(allbrowsers.items(), key=lambda x: x[1], reverse=True)[:10]) return browsers # 分发器 def dispatcher(src): # 分发器中记录handler,同时保存各自的队列 handlers = [] queues = [] def reg(handler, width: int, interval: int): """ 注册窗口处理函数 :param handler:注册数据处理函数 :param width:时间窗口宽度 :param interval:时间间隔 """ q = Queue() queues.append(q) # 多线程,数据并行 h = threading.Thread(target=window, args=(q, handler, width, interval)) handlers.append(h) def run(): # 启动线程处理数据 for t in handlers: t.start() # 将获取到的数据分发到全部的队列中 for item in src: for q in queues: q.put(item) # print(q.get()) return reg, run if __name__ == "__main__": import sys path = '/tmp/test.log' """ 如下的代码为测试用的,用于统计每隔5s统计10s内的随机数字的平均值 reg, run = dispatcher(source()) reg(handler, 10, 5) """ reg, run = dispatcher(load(path)) #每隔5s返回过去10s的数据,可是不作处理 reg(donothing_handler, 10, 5) #每隔5s统计10s内的返回状态码的占比状况 reg(status_handler, 10, 5) # 每隔5s统计10s内的浏览器类型占比状况,展现排行10s内访问量前十的浏览器 reg(browser_handler,10,5) run()