【Python】【容器 | 迭代对象 | 迭代器 | 生成器 | 生成器表达式 | 协程 | 期物 | 任务】

Python 的 asyncio 相似于 C++ 的 Boost.Asio。html

所谓「异步 IO」,就是你发起一个 IO 操做,却不用等它结束,你能够继续作其余事情,当它结束时,你会获得通知。java

Asyncio 是并发(concurrency)的一种方式。对 Python 来讲,并发还能够经过线程(threading)和多进程(multiprocessing)来实现。python

Asyncio 并不能带来真正的并行(parallelism)。固然,由于 GIL(全局解释器锁)的存在,Python 的多线程也不能带来真正的并行。linux

可交给 asyncio 执行的任务,称为协程(coroutine)。一个协程能够放弃执行,把机会让给其它协程(即 yield from 或 await)。nginx

定义协程 协程的定义,须要使用 async def 语句。git

? code 1 async def do_some_work(x): pass do_some_work 即是一个协程。 准确来讲,do_some_work 是一个协程函数,能够经过 asyncio.iscoroutinefunction 来验证:程序员

? code 1 print(asyncio.iscoroutinefunction(do_some_work)) # True 这个协程什么都没作,咱们让它睡眠几秒,以模拟实际的工做量 :github

? code 1 2 3 async def do_some_work(x): print("Waiting " + str(x)) await asyncio.sleep(x) 在解释 await 以前,有必要说明一下协程能够作哪些事。协程能够:web

? code 1 2 3 4 5 * 等待一个 future 结束redis

  • 等待另外一个协程(产生一个结果,或引起一个异常)
  • 产生一个结果给正在等它的协程
  • 引起一个异常给正在等它的协程 asyncio.sleep 也是一个协程,因此 await asyncio.sleep(x) 就是等待另外一个协程。可参见 asyncio.sleep 的文档:

? code 1 2 sleep(delay, result=None, *, loop=None) Coroutine that completes after a given time (in seconds). 运行协程 调用协程函数,协程并不会开始运行,只是返回一个协程对象,能够经过 asyncio.iscoroutine 来验证:

? code 1 print(asyncio.iscoroutine(do_some_work(3))) # True 此处还会引起一条警告:

? code 1 2 async1.py:16: RuntimeWarning: coroutine 'do_some_work' was never awaited print(asyncio.iscoroutine(do_some_work(3))) 要让这个协程对象运行的话,有两种方式:

? code 1 2 3 * 在另外一个已经运行的协程中用 await 等待它

  • 经过 ensure_future 函数计划它的执行 简单来讲,只有 loop 运行了,协程才可能运行。 下面先拿到当前线程缺省的 loop ,而后把协程对象交给 loop.run_until_complete,协程对象随后会在 loop 里获得运行。

? code 1 2 loop = asyncio.get_event_loop() loop.run_until_complete(do_some_work(3)) run_until_complete 是一个阻塞(blocking)调用,直到协程运行结束,它才返回。这一点从函数名不难看出。 run_until_complete 的参数是一个 future,可是咱们这里传给它的倒是协程对象,之因此能这样,是由于它在内部作了检查,经过 ensure_future 函数把协程对象包装(wrap)成了 future。因此,咱们能够写得更明显一些:

? code 1 loop.run_until_complete(asyncio.ensure_future(do_some_work(3))) 完整代码:

? code 1 2 3 4 5 6 7 8 import asyncio

async def do_some_work(x): print("Waiting " + str(x)) await asyncio.sleep(x)

loop = asyncio.get_event_loop() loop.run_until_complete(do_some_work(3)) 运行结果:

? code 1 2 Waiting 3 <三秒钟后程序结束> 回调 假如协程是一个 IO 的读操做,等它读完数据后,咱们但愿获得通知,以便下一步数据的处理。这一需求能够经过往 future 添加回调来实现。

? code 1 2 3 4 5 6 7 def done_callback(futu): print('Done')

futu = asyncio.ensure_future(do_some_work(3)) futu.add_done_callback(done_callback)

loop.run_until_complete(futu) 多个协程 实际项目中,每每有多个协程,同时在一个 loop 里运行。为了把多个协程交给 loop,须要借助 asyncio.gather 函数。

? code 1 loop.run_until_complete(asyncio.gather(do_some_work(1), do_some_work(3))) 或者先把协程存在列表里:

? code 1 2 coros = [do_some_work(1), do_some_work(3)] loop.run_until_complete(asyncio.gather(*coros)) 运行结果:

? code 1 2 3 4 Waiting 3 Waiting 1 <等待三秒钟> Done 这两个协程是并发运行的,因此等待的时间不是 1 + 3 = 4 秒,而是以耗时较长的那个协程为准。

参考函数 gather 的文档:

gather(*coros_or_futures, loop=None, return_exceptions=False) Return a future aggregating results from the given coroutines or futures.

发现也能够传 futures 给它:

? code 1 2 3 4 futus = [asyncio.ensure_future(do_some_work(1)), asyncio.ensure_future(do_some_work(3))]

loop.run_until_complete(asyncio.gather(*futus)) gather 起聚合的做用,把多个 futures 包装成单个 future,由于 loop.run_until_complete 只接受单个 future。

run_until_complete 和 run_forever 咱们一直经过 run_until_complete 来运行 loop ,等到 future 完成,run_until_complete 也就返回了。

? code 1 2 3 4 5 6 7 8 9 async def do_some_work(x): print('Waiting ' + str(x)) await asyncio.sleep(x) print('Done')

loop = asyncio.get_event_loop()

coro = do_some_work(3) loop.run_until_complete(coro) 输出:

? code 1 2 3 4 Waiting 3 <等待三秒钟> Done <程序退出> 如今改用 run_forever:

? code 1 2 3 4 5 6 7 8 9 10 11 async def do_some_work(x): print('Waiting ' + str(x)) await asyncio.sleep(x) print('Done')

loop = asyncio.get_event_loop()

coro = do_some_work(3) asyncio.ensure_future(coro)

loop.run_forever() 输出:

? code 1 2 3 4 Waiting 3 <等待三秒钟> Done <程序没有退出> 三秒钟事后,future 结束,可是程序并不会退出。run_forever 会一直运行,直到 stop 被调用,可是你不能像下面这样调 stop:

? code 1 2 loop.run_forever() loop.stop() run_forever 不返回,stop 永远也不会被调用。因此,只能在协程中调 stop:

? code 1 2 3 4 5 async def do_some_work(loop, x): print('Waiting ' + str(x)) await asyncio.sleep(x) print('Done') loop.stop() 这样并不是没有问题,假若有多个协程在 loop 里运行:

? code 1 2 3 4 asyncio.ensure_future(do_some_work(loop, 1)) asyncio.ensure_future(do_some_work(loop, 3))

loop.run_forever() 第二个协程没结束,loop 就中止了——被先结束的那个协程给停掉的。 要解决这个问题,能够用 gather 把多个协程合并成一个 future,并添加回调,而后在回调里再去中止 loop。

? code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 async def do_some_work(loop, x): print('Waiting ' + str(x)) await asyncio.sleep(x) print('Done')

def done_callback(loop, futu): loop.stop()

loop = asyncio.get_event_loop()

futus = asyncio.gather(do_some_work(loop, 1), do_some_work(loop, 3)) futus.add_done_callback(functools.partial(done_callback, loop))

loop.run_forever() 其实这基本上就是 run_until_complete 的实现了,run_until_complete 在内部也是调用 run_forever。

Close Loop? 以上示例都没有调用 loop.close,好像也没有什么问题。因此到底要不要调 loop.close 呢? 简单来讲,loop 只要不关闭,就还能够再运行。:

? code 1 2 3 loop.run_until_complete(do_some_work(loop, 1)) loop.run_until_complete(do_some_work(loop, 3)) loop.close() 可是若是关闭了,就不能再运行了:

? code 1 2 3 loop.run_until_complete(do_some_work(loop, 1)) loop.close() loop.run_until_complete(do_some_work(loop, 3)) # 此处异常 建议调用 loop.close,以完全清理 loop 对象防止误用。

gather vs. wait asyncio.gather 和 asyncio.wait 功能类似。

? code 1 2 coros = [do_some_work(loop, 1), do_some_work(loop, 3)] loop.run_until_complete(asyncio.wait(coros)) 具体差异可请参见 StackOverflow 的讨论:Asyncio.gather vs asyncio.wait。

Timer C++ Boost.Asio 提供了 IO 对象 timer,可是 Python 并无原生支持 timer,不过能够用 asyncio.sleep 模拟。

? code 1 2 3 4 5 6 7 async def timer(x, cb): futu = asyncio.ensure_future(asyncio.sleep(x)) futu.add_done_callback(cb) await futu

t = timer(3, lambda futu: print('Done')) loop.run_until_complete(t) 第一部分完。

一直对asyncio这个库比较感兴趣,毕竟这是官网也很是推荐的一个实现高并发的一个模块,python也是在python 3.4中引入了协程的概念。也经过此次整理更加深入理解这个模块的使用

asyncio 是干什么的?

异步网络操做 并发 协程 python3.0时代,标准库里的异步网络模块:select(很是底层) python3.0时代,第三方异步网络库:Tornado python3.4时代,asyncio:支持TCP,子进程

如今的asyncio,有了不少的模块已经在支持:aiohttp,aiodns,aioredis等等 https://github.com/aio-libs 这里列出了已经支持的内容,并在持续更新

固然到目前为止实现协程的不只仅只有asyncio,tornado和gevent都实现了相似功能

关于asyncio的一些关键字的说明:

event_loop 事件循环:程序开启一个无限循环,把一些函数注册到事件循环上,当知足事件发生的时候,调用相应的协程函数

coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会当即执行函数,而是会返回一个协程对象。协程对象须要注册到事件循环,由事件循环调用。

task 任务:一个协程对象就是一个原生能够挂起的函数,任务则是对协程进一步封装,其中包含了任务的各类状态

future: 表明未来执行或没有执行的任务的结果。它和task上没有本质上的区别

async/await 关键字:python3.5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。

看了上面这些关键字,你可能扭头就走了,其实一开始了解和研究asyncio这个模块有种抵触,本身也不知道为啥,这也致使很长一段时间,这个模块本身也基本就没有关注和使用,可是随着工做上用python遇到各类性能问题的时候,本身告诉本身仍是要好好学习学习这个模块。

定义一个协程 复制代码 import time import asyncio

now = lambda : time.time()

async def do_some_work(x): print("waiting:", x)

start = now()

这里是一个协程对象,这个时候do_some_work函数并无执行

coroutine = do_some_work(2) print(coroutine)

建立一个事件loop

loop = asyncio.get_event_loop()

将协程加入到事件循环loop

loop.run_until_complete(coroutine)

print("Time:",now()-start) 复制代码 在上面带中咱们经过async关键字定义一个协程(coroutine),固然协程不能直接运行,须要将协程加入到事件循环loop中

asyncio.get_event_loop:建立一个事件循环,而后使用run_until_complete将协程注册到事件循环,并启动事件循环

建立一个task 协程对象不能直接运行,在注册事件循环的时候,实际上是run_until_complete方法将协程包装成为了一个任务(task)对象. task对象是Future类的子类,保存了协程运行后的状态,用于将来获取协程的结果

复制代码 import asyncio import time

now = lambda: time.time()

async def do_some_work(x): print("waiting:", x)

start = now()

coroutine = do_some_work(2) loop = asyncio.get_event_loop() task = loop.create_task(coroutine) print(task) loop.run_until_complete(task) print(task) print("Time:",now()-start) 复制代码 结果为:

<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex2.py:13>> waiting: 2 <Task finished coro=<do_some_work() done, defined at /app/py_code/study_asyncio/simple_ex2.py:13> result=None> Time: 0.0003514289855957031 建立task后,在task加入事件循环以前为pending状态,当完成后,状态为finished

关于上面经过loop.create_task(coroutine)建立task,一样的能够经过 asyncio.ensure_future(coroutine)建立task

关于这两个命令的官网解释: https://docs.python.org/3/library/asyncio-task.html#asyncio.ensure_future

asyncio.ensure_future(coro_or_future, *, loop=None)¶ Schedule the execution of a coroutine object: wrap it in a future. Return a Task object.

If the argument is a Future, it is returned directly. https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.AbstractEventLoop.create_task

复制代码 AbstractEventLoop.create_task(coro) Schedule the execution of a coroutine object: wrap it in a future. Return a Task object.

Third-party event loops can use their own subclass of Task for interoperability. In this case, the result type is a subclass of Task.

This method was added in Python 3.4.2. Use the async() function to support also older Python versions. 复制代码 绑定回调 绑定回调,在task执行完成的时候能够获取执行的结果,回调的最后一个参数是future对象,经过该对象能够获取协程返回值。

复制代码 import time import asyncio

now = lambda : time.time()

async def do_some_work(x): print("waiting:",x) return "Done after s".format(x)

def callback(future): print("callback:",future.result())

start = now() coroutine = do_some_work(2) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) print(task) task.add_done_callback(callback) print(task) loop.run_until_complete(task)

print("Time:", now()-start) 复制代码 结果为:

复制代码 <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex3.py:13>> <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex3.py:13> cb=[callback() at /app/py_code/study_asyncio/simple_ex3.py:18]> waiting: 2 callback: Done after 2s Time: 0.00039196014404296875 复制代码 经过add_done_callback方法给task任务添加回调函数,当task(也能够说是coroutine)执行完成的时候,就会调用回调函数。并经过参数future获取协程执行的结果。这里咱们建立 的task和回调里的future对象其实是同一个对象

阻塞和await 使用async能够定义协程对象,使用await能够针对耗时的操做进行挂起,就像生成器里的yield同样,函数让出控制权。协程遇到await,事件循环将会挂起该协程,执行别的协程,直到其余的协程也挂起或者执行完毕,再进行下一个协程的执行

耗时的操做通常是一些IO操做,例如网络请求,文件读取等。咱们使用asyncio.sleep函数来模拟IO操做。协程的目的也是让这些IO操做异步化。

复制代码 import asyncio import time

now = lambda :time.time()

async def do_some_work(x): print("waiting:",x) # await 后面就是调用耗时的操做 await asyncio.sleep(x) return "Done after s".format(x)

start = now()

coroutine = do_some_work(2) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) loop.run_until_complete(task)

print("Task ret:", task.result()) print("Time:", now() - start) 复制代码 在await asyncio.sleep(x),由于这里sleep了,模拟了阻塞或者耗时操做,这个时候就会让出控制权。 即当遇到阻塞调用的函数的时候,使用await方法将协程的控制权让出,以便loop调用其余的协程。

并发和并行 并发指的是同时具备多个活动的系统

并行值得是用并发来使一个系统运行的更快。并行能够在操做系统的多个抽象层次进行运用

因此并发一般是指有多个任务须要同时进行,并行则是同一个时刻有多个任务执行

下面这个例子很是形象:

并发状况下是一个老师在同一时间段辅助不一样的人功课。并行则是好几个老师分别同时辅助多个学生功课。简而言之就是一我的同时吃三个馒头仍是三我的同时分别吃一个的状况,吃一个馒头算一个任务

复制代码 import asyncio import time

now = lambda :time.time()

async def do_some_work(x): print("Waiting:",x) await asyncio.sleep(x) return "Done after s".format(x)

start = now()

coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ]

loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))

for task in tasks: print("Task ret:",task.result())

print("Time:",now()-start) 复制代码 运行结果:

复制代码 Waiting: 1 Waiting: 2 Waiting: 4 Task ret: Done after 1s Task ret: Done after 2s Task ret: Done after 4s Time: 4.004154920578003 复制代码 总时间为4s左右。4s的阻塞时间,足够前面两个协程执行完毕。若是是同步顺序的任务,那么至少须要7s。此时咱们使用了aysncio实现了并发。asyncio.wait(tasks) 也可使用 asyncio.gather(*tasks) ,前者接受一个task列表,后者接收一堆task。

关于asyncio.gather和asyncio.wait官网的说明:

https://docs.python.org/3/library/asyncio-task.html#asyncio.gather

复制代码 Return a future aggregating results from the given coroutine objects or futures.

All futures must share the same event loop. If all the tasks are done successfully, the returned future’s result is the list of results (in the order of the original sequence, not necessarily the order of results arrival). If return_exceptions is true, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first raised exception will be immediately propagated to the returned future. 复制代码 https://docs.python.org/3/library/asyncio-task.html#asyncio.wait

复制代码 Wait for the Futures and coroutine objects given by the sequence futures to complete. Coroutines will be wrapped in Tasks. Returns two sets of Future: (done, pending).

The sequence futures must not be empty.

timeout can be used to control the maximum number of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.

return_when indicates when this function should return. 复制代码 协程嵌套 使用async能够定义协程,协程用于耗时的io操做,咱们也能够封装更多的io操做过程,这样就实现了嵌套的协程,即一个协程中await了另一个协程,如此链接起来。

复制代码 import asyncio import time

now = lambda: time.time()

async def do_some_work(x): print("waiting:",x) await asyncio.sleep(x) return "Done after s".format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ]

dones, pendings = await asyncio.wait(tasks)
for task in dones:
    print("Task ret:", task.result())

# results = await asyncio.gather(*tasks)
# for result in results:
#     print("Task ret:",result)

start = now()

loop = asyncio.get_event_loop() loop.run_until_complete(main()) print("Time:", now()-start) 复制代码 若是咱们把上面代码中的:

dones, pendings = await asyncio.wait(tasks)
for task in dones:
    print("Task ret:", task.result())

替换为:

results = await asyncio.gather(*tasks)
for result in results:
    print("Task ret:",result)

这样获得的就是一个结果的列表

不在main协程函数里处理结果,直接返回await的内容,那么最外层的run_until_complete将会返回main协程的结果。 将上述的代码更改成:

复制代码 import asyncio import time

now = lambda: time.time()

async def do_some_work(x): print("waiting:",x) await asyncio.sleep(x) return "Done after s".format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] return await asyncio.gather(*tasks)

start = now()

loop = asyncio.get_event_loop() results = loop.run_until_complete(main()) for result in results: print("Task ret:",result)

print("Time:", now()-start) 复制代码 或者返回使用asyncio.wait方式挂起协程。

将代码更改成:

复制代码 import asyncio import time

now = lambda: time.time()

async def do_some_work(x): print("waiting:",x) await asyncio.sleep(x) return "Done after s".format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] return await asyncio.wait(tasks)

start = now()

loop = asyncio.get_event_loop() done,pending = loop.run_until_complete(main()) for task in done: print("Task ret:",task.result())

print("Time:", now()-start) 复制代码 也可使用asyncio的as_completed方法

复制代码 import asyncio import time

now = lambda: time.time()

async def do_some_work(x): print("waiting:",x) await asyncio.sleep(x) return "Done after s".format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] for task in asyncio.as_completed(tasks): result = await task print("Task ret: ".format(result))

start = now()

loop = asyncio.get_event_loop() loop.run_until_complete(main()) print("Time:", now()-start) 复制代码 从上面也能够看出,协程的调用和组合很是灵活,主要体如今对于结果的处理:如何返回,如何挂起

协程的中止 future对象有几个状态:

Pending Running Done Cacelled 建立future的时候,task为pending,事件循环调用执行的时候固然就是running,调用完毕天然就是done,若是须要中止事件循环,就须要先把task取消。可使用asyncio.Task获取事件循环的task

import asyncio import time

now = lambda :time.time()

async def do_some_work(x): print("Waiting:",x) await asyncio.sleep(x) return "Done after s".format(x)

coroutine1 =do_some_work(1) coroutine2 =do_some_work(2) coroutine3 =do_some_work(2)

tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3), ]

start = now()

loop = asyncio.get_event_loop() try: loop.run_until_complete(asyncio.wait(tasks)) except KeyboardInterrupt as e: print(asyncio.Task.all_tasks()) for task in asyncio.Task.all_tasks(): print(task.cancel()) loop.stop() loop.run_forever() finally: loop.close()

print("Time:",now()-start)

启动事件循环以后,立刻ctrl+c,会触发run_until_complete的执行异常 KeyBorardInterrupt。而后经过循环asyncio.Task取消future。能够看到输出以下:

Waiting: 1 Waiting: 2 Waiting: 2 ^C{<Task finished coro=<do_some_work() done, defined at /app/py_code/study_asyncio/simple_ex10.py:13> result='Done after 1s'>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for= cb=[_wait. ._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for= cb=[_wait. ._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, <Task pending coro=<wait() running at /usr/local/lib/python3.5/asyncio/tasks.py:361> wait_for= >} False True True True Time: 1.0707225799560547 复制代码 True表示cannel成功,loop stop以后还须要再次开启事件循环,最后在close,否则还会抛出异常

循环task,逐个cancel是一种方案,但是正如上面咱们把task的列表封装在main函数中,main函数外进行事件循环的调用。这个时候,main至关于最外出的一个task,那么处理包装的main函数便可。

不一样线程的事件循环 不少时候,咱们的事件循环用于注册协程,而有的协程须要动态的添加到事件循环中。一个简单的方式就是使用多线程。当前线程建立一个事件循环,而后在新建一个线程,在新线程中启动事件循环。当前线程不会被block。

import asyncio from threading import Thread import time

now = lambda :time.time()

def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever()

def more_work(x): print('More work '.format(x)) time.sleep(x) print('Finished more work '.format(x))

start = now() new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() print('TIME: '.format(time.time() - start))

new_loop.call_soon_threadsafe(more_work, 6) new_loop.call_soon_threadsafe(more_work, 3) 复制代码 启动上述代码以后,当前线程不会被block,新线程中会按照顺序执行call_soon_threadsafe方法注册的more_work方法, 后者由于time.sleep操做是同步阻塞的,所以运行完毕more_work须要大体6 + 3

新线程协程 复制代码 import asyncio import time from threading import Thread

now = lambda :time.time()

def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever()

async def do_some_work(x): print('Waiting '.format(x)) await asyncio.sleep(x) print('Done after s'.format(x))

def more_work(x): print('More work '.format(x)) time.sleep(x) print('Finished more work '.format(x))

start = now() new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() print('TIME: '.format(time.time() - start))

asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop) asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)

上述的例子,主线程中建立一个new_loop,而后在另外的子线程中开启一个无限事件循环。 主线程经过run_coroutine_threadsafe新注册协程对象。这样就能在子线程中进行事件循环的并发操做,同时主线程又不会被block。一共执行的时间大概在6s左右。

-- coding:utf-8 --

#14.1 Sentence类初版,单词序列 #栗子14-1 吧句子划分为单词序列 """ import re import reprlib

RE_WORD = re.compile('\w+') class Sentence: def init(self,text): self.text = text self.words = RE_WORD.findall(text) #返回一个字符串列表 def getitem(self, item): return self.words[item] def len(self): #为了完善序列,咱们实现__len__方法,为了让对象可迭代,不必实现这个方法 return len(self.words) def repr(self): return 'Sentence(%s)' % reprlib.repr(self.text) #生成大型数据结构的简略字符串表示 #栗子14-2 测试Sentence是否可迭代 s = Sentence('"The tiem has come,",the walrus said,') print(s) #Sentence('"The tiem ha... walrus said,') for word in s: print(word) ''' The tiem has come the walrus said ''' print(list(s)) #['The', 'tiem', 'has', 'come', 'the', 'walrus', 'said'] #【分析】序列可迭代的缘由 ''' (1)检查内置对象是否实现了__iter__方法,若是实现了就调用他,获取一个迭代器 (2)若是没有实现__iter__方法,可是实现了__getitem__方法,Python会建立一个迭代器,尝试按顺序(从索引0开始)获取元素 (3)若是尝试失败,Python跑出TypeError异常,一般会提示"C object is not itrable" '''

#14.2 可迭代对象和迭代器对比

Iterable 和 Iterator 抽象基类。前者是后者的父类,后者在前者__iter__的基础上,新加了__next__方法。

