Python全栈之路系列----之-----协程(单线程并发)/Greenlet

协程

概念

协程:是单线程下的并发,又称微线程,纤程。英文名Coroutinepython

协程:协程是一种用户态的轻量级线程,即协程是由用户程序本身控制调度的git

对于单线程下,咱们不可避免程序中出现io操做,但若是咱们能在本身的程序中(即用户程序级别,而非操做系统级别)控制单线程下的多个任务能在一个任务遇到io阻塞时就切换到另一个任务去计算github

这样就保证了该线程可以最大限度地处于就绪态,即随时均可以被cpu执行的状态,至关于咱们在用户程序级别将本身的io操做最大限度地隐藏起来编程

从而能够迷惑操做系统,让其看到:该线程好像是一直在计算,io比较少,从而更多的将cpu的执行权限分配给咱们的线程。并发

 

单线程实现并发即在一个主线程内实现并发;本质是:切换+保存状态,这种并发只基于io阻塞app

1. 能够控制多个任务之间的切换,切换以前将任务的状态保存下来,以便从新运行时,能够基于暂停的位置继续执行。
2. 能够检测io操做,在遇到io操做的状况下才发生切换

在任务一遇到io状况下,切到任务二去执行,这样就能够利用任务一阻塞的时间完成任务二的计算,效率的提高就在于此。socket

#1 yiled能够保存状态,yield的状态保存与操做系统的保存线程状态很像,可是yield是代码级别控制的,更轻量级
#2 send能够把一个函数的结果传给另一个函数,以此实现单线程内程序之间的切换 



单纯地切换反而会下降运行效率
#串行执行
import time
def consumer(res):
    '''任务1:接收数据,处理数据'''
    pass

def producer():
    '''任务2:生产数据'''
    res=[]
    for i in range(10000000):
        res.append(i)
    return res

start=time.time()
#串行执行
res=producer()
consumer(res) #写成consumer(producer())会下降执行效率
stop=time.time()
print(stop-start) #1.5536692142486572



#基于yield并发执行
import time
def consumer():
    '''任务1:接收数据,处理数据'''
    while True:
        x=yield

def producer():
    '''任务2:生产数据'''
    g=consumer()
    next(g)
    for i in range(10000000):
        g.send(i)

start=time.time()
#基于yield保存状态,实现两个任务直接来回切换,即并发的效果
#PS:若是每一个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.
producer()

stop=time.time()
print(stop-start) #2.0272178649902344
单纯地切换反而会下降运行效率
import time
def consumer():
    '''任务1:接收数据,处理数据'''
    while True:
        x=yield

def producer():
    '''任务2:生产数据'''
    g=consumer()
    next(g)
    for i in range(10000000):
        g.send(i)
        time.sleep(2)

start=time.time()
producer() #并发执行,可是任务producer遇到io就会阻塞住,并不会切到该线程内的其余任务去执行

stop=time.time()
print(stop-start)
yield并不能实现遇到io切换

协程特色

协程的优缺点:ide

优势函数

  1. 无需线程上下文切换的开销
  2. 无需原子操做锁定及同步的开销(更改一个变量)
  3. 方便切换控制流,简化编程模型
  4. 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。因此很适合用于高并发处理。

缺点:高并发

  1. 没法利用多核资源:协程的本质是个单线程,它不能多核,协程须要和进程配合才能运行在多CPU上,固然咱们平常所编写的绝大部分应用都没有这个必要,除非是CPU密集型应用。
  2. 进行阻塞(Blocking)操做(如IO时)会阻塞掉整个程序

Greenlet

greenlet模块能够很是简单地实现多个任务的切换,可是检测不到io阻塞,须要手动添加

实现协程实例

def consumer(name): print("--->starting eating baozi...") while True: new_baozi = yield  # 直接返回
        print("[%s] is eating baozi %s" % (name, new_baozi)) def producer(): r = con.__next__() r = con2.__next__() n = 0 while n < 5: n += 1 con.send(n) # 唤醒生成器的同时传入一个参数
 con2.send(n) print("\033[32;1m[producer]\033[0m is making baozi %s" % n) if __name__ == '__main__': con = consumer("c1") con2 = consumer("c2") p = producer()
yield

Greenlet

安装greenlet  >>>     pip3 install greenlet

 

from greenlet import greenlet

