nginx的日志输出为以下的格式:python
183.69.210.164 - - [07/Apr/2017:09:32:39 +0800] "GET /member/ HTTP/1.1" 302 31 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0
如今须要把remote ip
,请求时间,请求方法,请求url以及请求协议等使用正则表达式的分组功能一一匹配出来,正则的匹配模式书写以下:linux
(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[^\[\]]+)\] "(?P<method>\w+) (?P<url>\S+) (?P<protocol>[\w+/\d.]+)" (?P<status>\d+) (?P<size>\d+) .+ "(?P<useragent>.+)"
在书写相似的正则时不要陷入目标数据对应的正则书写,如[07/Apr/2017:09:32:39 +0800]
,能够理解为在一个[]
中括号中出现了许多字符,但不会包含[
,]
这样的符号,因此就可写成\[(?P<datetime>[^\[\]]+)\]
这样的形式。nginx
代码验证:正则表达式
import re import datetime logfile = '''183.69.210.164 - - [07/Apr/2017:09:32:39 +0800] "GET /member/ HTTP/1.1" 302 31 "-" "Mozilla/5.0 \ (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0"''' pattern = '''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[^\[\]]+)\] "(?P<method>\w+) (?P<url>\S+) \ (?P<protocol>[\w+/\d.]+)" (?P<status>\d+) (?P<size>\d+) .+ "(?P<useragent>.+)"''' regex = re.compile(pattern) matcher = regex.match(logfile) if matcher: print(matcher.groupdict().items())
输出后端
dict_items([('remote', '183.69.210.164'), ('datetime', '07/Apr/2017:09:32:39 +0800'), ('method', 'GET'), ('url', '/member/'), ('protocol', 'HTTP/1.1'), ('status', '302'), ('size', '31'), ('useragent', 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0')])
这样就获得一个类字典的数据,把一条日志是的各个部份进行了分解,但各个分组中的数据类型都是字符串类型。像('datetime', '07/Apr/2017:09:32:39 +0800')
是一个时间,能够转换成时间类型,('status', '302')
是状态码,能够转换为整形,以便后续在分析时直接就拿到相应的数据类型。浏览器
对字符串时间进行数据转换缓存
import datetime s = '07/Apr/2017:09:32:39 +0800' dt = datetime.datetime.strptime(s, '%d/%b/%Y:%H:%M:%S %z') print(type(dt), dt)
输出多线程
<class 'datetime.datetime'> 2017-04-07 09:32:39+08:00
对status和size能够直接使用int进行数据类型转换,这种类型转换函数能够单独定义在一个字典中,当一个功能来提供,以下:app
ops = { 'datetime': lambda time_str: datetime.datetime.strptime(time_str, '%d/%b/%Y:%H:%M:%S %z'), 'status': int, 'size': int }
这样当日志格式变化后,能够很灵活在opt
这个字典中增长相应的转换函数。dom
结合上边的转换函数就能够把一行日志转换成字典,该字典存放了正则匹配出的分组信息,并相应的数据已进行了类型转换,发便后期分析时使用。完整代码以下:
import re import datetime logfile = '''183.69.210.164 - - [07/Apr/2017:09:32:39 +0800] "GET /member/ HTTP/1.1" 302 31 "-" "Mozilla/5.0 \ (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0"''' ops = { 'datetime': lambda time_str: datetime.datetime.strptime(time_str, '%d/%b/%Y:%H:%M:%S %z'), 'status': int, 'size': int } pattern = '''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[^\[\]]+)\] "(?P<method>\w+) (?P<url>\S+) \ (?P<protocol>[\w+/\d.]+)" (?P<status>\d+) (?P<size>\d+) .+ "(?P<useragent>.+)"''' regex = re.compile(pattern) def extract(line): matcher = regex.match(line) if matcher: info = {} for k, v in matcher.groupdict().items(): info[k] = ops.get(k, lambda x: x)(v) # 巧用字典的get方法及默认值是一个匿名函数 return info # 上边的代码能够用一行实现 # return {k: ops.get(k, lambda x: x)(v) for k, v in matcher.groupdict().items()} else: return None print(extract(logfile))
运行后输出
{'remote': '183.69.210.164', 'datetime': datetime.datetime(2017, 4, 7, 9, 32, 39, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'url': '/member/', 'protocol': 'HTTP/1.1', 'status': 302, 'size': 31, 'useragent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0'}
输出一个字典,相应的数据类型也完成了转换。
日志文件被读取并一条一条的送达给窗口函数,窗口函数须要缓存指定一段时间内的的日志,因日志是以时间序列产生的,当这个日志中的时间差值大于定义的窗口
大小时,代表这时咱们就须要对已缓存的日志进行分析,如:在这个时间窗口
内访问的url
有哪些,各个url
访问的数量,若是在一个时间窗口内对一个url访问的数量特别的多,那可能这个url在被cc***,这时就能够触发告警,告知相关人员。
先看一个窗口函数的事例:
import random import datetime import time def source(): # 模拟数据源 while True: tz_utc_8 = datetime.timezone(datetime.timedelta(hours=8)) # 建立时区UTC+8:00 datetime_now = datetime.datetime.now() dt = datetime_now.replace(tzinfo=tz_utc_8) # 强制设置为UTC+8:00 yield {'value': random.randint(1, 100), 'datetime': dt} time.sleep(1) def window(src, handler, width: int, interval: int): """ 窗口函数 :param src: 数据源,一个生成器 :param handler: 数据处理函数 :param width: 时间窗口宽度,秒 :param interval: 处理时间间隔,秒,表示在nginx的日志文件中的请求时间的差值在于interval时,就须要调用handler函数进行处理在这段时间内的这一批日志 :return: """ # start和current都设置为linux元年时间,后边会被日志中的时间替代,做初始化值用 start = datetime.datetime.strptime('1970/01/01 00:00:00 +0800', '%Y/%m/%d %H:%M:%S %z') current = datetime.datetime.strptime('1970/01/01 00:00:00 +0800', '%Y/%m/%d %H:%M:%S %z') delta = datetime.timedelta(seconds=width - interval) buffer = [] # 时间窗口内日志的缓存列表 while True: for x in src: if x: # {'value': 31, 'datetime': datetime.datetime(2020, 5, 3, 16, 4, 33, 501124)} buffer.append(x) current = x['datetime'] # 数据中的时间与定义的start相减后大于等于interval后,处理buffer中的时间间隔就到了 if (current - start).total_seconds() >= interval: ret = handler(buffer) # 数据处理函数 start = current # 时间点移动 # buffer中的数据处理 # x['datetime'] > current - delta 知足这个条件的数据须要保留 buffer = [x for x in buffer if x['datetime'] > current - delta] def do_nothing_handler(iterable: list): # 模拟处理函数 print(iterable) print(len(iterable)) window(source(), do_nothing_handler, 8, 6)
运行后输出
[{'value': 82, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 7, 963440, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}] 1 [{'value': 82, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 7, 963440, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 56, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 8, 967087, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 70, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 9, 971149, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 64, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 10, 975855, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 77, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 11, 978261, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 64, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 12, 980210, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 14, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 13, 983762, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}] 7 [{'value': 64, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 12, 980210, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 14, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 13, 983762, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 7, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 14, 987289, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 13, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 15, 990374, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 96, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 16, 992201, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 58, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 17, 992857, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 30, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 18, 995864, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 47, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 19, 998886, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}] 8 [{'value': 30, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 18, 995864, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 47, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 19, 998886, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 89, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 21, 3320, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 45, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 22, 7064, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 58, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 23, 9548, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 64, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 24, 11991, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 67, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 25, 15751, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 8, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 26, 18675, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}] 8 [{'value': 67, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 25, 15751, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 8, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 26, 18675, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 64, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 27, 23564, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 11, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 28, 27336, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 29, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 29, 29979, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 21, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 30, 34198, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 79, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 31, 37143, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 62, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 32, 39595, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}] 8
当运行稳定后,每次处理的buffer中的数据为8条,即为width
的值,而interval
设置为6,这说明上一次buffer中的数据和下一次buffer中的数据有重复的,这在某些场景是容许有数据重复,当width
= interval
时,buffer中的数据不会有重复,若是width
< interval
时,这时就有时间丢失了,这是不能容许出现的。
import re import datetime logfile = '''183.69.210.164 - - [07/Apr/2017:09:32:39 +0800] "GET /member/ HTTP/1.1" 302 31 "-" "Mozilla/5.0 \ (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0"''' ops = { 'datetime': lambda time_str: datetime.datetime.strptime(time_str, '%d/%b/%Y:%H:%M:%S %z'), 'status': int, 'size': int } pattern = '''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[^\[\]]+)\] "(?P<method>\w+) (?P<url>\S+) \ (?P<protocol>[\w+/\d.]+)" (?P<status>\d+) (?P<size>\d+) .+ "(?P<useragent>.+)"''' regex = re.compile(pattern) # 这里还会面临一个问题,若是有一行日志因一些缘由不是按照定义的格式记录的,那“regex.match(line)”会是一个None, # 那上边的代码就会报异常。因此这里还会面临异常的处理,加判断便可 def extract(line): matcher = regex.match(line) if matcher: return {k: ops.get(k, lambda x: x)(v) for k, v in matcher.groupdict().items()} # ops.get(k, lambda x: x)(v) 转换相应的值 else: return None def window(src, handler, width: int, interval: int): """ 窗口函数 :param src: 数据源,生成器 :param handler: 数据处理函数 :param width: 时间窗口宽度,秒 :param interval: 处理时间间隔,秒,表示在nginx的日志文件中的请求时间的差值在于interval时,就须要调用handler函数进行处理在这段时间内的这一批日志 :return: None """ start = datetime.datetime.strptime('1970/01/01 00:00:00 +0800', '%Y/%m/%d %H:%M:%S %z') # 也能够设置为None current = datetime.datetime.strptime('1970/01/01 00:00:00 +0800', '%Y/%m/%d %H:%M:%S %z') delta = datetime.timedelta(seconds=width - interval) buffer = [] for x in src: if x: # {'value': 31, 'datetime': datetime.datetime(2020, 5, 3, 16, 4, 33, 501124)} buffer.append(x) current = x['datetime'] if (current - start).total_seconds() >= interval: # 数据中的时间与定义的start相减后大于等于interval后,处理buffer中的时间间隔就到了 ret = handler(buffer) # 数据处理函数 start = current # 时间点移动 # buffer中的数据处理 # x['datetime'] > current - delta 知足这个条件的数据须要保留 buffer = [x for x in buffer if x['datetime'] > current - delta] # 装载函数 def load(path): with open(path) as f: for line in f: field = extract(line) if field: yield field else: continue # 解析失败时抛弃或打印日志 # 处理函数 def do_nothing_handler(iterable: list): print(iterable) print(len(iterable)) # test.log 日志文件为测试文件 window(load('test.log'), do_nothing_handler, 300, 300)
对日志的分析函数多种多样,如:在时间窗口内请求的状态码的占比,在时间窗口内请求url的数量,各类useragent访问的数量(不是时间窗口内的统计)等。而这些分析函数每每是须要并行执行的,因此这里也会引入分发器,在分发器中使用多线程来执行相应的分析处理函数。
def dispatcher(src): # src参数是一个生成器,便是数据源,通过extract函数处理过一行数据 queues = [] threads = [] def reg(handler, width, interval): # handler: 分析函数 # width: 时间窗口 # interval: 分析函数调用时间间隔 # 注册时须要分配消费者函数window各自的队列,建立各自的线程对象 q = Queue() queues.append(q) t = threading.Thread(target=window, args=(q, handler, width, interval)) threads.append(t) def run(): # 启动线程 for t in threads: t.start() # 把日志装载函数load返回的数据put进各个消费者的队列,相似于广播形式 for x in src: for q in queues: q.put(x) return reg, run # 返回注册函数和运行函数
原window
函数的第一个参数为src
一个生成器,如今加入分发器后,第一参数变成了一个q
队列,因此window
函数会作相应的变动。
在一个时间窗口内请求状态码的占比能反应出后端服务的健康情况。在window
这个窗口函数中会把一个指定时间内的日志append到一个list中,若是触发了时间窗口,那就会调用相应的handler
函数对list中的日志进行分析,因此状态码分析函数接收一个list做为参数。
from collections import defaultdict def status_handler(iterable: list): status = defaultdict(lambda: 0) # 使用默认字典,初始化值为0 for item in iterable: key = item['status'] status[key] += 1 total = sum(status.values()) # total = len(iterable) # 数据总条数求值有多种方式,也能够上for循环中计数 result = {k: v/total*100 for k, v in status.items()} # 求状态码在该时间窗口内所占的百分比 # print(result) return result # 返回一个字典, {'200': 95, '502': 2.4}
客户端请求时使用的是什么浏览器也能够进行统计,只是这个统计不针对时间窗口内的日志进行统计,只针对时间窗口内统计意义不太大,而是须要进行累计统计。对useragent有专门的包能够对其进行解析,名称为user_agents
,先看一个事例:
from user_agents import parse # pip install user-agents ua = "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0" u = parse(ua) print(u.browser) print(u.browser.family, u.browser.version_string)
输出
Browser(family='Sogou Explorer', version=(1, 0), version_string='1.0') Sogou Explorer 1.0
因此能够利用user_agents
包解析出user agent的名称和相应的版本。
要对user agent进行统计,那在对日志文件中的一行日志文件进行转换时,须要增长对user agent作相应的转换,在opt
这个字典中增长相应的转换函数,以下:
from user_agents import parse ops = { 'datetime': lambda time_str: datetime.datetime.strptime(time_str, '%d/%b/%Y:%H:%M:%S %z'), 'status': int, 'size': int, 'useragent': lambda useragent: parse(useragent) }
User agent统计代码以下:
# 统计结果放在函数useragent_handler以外,记录当前处理的日志文件,若是放在useragent_handler内则是统计时间窗口内的日志条数 ua_dict = defaultdict(lambda: 0) # useragent分析函数 def useragent_handler(iterable: list): for item in iterable: ua = item['useragent'] # ua是一个 user_agents.parsers.UserAgent 对象 key = (ua.browser.family, ua.browser.version_string) # key为一个元组 ua_dict[key] += 1 # print(sorted(ua_dict.items(), key=lambda x: x[1], reverse=True)) # 以useragent的数量升序排序 return ua_dict
import re import datetime from queue import Queue import threading from collections import defaultdict from pathlib import Path from user_agents import parse ops = { 'datetime': lambda time_str: datetime.datetime.strptime(time_str, '%d/%b/%Y:%H:%M:%S %z'), 'status': int, 'size': int, 'useragent': lambda useragent: parse(useragent) } pattern = '''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[^\[\]]+)\] "(?P<method>\w+) (?P<url>\S+) \ (?P<protocol>[\w+/\d.]+)" (?P<status>\d+) (?P<size>\d+) .+ "(?P<useragent>.+)"''' regex = re.compile(pattern) # 日志抽取函数 def extract(line) -> dict: matcher = regex.match(line) if matcher: return {k: ops.get(k, lambda x: x)(v) for k, v in matcher.groupdict().items()} else: return None def window(src: Queue, handler, width: int, interval: int): """ 窗口函数 :param src: 是一个queue :param handler: 数据处理函数 :param width: 时间窗口宽度,秒 :param interval: 处理时间间隔,秒 :return: """ start = datetime.datetime.strptime('1970/01/01 00:00:00 +0800', '%Y/%m/%d %H:%M:%S %z') # 也能够设置为None current = datetime.datetime.strptime('1970/01/01 00:00:00 +0800', '%Y/%m/%d %H:%M:%S %z') delta = datetime.timedelta(seconds=width - interval) buffer = [] while True: data = src.get() # 阻塞模式 if data: buffer.append(data) current = data['datetime'] if (current - start).total_seconds() >= interval: # 数据中的时间与定义的start相减后大于等于interval后,处理buffer中的时间间隔就到了 ret = handler(buffer) # 数据处理函数 start = current # 时间点移动 # buffer中的数据处理 # x['datetime'] > current - delta 知足这个条件的数据须要保留,结合“01-日志分析回顾及数据载入”的图形看 buffer = [x for x in buffer if x['datetime'] > current - delta] print(ret) # ret是触发调用函数的执行返回值,在此能够窗口内的分析结果进行相应的判断,好比502的状态码占比大于多少时作什么 # 抽取出日志文件读取函数,是一个生成器函数 def open_file(path: str): with open(str(path)) as f: for line in f: field = extract(line) if field: yield field else: continue # 解析失败时抛弃或打印日志 # 装载函数,可受多个路径,能够是目录,若是是目录只迭代一该目录下的全部文件,目录忽略 def load(*path): for item in path: p = Path(item) if not p.exists(): continue if p.is_dir(): for file in p.iterdir(): if file.is_file(): # 只处理文件,目录无论 yield from open_file(str(file)) elif p.is_file(): yield from open_file(str(p)) # 处理函数 def do_nothing_handler(iterable: list): print(iterable) print(len(iterable)) # 状态码分析处理函数,返回各状态码的占比 def status_handler(iterable: list): status = defaultdict(lambda: 0) for item in iterable: key = item['status'] status[key] += 1 total = sum(status.values()) # total = len(iterable) # 数据总条数求值有多种方式,也能够上for循环中计数 result = {k: v/total*100 for k, v in status.items()} # print(result) return result # 统计结果放在函数useragent_handler以外,记录当前处理的日志文件,若是放在useragent_handler内则是统计时间窗口内的日志条数 ua_dict = defaultdict(lambda: 0) # useragent分析函数 def useragent_handler(iterable: list): for item in iterable: ua = item['useragent'] # ua是一个 user_agents.parsers.UserAgent 对象 key = (ua.browser.family, ua.browser.version_string) # key为一个元组 ua_dict[key] += 1 # print(sorted(ua_dict.items(), key=lambda x: x[1], reverse=True)) # 以useragent的数量升序排序 return ua_dict # 分发器函数 # 1. 每个window函数拥有本身的队列,相应的处理函数,以及时间窗口及处理时间间隔 # 2. window函数至关于就是生产者消费者模型中的消费者,在实际的业务环境中消费者可能许多,拿到相同的一份数据后进行各自的handler处理 def dispatcher(src): # src参数是一个生成器,便是数据源 queues = [] threads = [] def reg(handler, width, interval): # 注册时须要分配消费都函数window各息的队列,建立各息的线程对象 q = Queue() queues.append(q) t = threading.Thread(target=window, args=(q, handler, width, interval)) threads.append(t) def run(): # 启动线程 for t in threads: t.start() # 把日志装载函数load返回的数据put进各个消费者的队列 for x in src: for q in queues: q.put(x) return reg, run path = 'test.log' # 接受可变参数传递,如 path = 'test.log, /var/logs/' regs, runs = dispatcher(load(path)) # 注册窗口,能够注册多个,根据各自的日志处理逻辑传入handler, width, interval便可。这样每一个window函数都在各自的线程中运行,互不影响。 # regs(do_nothing_handler, 10, 5) # 注册状态码处理函数 regs(status_handler, 10, 5) # 注册useragent分析函数 regs(useragent_handler, 10, 10) # useragent分析不是对时间窗口内的数据求和或求平均值,数据不须要有重复 # 运行 runs()