#Iterator 里有个方法 import abc @classmethod def subclasshook(cls,C): if cls is abc.Iterator: if (any("next" in B.dict for B in C.mro) and any("iter" in B.dict for B in C.mro)): return True return NotImplemented #考虑到Lib/types.py中的建议,以及Lib/_collections_abc.py中的逻辑实现,检查对象x是否为迭代器最好的方式是调用isinstance(x,abc.Iterator)。得益于Iterator.__subclasshook__方法,即便对象x #...所属的类不是Iterator类的真实子类或者虚拟子类,也能这么检查 #使用栗子14-1 中的类,用iter()函数构建迭代器,用next()函数使用迭代器 s3 = Sentence('Pig and Pepper') it = iter(s3) print(it) #<iterator object at 0x0000000002948A58> print(next(it)) #Pig print(next(it)) #and print(next(it)) #Pepper #print(next(it)) #StopIteration print(list(it)) #[] 到头后,迭代器没用了

#由于内置的 iter(...) 函数会对序列作特殊处理,因此第 1 版 Sentence 类能够迭代。接下来要实现标准的可迭代协议 #使用迭代器模式实现Sentence类 import re import reprlib

RE_WORD = re.compile('\w+')

class Sentence: def init(self,text): self.text = text self.words = RE_WORD.findall(text) def repr(self): return 'Sentence(%s)' % reprlib.repr(self.text) str = repr def iter(self): #与前一版相比,这里只多了一个 iter 方法。这一版没有 getitem 方法,为的是明确代表这个类能够迭代,由于实现了 iter 方法。 return SsentenceIterator(self.words) #根据可迭代协议, iter 方法实例化并返回一个迭代器

class SsentenceIterator: def init(self,words): self.words = words #SentenceIterator 实例引用单词列表 self.index = 0 # self.index 用于肯定下一个要获取的单词 def next(self): try: word = self.words[self.index] except IndexError: raise StopIteration() self.index += 1 return word def iter(self): return self print(list(iter(s3))) #['Pig', 'and', 'Pepper'] 要想再次迭代,要从新构建迭代器

#【注意】注意,对这个示例来讲,其实不必在 SentenceIterator 类中实现 iter 方法,不过这么作是对的,由于迭代器应该实现 nextiter 两个方法,并且这么作能让迭代器经过 issubclass(SentenceInterator, abc.Iterator) 测试。若是让SentenceIterator 类继承 abc.Iterator 类,那么它会继承 abc.Iterator.iter #... 这个具体方法

#【注意】 ''' 把Sentence变成迭代器:坏主意 迭代器模式可用来: 访问一个聚合对象的内容而无需暴露它的内部表示 支持对聚合对象的多种遍历 为遍历不一样的聚合结构提供一个统一的接口(即支持多态迭代) 为了“支持多种遍历”,必须能从同一个可迭代的实例中获取多个独立的迭代器,并且各个 迭代器要能维护自身的内部状态,所以这一模式正确的实现方式是,每次调用 iter(my_iterable) 都新建一个独立的迭代器。这就是为何这个示例须要定义 SentenceIterator 类。 可迭代的对象必定不能是自身的迭代器。也就是说,可迭代的对象必须实现 iter 方法,但不能实现 next 方法 '''

#栗子14-5 使用生成器函数实现Sentence类 import re import reprlib RE_WORD = '\w+' class Sentence: def init(self,text): self.text = text self.words = RE_WORD.findall(self.text) def repr(self): return 'Sentence(%s)' % reprlib.repr(self.text) def iter(self): #迭代器实际上是生成器对象,每次调用 iter 方法都会自动建立,由于这里的 iter 方法是生成器函数 for word in self.words: yield word return #生成器函数的工做原理只要 Python 函数的定义体中有 yield 关键字,该函数就是生成器函数。调用生成器函数时,会返回一个生成器对象。也就是说,生成器函数是生成器工厂 def gen_123(): yield 1 yield 2 yield 3 print(gen_123) #<function gen_123 at 0x0000000002093E18> print(gen_123()) #<generator object gen_123 at 0x000000000367A990> for i in gen_123(): print(i) ''' 1 2 3 ''' g = gen_123() print(next(g)) #1 print(next(g)) #2 print(next(g)) #3 print(next(g)) #StopIteration

#栗子14-6 运行时打印消息的生成器函数

def gen_AB(): print('start') yield 'A' print('continue') yield 'B' print('END') for c in gen_AB(): print ('--》' ,c)

''' start --》 A continue --》 B END '''

#这一版 Sentence 类比前一版简短多了,可是还不够懒惰。现在,人们认为惰性是好的特质,至少在编程语言和 API 中是如此。惰性实现是指尽量延后生成值。这样作能节省# 内存,并且或许还能够避免作无用的处理

#栗子14-9 使用生成器表达式实现Sentence类 import re import reprlib RE_WORD = re.compile('\w+') class Sentenct: def init(self,text): self.text = text def repr(self): return 'Sentence(%s)' % reprlib.repr(self.text) str = repr def iter(self): return (match.group() for match in RE_WORD.finditer(self.text))

#【备注】若是生成器表达式要分红多行写,我倾向于定义生成器函数,以便提升可读性。此外,生成器函数有名称,所以能够重用

#下面咱们在控制台中对稍后实现的 ArithmeticProgression 类作一些测试,如示例 14-10 所示。这里,构造方法的签名是 ArithmeticProgression(begin, step[,end])。 range() 函数与这个 ArithmeticProgression 类的做用相似,不过签名是range(start, stop[, step])。我选择使用不一样的签名是由于,建立等差数列时必须指定公差(step),而末项(end)是可选的。我还把参数的名称由 start/stop 改为了begin/end,以明确代表签名不一样。在示例 14-10 里的每一个测试中,我都调用了 list()

...函数,用于查看生成的值

#栗子14-10 ArithmeticProgression class ArithmeticProgression: def init(self,begin,step,end=None): self.begin = begin self.step = step self.end = end def iter(self): result = type(self.begin + self.step)(self.begin) forever = self.end is None index = 0 while forever or result < self.end: yield result index += 1 result = self.begin + self.step * index

#测试 ap = ArithmeticProgression(0,1,3) print(list(ap)) ap = ArithmeticProgression(1,.5,3) print(list(ap)) ap = ArithmeticProgression(0,1/3,1) print(list(ap)) from fractions import Fraction ap = ArithmeticProgression(0,Fraction(1,3),1) print(list(ap)) from decimal import Decimal ap = ArithmeticProgression(0,Decimal('.1'),.3) print(list(ap)) ''' [0, 1, 2] [1.0, 1.5, 2.0, 2.5] [0.0, 0.3333333333333333, 0.6666666666666666] [Fraction(0, 1), Fraction(1, 3), Fraction(2, 3)] [Decimal('0'), Decimal('0.1'), Decimal('0.2')] 1/3 '''

#示例 14-12 中定义了一个名为 aritprog_gen 的生成器函数,做用与ArithmeticProgression 类同样,只不过代码量更少。若是把ArithmeticProgression 类换成 aritprog_gen 函数,示例 14-10 中的测试也都能经过 #栗子14-12 aritprog_gen生成器函数 def aritprog_gen(begin,step,end=None): result = type(begin + step)(begin) forever = end is None index = 0 while forever or result < end: yield result index += 1 result = begin + step*index

#itertools模块提供了19个生成器函数,结合起来使用能实现不少有趣的用法. #itertools.count函数返回的生成器能生成多个数。若是不传入参数,itertools.count函数会生成从0开始的整数数列. import itertools gen = itertools.count(1,.5) print(next(gen)) print(next(gen)) print(next(gen)) ''' 1 1.5 2.0 ''' #不过, itertools.takewhile 函数则不一样,它会生成一个使用另外一个生成器的生成器,在指定的条件计算结果为 False 时中止。所以,能够把这两个函数结合在一块儿使用,编写下述代码: gen = itertools.takewhile(lambda n : n < 3, itertools.count(1,.5)) print(list(gen)) #[1, 1.5, 2.0, 2.5]

#aritprog_gen函数还能够这么实现 #栗子14-13 利用 takewhile 和 count 函数,写出的代码流畅而简短

import itertools def aritprog_gen(begin,step,end=None): first = type(begin + step)(begin) ap_gen = itertools.count(first,step) if end is not None: ap_gen = itertools.takewhile(lambda n : n<end,ap_gen) return ap_gen """

【流畅的Python】【控制流程】【上下文管理器】 参考“上下文管理器”

【流畅的Python】【控制流程】【协程】 """

16.2 用过协程的生成器的基本行为

#例子16-1 多是协程最简单的使用演示 def simple_coroutine(): print('-> coroiutine started') x = yield print('-> coroutine recived:',x) my_coro = simple_coroutine() print(my_coro) #<generator object simple_coroutine at 0x10900f9e8> #print(next(my_coro)) ''' → coroiutine started None ''' #my_coro.send(42) ''' → coroutine recived: 42 StopIteration

''' #【备注】协程能够身处四个状态中的一个。当前状态可使用inspect.getgeneratorstate()函数肯定,该函数会返回下述字符串中的一个。 #... 'GEN_CREATED'等待开始执行。 'GEN_RUNNING' 解释器正在执行 'GEN_SUSPENDED'在yield表达式处暂停 'GEN_CLOSED'执行结束。 #...由于send方法的参数会称为暂停的yield表达式的值,因此,仅当协程处于暂停状态时才能调用send方法。不过,若是协程还没激活(即,状态是'GEN_CREATED'),状况就不一样了。所以,始终要调用next()激活协程-也能够调用my_coro.sen(None),效果同样 #...若是建立协程对象后当即把None以外的值发给他,会出现下述错误: my_coro = simple_coroutine() my_coro.send(1729) #TypeError: can't send non-None value to a just-started generator

例子16-2 产出两个值的协程

def simple_coro2(a): print('-> Started:a = ',a) b = yield a print('-> Received:b = ',b) c = yield (a+b) print('-> Received:c = ',c) my_coro2 = simple_coro2(14) from inspect import getgeneratorstate print(getgeneratorstate(my_coro2)) #GEN_CREATED print(next(my_coro2)) ''' → Started:a = 14 14

''' print(getgeneratorstate(my_coro2)) #GEN_SUSPENDED print(my_coro2.send(28)) ''' → Started:b = 28 42 ''' print(my_coro2.send(99)) ''' → Received:c = 99 Traceback (most recent call last): File "/Users/suren/PycharmProjects/fluentPython/kongzhiliucheng/xiecheng.py", line 47, in my_coro2.send(99) StopIteration ''' print(getgeneratorstate(my_coro2)) #'GEN_CLOSED'

#例子16-3 一个计算移动平均值的协程 def averager(): total = 0.0 count = 0 average = None while True:#这个无限循环代表,只要调用方不断把值发给这个协程,它就会一直接收值,而后生成结果。仅当调用方在协程上调用.close()方法,或者没有对协程引用而被垃圾回收程序回收时,这个协程才终止 term = yield average total += term count += 1 average = total/count coro_avg = averager() print(next(coro_avg)) #None print(coro_avg.send(10)) #10.0 print(coro_avg.send(30)) #20.0 print(coro_avg.send(5)) #15.0

#16.4 预激程序的装饰器 from functools import wraps

def coroutine(func): @wraps(func) def primer(*args,**kwargs): gen = func(*args,**kwargs) next(gen) return gen return primer

@coroutine def averager(): total = 0.0 count = 0 average = None while True: term = yield average total += term count += 1 average = total/count

coro_avg = averager() from inspect import getgeneratorstate print(getgeneratorstate(coro_avg)) #GEN_SUSPENDED print(coro_avg.send(10)) #10.0 print(coro_avg.send(30)) #20.0 print(coro_avg.send(5)) #15.0

#16.5 终止协程和异常处理 #例子 16-7 未处理的异常会致使协程终止 from functools import wraps def coroutine(func): @wraps(func) def primer(*args,**kwargs): gen = func(*args,**kwargs) next(gen) return gen return primer

@coroutine def averager(): total = 0.0 count = 0 average = None while True: term = yield average total += term count += 1 average = total/count coro_avg = averager() print(coro_avg.send(40)) #40.0 print(coro_avg.send(50)) #45.0 print(coro_avg.send('spam')) #TypeError: unsupported operand type(s) for +=: 'float' and 'str'。此时,因为在协程里没有处理异常,协程会终止。若是试图从新激活协程,会抛出 print(coro_avg.send(60)) #不会处理

例子16-8 在协程中处理异常代码

class DemoException(Exception): '''为此次演示定义的异常类型''' def demo_exc_handling(): print('-> coroutine started') while True: try: x = yield except DemoException: print('*** DemoException handled.Continuing...') else: #若是没有异常,则显示接收到的值 print('->coroutine received:{!s}'.format(x)) raise RuntimeError('This line should never run.') #这一行永远不会执行,由于只有未处理的异常才会终止那个无限循环

#激活和关闭demo_exc_handling,没有异常 exc_coro = demo_exc_handling() next(exc_coro) #coroutine started exc_coro.send(11) #->coroutine received:11 exc_coro.send(22) #->coroutine received:22 exc_coro.close() from inspect import getgeneratorstate print(getgeneratorstate(exc_coro)) #GEN_CLOSED

#把DemoException异常传入demo_exc_handling不会致使协程停止 exc_coro = demo_exc_handling() next(exc_coro) #-> coroutine started exc_coro.send(11) #->coroutine received:11 exc_coro.throw(DemoException) #*** DemoException handled.Continuing... print(getgeneratorstate(exc_coro)) #GEN_SUSPENDED

#若是没法处理传入的异常,协程会终止 exc_coro = demo_exc_handling() next(exc_coro) #-> coroutine started exc_coro.send(11) print(exc_coro.throw(ZeroDivisionError)) ''' Traceback (most recent call last): File "/Users/suren/PycharmProjects/fluentPython/kongzhiliucheng/xiecheng.py", line 172, in print(exc_coro.throw(ZeroDivisionError)) File "/Users/suren/PycharmProjects/fluentPython/kongzhiliucheng/xiecheng.py", line 145, in demo_exc_handling x = yield ZeroDivisionError ''' print(getgeneratorstate(exc_coro)) #GEN_CLOSED

#例子16-12 使用try/finally 块在协程终止时执行操做 class DemoException(Exception): '''为此次演示定义的异常类型''' def demo_finally(): print('-> coroutine started') try: while True: try: x = yield except DemoException: print('*** DemoException handled.Continuing.....') else: print('-> coroutine received:{!s}'.format(x)) finally: print('->coroutine ending')

激活和关闭demo_exc_handling,没有异常

exc_coro = demo_finally() next(exc_coro) #coroutine started 换行后打印 →coroutine ending exc_coro.send(11) #->coroutine received:11 换行后打印 →coroutine ending exc_coro.send(22) #->coroutine received:22 换行后打印 →coroutine ending exc_coro.close() from inspect import getgeneratorstate print(getgeneratorstate(exc_coro)) #GEN_CLOSED 换行后打印 →coroutine ending

把DemoException异常传入demo_exc_handling不会致使协程停止

exc_coro = demo_finally() next(exc_coro) #-> coroutine started 换行后打印 →coroutine ending exc_coro.send(11) #->coroutine received:11 换行后打印 →coroutine ending exc_coro.throw(DemoException) #*** DemoException handled.Continuing... 换行后打印 →coroutine ending print(getgeneratorstate(exc_coro)) #GEN_SUSPENDED 换行后打印 →coroutine ending

#若是没法处理传入的异常,协程会终止 exc_coro = demo_finally() next(exc_coro) #-> coroutine started 换行后打印 →coroutine ending exc_coro.send(11) #->coroutine received:11 换行后打印 →coroutine ending

exc_coro.throw(ZeroDivisionError) ''' Traceback (most recent call last): File "/Users/suren/PycharmProjects/fluentPython/kongzhiliucheng/xiecheng.py", line 220, in print(exc_coro.throw(ZeroDivisionError)) →coroutine ending File "/Users/suren/PycharmProjects/fluentPython/kongzhiliucheng/xiecheng.py", line 192, in demo_finally x = yield ZeroDivisionError →coroutine ending ''' from inspect import getgeneratorstate print(getgeneratorstate(exc_coro)) #什么也没打印,由于没有走到这一行

#让协程返回值 #例子16-14 from collections import namedtuple

Result = namedtuple('Result','count average')

def averager(): total = 0.0 count = 0 average = None while True: term = yield if term is None: break total += term count += 1 average = total/count return Result(count,average)

coro_avg = averager() next(coro_avg) #无产出 coro_avg.send(10) #无产出 coro_avg.send(30) #无产出 coro_avg.send(6.5) #无产出 #coro_avg.send(None) #StopIteration: Result(count=3, average=15.5)

#例子16-15 from collections import namedtuple Result = namedtuple('Result','count average')

def averager(): total = 0.0 count = 0 average = None while True: term = yield if term is None: break total += term count += 1 average = total/count return Result(count,average)

coro_avg = averager() next(coro_avg) #无产出 coro_avg.send(10) #无产出 coro_avg.send(30) #无产出 coro_avg.send(6.5) #无产出 try: coro_avg.send(None) except StopIteration as exc: result = exc.value print(result) #Result(count=3, average=15.5)

"""

#对比 yield from 和 yield def gen(): yield from 'AB' yield from range(1,3) print(list(gen())) #['A', 'B', 1, 2]

def gen(): yield 'AB' yield range(1,3) print(list(gen())) #['AB', range(1, 3)]

#16.7 yield from def gen(): yield from 'AB' yield from range(1,3) print(list(gen())) #['A', 'B', 1, 2]

#栗子16-16 使用yield from 链接可迭代对象 def chain(*iterables): for it in iterables: yield from it s = 'ABC' t = tuple(range(3)) print(list(chain(s,t))) #['A', 'B', 'C', 0, 1, 2] #栗子16-17 说明yield from 用法 from collections import namedtuple

Result = namedtuple('Result','count average') def averager(): #子生成器 total = 0.0 count = 0 average = None while True: term = yield if term is None: break total += term count += 1 average = total/count return Result(count,average)

#委派生成器 def grouper(results,key): while True: results[key] = yield from averager() #客户端代码,即调用方 def main(data): results = for key,values in data.items(): group = grouper(results,key) next(group) for value in values: group.send(value) group.send(None)

print(results)
report(results)

#输出报告 def report(results): for key,result in sorted(results.items()): group,unit = key.split(';') print(' averaging '.format(result.count,group,result.average,unit)) data = {'girls;kg': [40.9,38.5,44.3], 'girls;m': [1.6,1.51,1.4], 'boys;kg': [50.6,60,70.33], 'boys;m': [1.7,1.89,1.78], }

if name == 'main': main(data) #[解析] ''' 下面简要说明示例 16-17 的运做方式,还会说明把 main 函数中调用 group.send(None) 那一行代码(带有“重要! ”注释的那一行)去掉会发生什么事。 外层 for 循环每次迭代会新建一个 grouper 实例,赋值给 group 变量; group 是委 派生成器。 调用 next(group),预激委派生成器 grouper,此时进入 while True 循环,调用 子生成器 averager 后,在 yield from 表达式处暂停。 内层 for 循环调用 group.send(value),直接把值传给子生成器 averager。同 时,当前的 grouper 实例(group)在 yield from 表达式处暂停。 内层循环结束后, group 实例依旧在 yield from 表达式处暂停,所以, grouper 函数定义体中为 results[key] 赋值的语句尚未执行。 本文档由Linux公社 www.linuxidc.com 整理若是外层 for 循环的末尾没有 group.send(None),那么 averager 子生成器永远 不会终止,委派生成器 group 永远不会再次激活,所以永远不会为 results[key] 赋值。 外层 for 循环从新迭代时会新建一个 grouper 实例,而后绑定到 group 变量上。前 一个 grouper 实例(以及它建立的还没有终止的 averager 子生成器实例)被垃圾回 收程序回收。 ''' #打印结果 ''' 3boys averaging (男孩体重的平均值)kg 3boys averaging (男孩身高的平均值)m 3girls averaging (女孩体重的平均值)kg 3girls averaging (女孩身高的平均值)m '''

#16.9 使用案例,使用xiecheng作离散事件仿真 from collections import namedtuple

Event = namedtuple('Event','time proc action') #实现各两出租车的活动 def taxi_process(ident,trips,start_time): time = yield Event(start_time,ident,'leave garage') for i in range(trips): time = yield Event(time,ident,'pick up passenger') time = yield Event(time,ident,'drop off passenger') yield Event(time,ident,'going home') ''' #驱动taxi_process xiecheng taxi = taxi_process(ident=13,trips=2,start_time=0) result1 = next(taxi) print(result1) #Event(time=0, proc=13, action='leave garage') result2 = taxi.send(result1.time + 7) print(result2) #Event(time=7, proc=13, action='pick up passenger') result3 = taxi.send(result2.time + 23) print(result3) #Event(time=30, proc=13, action='drop off passenger') result4 = taxi.send(result3.time + 5) print(result4) #Event(time=35, proc=13, action='pick up passenger') result5 = taxi.send(result4.time + 48) print(result5) #Event(time=83, proc=13, action='drop off passenger') result6 = taxi.send(result5.time + 1) print(result6) #Event(time=84, proc=13, action='going home') #result7 = taxi.send(result6.time + 10) #print(result7) #StopIteration ''' #为了实例化 Simulator 类, taxi_sim.py 脚本的 main 函数构建了一个 taxis 字典,以下所示 DEPARTURE_INTERVAL = 5 num_taxis = 3 taxis = {i:taxi_process(i,(i+1)2,iDEPARTURE_INTERVAL) for i in range(num_taxis)} #sim = Simulator(taxis) ''' DEPARTURE_INTERVAL 的值是 5;若是 num_taxis 的值与前面的运行示例同样也是 3, 这三行代码的做用与下述代码同样: taxis = {0: taxi_process(ident=0, trips=2, start_time=0), 1: taxi_process(ident=1, trips=4, start_time=5), 2: taxi_process(ident=2, trips=6, start_time=10)} sim = Simulator(taxis) '''

Simulator,一个简单的离散事件仿真类;关注的重点是

