1七、第七 - 网络编程基础 - 协程概念介绍、协程gevent模块并发爬网页


协程,又称微线程,纤程。什么是线程:协程是一种用户态的轻量级线程。html

  协程拥有本身的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其余地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。所以:协程能保留上一次调用时的状态(即全部局部状态的一个特定组合),每次过程重入时,就至关于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。python

协程的好处:编程

  • 无需线程上下文切换的开销
  • 无需原子操做锁定及同步的开销(注解:"原子操做(atomic operation)是不须要synchronized",所谓原子操做是指不会被线程调度机制打断的操做;这种操做一旦开始,就一直运行到结束,中间不会有任何 (切换到另外一个线程。原子操做能够是一个步骤,也能够是多个操做步骤,可是其顺序是不能够被打乱,或者切割掉只执行部分。视做总体是原子性的核心。
  • 方便切换控制流,简化编程模型
  • 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。因此很适合用于高并发处理。

协程的缺点:数组

  • 没法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程须要和进程配合才能运行在多CPU上.固然咱们平常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
  • 进行阻塞(Blocking)操做(如IO时)会阻塞掉整个程序

符合协程的标准网络

  1. 必须在只有一个单线程里实现并发
  2. 修改共享数据不需加锁
  3. 用户程序里本身保存多个控制流的上下文栈
  4. 一个协程遇到IO操做自动切换到其它协程

举例:并发

一、使用yield实现协程操做:

import time
import queue


def consumer(name):
    print("--->starting eating baozi...")
    while True:
        new_baozi = yield
        print("[%s] is eating baozi %s" % (name, new_baozi))
        # time.sleep(1)


def producer():
    r = con.__next__()
    r = con2.__next__()
    n = 0
    while n < 5:
        n += 1
        con.send(n)
        con2.send(n)
        print("\033[32;1m[producer]\033[0m is making baozi %s" % n)


if __name__ == '__main__':
    con = consumer("c1")
    con2 = consumer("c2")
    p = producer()
输出:
--->starting eating baozi...
--->starting eating baozi...
[c1] is eating baozi 1
[c2] is eating baozi 1
[producer] is making baozi 1
[c1] is eating baozi 2
[c2] is eating baozi 2
[producer] is making baozi 2
[c1] is eating baozi 3
[c2] is eating baozi 3
[producer] is making baozi 3
[c1] is eating baozi 4
[c2] is eating baozi 4
[producer] is making baozi 4
[c1] is eating baozi 5
[c2] is eating baozi 5
[producer] is making baozi 5

二、使用第三方模块:greenlet (手动指定执行切换协程)

greenlet是一个用C实现的协程模块,相比与python自带的yield,它可使你在任意函数之间随意切换,而不需把这个函数先声明为generator。less

  greenlet是python的并行处理的一个库。 python 有一个很是有名的库叫作 stackless ,用来作并发处理, 主要是弄了个叫作tasklet的微线程的东西, 而greenlet 跟stackless的最大区别是greenlet须要你本身来处理线程切换, 就是说,你须要本身指定如今执行哪一个greenlet再执行哪一个greenlet。至关于手动切换协程。异步

  一个 “greenlet” 是一个小型的独立伪线程。能够把它想像成一些栈帧,栈底是初始调用的函数,而栈顶是当前greenlet的暂停位置。你使用greenlet建立一堆这样的堆栈,而后在他们之间跳转执行。跳转必须显式声明的:一个greenlet必须选择要跳转到的另外一个greenlet,这会让前一个挂起,然后一个在此前挂起处恢复执行。不一样greenlets之间的跳转称为切换(switching) 。socket

  当你建立一个greenlet时,它获得一个开始时为空的栈;当你第一次切换到它时,它会执行指定的函数,这个函数可能会调用其余函数、切换跳出greenlet等等。当最终栈底的函数执行结束出栈时,这个greenlet的栈又变成空的,这个greenlet也就死掉了。greenlet也会由于一个未捕捉的异常死掉。async

 举例:

from greenlet import greenlet

def test1():
    print(12) #二、打印
    gr2.switch()#三、切换协程到 test2
    print(34)#六、打印
    gr2.switch()#七、切换到test2

def test2():
    print(56)#四、打印
    gr1.switch() #五、切换协程到 test1
    print(78)#八、打印,执行完成

gr1 = greenlet(test1) #启动一个协程
gr2 = greenlet(test2) #启动一个协程
gr1.switch()  #一、开始调用切换协程

输出:
12
56
34
78

注意:执行的步骤顺序,从1-8。

以上例子还不能实如今协程中自动切换,greenlet 只能手动指定执行,但对于生成器来讲简单不少。要实现自动监控,而且自动切换协程,如何实现?引入gevent模块。

三、使用第三方模块:gevent (自动监控并自动切换协程)

  Gevent 是一个第三方库,能够轻松经过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet所有运行在主程序操做系统进程的内部,但它们被协做式地调度。

  Gevent是第三方库,经过greenlet实现协程,其基本思想是:当一个greenlet遇到IO操做时,好比访问网络,就自动切换到其余的greenlet,等到IO操做完成,再在适当的时候切换回来继续执行。因为IO操做很是耗时,常常使程序处于等待状态,有了gevent为咱们自动切换协程,就保证总有greenlet在运行,而不是等待IO。因为切换是在IO操做时自动完成,因此gevent须要修改Python自带的一些标准库,这一过程在启动时经过monkey patch完成。

包含的特性:
1.基于libev的快速事件循环
2.基于greenlet的轻量级执行单元
3.重用Python标准库且概念类似的API
4.支持SSL的协做socket
5.经过c-ares或者线程池进行DNS查询
6.使用标准库和第三方库中使用了阻塞socket的代码的能力

 第三库须要另外,开源包进行安装。

举例:

A、验证gevent经过自动判断,选择最优的线路进行判断执行。注意:可gevent.sleep() 调整时间,进行验证测试。结论:每一个函数里面最后一次打印,看等待的时间越短越先执行。

import gevent

def foo():
    print('running foo')
    gevent.sleep(3)
    print('Explicit context switch to foo again')
def bar():
    print('running bar')
    gevent.sleep(6)
    print('Implicit context switch back to bar ')
def func3():
    print("running func3")
    gevent.sleep(1)
    print ("switch back to func3")

gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
    gevent.spawn(func3)
])
输出:
running foo
running bar
running func3
switch back to func3
Explicit context switch to foo again
Implicit context switch back to bar

 B、同步与异步性能对区别,以下:

  程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,后者阻塞当前流程,并执行全部给定的greenlet。执行流程只会在 全部greenlet执行完后才会继续向下走。

