本文正在参加「Python主题月」,详情查看 活动连接java
这两件小事发生在几周前了,一直想记录下,却又一直搁置,今天抽点时间写一下~python
最近两周公司项目暂时没啥大的新需求,有时间整下技术规划,跟组长商议后决定:对APP作下 启动优化,就是:应用启动、页面加载提速。web
说到页面加载提速,那么问题来了 → 页面那么多,怎么知道有哪些页面须要优化呢?正则表达式
公司APP的设计是单Activity多Fragment,那有多少Fragment呢,这时会门脚本语言的好处就来了,直接写个脚本递归遍历文件夹统计下就知道了:chrome
import os
def search_all_fragment(path):
os.chdir(path)
items = os.listdir(os.curdir)
for item in items:
path = os.path.join(item)
# 获取路径分割后的最后部分,即文件名
path_split_last = path.split(os.path.sep)[-1]
# 判断是否为目录,是往下递归
if os.path.isdir(path):
print("[-]", path)
search_all_fragment(path)
os.chdir(os.pardir)
# 由于项目里用到了ARouter会生成对应的Fragment,不在统计范畴中,要过滤掉
elif 'Fragment' in path_split_last and '$$' not in path_split_last:
print("[!]", path)
# 只保存.java和.kt的文件
if path_split_last.endswith('.java') or path_split_last.endswith('.kt'):
result_list.append(path)
else:
print('[+]', path)
if __name__ == '__main__':
result_list = []
search_all_fragment(r'项目文件路径')
print(result_list)
print('共有Fragment:', len(result_list), "个")
复制代码
脚本一跑就知有没有:数据库
行吧,一共412个Fragment,用脚本秀了一波,回归正题,怎么知道哪些页面须要优化。json
巧了,恰好自家全埋点上有作 渲染时间埋点 的日志上报,且不关心具体的实现方案是否靠谱,开打Kibana,输入下筛选条件(仅查看这类Event),部分日志以下:浏览器
② 是页面的路径,取最后的Fragment就好,③ renderCost,这里就是渲染时间了,拿到这两个数据就行。服务器
接着,怎么分析这些日志得出有哪些待优化页面?拍拍脑门跟组长定了个不怎么靠谱的方案:markdown
统计全部Fragment的渲染时间,计算平均数,而后从长到短排序,优先优化渲染时间长的页面。
早上讨论完,看了下过去一年渲染类型的日志共有 53068377
条记录,下午去看病的路上就开始构思该怎么搞了,三条思路:
事实证实留多点后手是没错的,前两个思路+第三个思路的前半段都GG了,听我娓娓道来~
次日一早到公司,准备导出csv,看别人发的教程很简单,就三步:
输入查询条件查询 → 得出查询结果后Save → 生成CSV
可是,我死活都找不到Save,故生成CSV的按钮一直是灰的(点不了):
em...难道是权限的问题?换上了组长给的有查数据库权限的帐号,同样不醒,难道是要 开启这样的配置 ,点击Kibana的设置,各类没权限,因而去找后台大佬,获得的回复是:
不能开这个,导几百条几千条还好,导个几百万条服务器顶不住直接就挂了,有风险,因此把这个功能禁用了。
思路一惨遭滑铁卢...
上思路二,有数据库权限的帐号,执行查询语句后走脚本导出,写两句简单SQL条件查询语句还不是手到擒来!
现实跟个人认知出了点 误差,原来这个数据库权限只是:能够用Kibana的Dev Tools,在上面拼接json字符串查询而已:
em...也不是不能用,抓包发现bool块的数据于筛选日志请求提交的数据相同,复制粘贴一波,而后把size改为1000000,执行下康康:
em...返回了错误信息,大概意思是说一次最多只能查10000条,若是一次查更多只能去改配置。
卧槽,直接裂开,我还想着复制粘贴保存下Json还好,5306w条数据啊,手动复制粘贴,来算算要多久:
- 修改查询条件(起始日期时间和结束日期时间) → 10s
- 点击查询等待查询结果显示 → 20s之内
- 新建文件,复制粘贴,保存输入文件名 → 30s
每存1w条数据,我要花至少1分钟,换算成时,获取完这些数据要多少小时:5306/60≈88.5h,换算成标准工做日(8h),须要11天多一点,这还没算休息时间呢,要 花两个多星期 重复作这样重复的事,任谁也顶不住啊!!!
方案二也跪了...
em...抓下包?分析参数,而后写爬虫抓下,抓了几回请求后我就放弃了,Cookie里有个sid每请求一次变一次:
并且跟响应头Set-Cookie返回的不同,短期内捣鼓怎么构造的显然不可能,唉只能上看上去最low的模拟用户访问浏览器了。
Tips:后面闲下来发现,Set-Cookie返回的只是Cookies里的一部分,登陆后拿到cookies,而后本身每次请求后替换这部分就好~
把模拟步骤拆分下:
怎么方便怎么来,笔者直接把查询结果存txt里了,模拟访问用Selenium,直接开搞:
import time
from selenium import webdriver
base_url = 'http://kibana.xxx.xxx'
login_url = base_url + '/login'
login_data = {
'password': 'xxx',
'username': 'xxx',
}
# 初始化浏览器
def init_browser():
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument(r'--start-maximized') # 开始就全屏
return webdriver.Chrome(options=chrome_options)
# 模拟登陆
def login():
browser.get(login_url)
time.sleep(10)
inputs = browser.find_elements_by_class_name('euiFieldText')
inputs[0].send_keys(login_data['username'])
inputs[1].send_keys(login_data['password'])
submit = browser.find_element_by_xpath('//button[1]')
submit.click()
if __name__ == '__main__':
browser = init_browser()
login()
复制代码
用过selenium的朋友可能或说:添加下述配置设置下用户数据目录,下次打开浏览器访问处于登陆态,就不用从新登陆了:
chrome_options.add_argument(r'--user-data-dir=D:\ChromeUserData')
复制代码
但实际的状况是设置了没用,仍是跳转了登陆页,我也不知道为何,索性每次跑脚本都登陆下吧...
登陆成功后,稍待片刻会跳转到主页,等待加载完毕,点击左侧这个图标,这没有采用显式或隐式等待的方式,而是笨方法休眠死等~
login()函数最后调下下面这个方法:
# 访问主页点击Tab
def click_tab():
time.sleep(8) # 假死等待页面加载完毕
browser.find_element_by_xpath('//ul[3]/li[2]').click()
复制代码
接着到输入区域写入文字:
Elements定位到目标位置:
卧槽,好像有点难搞啊,不是普通的文本输入框,获取外层ace_content的div,尝试send_keys:
def set_left_text():
inputs = browser.find_element_by_xpath('//div[@class="ace_content"]')
inputs.send_keys('测试文本')
复制代码
果真报错:
不能直接设置文本就只能另辟蹊径了,心生一计:
点击最后的游标,而后一直按backspace键清空,接着模拟键盘输入一个个字母敲进去
改动后的代码
# 设置左侧文字
def set_left_text():
time.sleep(5)
cursor_div = browser.find_element_by_xpath('//div[@class="ace_cursor"]')
cursor_div.click()
action_chains = ActionChains(browser)
for i in range(0, 500):
action_chains.context_click(cursor_div).send_keys(Keys.BACKSPACE).perform()
action_chains.context_click(cursor_div).send_keys('GET _search' + str(search_dict)).perform()
复制代码
清空后输入,有点鬼畜:
这当中其实作了不少无效操做,按回退键500下,实际上字符没那么多,还有得等它把字敲完,得办法改进下。
又心生一计:粘贴复制
,实现起来就是:
往剪切板写入本次要查询的字符串 → 点击游标或内容得到焦点 → Ctrl+A全选内容 → 回退 → Ctrl+V粘贴内容
代码实现一波:
def set_left_text():
time.sleep(5)
input_div = browser.find_element_by_xpath('//div[@class="ace_content"]')
input_div.click()
action = ActionChains(browser)
action.key_down(Keys.CONTROL).key_down('a').key_up('a').key_up(Keys.CONTROL).perform()
action.key_down(Keys.BACKSPACE).key_up(Keys.BACKSPACE).perform()
action.key_down(Keys.CONTROL).key_down('v').key_up('v').key_up(Keys.CONTROL).perform()
复制代码
看看效果:
能够的,模拟点击运行的小按钮了:
# 点击查询按钮
def click_submit():
submit_button = browser.find_element_by_xpath('//button[@data-test-subj="sendRequestButton"]')
submit_button.click()
复制代码
接着到右侧查询结果,直接处理有些麻烦,获取内容结点,递归遍历全部子节点,提取文本去空格换行等,最后拼接输出。
又又心生一计:
能不能拦截selenium浏览器接收的请求,对特定请求,直接拿响应结果写入
还真能够,经过中间人代理的方式,此处使用 browsermob-proxy
,下载完把库拷贝到项目中:
# 开启代理
server = Server(os.path.join(os.getcwd(), r'browsermob-proxy-2.1.4\bin\browsermob-proxy'))
server.start()
proxy = server.create_proxy()
# chrome加下配置
chrome_options.add_argument('--proxy-server={0}'.format(proxy.proxy))
# 抓包前:
proxy.new_har(options={
'captureContent': True,
'captureHeaders': True
})
# 抓包后过滤特定请求,并把内容保存到本地文件中:
def save_log(date_str, index):
for entry in proxy.har['log']['entries']:
if entry['request']['url'].endswith('path=_search&method=GET'):
log_file_path = os.path.join(out_dir, date_str + '_' + str(index) + '.txt')
with open(log_file_path, "w+", encoding='utf-8') as f:
f.write(str(entry['response']['content'])
.replace("\n", '').replace("\\n", "").replace(' ', ''))
print("日期日志保存完毕:", log_file_path)
复制代码
呕吼,完美,接着补齐剪贴板写入,以及查询日期的构造了:
def set_copy_text(content):
w.OpenClipboard()
w.EmptyClipboard()
w.SetClipboardData(win32con.CF_UNICODETEXT, content)
w.CloseClipboard()
# 构造生成一个从20200709到今天的日期
def init_date_list(begin_date, end_date):
date_list = []
begin_date = datetime.datetime.strptime(begin_date, "%Y%m%d")
end_date = datetime.datetime.strptime(end_date, "%Y%m%d")
while begin_date <= end_date:
date_str = begin_date.strftime("%Y-%m-%d")
date_list.append(date_str)
begin_date += datetime.timedelta(days=1)
return date_list
复制代码
最后,就是每次请求时更新请求参数写入剪贴板,打开代理抓包:
def input_query_content():
try:
for pos, date in enumerate(str_date_list[]):
for index in range(1, 3):
input_div = browser.find_element_by_xpath('//div[@class="ace_content"]')
input_div.click()
action = ActionChains(browser)
print(str(pos + 1) + "、请求日期:" + date + "-" + ("上半天" if (index == 1) else "下半天"))
update_dict_and_proxy(date, index)
action.key_down(Keys.CONTROL).key_down('a').key_up('a').key_up(Keys.CONTROL).perform()
set_copy_text('GET _search' + '\n' + str(search_dict).replace("'", '"'))
time.sleep(1)
action.key_down(Keys.BACKSPACE).key_up(Keys.BACKSPACE).perform()
action.key_down(Keys.CONTROL).key_down('v').key_up('v').key_up(Keys.CONTROL).perform()
submit_button = browser.find_element_by_xpath('//button[@data-test-subj="sendRequestButton"]')
submit_button.click()
time.sleep(20)
save_log(date, index)
except Exception as e:
print(e)
proxy.close()
browser.close()
# 更新请求字典及新建抓包
def update_dict_and_proxy(date_str, index):
gte_str = date_str + 'T00:00:00.000Z' if (index == 1) else date_str + 'T12:00:00.000Z'
lte_str = date_str + 'T12:00:01.000Z' if (index == 1) else date_str + 'T23:59:59.000Z'
search_dict['query']['bool']['filter'][20]['range']['time']['gte'] = gte_str
search_dict['query']['bool']['filter'][21]['range']['time']['lte'] = lte_str
proxy.new_har(options={
'captureContent': True,
'captureHeaders': True
})
复制代码
脚本一跑,就能够开始挂机了,建议找一台空闲的电脑挂着,由于脚本会 占用剪切板,会影响正常工做哦!另外,这里把查询时间划分红上跟下,是想尽量多的查询到所需数据。
会门脚本语言真香啊!把本来的工做委托给了自动化脚本,效率也高一倍,2w条数据只要1分钟,采集完全部数据的耗时骤降至少44个小时,机器还能24小时跑,因此其实只须要两天,还不影响我作 工(mo)做(yu),固然还能够再优化,将脚本部署到多台机子上同时执行,一减再减小,原本两个多星期的活,一天不到干完。还不香?
更幸运的是,实际上有效数据只有600w条,原来渲染事件的埋点是前年10月份才加的,因此单机跑了5个钟就把数据爬完了。
早上写脚本,下午就跑完,中途把统计脚本也写下,这一Part就很Easy了,正则表达式 yyds
!
data_pattern = re.compile('pagePath":"(.*?)".*?"renderCost","value":(.*?)}', re.S)
复制代码
读取文件内容,全文匹配,遍历匹配结果,依次对两个分组作处理,而后把渲染时间写入页面.txt文件中:
for log in log_list:
print("解析文件:", log)
with open(log, 'r', encoding='utf8') as f:
content = f.read()
data_result = data_pattern.findall(content)
if data_result is not None:
for data in data_result:
page_name = ''
page_render_time = 0
page_split = data[0].split('-')
if page_split is not None and len(page_split) > 0:
other_page_split = page_split[-1].split(",")
if other_page_split is not None and len(other_page_split) > 0:
page_name = other_page_split[-1]
else:
page_name = page_split[-1]
else:
other_page_split = data[0].split(",")
if other_page_split is not None and len(other_page_split) > 0:
page_name = other_page_split[-1]
else:
page_name = data[0]
page_render_time = data[1].replace('"', '')
if page_name == 'i':
print(data)
break
cp_utils.write_str_data(page_render_time, os.path.join(page_dir, page_name + ".txt"))
复制代码
写入示例以下:
再接着又是遍历文件夹子,字典存储数据 (页面:平均值),保存统计数据:
def average_time(file_path):
page_dir_split = file_path.split(os.path.sep)
if page_dir_split is not None and len(page_dir_split) > 0:
result_average = 0
render_time_list = cp_utils.load_list_from_file(file_path)
for render_time in render_time_list:
if render_time != '0':
result_average = int((result_average + int(render_time)) / 2)
print(page_dir_split[-1] + "结果计算完毕...")
cp_utils.write_str_data(page_dir_split[-1] + "-" + str(result_average), result_file)
else:
print("异常退出")
复制代码
最后按照倒序输出到文件中:
def order_list(file_path):
time_info_list = cp_utils.load_list_from_file(file_path)
order_list_result = sorted(time_info_list, key=lambda x: int(x.split('-')[-1]), reverse=True)
cp_utils.write_list_data(order_list_result, result_order_file)
复制代码
这样就能够获得页面的平均渲染时间了~固然,后面以为看这个平均数不靠谱,由于影响变量太多了:
设备硬件不一样,加载速度确定是有差距的,有些用户活跃,有些不活跃,还有版本等等...
不靠谱但又想依赖这个全埋点的数据作点什么,想了想又定了另一个方案:
按照页面使用频度排序,优先针对用户经常使用的页面进行优化,好比这个版本优化2个经常使用页面,挑几个典型特定设备进行跟踪,发版一段时间后那这两个页面的新数据跟旧数据作对比,就能够对优化的收益作量化了~
固然,这些是后话了,不敢想象,若是我不是Python玩得还能够的话,该怎么解决这些问题...
第二次小事比起第一件来讲就小巫见大巫了,基友小A,让我帮忙给他搞点行业报告,一会儿给了好几个网址,开头几个还好,就是模拟请求,解析页面,拿个ID啥的拼接,得出真实的PDF下载连接,而后下载。
后面的几个网站,就很鸡贼了,直接把PDF的每一页做为图片贴出来,如:
想把图片转成PDF,若是不会脚本语言,须要一张张图片右键保存到本地,最后用合成工具把图片合成成PDF。
不过巧了,我恰好 会点Python,因此这件事就变成了爬图片,找个图片转PDF的库了,找到个 img2pdf库
,API简单,用着还行,不过若是图片有Alpha通道,会直接报错,因此须要本身去下,简单,用 pillow库
就能够作:
from PIL import Image, ImageFont, ImageDraw
# 批量对RGBA图片进行转换,同时删除无效文件
def remove_pic_alpha(pic):
try:
img = Image.open(pic)
if img.mode == "RGBA":
img = img.convert("RGB")
img.save(pic)
print("转换图片:", pic)
except Exception as e:
print("文件异常,移除:" + pic)
os.remove(pic)
复制代码
简单转换代码以下:
import img2pdf
try:
with open(pdf_path, "wb+") as f:
f.write(img2pdf.convert(pic_path_list))
print("输出PDF文件:", pdf_path)
except Exception as e:
print("发生异常:", pdf_path)
复制代码
后面发现了一个大块头,851页,总共13950个有效的报告,有一些报告的页面结构不是纯图片,而是相似于:文字-图片-文字-图片这样,不想把文字漏掉,能够把它转换成图片,就是利用 pillow库
,按照必定的规则,把文字绘制到一个白色的背景上。
def font2pic(content, pic_path):
# 先转换为列表
content_list = list(content)
i = 30
while i < len(content_list):
content_list.insert(i, "\n")
i += 30
content_after = ''.join(content_list)
im = Image.new("RGB", (960, 720), (255, 255, 255))
dr = ImageDraw.Draw(im)
font = ImageFont.truetype(os.path.join("fonts", "msyh.ttf"), 24)
dr.text((50, 50), content_after, font=font, fill="#000000")
im.save(pic_path)
复制代码
爬取处理页面是,还得记录顺序,而且把它做为图片名,一个爬取的临时文件示例以下:
而后就是遍历文件每一行,文字生成图片,图片连接执行下载(也能够批量下载后替换url),得出的PDF样例以下:
文本的渲染比较无脑,不是很美观,具体的渲染规则还得从长计议下,不过这些也是后话了,数据到手,你想怎么处理,均可以~
当咱们须要作一些重复性任务,且量比较大的时候,脚本的优点就出来了:只要程序足够稳健,24小时不间断跑还不会累,把脚本部署到多台机子上,还能够缩短完成时间。固然,脚本是死的,人是活的,有些问题没考虑到,脚本跑到中途就挂了,因此大型任务还须要引入 告警及日志系统,以便及时跟进及对错误进行排查能够快速定位到问题。
脚本语言除了Python还有不少:Windows的.bat
、Linux的.sh
、C Shell
、JavaScript
、Lua
等。而笔者偏心Python的缘由主要仍是由于它的 类库丰富
,你能想到的基本都能找到对应的第三方库。
人生苦短,我用Python~