''' 所以, taxis 字典的值是三个参数不一样的生成器对象。例如, 1 号出租车从 start_time=5 时开始,寻找四个乘客。构建 Simulator 实例只需这个字典参数。 Simulator.init 方法如示例 16-22 所示。 Simulator 类的主要数据结构以下。 self.events PriorityQueue 对象,保存 Event 实例。元素能够放进(使用 put 方 法) PriorityQueue 对象中,而后按 item[0](即 Event 对象的 time 属性)依序取出 (使用 get 方法)。 self.procs   一个字典,把出租车的编号映射到仿真过程当中激活的进程(表示出租车的生成器对 象)。这个属性会绑定前面所示的 taxis 字典副本。 示例 16-22 taxi_sim.py: Simulator 类的初始化方法 class Simulator: def init(self, procs_map): self.events = queue.PriorityQueue() ➊ self.procs = dict(procs_map) ➋ ❶ 保存排定事件的 PriorityQueue 对象,按时间正向排序。 ❷ 获取的 procs_map 参数是一个字典(或其余映射),但是又从中构建一个字典,建立 本地副本,由于在仿真过程当中,出租车回家后会从 self.procs 属性中移除,而咱们不想 修改用户传入的对象。 优先队列是离散事件仿真系统的基础构件:建立事件的顺序不定,放入这种队列以后,可 以按照各个事件排定的时间顺序取出。例如,可能会把下面两个事件放入优先队列: Event(time=14, proc=0, action='pick up passenger') Event(time=11, proc=1, action='pick up passenger') 这两个事件的意思是, 0 号出租车 14 分钟后拉到第一个乘客,而 1 号出租车(time=10 时出发) 1 分钟后(time=11)拉到乘客。若是这两个事件在队列中,主循环从优先队列 中获取的第一个事件将是 Event(time=11, proc=1, action='pick up passenger')。 下面分析这个仿真系统的主算法——Simulator.run 方法。在 main 函数中,实例化 Simulator 类以后当即就调用了这个方法,以下所示: sim = Simulator(taxis) sim.run(end_time) 本文档由Linux公社 www.linuxidc.com 整理Simulator 类带有注解的代码清单在示例 16-23 中,下面先概述 Simulator.run 方法实 现的算法。 (1) 迭表明示各辆出租车的进程。

  1. 在各辆出租车上调用 next() 函数,预激协程。这样会产出各辆出租车的第一个事 件。
  2. 把各个事件放入 Simulator 类的 self.events 属性(队列)中。 (2) 知足 sim_time < end_time 条件时,运行仿真系统的主循环。
  3. 检查 self.events 属性是否为空;若是为空,跳出循环。
  4. 从 self.events 中获取当前事件(current_event),即 PriorityQueue 对象 中时间值最小的 Event 对象。
  5. 显示获取的 Event 对象。 d.获取 current_event 的 time 属性,更新仿真时间。 e.把时间发给 current_event 的 proc 属性标识的协程,产出下一个事件 (next_event)。 f.把 next_event 添加到 self.events 队列中,排定 next_event。 Simulator 类完整的代码如示例 16-23 所示。 run 方法 class Simulator: def init(self, procs_map): self.events = queue.PriorityQueue() self.procs = dict(procs_map) def run(self, end_time): ➊ "排定并显示事件,直到时间结束"

排定各辆出租车的第一个事件

for _, proc in sorted(self.procs.items()): ➋ first_event = next(proc) ➌ self.events.put(first_event) ➍

这个仿真系统的主循环

sim_time = 0 ➎ while sim_time < end_time: ➏ if self.events.empty(): ➐ print('*** end of events ') break current_event = self.events.get() ➑ 本文档由Linux公社 www.linuxidc.com 整理sim_time, proc_id, previous_action = current_event ➒ print('taxi:', proc_id, proc_id * ' ', current_event) ➓ active_proc = self.procs[proc_id] ⓫ next_time = sim_time + compute_duration(previous_action) ⓬ try: next_event = active_proc.send(next_time) ⓭ except StopIteration: del self.procs[proc_id] ⓮ else: self.events.put(next_event) ⓯ else: ⓰ msg = ' end of simulation time: events pending ***' print(msg.format(self.events.qsize())) ❶ run 方法只须要仿真结束时间(end_time)这一个参数。 ❷ 使用 sorted 函数获取 self.procs 中按键排序的元素;用不到键,所以赋值给 _。 ❸ 调用 next(proc) 预激各个协程,向前执行到第一个 yield 表达式,作好接收数据的 准备。产出一个 Event 对象。 ❹ 把各个事件添加到 self.events 属性表示的 PriorityQueue 对象中。如示例 16-20 中的运行示例,各辆出租车的第一个事件是 'leave garage'。 ❺ 把 sim_time 变量(仿真钟)归零。 ❻ 这个仿真系统的主循环: sim_time 小于 end_time 时运行。 ❼ 若是队列中没有未完成的事件,退出主循环。 ❽ 获取优先队列中 time 属性最小的 Event 对象;这是当前事件(current_event)。 ❾ 拆包 Event 对象中的数据。这一行代码会更新仿真钟 sim_time,对应于事件发生时 的时间。 这一般是离散事件仿真:每次循环时仿真钟不会以固定的量推动,而是根据各个事件持续的时间推动。 ❿ 显示 Event 对象,指明是哪辆出租车,并根据出租车的编号缩进。 ⓫ 从 self.procs 字典中获取表示当前活动的出租车的协程。 ⓬ 调用 compute_duration(...) 函数,传入前一个动做(例如, 'pick up passenger'、 'drop off passenger' 等),把结果加到 sim_time 上,计算出下一次 活动的时间。 ⓭ 把计算获得的时间发给出租车协程。协程会产出下一个事件(next_event),或者抛 出 StopIteration 异常(完成时)。 ⓮ 若是抛出了 StopIteration 异常,从 self.procs 字典中删除那个协程。 16 16 本文档由Linux公社 www.linuxidc.com 整理⓯ 不然,把 next_event 放入队列中。 ⓰ 若是循环因为仿真时间到了而退出,显示待完成的事件数量(有时可能碰巧是零)。 注意,示例 16-23 中的 Simulator.run 方法有两处用到了第 15 章介绍的 else 块,并且 都不在 if 语句中。 主 while 循环有一个 else 语句,报告仿真系统因为到达结束时间而结束,而不是由 于没有事件要处理而结束。 靠近主 while 循环底部那个 try 语句把 next_time 发给当前的出租车进程,尝试获 取下一个事件(next_event),若是成功,执行 else 块,把 next_event 放入 self.events 队列中。 我以为,若是没有这两个 else 块, Simulator.run 方法的代码会有点难以阅读。 这个示例的要旨是说明如何在一个主循环中处理事件,以及如何经过发送数据驱动协程。 这是 asyncio 包底层的基本思想,咱们在第 18 章会学习这个包。 '''

【流畅的Python】【控制流程】【使用期物处理并发】

-- coding:utf-8 --

""" #热身练习 ''' import requests,sys,os BASE_URL = 'http://images.cnblogs.com/cnblogs_com/suren2017/1102909'

cc = 'T_JINGSE2' url = '/.PNG'.format(BASE_URL,cc=cc.lower())

resp = requests.get(url) print(resp.content)

path = os.path.join(sys.path[0],'downloads','t_jingse2.PNG') path = path.replace('\','/') with open(path,'wb') as fp: fp.write(resp.content)

'''

#例子 有些时候我保存在博客园的资源图片地址会失效,下面这个例子是最近一次生效的,再以后是以前的

import os,time,sys,requests from enum import Enum import collections from collections import namedtuple import asyncio from concurrent import futures from tqdm import tqdm

Result = namedtuple("Result","status cc") HTTPStatus = Enum('Status','ok not_found error')

BASE_URL = 'https://images2018.cnblogs.com/blog/1239321/201808'

DEST_DIR = 'downloads'

MAX_WORKERS = 20

POP20_CC1 = 'aaaa1239321-20180802153722539-931669752 bbbbb1239321-20180802154629131-632652627 1239321-20180802154726137-51175796 '
'1239321-20180802155107376-2107990591 1239321-20180802155118312-440576328 1239321-20180802155128983-1994377345 '
'1239321-20180802155159195-1272984827 1239321-20180802155208608-1491921327 1239321-20180802155214231-346476157 '
'1239321-20180802155220096-549202876 1239321-20180802155224859-827179894 1239321-20180802155231862-1759423827 '
'1239321-20180802155622645-1918814285 1239321-20180802155627878-380113558 1239321-20180802155635750-1829416793 '
'1239321-20180802155641947-721209039 1239321-20180802155712392-1928969512 1239321-20180802155717674-571154640 '
'1239321-20180802155724893-1505320081 1239321-20180802155729597-1637208554 1239321-20180802155734238-2129250463 '
'1239321-20180802155738782-1538050338 1239321-20180802155743498-1050604720 1239321-20180802155748466-57033362 '
'1239321-20180802155754337-1037641153'.split() POP20_CC = 'aaaa1239321-20180802153722539-931669752 bbbbb1239321-20180802154629131-632652627 1239321-20180802153722539-931669752'.split()

def save_flag(img,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) with open(path,'wb') as fp: fp.write(img)

def get_flag(cc): url = '/.png'.format(BASE_URL,cc=cc) resp = requests.get(url) if 'PNG' in resp.text: return resp.content elif 'not found' in resp.text or 'not exist' in resp.text: resp.status_code = 404 resp.raise_for_status()

def download_one(cc,verbose=False): try: image = get_flag(cc) except requests.exceptions.HTTPError as exc: res = exc.response if res.status_code == 404: status = HTTPStatus.not_found msg = 'not found' res.reason = 'NOT FOUND' raise else: raise else: save_flag(image,cc) status = HTTPStatus.ok msg = 'OK' if verbose: print (cc,msg) return Result(status,cc) def download_many(cc_list,verbose): counter = collections.Counter() with futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: to_do_map = for cc in sorted(cc_list): future = executor.submit(download_one,cc,verbose) to_do_map[future] = cc done_iter = futures.as_completed(to_do_map) if verbose: done_iter = tqdm(done_iter,total=len(cc_list)) for future in done_iter: try: res = future.result() except requests.exceptions.HTTPError as exc: error_msg = 'HTTP - ' error_msg = error_msg.format(res=exc.response) except requests.exceptions.ConnectionError as exc: error_msg = 'Connection error' else: error_msg = '' status = res.status if error_msg: status = HTTPStatus.error counter[status] += 1 if not verbose and error_msg: print ('*** error for :'.format(to_do_map[future],error_msg)) return counter

def main(): t0 = time.time() count = download_many(POP20_CC,verbose=False) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print (msg.format(count,elapsed))

if name == 'main':

main()

#栗子17-2 依照顺序下载的脚本 ''' 准备工做:个人博客园相册里有存了20张图片 ''' import os import sys import time

import requests

BASE_URL = 'http://images.cnblogs.com/cnblogs_com/suren2017/1102909'

POP20_CC = 'T_JINGSE2 T_JINGSE3 T_JINGSE4 T_JINGSE5 t_jingse6 t_jingse7 t_jingse8 t_jingse9 t_jingse10 t_jingse11 t_jingse12 t_jingse13 T_jingse14 T_jingse15 T_jingse16 T_jingse17 T_jingse18 T_jingse19 T_jingse20'.split()

DEST_DIR = 'downloads'

def save_flag(img,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) path = path.replace('\','/') with open(path,'wb') as fp: fp.write(img)

def get_flag(cc): url = '/.PNG'.format(BASE_URL,cc=cc.lower()) resp = requests.get(url) return resp.content

def show(text): print(text,end=' ') sys.stdout.flush()

def download_many(cc_list): for cc in sorted(cc_list): image = get_flag(cc) show(cc) save_flag(image,cc.lower() + '.PNG') return len(cc_list)

def main(download_many): t0 = time.time() count = download_many(POP20_CC) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print(msg.format(count,elapsed))

if name == 'main': main(download_many) #19 flags downloaded in 1.25s

#栗子17-3 使用futures.ThreadPoolExecutor类实现多线程下载的脚本

import os import sys import time

import requests

BASE_URL = 'http://images.cnblogs.com/cnblogs_com/suren2017/1102909'

POP20_CC = 'T_JINGSE2 T_JINGSE3 T_JINGSE4 T_JINGSE5 t_jingse6 t_jingse7 t_jingse8 t_jingse9 t_jingse10 t_jingse11 t_jingse12 t_jingse13 T_jingse14 T_jingse15 T_jingse16 T_jingse17 T_jingse18 T_jingse19 T_jingse20'.split()

DEST_DIR = 'downloads'

MAX_WORKERS = 20 def save_flag(img,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) path = path.replace('\','/') with open(path,'wb') as fp: fp.write(img)

def get_flag(cc): url = '/.PNG'.format(BASE_URL,cc=cc.lower()) resp = requests.get(url) return resp.content

def show(text): print(text,end=' ') sys.stdout.flush()

def download_one(cc): image = get_flag(cc) show(cc) save_flag(image,cc.lower() + '.PNG') return cc

from concurrent import futures def download_many(cc_list): workers = min(MAX_WORKERS,len(cc_list)) with futures.ThreadPoolExecutor(workers) as executor: res = executor.map(download_one,sorted(cc_list)) #map 方法的做用与内置的 map 函数相似,不过 download_one 函数会在多个线程中并发调用; map 方法返回一个生成器,所以能够迭代,获取各个函数返回的值。 return len(list(res))

def main(download_many): t0 = time.time() count = download_many(POP20_CC) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print(msg.format(count,elapsed))

if name == 'main': main(download_many) #19 flags downloaded in 0.25s

栗子 17-4 了解期物未何物

import os import sys import time

import requests

BASE_URL = 'http://images.cnblogs.com/cnblogs_com/suren2017/1102909'

POP20_CC = 'T_JINGSE2 T_JINGSE3 T_JINGSE4 T_JINGSE5 t_jingse6 t_jingse7 t_jingse8 t_jingse9 t_jingse10 t_jingse11 t_jingse12 t_jingse13 T_jingse14 T_jingse15 T_jingse16 T_jingse17 T_jingse18 T_jingse19 T_jingse20'.split()

DEST_DIR = 'downloads'

MAX_WORKERS = 20 def save_flag(img,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) path = path.replace('\','/') with open(path,'wb') as fp: fp.write(img)

def get_flag(cc): url = '/.PNG'.format(BASE_URL,cc=cc.lower()) resp = requests.get(url) return resp.content

def show(text): print(text,end=' ') sys.stdout.flush()

def download_one(cc): image = get_flag(cc) show(cc) save_flag(image,cc.lower() + '.PNG') return cc

from concurrent import futures def download_many(cc_list): cc_list = cc_list[:5] with futures.ThreadPoolExecutor(max_workers=3) as executor: to_do = [] for cc in sorted(cc_list): future = executor.submit(download_one,cc) to_do.append(future) msg = 'Scheduled for :' print(msg.format(cc,future)) results = [] for future in futures.as_completed(to_do): res = future.result() msg = ' result: {!s}' print(msg.format(future, res)) results.append(res) return len(results)

def main(download_many): t0 = time.time() count = download_many(POP20_CC) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print(msg.format(count,elapsed))

if name == 'main': main(download_many)

''' Scheduled for T_JINGSE2:<Future at 0x3670240 state=running> Scheduled for T_JINGSE3:<Future at 0x36dfc88 state=running> Scheduled for T_JINGSE4:<Future at 0x37022b0 state=running> Scheduled for T_JINGSE5:<Future at 0x37028d0 state=pending> Scheduled for t_jingse6:<Future at 0x3702978 state=pending> T_JINGSE2 T_JINGSE4 T_JINGSE3 <Future at 0x3670240 state=finished returned str> result: T_JINGSE2 <Future at 0x37022b0 state=finished returned str> result: T_JINGSE4 <Future at 0x36dfc88 state=finished returned str> result: T_JINGSE3 T_JINGSE5 <Future at 0x37028d0 state=finished returned str> result: T_JINGSE5 t_jingse6 <Future at 0x3702978 state=finished returned str> result: t_jingse6

5 flags downloaded in 0.14s ''' #[分析] ''' 严格来讲,咱们目前测试的并发脚本都不能并行下载。使用 concurrent.futures 库实 现的那两个示例受 GIL(Global Interpreter Lock,全局解释器锁)的限制,而 flags_asyncio.py 脚本在单个线程中运行。 读到这里,你可能会对前面作的非正规基准测试有下述疑问。 既然 Python 线程受 GIL的限制,任什么时候候都只容许运行一个线程,那么 flags_threadpool.py 脚本的下载速度怎么会比 flags.py 脚本快 5 倍? flags_asyncio.py 脚本和 flags.py 脚本都在单个线程中运行,前者怎么会比后者快 5 倍? 第二个问题在 18.3 节解答。 GIL几乎对 I/O 密集型处理无害,缘由参见下一节 ''' ''' 17.2 阻塞型I/O和GIL CPython 解释器自己就不是线程安全的,所以有全局解释器锁(GIL),一次只容许使用 一个线程执行 Python 字节码。所以,一个 Python 进程一般不能同时使用多个 CPU 核 心。 这是 CPython 解释器的局限,与 Python 语言自己无关。 Jython 和 IronPython 没有这种限制。不过,目前最快的 Python 解释器 PyPy 也有 GIL。 编写 Python 代码时没法控制 GIL;不过,执行耗时的任务时,可使用一个内置的函数或 一个使用 C 语言编写的扩展释放 GIL。其实,有个使用 C 语言编写的 Python 库能管理 GIL,自行启动操做系统线程,利用所有可用的 CPU 核心。这样作会极大地增长库代码的 复杂度,所以大多数库的做者都不这么作。 然而,标准库中全部执行阻塞型 I/O 操做的函数,在等待操做系统返回结果时都会释放 GIL。这意味着在 Python 语言这个层次上可使用多线程,而 I/O 密集型 Python 程序能从 中受益:一个 Python 线程等待网络响应时,阻塞型 I/O 函数会释放 GIL,再运行一个线 程。 所以 David Beazley 才说: “Python 线程毫无做用。 ” 出自“Generators: The Final Frontier”(http://www.dabeaz.com/finalgenerator/),第 106 张幻灯片。 Python 标准库中的全部阻塞型 I/O 函数都会释放 GIL,容许其余线程运 行。 time.sleep() 函数也会释放 GIL。所以,尽管有 GIL, Python 线程仍是能在 I/O 密集型应用中发挥做用。 下面简单说明如何在 CPU 密集型做业中使用 concurrent.futures 模块轻松绕开 GIL ''' ''' #下面这种,会让运行时间增长一倍左右 下载国旗的示例或其余 I/O 密集型做业使用 ProcessPoolExecutor 类得不到任何好处。 这一点易于验证,只需把示例 17-3 中下面这几行: def download_many(cc_list): workers = min(MAX_WORKERS, len(cc_list)) with futures.ThreadPoolExecutor(workers) as executor: 改为: def download_many(cc_list): with futures.ProcessPoolExecutor() as executor: 通过几回测试,我发现使用 ProcessPoolExecutor 实例下载 20 面国旗的时间增长到了 1.8 秒,而原来使用 ThreadPoolExecutor 的版本是 1.4 秒。主要缘由多是,个人电脑 用的是四核 CPU,所以限制只能有 4 个并发下载,而使用线程池的版本有 20 个工做的线 程。 ProcessPoolExecutor 的价值体如今 CPU 密集型做业上。我用两个 CPU 密集型脚本作 了一些性能测试。

'''

''' 若是使用 Python 处理 CPU 密集型工做,应该试试 PyPy(http://pypy.org)。使 用 PyPy 运行 arcfour_futures.py 脚本,速度快了 3.8~5.1 倍;具体的倍数由职程的数量 决定。我测试时使用的是 PyPy 2.4.0,这一版与 Python 3.2.5 兼容,所以标准库中有 concurrent.futures 模块 '''

#17.4 实验Executor.map方法 #栗子17-6 from time import sleep,strftime

from concurrent import futures

def display(*args): print(strftime('[%H:%M:%S]'),end = ' ') print(*args)

def loiter(n): msg = 'loiter():doing nothing for s' display(msg.format('\t'*n,n,n)) sleep(n) msg = 'loiter():done.' display(msg.format('\t'n,n)) return n10

def main(): display('Script starting.') executor = futures.ThreadPoolExecutor(max_workers=3) results = executor.map(loiter,range(5)) display('results:',results) display('Waiting for individual results:') for i,result in enumerate(results): display('result :'.format(i,result)) main()

''' [11:52:00] Script starting. [11:52:00] loiter(0):doing nothing for 0s [11:52:00] loiter(0):done. [11:52:00] loiter(1):doing nothing for 1s [11:52:00] loiter(2):doing nothing for 2s [11:52:00] results: <generator object Executor.map. .result_iterator at 0x0000000002CC6888> [11:52:00] loiter(3):doing nothing for 3s [11:52:00] Waiting for individual results: [11:52:00] result 0:0 [11:52:01] loiter(1):done. [11:52:01] result 1:10 [11:52:01] loiter(4):doing nothing for 4s [11:52:02] loiter(2):done. [11:52:02] result 2:20 [11:52:03] loiter(3):done. [11:52:03] result 3:30 [11:52:05] loiter(4):done. [11:52:05] result 4:40

''' #【备注】 ''' executor.submit 和 futures.as_completed 这个组合比 executor.map 更 灵活,由于 submit 方法能处理不一样的可调用对象和参数,而 executor.map 只能处 理参数不一样的同一个可调用对象。此外,传给 futures.as_completed 函数的期物 集合能够来自多个 Executor 实例,例如一些由 ThreadPoolExecutor 实例建立, 另外一些由 ProcessPoolExecutor 实例建立 '''

#17,5 处理错误 #17.5.1 顺序下载 #栗子17-12 顺序下载

import os,time,sys,requests from enum import Enum import collections from collections import namedtuple import asyncio from concurrent import futures from tqdm import tqdm

Result = namedtuple('Result','status cc') HTTPStatus = Enum('Status','ok not_found error')

BASE_URL = 'http://images.cnblogs.com/cnblogs_com/suren2017/1102909' POP20_CC = 'T_JINGSE200 T_JINGSE3'.split() #POP20_CC = 'T_JINGSE992'.split() DEST_DIR = 'downloads' MAX_WORKERS = 20

def save_flag(img,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) path = path.replace('\','/') with open(path,'wb') as fp: fp.write(img)

def get_flag(cc): url = '/.PNG'.format(BASE_URL,cc=cc.lower()) resp = requests.get(url) if 'PNG' in resp.text: return resp.content elif '404' in resp.text: resp.status_code = 404 resp.raise_for_status()

def download_one(cc,verbose=False): try: image = get_flag(cc) except requests.exceptions.HTTPError as exc: res = exc.response if res.status_code == 404: status = HTTPStatus.not_found msg = 'not found' res.reason = 'NOT FOUND' raise else: raise

else:
    save_flag(image, cc.lower() + '.PNG')
    status = HTTPStatus.ok
    msg = 'OK'

if verbose:
    print(cc,msg)
return Result(status,cc)

def download_many(cc_list,verbose): counter = collections.Counter() with futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: to_do_map = for cc in sorted(cc_list): future = executor.submit(download_one,cc,verbose) to_do_map[future] = cc done_iter = futures.as_completed(to_do_map) if not verbose: done_iter = tqdm(done_iter,total=len(cc_list)) for future in done_iter: try: res = future.result() except requests.exceptions.HTTPError as exc: error_msg = 'HTTP - ' error_msg = error_msg.format(res=exc.response) except requests.exceptions.ConnectionError as exc: error_msg = 'Connection error' else: error_msg = '' status = res.status if error_msg: status = HTTPStatus.error

counter[status] += 1
        if not verbose and error_msg:
            cc = to_do_map[future]
            print('*** Error for {} :{}'.format(cc,error_msg))
return counter

def main(): t0 = time.time() count = download_many(POP20_CC,verbose=False) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print(msg.format(count, elapsed))

if name == 'main': main()

''' #测试栗子1: POP20_CC第一个改为假的+verbose=False+'if not verbose and error_msg: #备注1',运行结果以下 0%| | 0/19 [00:00<?, ?it/s]*** Error for T_JINGSE200: HTTP error 404 - NOT FOUND 100%|██████████| 19/19 [00:14<00:00, 1.29it/s]

Counter({<Status.ok: 1>: 18, <Status.error: 3>: 1}) flags downloaded in 14.68s

''' ''' #测试栗子2:POP20_CC第一个改为假的+verbose=True+'if verbose and error_msg: #备注1',运行结果以下 *** Error for T_JINGSE200: HTTP error 404 - NOT FOUND T_JINGSE3 OK T_JINGSE4 OK T_JINGSE5 OK T_jingse14 OK T_jingse15 OK T_jingse16 OK T_jingse17 OK T_jingse18 OK T_jingse19 OK T_jingse20 OK t_jingse10 OK t_jingse11 OK t_jingse12 OK t_jingse13 OK t_jingse6 OK t_jingse7 OK t_jingse8 OK t_jingse9 OK

Counter({<Status.ok: 1>: 18, <Status.error: 3>: 1}) flags downloaded in 14.96s

'''

''' #测试栗子3:POP20_CC第一个改为假的+verbose=False+'if verbose and error_msg: #备注1',运行结果以下 100%|██████████| 19/19 [00:15<00:00, 1.26it/s]

Counter({<Status.ok: 1>: 18, <Status.error: 3>: 1}) flags downloaded in 15.14s '''

##17.5.2 使用futures.as_completed函数

from enum import Enum HTTPStatus = Enum('Status', 'ok not_found error') import collections from collections import namedtuple Result = namedtuple('Result','status cc')

import os import sys import time

import requests

BASE_URL = 'http://images.cnblogs.com/cnblogs_com/suren2017/1102909'

POP20_CC = 'T_JINGSE200 T_JINGSE3 T_JINGSE4 T_JINGSE5 t_jingse6 t_jingse7 t_jingse8 t_jingse9 t_jingse10 t_jingse11 t_jingse12 t_jingse13 T_jingse14 T_jingse15 T_jingse16 T_jingse17 T_jingse18 T_jingse19 T_jingse20'.split()

DEST_DIR = 'downloads'

DEFAULT_CONCUR_REQ = 30 #做限制 MAX_CONCUR_REQ = 1000 #为上方的限制做保障

MAX_WORKERS = 20 def save_flag(img,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) path = path.replace('\','/') with open(path,'wb') as fp: fp.write(img)

def get_flag(cc): url = '/.PNG'.format(BASE_URL,cc=cc.lower()) resp = requests.get(url) if 'PNG' in resp.text: return resp.content elif '404' in resp.text: resp.status_code = 404 resp.raise_for_status()

def download_one(cc,verbose=False): try: image = get_flag(cc) except requests.exceptions.HTTPError as exc: res = exc.response if res.status_code == 404: status = HTTPStatus.not_found msg = 'not found' res.status_code = 404 res.reason = 'NOT FOUND' raise else: raise else: save_flag(image,cc.lower() + '.PNG') status = HTTPStatus.ok msg = 'OK'

if verbose:  #若是在命令行中设定了 -v/--verbose 选项,显示国家代码和状态消息;这就是详细模式中看到的进度信息
    print(cc,msg)
return Result(status,cc)

from concurrent import futures from tqdm import tqdm def download_many(cc_list,verbose,concur_req): counter = collections.Counter() with futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: to_do_map = for cc in sorted(cc_list): future = executor.submit(download_one,cc,verbose) to_do_map[future] = cc done_iter = futures.as_completed(to_do_map) if not verbose: done_iter = tqdm(done_iter,total=len(cc_list)) for future in done_iter: try: res = future.result() except requests.exceptions.HTTPError as exc: error_msg = 'HTTP - ' error_mas = error_msg.format(res=exc.response) except requests.exceptions.ConnectionError as exc: error_msg = 'Connection error' else: error_msg = '' status = res.status if error_msg: status = HTTPStatus.error

counter[status] += 1
       if verbose and error_msg:
           cc = to_do_map[future]
           print('*** Error for {}: {}'.format(cc,error_msg))

return counter

def main(download_many): t0 = time.time() count = download_many(POP20_CC,verbose=False,concur_req=DEFAULT_CONCUR_REQ) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print(msg.format(count,elapsed))

if name == 'main': main(download_many)

''' #Python 线程特别适合 I/O 密集型应用, concurrent.futures 模块大大简化了某些使用场 景下 Python 线程的用法。咱们对 concurrent.futures 模块基本用法的介绍到此结束。 下面讨论不适合使用 ThreadPoolExecutor 或 ProcessPoolExecutor 类时,有哪些替 代方案。 ''' """ #17.5.3 线程和多进程的替代方案