def func1():
    print(12)
    #遇到switch时切换,手动切换
    gr2.switch()
    print(34)
    gr2.switch()
def func2():
    print(56)
    gr1.switch()
    print(78)

#建立两个协程
gr1=greenlet(func1)
gr2=greenlet(func2)
gr1.switch()
Greenlet

单纯的切换(在没有io的状况下或者没有重复开辟内存空间的操做),反而会下降程序的执行速度

顺序执行
import time
def f1():
    res=1
    for i in range(100000000):
        res+=i

def f2():
    res=1
    for i in range(100000000):
        res*=i

start=time.time()
f1()
f2()
stop=time.time()
print('run time is %s' %(stop-start)) #10.985628366470337

#切换
from greenlet import greenlet
import time
def f1():
    res=1
    for i in range(100000000):
        res+=i
        g2.switch()

def f2():
    res=1
    for i in range(100000000):
        res*=i
        g1.switch()

start=time.time()
g1=greenlet(f1)
g2=greenlet(f2)
g1.switch()
stop=time.time()
print('run time is %s' %(stop-start)) # 52.763017892837524
单纯的切换

 

Gevent

单线程里的这多个任务的代码一般会既有计算操做又有阻塞操做,咱们彻底能够在执行任务1时遇到阻塞,就利用阻塞的时间去执行任务2。。。。如此,才能提升效率,这就用到了Gevent模块

#用法
g1=gevent.spawn(func,1,,2,3,x=4,y=5)建立一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面能够有多个参数,能够是位置实参或关键字实参,都是传给函数eat的

g2=gevent.spawn(func2)

g1.join() #等待g1结束

g2.join() #等待g2结束

#或者上述两步合做一步:gevent.joinall([g1,g2])

g1.value#拿到func1的返回值
用法

遇到IO阻塞时会自动切换任务

import gevent
def eat(name):
    print('%s eat 1' %name)
    gevent.sleep(2)
    print('%s eat 2' %name)

def play(name):
    print('%s play 1' %name)
    gevent.sleep(1)
    print('%s play 2' %name)


g1=gevent.spawn(eat,'egon')
g2=gevent.spawn(play,name='egon')
g1.join()
g2.join()
#或者gevent.joinall([g1,g2])
print('')
gevent

gevent.sleep(2)模拟的是gevent能够识别的io阻塞,补丁必须放在开头位置

而time.sleep(2)或其余的阻塞,gevent是不能直接识别的须要用下面一行代码,打补丁,就能够识别了

咱们能够用threading.current_thread().getName()来查看每一个g1和g2,查看的结果为DummyThread-n,即假线程

from gevent import monkey;monkey.patch_all()

import gevent
import time
def eat():
    print('eat food 1')
    time.sleep(2)
    print('eat food 2')

def play():
    print('play 1')
    time.sleep(1)
    print('play 2')

g1=gevent.spawn(eat)
g2=gevent.spawn(play_phone)
gevent.joinall([g1,g2])
print('')
补丁
#协程:单线程下实现并发,用户从应用程序级别控制单线程下任务的切换,注意必定是遇到IO才切


# import gevent
# #1.检测IO
# #2.自动切换
# import time
# def eat(name):
#     print('%s eat 1' %name)
#     gevent.sleep(2)
#     print('%s eat 2' %name)
# def play(name):
#     print('%s play 1' %name)
#     gevent.sleep(1)
#     print('%s play 2' %name)
#
# start=time.time()
# g1=gevent.spawn(eat,'alex')
# g2=gevent.spawn(play,'egon')
#
# # g1.join()
# # g2.join()
# gevent.joinall([g1,g2])
# stop=time.time()
# print(stop-start)





# import gevent
# import os
# #1.检测IO
# #2.自动切换
# import time
# def eat():
#     print('%s eat 1' %os.getpid())
#     gevent.sleep(2)
#     print('%s eat 2' %os.getpid())
# def play():
#     print('%s play 1' %os.getpid())
#     gevent.sleep(1)
#     print('%s play 2' %os.getpid())
#
# start=time.time()
# g1=gevent.spawn(eat,)
# g2=gevent.spawn(play,)
#
# # g1.join()
# # g2.join()
# gevent.joinall([g1,g2])
# stop=time.time()
# print(stop-start)



