在具体的分析或者特征工程之中,常常会遇处处理时间好久的问题,固然必要的优化是必须的。可是显然,数据量上升,计算量过大后,处理时间是必须的此。时,若是有个能够帮助您查看任务进度的进度条,一定能够提升你抓住处理时间去作(磨)别(洋)事(工)。固然逐行打印是不错的选择,但在Jupyter notebook
/JupyterLab
中,这种实践最大的问题是,打印过多,影响整个notebook
的美观程度。python
在此探讨的是5GB级别如下的数据(之上的Spark
分析,有基于Zipplin
的分布式任务精度条),主要环境是Jupyter
下基于pandas
包的分析和特征工程任务。git
tqdm
是基于Python的精度条模块,里面提供了简单的代码行进度条和基于ipywidgets
的notebook内的进度条。因为如今tqdm
相关模块还在开发阶段,可能会用到一些私有对象,以后正式版中可能具体API会有所变化。github
固然,首先咱们得载入模块,在notebook中使用tqdm带的基于Js显示的进度条前,请务必检查是否安装ipywidgets
模块。多线程
from tqdm import tqdm_notebook, _tqdm_notebook _tqdm_notebook.tqdm_notebook.pandas()
其中第一行载入的两个方法的做用分别是:app
tqdm_notebook
:用来包装任何能够iterable
的对象,在使用其元素进行运算结束后统计时间。_tqdm_notebook
:其中含有模块能够处理pandas
的对象。第二行则是重载pandas
里面的对象,提供能够展现精度条的方法。分布式
下面咱们能够尝试直接使用tqdm_notebook
包裹iterable
对象来展现进度条,效果以下:函数
a = list(range(1, 10000)) b = range(1, 10000) _ = [(lambda x: x+1)(i) for i in tqdm_notebook(a)] _ = [(lambda x: x+1)(i) for i in tqdm_notebook(b)]
固然若是仅仅是使用range
也能够使用tqdm
自带的tnrange
:oop
from tqdm._tqdm_notebook import tnrange _ = [(lambda x: x+1)(i) for i in tnrange(1, 10000)]
效果以下:优化
在一些场合,可能写须要多个层级的迭代,此时,咱们能够经过命名每一个层级的迭代器来实现这个个效果。使用desc
参数便可:spa
for i in tnrange(1, 10, desc='i Loop'): for j in tnrange(1, 10000, desc='j Loop'): i+j
固然,若是遇到Loop过多时,可能会依旧出现打印过多的困扰。此时leave
参数是一个不错的推荐。
for i in tqdm_notebook(range(100), desc='i-Loop'): for j in tqdm_notebook(range(10000), desc='j-Loop', leave=False): i+j
固然,在具体计算中,多进程每每是常常会须要的一类扩展(因为Python只能基于一个运算核心进行计算的限制),这时候线程的运算也是常常须要考量的方式。
在使用过程当中,第一个须要注意的问题是,tqdm
每次是在从iterable
对象中取值时,进行更新,而若是在map以前的list中作进度条的包裹,是在未使用map的函数以前统计。因此在进度条完成时,可能还会有一段时间后才真的执行结束。
from multiprocessing import Pool def f(x): return x**32 p = Pool(5) _ = [i for i in p.imap(f, tnrange(1000))]
而一个更好的处理是在使用后标记时间,使用multiprocessing.Pool.imap
做为迭代对象,但这个问题是tqdm
没法识别具体数量,此时,指定tqdm
的迭代次数total
便可。
_ = [i+1 for i in tqdm_notebook(p.imap(f, range(1000)))]
_ = [i for i in tqdm_notebook(p.imap(f, range(3, 1000)), total=997)]
pandas
中的使用,也是很是简单,在重载命令执行后,Serires
、DataFrame
、GroupBy
对象都会拥有progress_apply
方法,用法和apply
一致,直接能够调取进度条。
最后,咱们结合一下以前的多线程和pandas
操做,处理更大规模的数据。基本思路是,将DataFrame
拆成若干组分,最后经过pandas.concat
合并起结果。
def parallelize_dataframe(df, func, n_jobs=3, split_num=10): ## 拆分数据表 df_split = np.array_split(df, split_num) pool = Pool(n_jobs) df_list = [] ## map操做 for df_element in tqdm_notebook(pool.imap(func, df_split), total=10000): df_list.append(df_element) ## reduce操做 df = pd.concat(df_list) ## 关闭进程 pool.close() pool.join() return df
以上实现了基本的基于tqdm
显示处理进度的操做。使用方法以下:
def apply_add_1(df): return df.apply(lambda row: row['sepal_length']+1, axis=1) _ = parallelize_dataframe(iris_df, apply_add_1)
查看了一下进度条,此次预处理我还要花一小时,能够先去冲杯咖啡了。