【流畅的Python】【控制流程】【asyncio】

-- coding:utf-8 --

""" #18.1 线程&协程 #栗子18-1 threading import sys import time import itertools import threading

class Signal: go = True

def spin(msg, signal): write, flush = sys.stdout.write, sys.stdout.flush for char in itertools.cycle('|/-\'): status = char + ' ' + msg write(status) flush() write('\x08' * len(status)) time.sleep(.1) if not signal.go: break write(' '* len(status) + '\x08'*len(status))

def slow_function(): time.sleep(1) return 42

def supervisor(): signal = Signal() spinner = threading.Thread(target=spin,args=('thinking!',signal)) print('spinner object:',spinner) spinner.start() result = slow_function() signal.go = False spinner.join() return result

def main(): result = supervisor() print('Answer:',result) if name == 'main': main()

''' spinner object: <Thread(Thread-1, initial)> | thinking! / thinking!

  • thinking! \ thinking! | thinking! / thinking!
  • thinking! \ thinking! | thinking! / thinking! Answer: 42 '''

#栗子18-2 asyncio 实现

import asyncio import sys import itertools

@asyncio.coroutine def spin(msg): write,flush = sys.stdout.write,sys.stdout.flush for char in itertools.cycle('|/-\'): status = char + ' ' + msg write(status) flush() write('\x08'*len(status)) #这是显示文本式动画的诀窍所在:使用退格符(\x08)把光标移回来 try: yield from asyncio.sleep(.1) except asyncio.CancelledError: break write(' '*len(status) + '\x08'*len(status)) #使用空格清除状态消息,把光标移回开头 @asyncio.coroutine def slow_function(): # 伪装等到I/O一段时间 yield from asyncio.sleep(1) #yield from asyncio.sleep(3) 表达式把控制权交给主循环,在休眠结束后恢复这个协程 return 42 @asyncio.coroutine def supervisor(): spinner = asyncio.async(spin('thinking!')) print('spinner object:',spinner) result = yield from slow_function() #驱动 slow_function() 函数。结束后,获取返回值。同时,事件循环继续运行,由于slow_function 函数最后使用 yield from asyncio.sleep(3) 表达式把控制权交回给了主循环。 spinner.cancel() return result

def main(): loop = asyncio.get_event_loop() #获取事件循环的引用。 result = loop.run_until_complete(supervisor()) #驱动 supervisor 协程,让它运行完毕;这个协程的返回值是此次调用的返回值 loop.close() print('Answer :',result) if name == 'main': main() ''' spinner object: <Task pending coro=<spin() running at C:/Users/wangxue1/PycharmProjects/fluentPython/kongzhiliucheng/asyncio/init.py:69>> | thinking! / thinking!

  • thinking! \ thinking! | thinking! / thinking!
  • thinking! \ thinking! | thinking! / thinking! Answer : 42 '''

#例子 async/await 实现

import asyncio import sys import itertools

async def spin(msg): write,flush = sys.stdout.write,sys.stdout.flush for char in itertools.cycle('|/-\'): status = char + msg write(status) flush() write('\x08' * len(status)) try: await asyncio.sleep(.3) except asyncio.CancelledError: break write('\x08' * len(status))

async def slow_function(): await asyncio.sleep(1) return 42

async def supervisor(): spinner = asyncio.ensure_future(spin('thinking!')) print (type(spinner)) print ('spinner object:',spinner) result = await slow_function() spinner.cancel() return result

def main(): loop = asyncio.get_event_loop() result = loop.run_until_complete(supervisor()) loop.close() print ('Answer:',result) main()

''' #【比较】 这两种 supervisor 实现之间的主要区别概述以下。 asyncio.Task 对象差很少与 threading.Thread 对象等效。 Victor Stinner(本章的 特约技术审校)指出, “Task 对象像是实现协做式多任务的库(例如 gevent)中的 绿色线程(green thread) ”。 Task 对象用于驱动协程, Thread 对象用于调用可调用的对象。 Task 对象不禁本身动手实例化,而是经过把协程传给 asyncio.async(...) 函数或 loop.create_task(...) 方法获取。 获取的 Task 对象已经排定了运行时间(例如,由 asyncio.async 函数排 定); Thread 实例则必须调用 start 方法,明确告知让它运行。 在线程版 supervisor 函数中, slow_function 函数是普通的函数,直接由线程调 用。在异步版 supervisor 函数中, slow_function 函数是协程,由 yield from 驱动。 没有 API 能从外部终止线程,由于线程随时可能被中断,致使系统处于无效状态。 若是想终止任务,可使用 Task.cancel() 实例方法,在协程内部抛出 CancelledError 异常。协程能够在暂停的 yield 处捕获这个异常,处理终止请 求。 supervisor 协程必须在 main 函数中由 loop.run_until_complete 方法执行。 上述比较应该能帮助你理解,与更熟悉的 threading 模型相比, asyncio 是如何编排并 发做业的。 线程与协程之间的比较还有最后一点要说明:若是使用线程作太重要的编程,你就知道写 出程序有多么困难,由于调度程序任什么时候候都能中断线程。必须记住保留锁,去保护程序 中的重要部分,防止多步操做在执行的过程当中中断,防止数据处于无效状态。 而协程默认会作好全方位保护,以防止中断。咱们必须显式产出才能让程序的余下部分运 行。对协程来讲,无需保留锁,在多个线程之间同步操做,协程自身就会同步,由于在任 意时刻只有一个协程运行。想交出控制权时,可使用 yield 或 yield from 把控制权 交还调度程序。这就是可以安全地取消协程的缘由:按照定义,协程只能在暂停的 yield 处取消,所以能够处理 CancelledError 异常,执行清理操做 '''

#18.1.1 故意不阻塞 ''' asyncio.Future 类与 concurrent.futures.Future 类的接口基本一致,不过实现方 式不一样,不能够互换。 “PEP 3156—Asynchronous IO Support Rebooted: the‘asyncio’Module”(https://www.python.org/dev/peps/pep-3156/)对这个不幸情况是这样说 的: 将来可能会统一 asyncio.Future 和 concurrent.futures.Future 类实现的期物 (例如,为后者添加兼容 yield from 的 iter 方法)。

总之,由于 asyncio.Future 类的目的是与 yield from 一块儿使用,因此一般不须要使 用如下方法。 无需调用 my_future.add_done_callback(...),由于能够直接把想在期物运行结 束后执行的操做放在协程中 yield from my_future 表达式的后面。这是协程的一 大优点:协程是能够暂停和恢复的函数。 无需调用 my_future.result(),由于 yield from 从期物中产出的值就是结果 (例如, result = yield from my_future) '''

#18.2 使用asyncio和aiohttp下载

import os,sys,time import requests import asyncio import aiohttp

BASE_URL = 'http://images.cnblogs.com/cnblogs_com/suren2017/1102909' POP20_CC = 'T_JINGSE2 T_JINGSE3'.split() DEST_DIR = 'downloads' MAX_WORKERS = 20

def save_flag(img,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) path = path.replace('\','/') with open(path,'wb') as fp: fp.write(img)

async def get_flag(cc): url = '/.PNG'.format(BASE_URL,cc=cc.lower()) resp = await aiohttp.request('GET',url) image = await resp.read() return image

def show(text): print(text,end=' ') sys.stdout.flush()

async def download_one(cc): image = await get_flag(cc) show(cc) save_flag(image,cc.lower()+'.PNG') return cc

def download_many(cc_list): loop = asyncio.get_event_loop() to_do = [download_one(cc) for cc in sorted(cc_list)] wait_coro = asyncio.wait(to_do) #虽然函数的名称是 wait,但它不是阻塞型函数。 wait 是一个协程,等传给它的全部协程运行完毕后结束 ''' asyncio.wait(...) 协程的参数是一个由期物或协程构成的可迭代对象; wait 会分别 把各个协程包装进一个 Task 对象。最终的结果是, wait 处理的全部对象都经过某种方 式变成 Future 类的实例。 wait 是协程函数,所以返回的是一个协程或生成器对 象; wait_coro 变量中存储的正是这种对象。为了驱动协程,咱们把协程传给 loop.run_until_complete(...) 方法 ''' res,_ = loop.run_until_complete(wait_coro) #执行事件循环,直到 wait_coro 运行结束;事件循环运行的过程当中,这个脚本会在这里阻塞。咱们忽略 run_until_complete 方法返回的第二个元素 ''' loop.run_until_complete 方法的参数是一个期物或协程。若是是协 程, run_until_complete 方法与 wait 函数同样,把协程包装进一个 Task 对象中。协 程、期物和任务都能由 yield from 驱动,这正是 run_until_complete 方法对 wait 函数返回的 wait_coro 对象所作的事。 wait_coro 运行结束后返回一个元组,第一个元 素是一系列结束的期物,第二个元素是一系列未结束的期物。在示例 18-5 中,第二个元 素始终为空,所以咱们把它赋值给 _,将其忽略。可是 wait 函数有两个关键字参数,如 果设定了可能会返回未结束的期物;这两个参数是 timeout 和 return_when ''' loop.close() return len(res)

def main(download_many): t0 = time.time() count = download_many(POP20_CC) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print(msg.format(count,elapsed))

if name == 'main': main(download_many) #19 flags downloaded in 0.25s ''' Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038C9470> t_jingse7 t_jingse11 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x000000000388F128> T_JINGSE4 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x0000000003877BE0> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x000000000387E8D0> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x000000000388FE48> t_jingse8 T_jingse17 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038BC7B8> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x0000000003872C88> t_jingse6 t_jingse10 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038BCBE0> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x000000000388F5F8> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x000000000388FA20> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038B0B00> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x000000000387E3C8> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x000000000387E390> T_JINGSE3 t_jingse9 T_JINGSE5 t_jingse13 T_jingse20 T_jingse16 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x0000000003868F28> T_JINGSE2 t_jingse12 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038B02B0> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038BC390> T_jingse19 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038C9048> T_jingse15 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038B06D8> T_jingse18 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038B0F28> T_jingse14 19 flags downloaded in 0.45s ''' #【小结】 ''' 使用 asyncio 包时,咱们编写的异步代码中包含由 asyncio 自己驱动的 协程(即委派生成器),而生成器最终把职责委托给 asyncio 包或第三方库(如 aiohttp)中的协程。这种处理方式至关于架起了管道,让 asyncio 事件循环(经过我 们编写的协程)驱动执行低层异步 I/O 操做的库函数 '''

''' 18.3 避免阻塞型调用 Ryan Dahl(Node.js 的发明者)在介绍他的项目背后的哲学时说: “咱们处理 I/O 的方式彻 底错了。 ” 他把执行硬盘或网络 I/O 操做的函数定义为阻塞型函数,主张不能像对待非 阻塞型函数那样对待阻塞型函数。为了说明缘由,他展现了表 18-1 中的前两列。 “Introduction to Node.js”(https://www.youtube.com/watch?v=M-sc73Y-zQA)视频 4:55 处。 表18-1:使用现代的电脑从不一样的存储介质中读取数据的延迟状况;第三栏按比例换 算成具体的时间,便于人类理解 存储介质 CPU 周期 按比例换算成“人类时间” L1 缓存 3 3 秒 L2 缓存 14 14 秒 RAM 250 250 秒 硬盘 41 000 000 1.3 年 网络 240 000 000 7.6 年 为了理解表 18-1,请记住一点:现代的 CPU 拥有 GHz 数量级的时钟频率,每秒钟能运行 几十亿个周期。假设 CPU 每秒正好运行十亿个周期,那么 CPU 能够在一秒钟内读取 L1 缓存 333 333 333 次,读取网络 4 次(只有 4 次)。表 18-1 中的第三栏是拿第二栏中的各 个值乘以固定的因子获得的。所以,在另外一个世界中,若是读取 L1 缓存要用 3 秒,那么 读取网络要用 7.6 年! 有两种方法能避免阻塞型调用停止整个应用程序的进程: 在单独的线程中运行各个阻塞型操做 把每一个阻塞型操做转换成非阻塞的异步调用使用 '''

#本身栗子1 import asyncio import time

now = lambda : time.time()

@asyncio.coroutine def do_some_work(x): print('Waiting: ',x)

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop() loop.run_until_complete(coroutine)

print('TIME: ',now() - start)

#本身栗子2 import asyncio import time

now = lambda : time.time()

@asyncio.coroutine def do_some_work(x): print('Waiting: ',x)

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop() task = loop.create_task(coroutine) print(task) loop.run_until_complete(task) print(task) print('TIME: ',now() - start) ''' <Task pending coro=<do_some_work() running at C:\Python36\lib\asyncio\coroutines.py:208>> Waiting: 2 <Task finished coro=<do_some_work() done, defined at C:\Python36\lib\asyncio\coroutines.py:208> result=None> TIME: 0.0010001659393310547

'''

#本身栗子3 ''' 协程对象不能直接运行,在注册事件循环的时候,实际上是run_until_complete方法将协程包装成为了一个任务(task)对象。所谓task对象是Future类的子类。保存了协程运行后的状态,用于将来获取协程的结果'''

import asyncio import time

now = lambda : time.time()

@asyncio.coroutine def do_some_work(x): print('Waiting: ',x)

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) print(task) loop.run_until_complete(task) print(task) print('TIME: ',now() - start) ''' asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)均可以建立一个task,run_until_complete的参数是一个futrue对象。当传入一个协程,其内部会自动封装成task,task是Future的子类。isinstance(task, asyncio.Future)将会输出True ''' print(isinstance(task,asyncio.Future)) ''' <Task pending coro=<do_some_work() running at C:\Python36\lib\asyncio\coroutines.py:208>> Waiting: 2 <Task finished coro=<do_some_work() done, defined at C:\Python36\lib\asyncio\coroutines.py:208> result=None> TIME: 0.0009999275207519531 True '''

#本身栗子4 :绑定回调

import asyncio import time

now = lambda : time.time()

@asyncio.coroutine def do_some_work(x): print('Waiting: ',x) return 'Done after s'.format(x)

def callback(future): print('Result: ',future)

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) task.add_done_callback(callback) loop.run_until_complete(task)

print('TIME: ',now() - start) ''' Waiting: 2 Result: <Task finished coro=<do_some_work() done, defined at C:\Python36\lib\asyncio\coroutines.py:208> result='Done after 2s'> TIME: 0.002000093460083008 '''

#本身栗子5:绑定回调 ,如回调须要多个参数

import asyncio import time

now = lambda : time.time()

@asyncio.coroutine def do_some_work(x): print('Waiting: ',x) return 'Done after s'.format(x)

def callback(t,future): print('Result: ',t,future)

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) import functools task.add_done_callback(functools.partial(callback,2)) loop.run_until_complete(task)

print('TIME: ',now() - start)

''' Waiting: 2 Result: 2 <Task finished coro=<do_some_work() done, defined at C:\Python36\lib\asyncio\coroutines.py:208> result='Done after 2s'> TIME: 0.002000093460083008 '''

#本身栗子6: future 和 result 。回调一致是不少异步编程的噩梦,程序员更喜欢用同步的编写方式写异步代码

import asyncio import time

now = lambda : time.time()

async def do_some_work(x): print('Waiting '.format(x)) return 'Done after s'.format(x)

start = now()

coroutine = do_some_work(2) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) loop.run_until_complete(task)

print('Task result:'.format(task.result)) print('TIME: '.format(now() - start))

''' Waiting 2 Task result:<built-in method result of _asyncio.Task object at 0x0000000002F73AE8> TIME: 0.002000093460083008 '''

#本身栗子7: 阻塞和await

import asyncio import time

now = lambda : time.time()

async def do_some_work(x): print('Waiting '.format(x)) await asyncio.sleep(x) return 'Done after s'.format(x)

start = now()

coroutine = do_some_work(2) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) loop.run_until_complete(task)

print('Task result:'.format(task.result)) print('TIME: '.format(now() - start))

''' Waiting 2 Task result:<built-in method result of _asyncio.Task object at 0x0000000002F73A60> TIME: 2.001114845275879 '''

#本身栗子8:并发&并行 #每当有阻塞任务时候就用await

import asyncio import time

now = lambda : time.time()

start = now()

async def do_some_work(x): print('Waiting : ',x) await asyncio.sleep(x) return 'Done after s'.format(x)

coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ]

loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))

for task in tasks: print('Task result: ',task.result())

print('Time: ',now() - start) ''' Waiting : 1 Waiting : 2 Waiting : 4 Task result: Done after 1s Task result: Done after 2s Task result: Done after 4s Time: 3.9912283420562744 '''

#例子

import asyncio import time

now = lambda: time.time()

start = now()

async def do_some_work(x): print ('Waiting:',x) await asyncio.sleep(x) return 'Done after s'.format(x)

tasks = [ asyncio.ensure_future(do_some_work(1)), asyncio.ensure_future(do_some_work(2)), asyncio.ensure_future(do_some_work(4)) ]

loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))

for task in tasks: print ('Task result:',task.result())

''' Waiting: 2 Waiting: 1 Waiting: 4 Traceback (most recent call last): File "/Users/suren/PycharmProjects/untitled1/asyn.py", line 38, in print ('task result:',asyncio.ensure_future(coro).result()) asyncio.base_futures.InvalidStateError: Result is not ready.

'''

#本身栗子9 协程嵌套 [一] dones, pendings = await asyncio.wait(tasks)

import asyncio import time

now = lambda : time.time()

start = now()

async def do_some_work(x): print('Waiting : ',x) await asyncio.sleep(x) return 'Done after s'.format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

dones,pendings = await asyncio.wait(tasks)

for task in dones:
    print('Task result: ',task.result())

loop = asyncio.get_event_loop() loop.run_until_complete(main())

print('TIME: ',now() - start) ''' Waiting : 1 Waiting : 2 Waiting : 4 Task result: Done after 2s Task result: Done after 4s Task result: Done after 1s TIME: 4.007229328155518 '''

#本身栗子10 协程嵌套 [二] 若是使用的是 asyncio.gather建立协程对象,那么await的返回值就是协程运行的结果

import asyncio import time

now = lambda : time.time()

start = now()

async def do_some_work(x): print('Waiting : ',x) await asyncio.sleep(x) return 'Done after s'.format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

results = await asyncio.gather(*tasks)

for result in results:
    print('Task result: ',result)

loop = asyncio.get_event_loop() loop.run_until_complete(main())

print('TIME: ',now() - start) ''' Waiting : 1 Waiting : 2 Waiting : 4 Task result: Done after 1s Task result: Done after 2s Task result: Done after 4s TIME: 3.9892282485961914 '''

#本身栗子11 协程嵌套 [三] 不在main协程函数里处理结果,直接返回await的内容,那么最外层的run_until_complete将会返回main协程的结果

import asyncio import time

now = lambda : time.time()

start = now()

async def do_some_work(x): print('Waiting : ',x) await asyncio.sleep(x) return 'Done after s'.format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

return await asyncio.gather(*tasks)

loop = asyncio.get_event_loop() results = loop.run_until_complete(main())

for result in results: print('Task result: ', result)

print('TIME: ',now() - start) ''' Waiting : 1 Waiting : 2 Waiting : 4 Task result: Done after 1s Task result: Done after 2s Task result: Done after 4s TIME: 4.0052289962768555 '''

#本身栗子12 协程嵌套 [四 ] 不在main协程函数里处理结果,直接返回await的内容,那么最外层的run_until_complete将会返回main协程的结果,使用asyncio.wait方式挂起协程。

import asyncio import time

now = lambda : time.time()

start = now()

async def do_some_work(x): print('Waiting : ',x) await asyncio.sleep(x) return 'Done after s'.format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

return await asyncio.wait(tasks)

loop = asyncio.get_event_loop() dones,pendings = loop.run_until_complete(main())

for task in dones: print('Task result: ', task.result())

print('TIME: ',now() - start) ''' Waiting : 1 Waiting : 2 Waiting : 4 Task result: Done after 2s Task result: Done after 4s Task result: Done after 1s TIME: 3.9912283420562744 '''

#本身栗子13 协程嵌套 [五]使用asyncio的as_completed方法

import asyncio import time

now = lambda : time.time()

start = now()

async def do_some_work(x): print('Waiting : ',x) await asyncio.sleep(x) return 'Done after s'.format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

for task in asyncio.as_completed(tasks):
    result = await task
    print('Task result: {}'.format(result))

loop = asyncio.get_event_loop() loop.run_until_complete(main())

print('TIME: ',now() - start) ''' Waiting : 1 Waiting : 2 Waiting : 4 Task result: Done after 1s Task result: Done after 2s Task result: Done after 4s TIME: 3.9912281036376953 '''

#本身栗子14 协程中止 【一】 main函数外进行事件循环的调用。这个时候,main至关于最外出的一个task,那么处理包装的main函数便可 ''' 上面见识了协程的几种经常使用的用法,都是协程围绕着事件循环进行的操做。future对象有几个状态:

Pending Running Done Cancelled 建立future的时候,task为pending,事件循环调用执行的时候固然就是running,调用完毕天然就是done,若是须要中止事件循环,就须要先把task取消。可使用asyncio.Task获取事件循环的task'

启动事件循环以后,立刻ctrl+c,会触发run_until_complete的执行异常 KeyBorardInterrupt。而后经过循环asyncio.Task取消future。

'''

import asyncio import time

now = lambda : time.time()

start = now()

async def do_some_work(x): print('Waiting : ',x) await asyncio.sleep(x) return 'Done after s'.format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

done,pending = await asyncio.wait(tasks)
for task in done:
    print('Task result: ',task.result())

loop = asyncio.get_event_loop() task = asyncio.ensure_future(main()) try: loop.run_until_complete(task) except KeyboardInterrupt as e: print(asyncio.Task.all_tasks()) print('*******************') print(asyncio.gather(*asyncio.Task.all_tasks()).cancel()) loop.stop() loop.run_forever() #True表示cannel成功,loop stop以后还须要再次开启事件循环,最后在close,否则还会抛出异常 finally: loop.close()

print('TIME: ',now() - start)

''' #不能再pycharm经过Ctrl+C,只能在Python交互环境里 Waiting: 1 Waiting: 2 Waiting: 4 {<Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x101230648>()]> cb=[_wait. ._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1032b10a8>()]> cb=[_wait. ._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<wait() running at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:307> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317d38>()]> cb=[_run_until_complete_cb() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py:176]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317be8>()]> cb=[_wait. ._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>}


True TIME: 2.0158370780944824 '''

#本身栗子15 协程中止 【二】 tasks在外层,没有被包含在main函数里面 import asyncio

import time

now = lambda: time.time() start = now() async def do_some_work(x): print('Waiting: ', x)

await asyncio.sleep(x)
return 'Done after {}s'.format(x)

coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ]

start = now()

loop = asyncio.get_event_loop() try: loop.run_until_complete(asyncio.wait(tasks)) except KeyboardInterrupt as e: print(asyncio.Task.all_tasks()) for task in asyncio.Task.all_tasks(): print(task.cancel()) loop.stop() loop.run_forever() finally: loop.close()

print('TIME: ', now() - start)

''' 打印四个True,而不是三个,缘由我也不知道 Waiting: 1 Waiting: 2 Waiting: 4 {<Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x101230648>()]> cb=[_wait. ._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1032b10a8>()]> cb=[_wait. ._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<wait() running at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:307> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317d38>()]> cb=[_run_until_complete_cb() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py:176]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317be8>()]> cb=[_wait. ._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>} True True True True TIME: 0.8858370780944824 '''

#本身栗子16 不一样线程的时间循环 ''' 不少时候,咱们的事件循环用于注册协程,而有的协程须要动态的添加到事件循环中。一个简单的方式就是使用多线程。当前线程建立一个事件循环,而后在新建一个线程,在新线程中启动事件循环。当前线程不会被block。 启动上述代码以后,当前线程不会被block,新线程中会按照顺序执行call_soon_threadsafe方法注册的more_work方法,后者由于time.sleep操做是同步阻塞的,所以运行完毕more_work须要大体6 + 3 '''

from threading import Thread import asyncio

import time

now = lambda: time.time() start = now()

def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever()

def more_work(x): print('More work '.format(x)) time.sleep(x) print('Finished more work '.format(x))

new_loop = asyncio.new_event_loop() t = Thread(target=start_loop,args=(new_loop,)) t.start()

new_loop.call_soon_threadsafe(more_work,6) new_loop.call_soon_threadsafe(more_work,4)

''' More work 6 Finished more work 6 More work 4 Finished more work 4 '''

#本身栗子17: 新线程 协程 from threading import Thread import asyncio

import time

now = lambda: time.time() start = now()

def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever()

async def do_some_work(x): print('Waiting '.format(x)) await asyncio.sleep(x) print('Done after s'.format(x))

new_loop = asyncio.new_event_loop() t = Thread(target=start_loop,args=(new_loop,)) t.start()

asyncio.run_coroutine_threadsafe(do_some_work(6),new_loop) asyncio.run_coroutine_threadsafe(do_some_work(4),new_loop)

''' Waiting 6 Waiting 4 Done after 4s Done after 6s '''

【aiohttp练习】

  1. 前言

本文翻译自aiohttp的官方文档,若有纰漏,欢迎指出。

aiohttp分为服务器端和客户端,本文只介绍客户端。

另外我已经对 aiohttp 和 asyncio进行了封装,能够参考个人 github 地址:

https://github.com/web-trump/ahttp

因为上下文的缘故,请求代码必须在一个异步的函数中进行:

async def fn():

pass

  1. aiohttp安装

pip3 install aiohttp

1.1. 基本请求用法

async with aiohttp.get('https://github.com') as r:
 
await r.text()

1
2

其中r.text(), 能够在括号中指定解码方式,编码方式,例如

await resp.text(encoding='windows-1251')

1

或者也能够选择不编码,适合读取图像等,是没法编码的

await resp.read()

2.发起一个session请求

首先是导入aiohttp模块:

import aiohttp

而后咱们试着获取一个web源码,这里以GitHub的公共Time-line页面为例:

async with aiohttp.ClientSession() as session: async with session.get('https://api.github.com/events') as resp: print(resp.status) print(await resp.text())

上面的代码中,咱们建立了一个 ClientSession 对象命名为session,而后经过session的get方法获得一个 ClientResponse 对象,命名为resp,get方法中传入了一个必须的参数url,就是要得到源码的http url。至此便经过协程完成了一个异步IO的get请求。

有get请求固然有post请求,而且post请求也是一个协程:

session.post('http://httpbin.org/post', data=b'data')

用法和get是同样的,区别是post须要一个额外的参数data,便是须要post的数据。

除了get和post请求外,其余http的操做方法也是同样的:

session.put('http://httpbin.org/put', data=b'data') session.delete('http://httpbin.org/delete') session.head('http://httpbin.org/get') session.options('http://httpbin.org/get') session.patch('http://httpbin.org/patch', data=b'data')

小记:

不要为每次的链接都建立一次session,通常状况下只须要建立一个session,而后使用这个session执行全部的请求。

每一个session对象,内部包含了一个链接池,而且将会保持链接和链接复用(默认开启)能够加快总体的性能。

3.在URL中传递参数

咱们常常须要经过 get 在url中传递一些参数,参数将会做为url问号后面的一部分发给服务器。在aiohttp的请求中,容许以dict的形式来表示问号后的参数。举个例子,若是你想传递 key1=value1 key2=value2 到 httpbin.org/get 你可使用下面的代码:

params = {'key1': 'value1', 'key2': 'value2'} async with session.get('http://httpbin.org/get', params=params) as resp: assert resp.url == 'http://httpbin.org/get?key2=value2&key1=value1'

能够看到,代码正确的执行了,说明参数被正确的传递了进去。无论是一个参数两个参数,仍是更多的参数,均可以经过这种方式来传递。除了这种方式以外,还有另一个,使用一个 list 来传递(这种方式能够传递一些特殊的参数,例以下面两个key是相等的也能够正确传递):

params = [('key', 'value1'), ('key', 'value2')] async with session.get('http://httpbin.org/get', params=params) as r: assert r.url == 'http://httpbin.org/get?key=value2&key=value1'

除了上面两种,咱们也能够直接经过传递字符串做为参数来传递,可是须要注意,经过字符串传递的特殊字符不会被编码:

async with session.get('http://httpbin.org/get', params='key=value+1') as r: assert r.url == 'http://httpbin.org/get?key=value+1'

4.响应的内容

仍是以GitHub的公共Time-line页面为例,咱们能够得到页面响应的内容:

async with session.get('https://api.github.com/events') as resp: print(await resp.text())

运行以后,会打印出相似于以下的内容:

'[{"created_at":"2015-06-12T14:06:22Z","public":true,"actor":{...

resp的text方法,会自动将服务器端返回的内容进行解码--decode,固然咱们也能够自定义编码方式:

await resp.text(encoding='gb2312')

除了text方法能够返回解码后的内容外,咱们也能够获得类型是字节的内容:

print(await resp.read())

运行的结果是:

b'[{"created_at":"2015-06-12T14:06:22Z","public":true,"actor":{...

gzip和deflate转换编码已经为你自动解码。

小记:

text(),read()方法是把整个响应体读入内存,若是你是获取大量的数据,请考虑使用”字节流“(streaming response)

5.特殊响应内容:json

若是咱们获取的页面的响应内容是json,aiohttp内置了更好的方法来处理json:

async with session.get('https://api.github.com/events') as resp: print(await resp.json())

若是由于某种缘由而致使resp.json()解析json失败,例如返回不是json字符串等等,那么resp.json()将抛出一个错误,也能够给json()方法指定一个解码方式:

print(await resp.json(

encoding='gb2312'

)) 或者传递一个函数进去:

print(await resp.json( lambda(x:x.replace('a','b')) ))

6.以字节流的方式读取响应内容

虽然json(),text(),read()很方便的能把响应的数据读入到内存,可是咱们仍然应该谨慎的使用它们,由于它们是把整个的响应体所有读入了内存。即便你只是想下载几个字节大小的文件,但这些方法却将在内存中加载全部的数据。因此咱们能够经过控制字节数来控制读入内存的响应内容:

async with session.get('https://api.github.com/events') as resp: await resp.content.read(10) #读取前10个字节

通常地,咱们应该使用如下的模式来把读取的字节流保存到文件中:

with open(filename, 'wb') as fd: while True: chunk = await resp.content.read(chunk_size) if not chunk: break fd.write(chunk)

7.自定义请求头

若是你想添加请求头,能够像get添加参数那样以dict的形式,做为get或者post的参数进行请求:

import json url = 'https://api.github.com/some/endpoint' payload = {'some': 'data'} headers = {'content-type': 'application/json'}

await session.post(url, data=json.dumps(payload), headers=headers)

8.自定义Cookie

给服务器发送cookie,能够经过给 ClientSession 传递一个cookie参数:

url = 'http://httpbin.org/cookies' cookies = {'cookies_are': 'working'} async with ClientSession(cookies=cookies) as session: async with session.get(url) as resp: assert await resp.json() == { "cookies": {"cookies_are": "working"}}

可直接访问连接 “httpbin.org/cookies”查看当前cookie,访问session中的cookie请见第10节。

9.post数据的几种方式

(1)模拟表单post数据

payload = {'key1': 'value1', 'key2': 'value2'} async with session.post('http://httpbin.org/post', data=payload) as resp: print(await resp.text())

注意:data=dict的方式post的数据将被转码,和form提交数据是同样的做用,若是你不想被转码,能够直接以字符串的形式 data=str 提交,这样就不会被转码。

(2)post json

import json url = 'https://api.github.com/some/endpoint' payload = {'some': 'data'}

async with session.post(url, data=json.dumps(payload)) as resp: ...

其实json.dumps(payload)返回的也是一个字符串,只不过这个字符串能够被识别为json格式

(3)post 小文件

url = 'http://httpbin.org/post' files = {'file': open('report.xls', 'rb')}

await session.post(url, data=files)

能够设置好文件名和content-type:

url = 'http://httpbin.org/post' data = FormData() data.add_field('file', open('report.xls', 'rb'), filename='report.xls', content_type='application/vnd.ms-excel')

await session.post(url, data=data)

若是将文件对象设置为数据参数,aiohttp将自动以字节流的形式发送给服务器。

(4)post 大文件

aiohttp支持多种类型的文件以流媒体的形式上传,因此咱们能够在文件未读入内存的状况下发送大文件。

@aiohttp.streamer def file_sender(writer, file_name=None): with open(file_name, 'rb') as f: chunk = f.read(216) while chunk: yield from writer.write(chunk) chunk = f.read(216)

Then you can use file_sender as a data provider:

async with session.post('http://httpbin.org/post', data=file_sender(file_name='huge_file')) as resp: print(await resp.text())

同时咱们能够从一个url获取文件后,直接post给另外一个url,并计算hash值:

async def feed_stream(resp, stream): h = hashlib.sha256()

while True:
    chunk = await resp.content.readany()
    if not chunk:
        break
    h.update(chunk)
    stream.feed_data(chunk)

return h.hexdigest()

resp = session.get('http://httpbin.org/post') stream = StreamReader() loop.create_task(session.post('http://httpbin.org/post', data=stream))

file_hash = await feed_stream(resp, stream)

由于响应内容类型是StreamReader,因此能够把get和post链接起来,同时进行post和get:

r = await session.get('http://python.org') await session.post('http://httpbin.org/post', data=r.content)

(5)post预压缩数据

在经过aiohttp发送前就已经压缩的数据, 调用压缩函数的函数名(一般是deflate 或 zlib)做为content-encoding的值:

async def my_coroutine(session, headers, my_data): data = zlib.compress(my_data) headers = {'Content-Encoding': 'deflate'} async with session.post('http://httpbin.org/post', data=data, headers=headers) pass

10.keep-alive, 链接池,共享cookie

ClientSession 用于在多个链接之间共享cookie:

async with aiohttp.ClientSession() as session: await session.get( 'http://httpbin.org/cookies/set?my_cookie=my_value') filtered = session.cookie_jar.filter_cookies('http://httpbin.org') assert filtered['my_cookie'].value == 'my_value' async with session.get('http://httpbin.org/cookies') as r: json_body = await r.json() assert json_body['cookies']['my_cookie'] == 'my_value'

也能够为全部的链接设置共同的请求头:

async with aiohttp.ClientSession( headers={"Authorization": "Basic bG9naW46cGFzcw=="}) as session: async with session.get("http://httpbin.org/headers") as r: json_body = await r.json() assert json_body['headers']['Authorization'] ==
'Basic bG9naW46cGFzcw=='

ClientSession 还支持 keep-alive链接和链接池(connection pooling)

11.cookie安全性

默认ClientSession使用的是严格模式的 aiohttp.CookieJar. RFC 2109,明确的禁止接受url和ip地址产生的cookie,只能接受 DNS 解析IP产生的cookie。能够经过设置aiohttp.CookieJar 的 unsafe=True 来配置:

jar = aiohttp.CookieJar(unsafe=True) session = aiohttp.ClientSession(cookie_jar=jar)

12.控制同时链接的数量(链接池)

也能够理解为同时请求的数量,为了限制同时打开的链接数量,咱们能够将限制参数传递给链接器:

conn = aiohttp.TCPConnector(limit=30)#同时最大进行链接的链接数为30,默认是100,limit=0的时候是无限制

限制同时打开限制同时打开链接到同一端点的数量((host, port, is_ssl) 三的倍数),能够经过设置 limit_per_host 参数:

conn = aiohttp.TCPConnector(limit_per_host=30)#默认是0

13.自定义域名解析

咱们能够指定域名服务器的 IP 对咱们提供的get或post的url进行解析:

from aiohttp.resolver import AsyncResolver

resolver = AsyncResolver(nameservers=["8.8.8.8", "8.8.4.4"]) conn = aiohttp.TCPConnector(resolver=resolver)

14.设置代理

aiohttp支持使用代理来访问网页:

async with aiohttp.ClientSession() as session: async with session.get("http://python.org", proxy="http://some.proxy.com") as resp: print(resp.status)

固然也支持须要受权的页面:

async with aiohttp.ClientSession() as session: proxy_auth = aiohttp.BasicAuth('user', 'pass') async with session.get("http://python.org", proxy="http://some.proxy.com", proxy_auth=proxy_auth) as resp: print(resp.status)

或者经过这种方式来验证受权:

session.get("http://python.org", proxy="http://user:pass@some.proxy.com")

15.响应状态码 response status code

能够经过 resp.status来检查状态码是否是200:

async with session.get('http://httpbin.org/get') as resp: assert resp.status == 200

16.响应头

咱们能够直接使用 resp.headers 来查看响应头,获得的值类型是一个dict:

resp.headers {'ACCESS-CONTROL-ALLOW-ORIGIN': '*', 'CONTENT-TYPE': 'application/json', 'DATE': 'Tue, 15 Jul 2014 16:49:51 GMT', 'SERVER': 'gunicorn/18.0', 'CONTENT-LENGTH': '331', 'CONNECTION': 'keep-alive'}

或者咱们能够查看原生的响应头:

resp.raw_headers ((b'SERVER', b'nginx'), (b'DATE', b'Sat, 09 Jan 2016 20:28:40 GMT'), (b'CONTENT-TYPE', b'text/html; charset=utf-8'), (b'CONTENT-LENGTH', b'12150'), (b'CONNECTION', b'keep-alive'))

17.查看cookie

url = 'http://example.com/some/cookie/setting/url' async with session.get(url) as resp: print(resp.cookies)

18.重定向的响应头

若是一个请求被重定向了,咱们依然能够查看被重定向以前的响应头信息:

resp = await session.get('http://example.com/some/redirect/') resp <ClientResponse(http://example.com/some/other/url/) [200]> resp.history (<ClientResponse(http://example.com/some/redirect/) [301]>,)

19.超时处理

默认的IO操做都有5分钟的响应时间 咱们能够经过 timeout 进行重写:

async with session.get('https://github.com', timeout=60) as r: ...

若是 timeout=None 或者 timeout=0 将不进行超时检查,也就是不限时长。

#18.4 改进asyncio下载脚本 #示例 18-7 flags2_asyncio.py:脚本的前半部分;余下的代码在示例 18-8 中

import os,time,sys import aiohttp from aiohttp import web import asyncio import async_timeout import collections from collections import namedtuple from enum import Enum from tqdm import tqdm

BASE_URL = 'https://images2018.cnblogs.com/blog/1239321/201808' POP20_CC1 = '1239321-20180808065117364-1539273796 1239321-20180808065129112-103367989'
'1239321-20180808065136786-868892759'
'1239321-20180808065146211-1880907820 1239321-20180808065155072-1392342345 1239321-20180808065222347-1439669487'
'1239321-20180808065232562-1454112423 1239321-20180808065246215-1857827340 1239321-20180808065301480-1707393818'
'1239321-20180808065312201-964077895 1239321-20180808065326211-1590046138 1239321-20180808065342568-448845'
'1239321-20180808065358869-366577464 1239321-20180808065410900-539910454 1239321-20180808065422695-222625730'
'1239321-20180808065430991-1182951067 1239321-20180808065437898-138307299 1239321-20180808065444387-1849567433'
'1239321-20180808065454537-30405473 1239321-20180808065506470-995044385 '.split() POP20_CC = 'aaaa1239321-20180808065117364-1539273796 1239321-20180808065129112-103367989'.split() DEST_DIR = 'downloads' MAX_WORKERS = 20

HTTPStatus = Enum('Status','ok not_found error') Result = namedtuple('Result','status cc')

class FetchError(Exception): def init(self,country_code): self.country_code = country_code

def save_flag(image,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) with open(path,'wb') as fp: fp.write(image)

async def get_flag(cc): url = '/.png'.format(BASE_URL,cc=cc)

async with aiohttp.ClientSession() as session:
    with async_timeout.timeout(3000):
        async with session.get(url,verify_ssl = False) as resp:
            #若是不加verify_ssl参数,则会报SSL错误,根源
            #是urllib或requests在打开https站点是会验证证书

            #print(await resp.text())
            if b'PNG' in (await resp.read()):
                # 这里不能用resp.status==404来判断资源是否不存在,不是每一个网站返回结果的格式都是一致的。同时,也不能用'404' not in (await resp.text() 来判断,由于若是资源存在,使用这个方法会报错,'UnicodeDecodeError: 'utf-8' codec can't decode byte 0x89 in position 0: invalid start byte'。
                # 因此只能用这种方式来判断,这是试出来的,资源不存在时候这个值是191.
                # 这种判断方法有点不太稳定,比较正规的的判断方法待之后完善吧
                # 另一个办法:if resp.status == 200 and ((await resp.read())[2] == 78):
                image = await resp.read()
                return image
            elif 'not found' in (await resp.text()) or 'not exist' in (await resp.text()):# 后来FetchError接到了,结果打印*** Error for T_JINGSE200: Not Found
                raise web.HTTPNotFound()
            else:

                raise aiohttp.HttpProcessingError(code = resp.status,message= resp.reason,headers = resp.headers) #后来FetchError接到了,结果打印*** Error for T_JINGSE200: module 'aiohttp' has no attribute

async def download_one(cc,semaphore,verbose): try: with (await semaphore):#在 yield from 表达式中把 semaphore 当成上下文管理器使用,防止阻塞整个系统:若是 semaphore 计数器的值是所容许的最大值,只有这个协程会阻塞。 image = await get_flag(cc) except Exception as exc:

raise FetchError(cc) from exc#引入的raise X from Y 句法连接原来的异常
else:
    save_flag(image,cc + '.PNG')
    status = HTTPStatus.ok
    msg = 'OK'
if verbose and msg:#若是在命令行中设定了 -v/--verbose 选项,显示国家代码和状态消息;这就是详细模式中看到的进度信息
    print (cc,msg)
return Result(status,cc)

async def download_coro(cc_list,verbose,concur_req): counter = collections.Counter() semaphore = asyncio.Semaphore(concur_req) to_do = [download_one(cc,semaphore,verbose) for cc in cc_list] to_do_iter = asyncio.as_completed(to_do)#获取一个迭代器,这个迭代器会在期物运行结束后返回期物 if not verbose: to_do_iter = tqdm(to_do_iter,total=len(cc_list))# 把迭代器传给 tqdm 函数,显示进度 for future in to_do_iter: #迭代运行结束的期物 try: res = await future except FetchError as exc: country_code = exc.country_code try: error_msg = exc.cause.args[0] #有的时候格式相似于("module 'aiohttp' has no attribute 'HttpProcessingError'",),此时就取元祖的第二个元素 #有的时候格式是相似于 (1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed (_ssl.c:749)'),此时就取元祖的第一个元素 print (exc.cause.args) except IndexError: error_msg = exc.cause.class.name if not verbose and error_msg: msg = '****Error for :' print (msg.format(country_code,error_msg)) status = HTTPStatus.error

else:
        status = res.status
    counter[status] += 1
return counter

def download_many(cc_list,verbose,concur_req): loop = asyncio.get_event_loop() coro = download_coro(cc_list,verbose,concur_req)#download_many 函数只是实例化 downloader_coro 协程,而后经过run_until_complete 方法把它传给事件循环 counts = loop.run_until_complete(coro) loop.close() return counts

def main(): t0 = time.time() count = download_many(POP20_CC,verbose=False,concur_req=2) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print (msg.format(count,elapsed))

main()

''' 返回正常的就下载到指定路径 不正常的就报相应的错误 '''

18.4.2 使用Excutor对象,防止阻塞事件循环

''' 在示例 18-7 中,阻塞型函数是 save_flag。在这个脚本的线程版中(见示例 17- 14), save_flag 函数会阻塞运行 download_one 函数的线程,可是阻塞的只是众多工 做线程中的一个。阻塞型 I/O 调用在背后会释放 GIL,所以另外一个线程能够继续。可是在 flags2_asyncio.py 脚本中, save_flag 函数阻塞了客户代码与 asyncio 事件循环共用的惟 一线程,所以保存文件时,整个应用程序都会冻结。这个问题的解决方法是,使用事件循 环对象的 run_in_executor 方法。 asyncio 的事件循环在背后维护着一个 ThreadPoolExecutor 对象,咱们能够调用 run_in_executor 方法,把可调用的对象发给它执行。若想在这个示例中使用这个功 能, download_one 协程只有几行代码须要改动 '''

栗子 异步下载,使用Executor对象,根上一个栗子相比,没发现性能提高多少

from enum import Enum import os,sys,time import collections from collections import namedtuple import asyncio import async_timeout import aiohttp from aiohttp import web from tqdm import tqdm

HTTPStatus = Enum('Status','ok not_found error') Result = namedtuple('Result','status cc')

BASE_URL = 'http://images.cnblogs.com/cnblogs_com/suren2017/1102909' POP20_CC = 'T_JINGSE200 T_JINGSE3'.split() DEST_DIR = 'download' MAX_WORKERS = 20

class FetchError(Exception): def init(self,country_code): self.country_code = country_code

def save_flag(img,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) path = path.replace('\','/') with open(path,'wb') as fp: fp.write(img)

async def get_flag(cc): url = '/.PNG'.format(BASE_URL,cc=cc.lower()) async with aiohttp.ClientSession() as session: with async_timeout.timeout(10000): async with session.get(url) as resp: if b'PNG' in (await resp.read()): image = await resp.read() return image elif '404' in (await resp.text()): raise web.HTTPNotFound() else: raise aiohttp.HttpProcessingError(code = resp.status,message=resp.reason,headers=resp.headers)

async def download_one(cc,semaphore,verbose): try: with (await semaphore): image = await get_flag(cc) except Exception as exc: raise FetchError(cc) from exc else: loop = asyncio.get_event_loop() loop.run_in_executor(None,save_flag,image,cc.lower()+'.PNG') status = HTTPStatus.ok msg = 'OK' if verbose and msg: print(cc,msg) return Result(status,cc)

async def download_coro(cc_list,verbose,concur_req): counter = collections.Counter() semaphore = asyncio.Semaphore(concur_req) to_do = [download_one(cc,semaphore,verbose) for cc in sorted(cc_list)] to_do_iter = asyncio.as_completed(to_do) if not verbose: to_do_iter = tqdm(to_do_iter,total=len(cc_list)) for future in to_do_iter: try: res = await future

except FetchError as exc:
        country_code = exc.country_code
        try:
            error_msg = exc.__cause__.args[0]
            print(exc.__cause__.args)
        except IndexError:
            error_msg = exc.__cause__.__class__.__name__
        if not verbose and error_msg:
            msg = '*** Error for {} : {}'
            print(msg.format(country_code,error_msg))
        status = HTTPStatus.error
    else:
        status = res.status
    counter[status] += 1
return counter

def download_many(cc_list,verbose,concur_req): loop = asyncio.get_event_loop() coro = download_coro(cc_list,verbose,concur_req) counts = loop.run_until_complete(coro) loop.close() return counts

def main(): t0 = time.time() count = download_many(POP20_CC,verbose=True,concur_req=2) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print(msg.format(count,elapsed))

main()

#栗子 顺序下载,把各个文件保存的字节数变成原来的 10 倍,不使用Executor对象

from enum import Enum HTTPStatus = Enum('Status', 'ok not_found error') import collections from collections import namedtuple Result = namedtuple('Result','status cc')

import os import sys import time

import requests

BASE_URL = 'http://images.cnblogs.com/cnblogs_com/suren2017/1102909'

POP20_CC = 'T_JINGSE200 T_JINGSE3 T_JINGSE4 T_JINGSE5 t_jingse6 t_jingse7 t_jingse8 t_jingse9 t_jingse10 t_jingse11 t_jingse12 t_jingse13 T_jingse14 T_jingse15 T_jingse16 T_jingse17 T_jingse18 T_jingse19 T_jingse20'.split()

DEST_DIR = 'downloads'

MAX_WORKERS = 20 def save_flag(img,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) path = path.replace('\','/') with open(path,'wb') as fp: fp.write(img*10)

def get_flag(cc): url = '/.PNG'.format(BASE_URL,cc=cc.lower()) resp = requests.get(url) if '404' in resp.text: resp.status_code = 404 resp.raise_for_status() return resp.content

def download_one(cc,verbose=False): try: image = get_flag(cc) except requests.exceptions.HTTPError as exc: res = exc.response if res.status_code == 404: status = HTTPStatus.not_found msg = 'not found' res.status_code = 404 res.reason = 'NOT FOUND' raise else: raise else: save_flag(image,cc.lower() + '.PNG') status = HTTPStatus.ok msg = 'OK'

if verbose:  #若是在命令行中设定了 -v/--verbose 选项,显示国家代码和状态消息;这就是详细模式中看到的进度信息
    print(cc,msg)
return Result(status,cc)

from concurrent import futures from tqdm import tqdm def download_many(cc_list,verbose,max_req): counter = collections.Counter() cc_iter = sorted(cc_list) if not verbose: cc_iter = tqdm(cc_iter) for cc in cc_iter: try: res = download_one(cc,verbose) except requests.exceptions.HTTPError as exc: error_msg = 'HTTP error - ' error_msg = error_msg.format(res=exc.response) except requests.exceptions.ConnectionError as exc: error_msg = 'Connection error' else: error_msg = '' status = res.status if error_msg: status = HTTPStatus.error counter[status] += 1 if verbose and error_msg: #备注1 print('*** Error for : '.format(cc,error_msg)) return counter

def main(download_many): t0 = time.time() count = download_many(POP20_CC,verbose=False,max_req=10) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print(msg.format(count,elapsed)) #Counter({<Status.ok: 1>: 18, <Status.error: 3>: 1}) flags downloaded in 33.58s

if name == 'main': main(download_many)

栗子 顺序下载,把各个文件保存的字节数变成原来的 10 倍,使用Executor对象

from enum import Enum HTTPStatus = Enum('Status', 'ok not_found error') import collections from collections import namedtuple Result = namedtuple('Result','status cc')

import os import sys import time

import requests

BASE_URL = 'http://images.cnblogs.com/cnblogs_com/suren2017/1102909'

POP20_CC = 'T_JINGSE200 T_JINGSE3 T_JINGSE4 T_JINGSE5 t_jingse6 t_jingse7 t_jingse8 t_jingse9 t_jingse10 t_jingse11 t_jingse12 t_jingse13 T_jingse14 T_jingse15 T_jingse16 T_jingse17 T_jingse18 T_jingse19 T_jingse20'.split()

DEST_DIR = 'downloads'

MAX_WORKERS = 20 def save_flag(img,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) path = path.replace('\','/') with open(path,'wb') as fp: fp.write(img*10)

def get_flag(cc): url = '/.PNG'.format(BASE_URL,cc=cc.lower()) resp = requests.get(url) if '404' in resp.text: resp.status_code = 404 resp.raise_for_status() return resp.content

def download_one(cc,verbose=False): try: image = get_flag(cc) except requests.exceptions.HTTPError as exc: res = exc.response if res.status_code == 404: status = HTTPStatus.not_found msg = 'not found' res.status_code = 404 res.reason = 'NOT FOUND' raise else: raise else:

#save_flag(image,cc.lower() + '.PNG')
    import asyncio
    loop = asyncio.get_event_loop()
    loop.run_in_executor(None,save_flag,image,cc.lower() + '.PNG')
    status = HTTPStatus.ok
    msg = 'OK'

if verbose:  #若是在命令行中设定了 -v/--verbose 选项,显示国家代码和状态消息;这就是详细模式中看到的进度信息
    print(cc,msg)
return Result(status,cc)

from concurrent import futures from tqdm import tqdm def download_many(cc_list,verbose,max_req): counter = collections.Counter() cc_iter = sorted(cc_list) if not verbose: cc_iter = tqdm(cc_iter) for cc in cc_iter: try: res = download_one(cc,verbose) except requests.exceptions.HTTPError as exc: error_msg = 'HTTP error - ' error_msg = error_msg.format(res=exc.response) except requests.exceptions.ConnectionError as exc: error_msg = 'Connection error' else: error_msg = '' status = res.status if error_msg: status = HTTPStatus.error counter[status] += 1 if verbose and error_msg: #备注1 print('*** Error for : '.format(cc,error_msg)) return counter

def main(download_many): t0 = time.time() count = download_many(POP20_CC,verbose=False,max_req=10) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print(msg.format(count,elapsed)) #把各个文件保存的字节数变成原来的 10 倍(只需把fp.write(img) 改为 fp.write(img*10)),此时便会看到效果 # Counter({<Status.ok: 1>: 18, <Status.error: 3>: 1}) flags downloaded in 20.68s

if name == 'main': main(download_many)

"""