# import gevent
# import os
# from threading import current_thread
# #1.检测IO
# #2.自动切换
# import time
# def eat():
#     print('%s eat 1' %current_thread().getName())
#     gevent.sleep(2)
#     print('%s eat 2' %current_thread().getName())
# def play():
#     print('%s play 1' %current_thread().getName())
#     gevent.sleep(1)
#     print('%s play 2' %current_thread().getName())
#
# start=time.time()
# g1=gevent.spawn(eat,)
# g2=gevent.spawn(play,)
#
# # g1.join()
# # g2.join()
# gevent.joinall([g1,g2])
# stop=time.time()
# print(stop-start)


from gevent import monkey;monkey.patch_all()
import gevent
import os
from threading import current_thread
#1.检测IO
#2.自动切换
import time
def eat():
    print('%s eat 1' %current_thread().getName())
    time.sleep(2)
    print('%s eat 2' %current_thread().getName())
def play():
    print('%s play 1' %current_thread().getName())
    time.sleep(1)
    print('%s play 2' %current_thread().getName())

start=time.time()
g1=gevent.spawn(eat,)
g2=gevent.spawn(play,)

# g1.join()
# g2.join()
gevent.joinall([g1,g2])
stop=time.time()
print(stop-start)
总结性代码

例子

from urllib import request
from gevent import monkey
import gevent
import time

monkey.patch_all()  # 当前程序中只要设置到IO操做的都作上标记

def wget(url):
    print('GET: %s' % url)
    resp = request.urlopen(url)
    data = resp.read()
    print('%d bytes received from %s.' % (len(data), url))

urls = [
    'https://www.python.org/',
    'https://www.python.org/',
    'https://github.com/',
    'https://blog.ansheng.me/',
]

# 串行抓取
start_time = time.time()
for n in urls:
    wget(n)
print("串行抓取使用时间:", time.time() - start_time)

# 并行抓取
ctrip_time = time.time()
gevent.joinall([
    gevent.spawn(wget, 'https://www.python.org/'),
    gevent.spawn(wget, 'https://www.python.org/'),
    gevent.spawn(wget, 'https://github.com/'),
    gevent.spawn(wget, 'https://blog.ansheng.me/'),
])
print("并行抓取使用时间:", time.time() - ctrip_time)



输出
GET: https://www.python.org/
47424 bytes received from https://www.python.org/.
GET: https://www.python.org/
47424 bytes received from https://www.python.org/.
GET: https://github.com/
25735 bytes received from https://github.com/.
GET: https://blog.ansheng.me/
82693 bytes received from https://blog.ansheng.me/.
串行抓取使用时间: 15.143015384674072
GET: https://www.python.org/
GET: https://www.python.org/
GET: https://github.com/
GET: https://blog.ansheng.me/
25736 bytes received from https://github.com/.
47424 bytes received from https://www.python.org/.
82693 bytes received from https://blog.ansheng.me/.
47424 bytes received from https://www.python.org/.
并行抓取使用时间: 3.781306266784668
页面抓取
from gevent import monkey;monkey.patch_all()
import gevent
from multiprocessing import Process
from socket import *

def server(ip,port):
    s = socket(AF_INET, SOCK_STREAM)
    s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    s.bind((ip,port))
    s.listen(5)
    while True:
        conn,addr=s.accept()
        print('%s:%s' % (addr[0], addr[1]))
        g1=gevent.spawn(talk,conn,addr)

def talk(conn,addr):
    while True:
        try:
            data=conn.recv(1024)
            print('%s:%s [%s]' %(addr[0],addr[1],data))
            if not data:break
            conn.send(data.upper())
        except ConnectionResetError:
            break
    conn.close()
if __name__ == '__main__':
    server('127.0.0.1',8091)
服务端
# from socket import *
# c=socket(AF_INET,SOCK_STREAM)
# c.connect(('127.0.0.1',8090))
#
# while True:
#     msg=input('>>: ').strip()
#     if not msg:continue
#     c.send(msg.encode('utf-8'))
#     data=c.recv(1024)
#     print(data.decode('utf-8'))


from threading import Thread
from socket import *

def client():
    c=socket(AF_INET,SOCK_STREAM)
    c.connect(('127.0.0.1',8091))

    while True:
        c.send('hello'.encode('utf-8'))
        data=c.recv(1024)
        print(data.decode('utf-8'))

if __name__ == '__main__':
    for i in range(500):
        t=Thread(target=client)
        t.start()
客户端
相关文章
相关标签/搜索