import gevent

def task(pid):
    gevent.sleep(1)
    print('Task %s done' % pid)

def synchronous():
    for i in range(1, 6): #range从1到5打印
        task(i)

def asynchronous():
    threads = [gevent.spawn(task, i) for i in range(5)]
    gevent.joinall(threads) #


print('Synchronous:')
synchronous() #正常函数,串行的调用会每一个1s打印一次

print(" ")

print('Asynchronous:')
asynchronous() #并行打印,等待一次性打印出来
输出:
每相隔一秒打印
Synchronous:
Task 1 done
Task 2 done
Task 3 done
Task 4 done
Task 5 done
 
等待同一时间打印
Asynchronous:
Task 0 done
Task 1 done
Task 2 done
Task 3 done
Task 4 done

  C、gevent协程爬取网页,遇到IO阻塞时会自动切换业务。举例以下:

from gevent import monkey;
monkey.patch_all()

import gevent,time
from  urllib.request import urlopen

def f(url):
    print('GET: %s' % url)
    resp = urlopen(url)
    data = resp.read()
    #print(data) #打印爬取到的网页内容
    print('%d bytes received from %s.' % (len(data), url))

time_start = time.time()

urls = ['http://www.cnblogs.com/alex3714/articles/5248247.html',
	    'http://www.cnblogs.com/chen170615/p/8797609.html',
	    'http://www.cnblogs.com/chen170615/p/8761768.html',
        ]