【流畅的Python】【控制流程】【五】【asyncio】

-- coding:utf-8 --

""" #18.1 线程&协程 #栗子18-1 threading import sys import time import itertools import threading

class Signal: go = True

def spin(msg, signal): write, flush = sys.stdout.write, sys.stdout.flush for char in itertools.cycle('|/-\'): status = char + ' ' + msg write(status) flush() write('\x08' * len(status)) time.sleep(.1) if not signal.go: break write(' '* len(status) + '\x08'*len(status))

def slow_function(): time.sleep(1) return 42

def supervisor(): signal = Signal() spinner = threading.Thread(target=spin,args=('thinking!',signal)) print('spinner object:',spinner) spinner.start() result = slow_function() signal.go = False spinner.join() return result

def main(): result = supervisor() print('Answer:',result) if name == 'main': main()

''' spinner object: <Thread(Thread-1, initial)> | thinking! / thinking!

  • thinking! \ thinking! | thinking! / thinking!
  • thinking! \ thinking! | thinking! / thinking! Answer: 42 '''

#栗子18-2 asyncio 实现

import asyncio import sys import itertools

@asyncio.coroutine def spin(msg): write,flush = sys.stdout.write,sys.stdout.flush for char in itertools.cycle('|/-\'): status = char + ' ' + msg write(status) flush() write('\x08'*len(status)) #这是显示文本式动画的诀窍所在:使用退格符(\x08)把光标移回来 try: yield from asyncio.sleep(.1) except asyncio.CancelledError: break write(' '*len(status) + '\x08'*len(status)) #使用空格清除状态消息,把光标移回开头 @asyncio.coroutine def slow_function(): # 伪装等到I/O一段时间 yield from asyncio.sleep(1) #yield from asyncio.sleep(3) 表达式把控制权交给主循环,在休眠结束后恢复这个协程 return 42 @asyncio.coroutine def supervisor(): spinner = asyncio.async(spin('thinking!')) print('spinner object:',spinner) result = yield from slow_function() #驱动 slow_function() 函数。结束后,获取返回值。同时,事件循环继续运行,由于slow_function 函数最后使用 yield from asyncio.sleep(3) 表达式把控制权交回给了主循环。 spinner.cancel() return result

def main(): loop = asyncio.get_event_loop() #获取事件循环的引用。 result = loop.run_until_complete(supervisor()) #驱动 supervisor 协程,让它运行完毕;这个协程的返回值是此次调用的返回值 loop.close() print('Answer :',result) if name == 'main': main() ''' spinner object: <Task pending coro=<spin() running at C:/Users/wangxue1/PycharmProjects/fluentPython/kongzhiliucheng/asyncio/init.py:69>> | thinking! / thinking!

  • thinking! \ thinking! | thinking! / thinking!
  • thinking! \ thinking! | thinking! / thinking! Answer : 42 '''

#例子 async/await 实现

import asyncio import sys import itertools

async def spin(msg): write,flush = sys.stdout.write,sys.stdout.flush for char in itertools.cycle('|/-\'): status = char + msg write(status) flush() write('\x08' * len(status)) try: await asyncio.sleep(.3) except asyncio.CancelledError: break write('\x08' * len(status))

async def slow_function(): await asyncio.sleep(1) return 42

async def supervisor(): spinner = asyncio.ensure_future(spin('thinking!')) print (type(spinner)) print ('spinner object:',spinner) result = await slow_function() spinner.cancel() return result

def main(): loop = asyncio.get_event_loop() result = loop.run_until_complete(supervisor()) loop.close() print ('Answer:',result) main()

''' #【比较】 这两种 supervisor 实现之间的主要区别概述以下。 asyncio.Task 对象差很少与 threading.Thread 对象等效。 Victor Stinner(本章的 特约技术审校)指出, “Task 对象像是实现协做式多任务的库(例如 gevent)中的 绿色线程(green thread) ”。 Task 对象用于驱动协程, Thread 对象用于调用可调用的对象。 Task 对象不禁本身动手实例化,而是经过把协程传给 asyncio.async(...) 函数或 loop.create_task(...) 方法获取。 获取的 Task 对象已经排定了运行时间(例如,由 asyncio.async 函数排 定); Thread 实例则必须调用 start 方法,明确告知让它运行。 在线程版 supervisor 函数中, slow_function 函数是普通的函数,直接由线程调 用。在异步版 supervisor 函数中, slow_function 函数是协程,由 yield from 驱动。 没有 API 能从外部终止线程,由于线程随时可能被中断,致使系统处于无效状态。 若是想终止任务,可使用 Task.cancel() 实例方法,在协程内部抛出 CancelledError 异常。协程能够在暂停的 yield 处捕获这个异常,处理终止请 求。 supervisor 协程必须在 main 函数中由 loop.run_until_complete 方法执行。 上述比较应该能帮助你理解,与更熟悉的 threading 模型相比, asyncio 是如何编排并 发做业的。 线程与协程之间的比较还有最后一点要说明:若是使用线程作太重要的编程,你就知道写 出程序有多么困难,由于调度程序任什么时候候都能中断线程。必须记住保留锁,去保护程序 中的重要部分,防止多步操做在执行的过程当中中断,防止数据处于无效状态。 而协程默认会作好全方位保护,以防止中断。咱们必须显式产出才能让程序的余下部分运 行。对协程来讲,无需保留锁,在多个线程之间同步操做,协程自身就会同步,由于在任 意时刻只有一个协程运行。想交出控制权时,可使用 yield 或 yield from 把控制权 交还调度程序。这就是可以安全地取消协程的缘由:按照定义,协程只能在暂停的 yield 处取消,所以能够处理 CancelledError 异常,执行清理操做 '''

#18.1.1 故意不阻塞 ''' asyncio.Future 类与 concurrent.futures.Future 类的接口基本一致,不过实现方 式不一样,不能够互换。 “PEP 3156—Asynchronous IO Support Rebooted: the‘asyncio’Module”(https://www.python.org/dev/peps/pep-3156/)对这个不幸情况是这样说 的: 将来可能会统一 asyncio.Future 和 concurrent.futures.Future 类实现的期物 (例如,为后者添加兼容 yield from 的 iter 方法)。

总之,由于 asyncio.Future 类的目的是与 yield from 一块儿使用,因此一般不须要使 用如下方法。 无需调用 my_future.add_done_callback(...),由于能够直接把想在期物运行结 束后执行的操做放在协程中 yield from my_future 表达式的后面。这是协程的一 大优点:协程是能够暂停和恢复的函数。 无需调用 my_future.result(),由于 yield from 从期物中产出的值就是结果 (例如, result = yield from my_future) '''

#18.2 使用asyncio和aiohttp下载

import os,sys,time import requests import asyncio import aiohttp

BASE_URL = 'http://images.cnblogs.com/cnblogs_com/suren2017/1102909' POP20_CC = 'T_JINGSE2 T_JINGSE3'.split() DEST_DIR = 'downloads' MAX_WORKERS = 20

def save_flag(img,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) path = path.replace('\','/') with open(path,'wb') as fp: fp.write(img)

async def get_flag(cc): url = '/.PNG'.format(BASE_URL,cc=cc.lower()) resp = await aiohttp.request('GET',url) image = await resp.read() return image

def show(text): print(text,end=' ') sys.stdout.flush()

async def download_one(cc): image = await get_flag(cc) show(cc) save_flag(image,cc.lower()+'.PNG') return cc

def download_many(cc_list): loop = asyncio.get_event_loop() to_do = [download_one(cc) for cc in sorted(cc_list)] wait_coro = asyncio.wait(to_do) #虽然函数的名称是 wait,但它不是阻塞型函数。 wait 是一个协程,等传给它的全部协程运行完毕后结束 ''' asyncio.wait(...) 协程的参数是一个由期物或协程构成的可迭代对象; wait 会分别 把各个协程包装进一个 Task 对象。最终的结果是, wait 处理的全部对象都经过某种方 式变成 Future 类的实例。 wait 是协程函数,所以返回的是一个协程或生成器对 象; wait_coro 变量中存储的正是这种对象。为了驱动协程,咱们把协程传给 loop.run_until_complete(...) 方法 ''' res,_ = loop.run_until_complete(wait_coro) #执行事件循环,直到 wait_coro 运行结束;事件循环运行的过程当中,这个脚本会在这里阻塞。咱们忽略 run_until_complete 方法返回的第二个元素 ''' loop.run_until_complete 方法的参数是一个期物或协程。若是是协 程, run_until_complete 方法与 wait 函数同样,把协程包装进一个 Task 对象中。协 程、期物和任务都能由 yield from 驱动,这正是 run_until_complete 方法对 wait 函数返回的 wait_coro 对象所作的事。 wait_coro 运行结束后返回一个元组,第一个元 素是一系列结束的期物,第二个元素是一系列未结束的期物。在示例 18-5 中,第二个元 素始终为空,所以咱们把它赋值给 _,将其忽略。可是 wait 函数有两个关键字参数,如 果设定了可能会返回未结束的期物;这两个参数是 timeout 和 return_when ''' loop.close() return len(res)

def main(download_many): t0 = time.time() count = download_many(POP20_CC) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print(msg.format(count,elapsed))

if name == 'main': main(download_many) #19 flags downloaded in 0.25s ''' Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038C9470> t_jingse7 t_jingse11 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x000000000388F128> T_JINGSE4 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x0000000003877BE0> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x000000000387E8D0> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x000000000388FE48> t_jingse8 T_jingse17 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038BC7B8> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x0000000003872C88> t_jingse6 t_jingse10 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038BCBE0> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x000000000388F5F8> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x000000000388FA20> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038B0B00> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x000000000387E3C8> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x000000000387E390> T_JINGSE3 t_jingse9 T_JINGSE5 t_jingse13 T_jingse20 T_jingse16 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x0000000003868F28> T_JINGSE2 t_jingse12 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038B02B0> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038BC390> T_jingse19 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038C9048> T_jingse15 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038B06D8> T_jingse18 Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x00000000038B0F28> T_jingse14 19 flags downloaded in 0.45s ''' #【小结】 ''' 使用 asyncio 包时,咱们编写的异步代码中包含由 asyncio 自己驱动的 协程(即委派生成器),而生成器最终把职责委托给 asyncio 包或第三方库(如 aiohttp)中的协程。这种处理方式至关于架起了管道,让 asyncio 事件循环(经过我 们编写的协程)驱动执行低层异步 I/O 操做的库函数 '''

''' 18.3 避免阻塞型调用 Ryan Dahl(Node.js 的发明者)在介绍他的项目背后的哲学时说: “咱们处理 I/O 的方式彻 底错了。 ” 他把执行硬盘或网络 I/O 操做的函数定义为阻塞型函数,主张不能像对待非 阻塞型函数那样对待阻塞型函数。为了说明缘由,他展现了表 18-1 中的前两列。 “Introduction to Node.js”(https://www.youtube.com/watch?v=M-sc73Y-zQA)视频 4:55 处。 表18-1:使用现代的电脑从不一样的存储介质中读取数据的延迟状况;第三栏按比例换 算成具体的时间,便于人类理解 存储介质 CPU 周期 按比例换算成“人类时间” L1 缓存 3 3 秒 L2 缓存 14 14 秒 RAM 250 250 秒 硬盘 41 000 000 1.3 年 网络 240 000 000 7.6 年 为了理解表 18-1,请记住一点:现代的 CPU 拥有 GHz 数量级的时钟频率,每秒钟能运行 几十亿个周期。假设 CPU 每秒正好运行十亿个周期,那么 CPU 能够在一秒钟内读取 L1 缓存 333 333 333 次,读取网络 4 次(只有 4 次)。表 18-1 中的第三栏是拿第二栏中的各 个值乘以固定的因子获得的。所以,在另外一个世界中,若是读取 L1 缓存要用 3 秒,那么 读取网络要用 7.6 年! 有两种方法能避免阻塞型调用停止整个应用程序的进程: 在单独的线程中运行各个阻塞型操做 把每一个阻塞型操做转换成非阻塞的异步调用使用 '''

#本身栗子1 import asyncio import time

now = lambda : time.time()

@asyncio.coroutine def do_some_work(x): print('Waiting: ',x)

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop() loop.run_until_complete(coroutine)

print('TIME: ',now() - start)

#本身栗子2 import asyncio import time

now = lambda : time.time()

@asyncio.coroutine def do_some_work(x): print('Waiting: ',x)

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop() task = loop.create_task(coroutine) print(task) loop.run_until_complete(task) print(task) print('TIME: ',now() - start) ''' <Task pending coro=<do_some_work() running at C:\Python36\lib\asyncio\coroutines.py:208>> Waiting: 2 <Task finished coro=<do_some_work() done, defined at C:\Python36\lib\asyncio\coroutines.py:208> result=None> TIME: 0.0010001659393310547

'''

#本身栗子3 ''' 协程对象不能直接运行,在注册事件循环的时候,实际上是run_until_complete方法将协程包装成为了一个任务(task)对象。所谓task对象是Future类的子类。保存了协程运行后的状态,用于将来获取协程的结果'''

import asyncio import time

now = lambda : time.time()

@asyncio.coroutine def do_some_work(x): print('Waiting: ',x)

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) print(task) loop.run_until_complete(task) print(task) print('TIME: ',now() - start) ''' asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)均可以建立一个task,run_until_complete的参数是一个futrue对象。当传入一个协程,其内部会自动封装成task,task是Future的子类。isinstance(task, asyncio.Future)将会输出True ''' print(isinstance(task,asyncio.Future)) ''' <Task pending coro=<do_some_work() running at C:\Python36\lib\asyncio\coroutines.py:208>> Waiting: 2 <Task finished coro=<do_some_work() done, defined at C:\Python36\lib\asyncio\coroutines.py:208> result=None> TIME: 0.0009999275207519531 True '''

#本身栗子4 :绑定回调

import asyncio import time

now = lambda : time.time()

@asyncio.coroutine def do_some_work(x): print('Waiting: ',x) return 'Done after s'.format(x)

def callback(future): print('Result: ',future)

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) task.add_done_callback(callback) loop.run_until_complete(task)

print('TIME: ',now() - start) ''' Waiting: 2 Result: <Task finished coro=<do_some_work() done, defined at C:\Python36\lib\asyncio\coroutines.py:208> result='Done after 2s'> TIME: 0.002000093460083008 '''

#本身栗子5:绑定回调 ,如回调须要多个参数

import asyncio import time

now = lambda : time.time()

@asyncio.coroutine def do_some_work(x): print('Waiting: ',x) return 'Done after s'.format(x)

def callback(t,future): print('Result: ',t,future)

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) import functools task.add_done_callback(functools.partial(callback,2)) loop.run_until_complete(task)

print('TIME: ',now() - start)

''' Waiting: 2 Result: 2 <Task finished coro=<do_some_work() done, defined at C:\Python36\lib\asyncio\coroutines.py:208> result='Done after 2s'> TIME: 0.002000093460083008 '''

#本身栗子6: future 和 result 。回调一致是不少异步编程的噩梦,程序员更喜欢用同步的编写方式写异步代码

import asyncio import time

now = lambda : time.time()

async def do_some_work(x): print('Waiting '.format(x)) return 'Done after s'.format(x)

start = now()

coroutine = do_some_work(2) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) loop.run_until_complete(task)

print('Task result:'.format(task.result)) print('TIME: '.format(now() - start))

''' Waiting 2 Task result:<built-in method result of _asyncio.Task object at 0x0000000002F73AE8> TIME: 0.002000093460083008 '''

#本身栗子7: 阻塞和await

import asyncio import time

now = lambda : time.time()

async def do_some_work(x): print('Waiting '.format(x)) await asyncio.sleep(x) return 'Done after s'.format(x)

start = now()

coroutine = do_some_work(2) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) loop.run_until_complete(task)

print('Task result:'.format(task.result)) print('TIME: '.format(now() - start))

''' Waiting 2 Task result:<built-in method result of _asyncio.Task object at 0x0000000002F73A60> TIME: 2.001114845275879 '''

#本身栗子8:并发&并行 #每当有阻塞任务时候就用await

import asyncio import time

now = lambda : time.time()

start = now()

async def do_some_work(x): print('Waiting : ',x) await asyncio.sleep(x) return 'Done after s'.format(x)

coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ]

loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))

for task in tasks: print('Task result: ',task.result())

print('Time: ',now() - start) ''' Waiting : 1 Waiting : 2 Waiting : 4 Task result: Done after 1s Task result: Done after 2s Task result: Done after 4s Time: 3.9912283420562744 '''

#例子

import asyncio import time

now = lambda: time.time()

start = now()

async def do_some_work(x): print ('Waiting:',x) await asyncio.sleep(x) return 'Done after s'.format(x)

tasks = [ asyncio.ensure_future(do_some_work(1)), asyncio.ensure_future(do_some_work(2)), asyncio.ensure_future(do_some_work(4)) ]

loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))

for task in tasks: print ('Task result:',task.result())

''' Waiting: 2 Waiting: 1 Waiting: 4 Traceback (most recent call last): File "/Users/suren/PycharmProjects/untitled1/asyn.py", line 38, in print ('task result:',asyncio.ensure_future(coro).result()) asyncio.base_futures.InvalidStateError: Result is not ready.

'''

#本身栗子9 协程嵌套 [一] dones, pendings = await asyncio.wait(tasks)

import asyncio import time

now = lambda : time.time()

start = now()

async def do_some_work(x): print('Waiting : ',x) await asyncio.sleep(x) return 'Done after s'.format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

dones,pendings = await asyncio.wait(tasks)

for task in dones:
    print('Task result: ',task.result())

loop = asyncio.get_event_loop() loop.run_until_complete(main())

print('TIME: ',now() - start) ''' Waiting : 1 Waiting : 2 Waiting : 4 Task result: Done after 2s Task result: Done after 4s Task result: Done after 1s TIME: 4.007229328155518 '''

#本身栗子10 协程嵌套 [二] 若是使用的是 asyncio.gather建立协程对象,那么await的返回值就是协程运行的结果

import asyncio import time

now = lambda : time.time()

start = now()

async def do_some_work(x): print('Waiting : ',x) await asyncio.sleep(x) return 'Done after s'.format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

results = await asyncio.gather(*tasks)

for result in results:
    print('Task result: ',result)

loop = asyncio.get_event_loop() loop.run_until_complete(main())

print('TIME: ',now() - start) ''' Waiting : 1 Waiting : 2 Waiting : 4 Task result: Done after 1s Task result: Done after 2s Task result: Done after 4s TIME: 3.9892282485961914 '''

#本身栗子11 协程嵌套 [三] 不在main协程函数里处理结果,直接返回await的内容,那么最外层的run_until_complete将会返回main协程的结果

import asyncio import time

now = lambda : time.time()

start = now()

async def do_some_work(x): print('Waiting : ',x) await asyncio.sleep(x) return 'Done after s'.format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

return await asyncio.gather(*tasks)

loop = asyncio.get_event_loop() results = loop.run_until_complete(main())

for result in results: print('Task result: ', result)

print('TIME: ',now() - start) ''' Waiting : 1 Waiting : 2 Waiting : 4 Task result: Done after 1s Task result: Done after 2s Task result: Done after 4s TIME: 4.0052289962768555 '''

#本身栗子12 协程嵌套 [四 ] 不在main协程函数里处理结果,直接返回await的内容,那么最外层的run_until_complete将会返回main协程的结果,使用asyncio.wait方式挂起协程。

import asyncio import time

now = lambda : time.time()

start = now()

async def do_some_work(x): print('Waiting : ',x) await asyncio.sleep(x) return 'Done after s'.format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

return await asyncio.wait(tasks)

loop = asyncio.get_event_loop() dones,pendings = loop.run_until_complete(main())

for task in dones: print('Task result: ', task.result())

print('TIME: ',now() - start) ''' Waiting : 1 Waiting : 2 Waiting : 4 Task result: Done after 2s Task result: Done after 4s Task result: Done after 1s TIME: 3.9912283420562744 '''

#本身栗子13 协程嵌套 [五]使用asyncio的as_completed方法

import asyncio import time

now = lambda : time.time()

start = now()

async def do_some_work(x): print('Waiting : ',x) await asyncio.sleep(x) return 'Done after s'.format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

for task in asyncio.as_completed(tasks):
    result = await task
    print('Task result: {}'.format(result))

loop = asyncio.get_event_loop() loop.run_until_complete(main())

print('TIME: ',now() - start) ''' Waiting : 1 Waiting : 2 Waiting : 4 Task result: Done after 1s Task result: Done after 2s Task result: Done after 4s TIME: 3.9912281036376953 '''

#本身栗子14 协程中止 【一】 main函数外进行事件循环的调用。这个时候,main至关于最外出的一个task,那么处理包装的main函数便可 ''' 上面见识了协程的几种经常使用的用法,都是协程围绕着事件循环进行的操做。future对象有几个状态:

Pending Running Done Cancelled 建立future的时候,task为pending,事件循环调用执行的时候固然就是running,调用完毕天然就是done,若是须要中止事件循环,就须要先把task取消。可使用asyncio.Task获取事件循环的task'

启动事件循环以后,立刻ctrl+c,会触发run_until_complete的执行异常 KeyBorardInterrupt。而后经过循环asyncio.Task取消future。

'''

import asyncio import time

now = lambda : time.time()

start = now()

async def do_some_work(x): print('Waiting : ',x) await asyncio.sleep(x) return 'Done after s'.format(x)

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

done,pending = await asyncio.wait(tasks)
for task in done:
    print('Task result: ',task.result())

loop = asyncio.get_event_loop() task = asyncio.ensure_future(main()) try: loop.run_until_complete(task) except KeyboardInterrupt as e: print(asyncio.Task.all_tasks()) print('*******************') print(asyncio.gather(*asyncio.Task.all_tasks()).cancel()) loop.stop() loop.run_forever() #True表示cannel成功,loop stop以后还须要再次开启事件循环,最后在close,否则还会抛出异常 finally: loop.close()

print('TIME: ',now() - start)

''' #不能再pycharm经过Ctrl+C,只能在Python交互环境里 Waiting: 1 Waiting: 2 Waiting: 4 {<Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x101230648>()]> cb=[_wait. ._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1032b10a8>()]> cb=[_wait. ._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<wait() running at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:307> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317d38>()]> cb=[_run_until_complete_cb() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py:176]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317be8>()]> cb=[_wait. ._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>}


True TIME: 2.0158370780944824 '''

#本身栗子15 协程中止 【二】 tasks在外层,没有被包含在main函数里面 import asyncio

import time

now = lambda: time.time() start = now() async def do_some_work(x): print('Waiting: ', x)

await asyncio.sleep(x)
return 'Done after {}s'.format(x)

coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4)

tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ]

start = now()

loop = asyncio.get_event_loop() try: loop.run_until_complete(asyncio.wait(tasks)) except KeyboardInterrupt as e: print(asyncio.Task.all_tasks()) for task in asyncio.Task.all_tasks(): print(task.cancel()) loop.stop() loop.run_forever() finally: loop.close()

print('TIME: ', now() - start)

''' 打印四个True,而不是三个,缘由我也不知道 Waiting: 1 Waiting: 2 Waiting: 4 {<Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x101230648>()]> cb=[_wait. ._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1032b10a8>()]> cb=[_wait. ._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<wait() running at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:307> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317d38>()]> cb=[_run_until_complete_cb() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py:176]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317be8>()]> cb=[_wait. ._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>} True True True True TIME: 0.8858370780944824 '''

#本身栗子16 不一样线程的时间循环 ''' 不少时候,咱们的事件循环用于注册协程,而有的协程须要动态的添加到事件循环中。一个简单的方式就是使用多线程。当前线程建立一个事件循环,而后在新建一个线程,在新线程中启动事件循环。当前线程不会被block。 启动上述代码以后,当前线程不会被block,新线程中会按照顺序执行call_soon_threadsafe方法注册的more_work方法,后者由于time.sleep操做是同步阻塞的,所以运行完毕more_work须要大体6 + 3 '''

from threading import Thread import asyncio

import time

now = lambda: time.time() start = now()

def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever()

def more_work(x): print('More work '.format(x)) time.sleep(x) print('Finished more work '.format(x))

new_loop = asyncio.new_event_loop() t = Thread(target=start_loop,args=(new_loop,)) t.start()

new_loop.call_soon_threadsafe(more_work,6) new_loop.call_soon_threadsafe(more_work,4)

''' More work 6 Finished more work 6 More work 4 Finished more work 4 '''

#本身栗子17: 新线程 协程 from threading import Thread import asyncio

import time

now = lambda: time.time() start = now()

def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever()

async def do_some_work(x): print('Waiting '.format(x)) await asyncio.sleep(x) print('Done after s'.format(x))

new_loop = asyncio.new_event_loop() t = Thread(target=start_loop,args=(new_loop,)) t.start()

asyncio.run_coroutine_threadsafe(do_some_work(6),new_loop) asyncio.run_coroutine_threadsafe(do_some_work(4),new_loop)

''' Waiting 6 Waiting 4 Done after 4s Done after 6s '''

【aiohttp练习】

  1. 前言

本文翻译自aiohttp的官方文档,若有纰漏,欢迎指出。

aiohttp分为服务器端和客户端,本文只介绍客户端。

另外我已经对 aiohttp 和 asyncio进行了封装,能够参考个人 github 地址:

https://github.com/web-trump/ahttp

因为上下文的缘故,请求代码必须在一个异步的函数中进行:

async def fn():

pass

  1. aiohttp安装

pip3 install aiohttp

1.1. 基本请求用法

async with aiohttp.get('https://github.com') as r:
 
await r.text()

1
2

其中r.text(), 能够在括号中指定解码方式,编码方式,例如

await resp.text(encoding='windows-1251')

1

或者也能够选择不编码,适合读取图像等,是没法编码的

await resp.read()

2.发起一个session请求

首先是导入aiohttp模块:

import aiohttp

而后咱们试着获取一个web源码,这里以GitHub的公共Time-line页面为例:

async with aiohttp.ClientSession() as session: async with session.get('https://api.github.com/events') as resp: print(resp.status) print(await resp.text())

上面的代码中,咱们建立了一个 ClientSession 对象命名为session,而后经过session的get方法获得一个 ClientResponse 对象,命名为resp,get方法中传入了一个必须的参数url,就是要得到源码的http url。至此便经过协程完成了一个异步IO的get请求。

有get请求固然有post请求,而且post请求也是一个协程:

session.post('http://httpbin.org/post', data=b'data')

用法和get是同样的,区别是post须要一个额外的参数data,便是须要post的数据。

除了get和post请求外,其余http的操做方法也是同样的:

session.put('http://httpbin.org/put', data=b'data') session.delete('http://httpbin.org/delete') session.head('http://httpbin.org/get') session.options('http://httpbin.org/get') session.patch('http://httpbin.org/patch', data=b'data')

小记:

不要为每次的链接都建立一次session,通常状况下只须要建立一个session,而后使用这个session执行全部的请求。

每一个session对象,内部包含了一个链接池,而且将会保持链接和链接复用(默认开启)能够加快总体的性能。

3.在URL中传递参数

咱们常常须要经过 get 在url中传递一些参数,参数将会做为url问号后面的一部分发给服务器。在aiohttp的请求中,容许以dict的形式来表示问号后的参数。举个例子,若是你想传递 key1=value1 key2=value2 到 httpbin.org/get 你可使用下面的代码:

params = {'key1': 'value1', 'key2': 'value2'} async with session.get('http://httpbin.org/get', params=params) as resp: assert resp.url == 'http://httpbin.org/get?key2=value2&key1=value1'

能够看到,代码正确的执行了,说明参数被正确的传递了进去。无论是一个参数两个参数,仍是更多的参数,均可以经过这种方式来传递。除了这种方式以外,还有另一个,使用一个 list 来传递(这种方式能够传递一些特殊的参数,例以下面两个key是相等的也能够正确传递):

params = [('key', 'value1'), ('key', 'value2')] async with session.get('http://httpbin.org/get', params=params) as r: assert r.url == 'http://httpbin.org/get?key=value2&key=value1'

除了上面两种,咱们也能够直接经过传递字符串做为参数来传递,可是须要注意,经过字符串传递的特殊字符不会被编码:

async with session.get('http://httpbin.org/get', params='key=value+1') as r: assert r.url == 'http://httpbin.org/get?key=value+1'

4.响应的内容

仍是以GitHub的公共Time-line页面为例,咱们能够得到页面响应的内容:

async with session.get('https://api.github.com/events') as resp: print(await resp.text())

运行以后,会打印出相似于以下的内容:

'[{"created_at":"2015-06-12T14:06:22Z","public":true,"actor":{...

resp的text方法,会自动将服务器端返回的内容进行解码--decode,固然咱们也能够自定义编码方式:

await resp.text(encoding='gb2312')

除了text方法能够返回解码后的内容外,咱们也能够获得类型是字节的内容:

print(await resp.read())

运行的结果是:

b'[{"created_at":"2015-06-12T14:06:22Z","public":true,"actor":{...

gzip和deflate转换编码已经为你自动解码。

小记:

text(),read()方法是把整个响应体读入内存,若是你是获取大量的数据,请考虑使用”字节流“(streaming response)

5.特殊响应内容:json

若是咱们获取的页面的响应内容是json,aiohttp内置了更好的方法来处理json:

async with session.get('https://api.github.com/events') as resp: print(await resp.json())

若是由于某种缘由而致使resp.json()解析json失败,例如返回不是json字符串等等,那么resp.json()将抛出一个错误,也能够给json()方法指定一个解码方式:

print(await resp.json(

encoding='gb2312'

)) 或者传递一个函数进去:

print(await resp.json( lambda(x:x.replace('a','b')) ))

6.以字节流的方式读取响应内容

虽然json(),text(),read()很方便的能把响应的数据读入到内存,可是咱们仍然应该谨慎的使用它们,由于它们是把整个的响应体所有读入了内存。即便你只是想下载几个字节大小的文件,但这些方法却将在内存中加载全部的数据。因此咱们能够经过控制字节数来控制读入内存的响应内容:

async with session.get('https://api.github.com/events') as resp: await resp.content.read(10) #读取前10个字节

通常地,咱们应该使用如下的模式来把读取的字节流保存到文件中:

with open(filename, 'wb') as fd: while True: chunk = await resp.content.read(chunk_size) if not chunk: break fd.write(chunk)

7.自定义请求头

若是你想添加请求头,能够像get添加参数那样以dict的形式,做为get或者post的参数进行请求:

import json url = 'https://api.github.com/some/endpoint' payload = {'some': 'data'} headers = {'content-type': 'application/json'}

await session.post(url, data=json.dumps(payload), headers=headers)

8.自定义Cookie

给服务器发送cookie,能够经过给 ClientSession 传递一个cookie参数:

url = 'http://httpbin.org/cookies' cookies = {'cookies_are': 'working'} async with ClientSession(cookies=cookies) as session: async with session.get(url) as resp: assert await resp.json() == { "cookies": {"cookies_are": "working"}}

可直接访问连接 “httpbin.org/cookies”查看当前cookie,访问session中的cookie请见第10节。

9.post数据的几种方式

(1)模拟表单post数据

payload = {'key1': 'value1', 'key2': 'value2'} async with session.post('http://httpbin.org/post', data=payload) as resp: print(await resp.text())

注意:data=dict的方式post的数据将被转码,和form提交数据是同样的做用,若是你不想被转码,能够直接以字符串的形式 data=str 提交,这样就不会被转码。

(2)post json

import json url = 'https://api.github.com/some/endpoint' payload = {'some': 'data'}

async with session.post(url, data=json.dumps(payload)) as resp: ...

其实json.dumps(payload)返回的也是一个字符串,只不过这个字符串能够被识别为json格式

(3)post 小文件

url = 'http://httpbin.org/post' files = {'file': open('report.xls', 'rb')}

await session.post(url, data=files)

能够设置好文件名和content-type:

url = 'http://httpbin.org/post' data = FormData() data.add_field('file', open('report.xls', 'rb'), filename='report.xls', content_type='application/vnd.ms-excel')

await session.post(url, data=data)

若是将文件对象设置为数据参数,aiohttp将自动以字节流的形式发送给服务器。

(4)post 大文件

aiohttp支持多种类型的文件以流媒体的形式上传,因此咱们能够在文件未读入内存的状况下发送大文件。

@aiohttp.streamer def file_sender(writer, file_name=None): with open(file_name, 'rb') as f: chunk = f.read(216) while chunk: yield from writer.write(chunk) chunk = f.read(216)

Then you can use file_sender as a data provider:

async with session.post('http://httpbin.org/post', data=file_sender(file_name='huge_file')) as resp: print(await resp.text())

同时咱们能够从一个url获取文件后,直接post给另外一个url,并计算hash值:

async def feed_stream(resp, stream): h = hashlib.sha256()

while True:
    chunk = await resp.content.readany()
    if not chunk:
        break
    h.update(chunk)
    stream.feed_data(chunk)

return h.hexdigest()

resp = session.get('http://httpbin.org/post') stream = StreamReader() loop.create_task(session.post('http://httpbin.org/post', data=stream))

file_hash = await feed_stream(resp, stream)

由于响应内容类型是StreamReader,因此能够把get和post链接起来,同时进行post和get:

r = await session.get('http://python.org') await session.post('http://httpbin.org/post', data=r.content)

(5)post预压缩数据

在经过aiohttp发送前就已经压缩的数据, 调用压缩函数的函数名(一般是deflate 或 zlib)做为content-encoding的值:

async def my_coroutine(session, headers, my_data): data = zlib.compress(my_data) headers = {'Content-Encoding': 'deflate'} async with session.post('http://httpbin.org/post', data=data, headers=headers) pass

10.keep-alive, 链接池,共享cookie

ClientSession 用于在多个链接之间共享cookie:

async with aiohttp.ClientSession() as session: await session.get( 'http://httpbin.org/cookies/set?my_cookie=my_value') filtered = session.cookie_jar.filter_cookies('http://httpbin.org') assert filtered['my_cookie'].value == 'my_value' async with session.get('http://httpbin.org/cookies') as r: json_body = await r.json() assert json_body['cookies']['my_cookie'] == 'my_value'

也能够为全部的链接设置共同的请求头:

async with aiohttp.ClientSession( headers={"Authorization": "Basic bG9naW46cGFzcw=="}) as session: async with session.get("http://httpbin.org/headers") as r: json_body = await r.json() assert json_body['headers']['Authorization'] ==
'Basic bG9naW46cGFzcw=='

ClientSession 还支持 keep-alive链接和链接池(connection pooling)

11.cookie安全性

默认ClientSession使用的是严格模式的 aiohttp.CookieJar. RFC 2109,明确的禁止接受url和ip地址产生的cookie,只能接受 DNS 解析IP产生的cookie。能够经过设置aiohttp.CookieJar 的 unsafe=True 来配置:

jar = aiohttp.CookieJar(unsafe=True) session = aiohttp.ClientSession(cookie_jar=jar)

12.控制同时链接的数量(链接池)

也能够理解为同时请求的数量,为了限制同时打开的链接数量,咱们能够将限制参数传递给链接器:

conn = aiohttp.TCPConnector(limit=30)#同时最大进行链接的链接数为30,默认是100,limit=0的时候是无限制

限制同时打开限制同时打开链接到同一端点的数量((host, port, is_ssl) 三的倍数),能够经过设置 limit_per_host 参数:

conn = aiohttp.TCPConnector(limit_per_host=30)#默认是0

13.自定义域名解析

咱们能够指定域名服务器的 IP 对咱们提供的get或post的url进行解析:

from aiohttp.resolver import AsyncResolver

resolver = AsyncResolver(nameservers=["8.8.8.8", "8.8.4.4"]) conn = aiohttp.TCPConnector(resolver=resolver)

14.设置代理

aiohttp支持使用代理来访问网页:

async with aiohttp.ClientSession() as session: async with session.get("http://python.org", proxy="http://some.proxy.com") as resp: print(resp.status)

固然也支持须要受权的页面:

async with aiohttp.ClientSession() as session: proxy_auth = aiohttp.BasicAuth('user', 'pass') async with session.get("http://python.org", proxy="http://some.proxy.com", proxy_auth=proxy_auth) as resp: print(resp.status)

或者经过这种方式来验证受权:

session.get("http://python.org", proxy="http://user:pass@some.proxy.com")

15.响应状态码 response status code

能够经过 resp.status来检查状态码是否是200:

async with session.get('http://httpbin.org/get') as resp: assert resp.status == 200

16.响应头

咱们能够直接使用 resp.headers 来查看响应头,获得的值类型是一个dict:

resp.headers {'ACCESS-CONTROL-ALLOW-ORIGIN': '*', 'CONTENT-TYPE': 'application/json', 'DATE': 'Tue, 15 Jul 2014 16:49:51 GMT', 'SERVER': 'gunicorn/18.0', 'CONTENT-LENGTH': '331', 'CONNECTION': 'keep-alive'}

或者咱们能够查看原生的响应头:

resp.raw_headers ((b'SERVER', b'nginx'), (b'DATE', b'Sat, 09 Jan 2016 20:28:40 GMT'), (b'CONTENT-TYPE', b'text/html; charset=utf-8'), (b'CONTENT-LENGTH', b'12150'), (b'CONNECTION', b'keep-alive'))

17.查看cookie

url = 'http://example.com/some/cookie/setting/url' async with session.get(url) as resp: print(resp.cookies)

18.重定向的响应头

若是一个请求被重定向了,咱们依然能够查看被重定向以前的响应头信息:

resp = await session.get('http://example.com/some/redirect/') resp <ClientResponse(http://example.com/some/other/url/) [200]> resp.history (<ClientResponse(http://example.com/some/redirect/) [301]>,)

19.超时处理

默认的IO操做都有5分钟的响应时间 咱们能够经过 timeout 进行重写:

async with session.get('https://github.com', timeout=60) as r: ...

若是 timeout=None 或者 timeout=0 将不进行超时检查,也就是不限时长。

#18.4 改进asyncio下载脚本 #示例 18-7 flags2_asyncio.py:脚本的前半部分;余下的代码在示例 18-8 中

import os,time,sys import aiohttp from aiohttp import web import asyncio import async_timeout import collections from collections import namedtuple from enum import Enum from tqdm import tqdm

BASE_URL = 'https://images2018.cnblogs.com/blog/1239321/201808' POP20_CC1 = '1239321-20180808065117364-1539273796 1239321-20180808065129112-103367989'
'1239321-20180808065136786-868892759'
'1239321-20180808065146211-1880907820 1239321-20180808065155072-1392342345 1239321-20180808065222347-1439669487'
'1239321-20180808065232562-1454112423 1239321-20180808065246215-1857827340 1239321-20180808065301480-1707393818'
'1239321-20180808065312201-964077895 1239321-20180808065326211-1590046138 1239321-20180808065342568-448845'
'1239321-20180808065358869-366577464 1239321-20180808065410900-539910454 1239321-20180808065422695-222625730'
'1239321-20180808065430991-1182951067 1239321-20180808065437898-138307299 1239321-20180808065444387-1849567433'
'1239321-20180808065454537-30405473 1239321-20180808065506470-995044385 '.split() POP20_CC = 'aaaa1239321-20180808065117364-1539273796 1239321-20180808065129112-103367989'.split() DEST_DIR = 'downloads' MAX_WORKERS = 20

HTTPStatus = Enum('Status','ok not_found error') Result = namedtuple('Result','status cc')

class FetchError(Exception): def init(self,country_code): self.country_code = country_code

def save_flag(image,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) with open(path,'wb') as fp: fp.write(image)

async def get_flag(cc): url = '/.png'.format(BASE_URL,cc=cc)

async with aiohttp.ClientSession() as session:
    with async_timeout.timeout(3000):
        async with session.get(url,verify_ssl = False) as resp:
            #若是不加verify_ssl参数,则会报SSL错误,根源
            #是urllib或requests在打开https站点是会验证证书

            #print(await resp.text())
            if b'PNG' in (await resp.read()):
                # 这里不能用resp.status==404来判断资源是否不存在,不是每一个网站返回结果的格式都是一致的。同时,也不能用'404' not in (await resp.text() 来判断,由于若是资源存在,使用这个方法会报错,'UnicodeDecodeError: 'utf-8' codec can't decode byte 0x89 in position 0: invalid start byte'。
                # 因此只能用这种方式来判断,这是试出来的,资源不存在时候这个值是191.
                # 这种判断方法有点不太稳定,比较正规的的判断方法待之后完善吧
                # 另一个办法:if resp.status == 200 and ((await resp.read())[2] == 78):
                image = await resp.read()
                return image
            elif 'not found' in (await resp.text()) or 'not exist' in (await resp.text()):# 后来FetchError接到了,结果打印*** Error for T_JINGSE200: Not Found
                raise web.HTTPNotFound()
            else:

                raise aiohttp.HttpProcessingError(code = resp.status,message= resp.reason,headers = resp.headers) #后来FetchError接到了,结果打印*** Error for T_JINGSE200: module 'aiohttp' has no attribute

async def download_one(cc,semaphore,verbose): try: with (await semaphore):#在 yield from 表达式中把 semaphore 当成上下文管理器使用,防止阻塞整个系统:若是 semaphore 计数器的值是所容许的最大值,只有这个协程会阻塞。 image = await get_flag(cc) except Exception as exc:

raise FetchError(cc) from exc#引入的raise X from Y 句法连接原来的异常
else:
    save_flag(image,cc + '.PNG')
    status = HTTPStatus.ok
    msg = 'OK'
if verbose and msg:#若是在命令行中设定了 -v/--verbose 选项,显示国家代码和状态消息;这就是详细模式中看到的进度信息
    print (cc,msg)
return Result(status,cc)

async def download_coro(cc_list,verbose,concur_req): counter = collections.Counter() semaphore = asyncio.Semaphore(concur_req) to_do = [download_one(cc,semaphore,verbose) for cc in cc_list] to_do_iter = asyncio.as_completed(to_do)#获取一个迭代器,这个迭代器会在期物运行结束后返回期物 if not verbose: to_do_iter = tqdm(to_do_iter,total=len(cc_list))# 把迭代器传给 tqdm 函数,显示进度 for future in to_do_iter: #迭代运行结束的期物 try: res = await future except FetchError as exc: country_code = exc.country_code try: error_msg = exc.cause.args[0] #有的时候格式相似于("module 'aiohttp' has no attribute 'HttpProcessingError'",),此时就取元祖的第二个元素 #有的时候格式是相似于 (1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed (_ssl.c:749)'),此时就取元祖的第一个元素 print (exc.cause.args) except IndexError: error_msg = exc.cause.class.name if not verbose and error_msg: msg = '****Error for :' print (msg.format(country_code,error_msg)) status = HTTPStatus.error

else:
        status = res.status
    counter[status] += 1
return counter

def download_many(cc_list,verbose,concur_req): loop = asyncio.get_event_loop() coro = download_coro(cc_list,verbose,concur_req)#download_many 函数只是实例化 downloader_coro 协程,而后经过run_until_complete 方法把它传给事件循环 counts = loop.run_until_complete(coro) loop.close() return counts

def main(): t0 = time.time() count = download_many(POP20_CC,verbose=False,concur_req=2) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print (msg.format(count,elapsed))

main()

''' 返回正常的就下载到指定路径 不正常的就报相应的错误 '''

18.4.2 使用Excutor对象,防止阻塞事件循环

''' 在示例 18-7 中,阻塞型函数是 save_flag。在这个脚本的线程版中(见示例 17- 14), save_flag 函数会阻塞运行 download_one 函数的线程,可是阻塞的只是众多工 做线程中的一个。阻塞型 I/O 调用在背后会释放 GIL,所以另外一个线程能够继续。可是在 flags2_asyncio.py 脚本中, save_flag 函数阻塞了客户代码与 asyncio 事件循环共用的惟 一线程,所以保存文件时,整个应用程序都会冻结。这个问题的解决方法是,使用事件循 环对象的 run_in_executor 方法。 asyncio 的事件循环在背后维护着一个 ThreadPoolExecutor 对象,咱们能够调用 run_in_executor 方法,把可调用的对象发给它执行。若想在这个示例中使用这个功 能, download_one 协程只有几行代码须要改动 '''

栗子 异步下载,使用Executor对象,根上一个栗子相比,没发现性能提高多少

from enum import Enum import os,sys,time import collections from collections import namedtuple import asyncio import async_timeout import aiohttp from aiohttp import web from tqdm import tqdm

HTTPStatus = Enum('Status','ok not_found error') Result = namedtuple('Result','status cc')

BASE_URL = 'http://images.cnblogs.com/cnblogs_com/suren2017/1102909' POP20_CC = 'T_JINGSE200 T_JINGSE3'.split() DEST_DIR = 'download' MAX_WORKERS = 20

class FetchError(Exception): def init(self,country_code): self.country_code = country_code

def save_flag(img,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) path = path.replace('\','/') with open(path,'wb') as fp: fp.write(img)

async def get_flag(cc): url = '/.PNG'.format(BASE_URL,cc=cc.lower()) async with aiohttp.ClientSession() as session: with async_timeout.timeout(10000): async with session.get(url) as resp: if b'PNG' in (await resp.read()): image = await resp.read() return image elif '404' in (await resp.text()): raise web.HTTPNotFound() else: raise aiohttp.HttpProcessingError(code = resp.status,message=resp.reason,headers=resp.headers)

async def download_one(cc,semaphore,verbose): try: with (await semaphore): image = await get_flag(cc) except Exception as exc: raise FetchError(cc) from exc else: loop = asyncio.get_event_loop() loop.run_in_executor(None,save_flag,image,cc.lower()+'.PNG') status = HTTPStatus.ok msg = 'OK' if verbose and msg: print(cc,msg) return Result(status,cc)

async def download_coro(cc_list,verbose,concur_req): counter = collections.Counter() semaphore = asyncio.Semaphore(concur_req) to_do = [download_one(cc,semaphore,verbose) for cc in sorted(cc_list)] to_do_iter = asyncio.as_completed(to_do) if not verbose: to_do_iter = tqdm(to_do_iter,total=len(cc_list)) for future in to_do_iter: try: res = await future

except FetchError as exc:
        country_code = exc.country_code
        try:
            error_msg = exc.__cause__.args[0]
            print(exc.__cause__.args)
        except IndexError:
            error_msg = exc.__cause__.__class__.__name__
        if not verbose and error_msg:
            msg = '*** Error for {} : {}'
            print(msg.format(country_code,error_msg))
        status = HTTPStatus.error
    else:
        status = res.status
    counter[status] += 1
return counter

def download_many(cc_list,verbose,concur_req): loop = asyncio.get_event_loop() coro = download_coro(cc_list,verbose,concur_req) counts = loop.run_until_complete(coro) loop.close() return counts

def main(): t0 = time.time() count = download_many(POP20_CC,verbose=True,concur_req=2) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print(msg.format(count,elapsed))

main()

#栗子 顺序下载,把各个文件保存的字节数变成原来的 10 倍,不使用Executor对象

from enum import Enum HTTPStatus = Enum('Status', 'ok not_found error') import collections from collections import namedtuple Result = namedtuple('Result','status cc')

import os import sys import time

import requests

BASE_URL = 'http://images.cnblogs.com/cnblogs_com/suren2017/1102909'

POP20_CC = 'T_JINGSE200 T_JINGSE3 T_JINGSE4 T_JINGSE5 t_jingse6 t_jingse7 t_jingse8 t_jingse9 t_jingse10 t_jingse11 t_jingse12 t_jingse13 T_jingse14 T_jingse15 T_jingse16 T_jingse17 T_jingse18 T_jingse19 T_jingse20'.split()

DEST_DIR = 'downloads'

MAX_WORKERS = 20 def save_flag(img,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) path = path.replace('\','/') with open(path,'wb') as fp: fp.write(img*10)

def get_flag(cc): url = '/.PNG'.format(BASE_URL,cc=cc.lower()) resp = requests.get(url) if '404' in resp.text: resp.status_code = 404 resp.raise_for_status() return resp.content

def download_one(cc,verbose=False): try: image = get_flag(cc) except requests.exceptions.HTTPError as exc: res = exc.response if res.status_code == 404: status = HTTPStatus.not_found msg = 'not found' res.status_code = 404 res.reason = 'NOT FOUND' raise else: raise else: save_flag(image,cc.lower() + '.PNG') status = HTTPStatus.ok msg = 'OK'

if verbose:  #若是在命令行中设定了 -v/--verbose 选项,显示国家代码和状态消息;这就是详细模式中看到的进度信息
    print(cc,msg)
return Result(status,cc)

from concurrent import futures from tqdm import tqdm def download_many(cc_list,verbose,max_req): counter = collections.Counter() cc_iter = sorted(cc_list) if not verbose: cc_iter = tqdm(cc_iter) for cc in cc_iter: try: res = download_one(cc,verbose) except requests.exceptions.HTTPError as exc: error_msg = 'HTTP error - ' error_msg = error_msg.format(res=exc.response) except requests.exceptions.ConnectionError as exc: error_msg = 'Connection error' else: error_msg = '' status = res.status if error_msg: status = HTTPStatus.error counter[status] += 1 if verbose and error_msg: #备注1 print('*** Error for : '.format(cc,error_msg)) return counter

def main(download_many): t0 = time.time() count = download_many(POP20_CC,verbose=False,max_req=10) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print(msg.format(count,elapsed)) #Counter({<Status.ok: 1>: 18, <Status.error: 3>: 1}) flags downloaded in 33.58s

if name == 'main': main(download_many)

栗子 顺序下载,把各个文件保存的字节数变成原来的 10 倍,使用Executor对象

from enum import Enum HTTPStatus = Enum('Status', 'ok not_found error') import collections from collections import namedtuple Result = namedtuple('Result','status cc')

import os import sys import time

import requests

BASE_URL = 'http://images.cnblogs.com/cnblogs_com/suren2017/1102909'

POP20_CC = 'T_JINGSE200 T_JINGSE3 T_JINGSE4 T_JINGSE5 t_jingse6 t_jingse7 t_jingse8 t_jingse9 t_jingse10 t_jingse11 t_jingse12 t_jingse13 T_jingse14 T_jingse15 T_jingse16 T_jingse17 T_jingse18 T_jingse19 T_jingse20'.split()

DEST_DIR = 'downloads'

MAX_WORKERS = 20 def save_flag(img,filename): path = os.path.join(sys.path[0],DEST_DIR,filename) path = path.replace('\','/') with open(path,'wb') as fp: fp.write(img*10)

def get_flag(cc): url = '/.PNG'.format(BASE_URL,cc=cc.lower()) resp = requests.get(url) if '404' in resp.text: resp.status_code = 404 resp.raise_for_status() return resp.content

def download_one(cc,verbose=False): try: image = get_flag(cc) except requests.exceptions.HTTPError as exc: res = exc.response if res.status_code == 404: status = HTTPStatus.not_found msg = 'not found' res.status_code = 404 res.reason = 'NOT FOUND' raise else: raise else:

#save_flag(image,cc.lower() + '.PNG')
    import asyncio
    loop = asyncio.get_event_loop()
    loop.run_in_executor(None,save_flag,image,cc.lower() + '.PNG')
    status = HTTPStatus.ok
    msg = 'OK'

if verbose:  #若是在命令行中设定了 -v/--verbose 选项,显示国家代码和状态消息;这就是详细模式中看到的进度信息
    print(cc,msg)
return Result(status,cc)

from concurrent import futures from tqdm import tqdm def download_many(cc_list,verbose,max_req): counter = collections.Counter() cc_iter = sorted(cc_list) if not verbose: cc_iter = tqdm(cc_iter) for cc in cc_iter: try: res = download_one(cc,verbose) except requests.exceptions.HTTPError as exc: error_msg = 'HTTP error - ' error_msg = error_msg.format(res=exc.response) except requests.exceptions.ConnectionError as exc: error_msg = 'Connection error' else: error_msg = '' status = res.status if error_msg: status = HTTPStatus.error counter[status] += 1 if verbose and error_msg: #备注1 print('*** Error for : '.format(cc,error_msg)) return counter

def main(download_many): t0 = time.time() count = download_many(POP20_CC,verbose=False,max_req=10) elapsed = time.time() - t0 msg = '\n flags downloaded in s' print(msg.format(count,elapsed)) #把各个文件保存的字节数变成原来的 10 倍(只需把fp.write(img) 改为 fp.write(img*10)),此时便会看到效果 # Counter({<Status.ok: 1>: 18, <Status.error: 3>: 1}) flags downloaded in 20.68s

if name == 'main': main(download_many)

"""

【生成器&生成器表达式&迭代器&可迭代对象&容器】 【概念】

#【【切片】】 L = ['Michael','Sarah','Tracy','Bob','Jack'] print(L[0:3]) #['Michael', 'Sarah', 'Tracy'] print(L[:3]) #['Michael', 'Sarah', 'Tracy'] print(L[1:3]) #['Sarah', 'Tracy'] print(L[-2:]) #['Bob', 'Jack'] print(L[-2:-1]) #['Bob']

#建立一个0~99的数列 L = list(range(100)) print(L) #取前10个数 print(L[:10]) #[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] #取后10个数 print(L[-10:]) #[90, 91, 92, 93, 94, 95, 96, 97, 98, 99] #前11~20个 print(L[10:20]) #[10, 11, 12, 13, 14, 15, 16, 17, 18, 19] #前10个,每2个取一个 print(L[:10:2]) #[0, 2, 4, 6, 8] #全部数,每5个取一个 print(L[::5]) #[0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95] #什么都不写,原样复制 print(L[:])

#tuple也是一种list,惟一的区别是tuple不可变。 tuple = (0,1,2,3,4,5) print(tuple[:3]) #(0, 1, 2)

#字符串也能够切片 #练习题,利用切片,实现trim def trim(s): if len(s) == 0: return '' elif s[:1] == ' ': return trim(s[1:]) elif s[-1:] == ' ': return trim(s[:-1]) return s #测试 if trim('hello ') != 'hello': print('测试失败!') elif trim(' hello') != 'hello': print('测试失败!') elif trim(' hello ') != 'hello': print('测试失败!') elif trim(' hello world ') != 'hello world': print('测试失败!') elif trim('') != '': print('测试失败!') elif trim(' ') != '': print('测试失败!') else : print('测试成功!')