for i in urls:
    f(i)

print("同步执行时间:",time.time() - time_start)
print (" ")
async_time_start = time.time()
gevent.joinall([
    gevent.spawn(f, 'http://www.cnblogs.com/alex3714/articles/5248247.html'),
    gevent.spawn(f, 'http://www.cnblogs.com/chen170615/p/8797609.html'),
    gevent.spawn(f,'http://www.cnblogs.com/chen170615/p/8761768.html')
])
print("异步执行时间:",time.time() - async_time_start)
输出:
GET: http://www.cnblogs.com/alex3714/articles/5248247.html
92147 bytes received from http://www.cnblogs.com/alex3714/articles/5248247.html.
GET: http://www.cnblogs.com/chen170615/p/8797609.html
10930 bytes received from http://www.cnblogs.com/chen170615/p/8797609.html.
GET: http://www.cnblogs.com/chen170615/p/8761768.html
11853 bytes received from http://www.cnblogs.com/chen170615/p/8761768.html.
同步执行时间 20.319132089614868
 
GET: http://www.cnblogs.com/alex3714/articles/5248247.html
GET: http://www.cnblogs.com/chen170615/p/8797609.html
GET: http://www.cnblogs.com/chen170615/p/8761768.html
11853 bytes received from http://www.cnblogs.com/chen170615/p/8761768.html.
10930 bytes received from http://www.cnblogs.com/chen170615/p/8797609.html.
92147 bytes received from http://www.cnblogs.com/alex3714/articles/5248247.html.
异步执行时间: 0.28768205642700195

 以上例子能够看出,gevent协程异步并发执行的性能高于同步串行的执行,遇到会等待的IO同时,异步的性能就表现的优异起来。(多执行几回,就能看出对比。)

 D、过gevent实现单线程下的多socket并发

 举例:

服务端:

协程gevent_socket_server.py

import sys,socket,time,gevent

from gevent import socket,monkey

monkey.patch_all()

def server(port):
    gevent_server = socket.socket()
    gevent_server.bind(('0.0.0.0',port))
    gevent_server.listen()
    while True:
        cli,addr = gevent_server.accept()
        gevent.spawn(handle_request,cli)

def handle_request(conn):
    try:
        while True:
            data = conn.recv(1024)
            print("recv:",data)
            conn.send(data)
            if not data:
                conn.shutdown(socket.SHUT_WR)

    except Exception as ex:
        print(ex)

    finally:
        conn.close()

if __name__ == "__main__":
    server(8001)

客户端两种类型,以下: 

客户端1:协程gevent_socket_client.py(普通的手工输入模式)

import socket

HOST = "localhost"
PORT = 8001

s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect((HOST,PORT))

while True:
    msg = bytes(input(">>:").strip(),encoding="utf-8")
    s.sendall(msg)
    data = s.recv(1024)

    print('Received',repr(data))

s.close()

客户端2:协程gevent_socket_cli.py(经过起进程的方式,并发执行)

import socket,threading

HOST = "localhost"
PORT = 8001

def sock_conn():

   s = socket.socket()
   s.connect((HOST,PORT))
   count = 0

   while True:
       s.sendall(("hello %s" % count).encode("utf-8"))
       data = s.recv(1024)

       print("[%s]recv from server:" % threading.get_ident(),data.decode())
       count += 1
   s.close()

for i in range(10): #测试注意数值,不要设置太大。要否则,机器回被卡死
    t = threading.Thread(target=sock_conn)
    t.start()
相关文章
相关标签/搜索