#【【迭代】】 #判断一个对象是可迭代对象 from collections import Iterable print(isinstance('abc',Iterable)) #True print(isinstance([1,2,3],Iterable)) #True print(isinstance(123,Iterable)) #False

#若是要对list实现相似java那样的下标循环? for i,value in enumerate(['A','B','C']): print(i,value) ''' 0 A 1 B 2 C '''

#在for循环里,同时引用两个变量 for x,y in [(1,1),(2,4),(3,9)]: print(x,y)

''' 1 1 2 4 3 9 '''

#练习题 使用迭代查找一个list中最小和最大值,并经过tuple 返回 #方法一 def findMinAndMax1(L): if L == None or len(L) == 0: return (None,None) my_min = L[0] my_max = L[0] for val in L: my_min = min(my_min,val) my_max = max(my_max,val) return (my_min,my_max) #方法二 def findMinAndMax2(L): if L == None or len(L) == 0: return(None,None) return (min(L),max(L)) print(findMinAndMax1(list(range(10)))) #(0, 9) print(findMinAndMax2(list(range(10)))) #(0, 9)

#【【列表生成式】】 #列出当前目录下全部文件和目录名 import os print([d for d in os.listdir('.')]) #['.idea', 'def.py', 'dict&set.py', 'list&tuple.py', 'senior_pro.py']

#使用两个变量 dict = {'x':'A','y':'B','z':'C'} print([k + '=' + v for k,v in dict.items()]) #['x=A', 'y=B', 'z=C']

#把list中全部字符串变成小写 L = ['Hello','World','IBM','Apple'] print([x.lower() for x in L]) #['hello', 'world', 'ibm', 'apple']

#【【生成器】】generator

方法一 :把一个列表生成式的[] 换成 ()

L = [x * x for x in range(3)] print(L) #[0, 1, 4] g = (x * x for x in range(3)) print(g) #<generator object at 0x102a2e360>

打印元素

print(next(g)) #0 print(next(g)) #1 print(next(g)) #4 #print(next(g)) #StopIteration

#【注意】上面的方法不实用,正常的方法是for循环,而且不关心StopIteration错误 g = (x * x for x in range(3)) for n in g: print(n) ''' 0 1 4 '''

generator 功能特别强大,若是用相似列表生成式的for循环没法实现,能够用函数

斐波拉契

def fib(max): n,a,b = 0,0,1 while n < max: print(b) a,b = b,a + b n = n + 1 return 'done' fib(5) ''' 1 1 2 3 5 ''' #要想把此函数变成generator,只须要把print换成yield def fib(max): n,a,b = 0,0,1 while n < max: yield b a,b = b,a + b n = n + 1 return 'done' f = fib(5) print(f) #<generator object fib at 0x101a2e360>

#【知识点】generator和函数的执行流程不同。函数是顺序执行,遇到return语句或最后一行函数语句就返回。

而变成generator的函数,在每次调用next()时执行,遇到yield语句返回,再次执行时从上次返回的yield语句处继续执行。

举例 定义一个generator,依次返回1,3,5

def odd(): print('step 1:') yield 1 print('step 2:') yield 3 print('step 3:') yield 5 o = odd() print(next(o)) print(next(o)) print(next(o)) ''' step 1: 1 step 2: 3 step 3: 5 ''' #print(next(o)) #StopIteration #【解析】能够看到,odd不是普通函数,而是generator,在执行的过程当中,遇到yield就中断,下次又继续执行。

执行三次yield后,已经没有yield能够执行了,因此第四次调用next(o)就会报错。

#回到fib,咱们在循环过程当中不断调用yield,就会不断中断。固然要给循环设置一个条件来退出循环,否则就会产生一个无限序列。 for n in fib(5): print(n) ''' 1 1 2 3 5 '''

#【注意】经过for循环,一般拿不到return值。若是想要拿到,必须捕获StopIteration错,返回值包含在StopIteration的value中 g = fib(5) while True: try: n = next(g) print('next g:', n) except StopIteration as e: print('Generator return value:',e.value) break ''' next g: 1 next g: 1 next g: 2 next g: 3 next g: 5 Generator return value: done '''

例子 杨辉三角

def trangles(): N = [1] while True: yield N N.append(0) N = [N[i-1] + N[i] for i in range(len(N))] n = 0 for t in trangles(): print(t) n = n + 1 if n == 10: break ''' [1] [1, 1] [1, 2, 1] [1, 3, 3, 1] [1, 4, 6, 4, 1] [1, 5, 10, 10, 5, 1] [1, 6, 15, 20, 15, 6, 1] [1, 7, 21, 35, 35, 21, 7, 1] [1, 8, 28, 56, 70, 56, 28, 8, 1] [1, 9, 36, 84, 126, 126, 84, 36, 9, 1] '''

#【小结】 ''' generator是很是强大的工具,python中,能够简单的把列表生成式改为generator,也能够经过函数实现复杂逻辑的generator。

generator的工做原理,在for循环的过程当中不断计算出下一个元素,并在适当的条件结束for循环。 对于函数改为的generator来讲,遇到return语句或者执行到函数体的最后一行语句,就是结束generator的指令,以后for循环也随之结束。

请注意区分普通函数和generator函数,普通函数调用直接返回结果。 generator的调用实际返回一个generator对象。 '''

#【【可迭代对象、容器 、迭代器、生成器等概念。】】

可迭代对象、容器 、迭代器、生成器,都是一种概念,并非一种数据结构。

容器是一系列元素的集合,能够用来询问某个元素是否包含在其中时,那么这个对象就能够认定是一个容器。

容器一般是一个可迭代对象,判断的话,[知足两个条件 1⃣️ 可 检测某元素是否包含在容器中 2⃣️ 可迭代对象赋予容器的能力:从容器中获取其中的每一个值]

尽管大多数容器提供了某种方式来获取其中的每个元素,但这并非容器自己提供的能力,而是可迭代对象赋予了容器这种能力。

固然,并非全部容器都是可迭代的,好比Bloom filter, 虽然可用来检测某元素是否包含在容器中,可是并不能从容器中获取其中的每个值。由于Bloom filter 压根就没把元素存储在容器中,而是经过散列函数映射成一个值保存在数组中。

可迭代对象:可直接做用与for循环的对象统称为可迭代对象-> Iterable.

可迭代对象实现了__iter__方法,该方法返回一个迭代器对象。

一类是绝大部分(特殊的,)容器,如str list set tuple dict file sockets 等等。

一类是generator

判断方法:isinstance()

from collections import Iterable print(isinstance([],Iterable)) #True print(isinstance((),Iterable)) #True print(isinstance(,Iterable)) #True print(isinstance('abc',Iterable)) #True print(isinstance((x for x in range(10)),Iterable)) #True print(isinstance(100,Iterable)) #False #迭代器 from collections import Iterator print(isinstance((x for x in range(10)),Iterator)) #True print(isinstance([],Iterator)) #False print(isinstance(,Iterator)) #False print(isinstance('abc',Iterator)) #False

持有一个内部状态的字段,用于记录下次迭代返回值,实现了__next__ 和 iter 方法。

#凡是可做用于next()函数的对象都是Iterator类型,表示一个惰性计算的序列。 #迭代器不会一次性把全部元素加载到内存,而是须要的时候才能生成返回结果。 #集合数据类型如list str dict 等 是Iterable 但不是 Iterator,不过能够经过iter()函数得到一个Iterator对象。 #生成器是一种特殊的迭代器,它的返回值不是经过return 而是 yield。 #【引伸知识点】python的for循环本质上是经过不断调用next()函数实现的 for x in [1,2,3]: pass #等价于 #首先得到Iterator对象 it = iter([1,2,3]) #循环 while True: try: '''得到下一个值''' x = next(it) except StopIteration: # 遇到StopIteration 就退出循环 break

【举例区分】

1⃣️【容器】

print(i in [1,2,3]) #True print(4 not in [1,2,3]) #True print(1 in {1,2,3}) #True print(4 not in {1,2,3}) #True print(1 in (1,2,3)) #True print(4 not in (1,2,3)) #True

d = {1:'foo',2:'bar',3:'qux'} print(1 in d) #True print('foo' not in d) #True

s = 'foobar' print('b' in s) #True print('x' not in s) #True print('foo' in s) #True

#2⃣️【可迭代对象】 x = [1,2,3] y = iter(x) z = iter(x) print(next(y)) #1 print(next(y)) #2 print(next(z)) #1 print(type(x)) #<class 'list'> print(type(y)) #<class 'list_iterator'>

#3⃣️【迭代器】 itertools 函数返回的都是迭代器对象

生成无限序列

from itertools import count counter = count(start=13) print(next(counter)) #13 print(next(counter)) #14 #从有限序列中生成无限序列 from itertools import cycle colors = cycle(['red','white','blue']) print(next(colors)) #red print(next(colors)) #white print(next(colors)) #blue print(next(colors)) #red #从无限序列中生成有限序列 from itertools import cycle from itertools import islice colors = cycle(['red','white','blue']) myslice = slice(4) #limits = colors[myslice] #TypeError: 'itertools.cycle' object is not subscriptable 【解析】就是不可用中括号下标的形式访问元素

limits1 = islice(colors,0,6) list1 = [] print('****************') for x in limits1: print(x) list1.append(x) myslice = slice(4) print(list1[myslice]) ''' red white blue red white blue ['red', 'white', 'blue', 'red'] '''

#为了更直观

【循环方法】

#【一】生成器 【备注】这三种方法同时运行,同时有非空内容打印,是由于每次都生成一个新的生成器对象 def gen_123(): yield 1 yield 2 yield 3 print(gen_123) #<function gen_123 at 0x00000000037A3840> print(gen_123()) #<generator object gen_123 at 0x000000000379E780> #循环方法一 print(list(gen_123())) #[1, 2, 3] #循环方法二 for i in gen_123(): print(i) ''' 1 2 3 ''' #循环方法三 a = gen_123() print(next(a)) print(next(a)) print(next(a)) ''' 1 2 3 '''

#循环方法四:拆包 def t(): yield 1 yield 2 yield 3

x,y,z = t() print (x,y,z)

【二】生成器表达式 a = (format(a,'.3e') for a in (3.4444444,78.4534534)) #循环方法一 【备注】不把a赋值给g,next(a)一样也能够循环 g = a print(next(g)) #3.444e+00 print(next(g)) #7.845e+01 print(a) #<generator object at 0x000000000227F620> #循环方法二 [备注]方法二和方法一不能同时执行 for i in a: print(i) ''' 3.444e+00 7.845e+01 ''' #循环方法三
#print(list(a)) #['3.444e+00', '7.845e+01']

#循环方法四:拆包 a = (format(a,'.3e') for a in (3.444444,78.54899,987.345555)) a,b,c = a print (a,b,c)

【三】迭代器 【备注】这三个方法不能同时运行,只能选择一种执行

一个带状态的对象,,他能在你调用next()方法的时候返回容器中的下一个值,任何实现了__iter__和__next__()(python2中实现next())方法的对象都是迭代器,__iter__返回迭代器自身,__next__返回容器中的下一个值,若是容器中没有更多元素了,则抛出StopIteration异常

#因此,迭代器就是实现了工厂模式的对象,它在你每次你询问要下一个值的时候给你返回。有不少关于迭代器的例子,好比itertools函数返回的都是迭代器对象。 import itertools print('*************') b = itertools.chain([1,2],(3,4)) print(b) #<itertools.chain object at 0x0000000002924C18> #循环方法一 print(next(b)) print(next(b)) print(next(b)) print(next(b)) ''' 1 2 3 4 ''' #循环方法二 for i in b: print(i) #循环方法三 print(list(b)) #[1, 2, 3, 4]

#循环方法四:拆包 import itertools a = itertools.chain([1,2],(3,)) a,b,c = a print (a,b,c)

【四】可迭代对象 #循环方法一:for循环

#循环方法二:拆包‘ a,b,c = [1,2,3] print (a,b,c)

【join函数】

【一】迭代器

b = itertools.chain([1,2],(3,)) print(','.join(str(a) for a in b)) #1,2,3 【二】可迭代对象 print(''.join(['5','6','7'])) #56*7 【三】生成器

def gen_123(): yield 1 yield 2 yield 3 print(','.join(str(a) for a in gen_123())) #1,2,3 【四】生成器表达式 b = (format(a,'.3e') for a in (3.4444444,78.4534534)) print(','.join(str(a) for a in b)) #3.444e+00,7.845e+01

相关文章
相关标签/搜索