38 协程 greenlet模块实现并发 Gevent

异步回调python

### 什么是异步回调mysql

异步回调指的是:在发起一个异步任务的同时指定一个函数,在异步任务完成时会自动的调用这个函数sql

### 为何须要异步回调编程

以前在使用线程池或进程池提交任务时,若是想要处理任务的执行结果则必须调用result函数或是shutdown函数,而它们都是是阻塞的,会等到任务执行完毕后才能继续执行,这样一来在这个等待过程当中就没法执行其余任务,下降了效率,因此须要一种方案,即保证解析结果的线程不用等待,又能保证数据可以及时被解析,该方案就是异步回调服务器

### 异步回调的使用并发

先来看一个案例:app

在编写爬虫程序时,一般都是两个步骤:dom

​ 1.从服务器下载一个网页文件异步

​ 2.读取而且解析文件内容,提取有用的数据socket

按照以上流程能够编写一个简单的爬虫程序

要请求网页数据则须要使用到第三方的请求库requests能够经过pip或是pycharm来安装,在pycharm中点击settings->解释器->点击+号->搜索requests->安装

```python
import requests,re,os,random,time
from concurrent.futures import ProcessPoolExecutor

def get_data(url):
print("%s 正在请求%s" % (os.getpid(),url))
time.sleep(random.randint(1,2))
response = requests.get(url)
print(os.getpid(),"请求成功 数据长度",len(response.content))
#parser(response) # 3.直接调用解析方法 哪一个进程请求完成就那个进程解析数据 强行使两个操做耦合到一块儿了
return response

def parser(obj):
data = obj.result()
htm = data.content.decode("utf-8")
ls = re.findall("href=.*?com",htm)
print(os.getpid(),"解析成功",len(ls),"个连接")

if __name__ == '__main__':
pool = ProcessPoolExecutor(3)
urls = ["https://www.baidu.com",
"https://www.sina.com",
"https://www.python.org",
"https://www.tmall.com",
"https://www.mysql.com",
"https://www.apple.com.cn"]
# objs = []
for url in urls:
# res = pool.submit(get_data,url).result() # 1.同步的方式获取结果 将致使全部请求任务不能并发
# parser(res)

obj = pool.submit(get_data,url) #
obj.add_done_callback(parser) # 4.使用异步回调,保证了数据能够被及时处理,而且请求和解析解开了耦合
# objs.append(obj)

# pool.shutdown() # 2.等待全部任务执行结束在统一的解析
# for obj in objs:
# res = obj.result()
# parser(res)
# 1.请求任务能够并发 可是结果不能被及时解析 必须等全部请求完成才能解析
# 2.解析任务变成了串行,
```

总结:异步回调使用方法就是在提交任务后获得一个Futures对象,调用对象的add_done_callback来指定一个回调函数,

若是把任务比喻为烧水,没有回调时就只能守着水壶等待水开,有了回调至关于换了一个会响的水壶,烧水期间可用做其余的事情,等待水开了水壶会自动发出声音,这时候再回来处理。水壶自动发出声音就是回调。

注意:

1. 使用进程池时,回调函数都是主进程中执行执行
2. 使用线程池时,回调函数的执行线程是不肯定的,哪一个线程空闲就交给哪一个线程
3. 回调函数默认接收一个参数就是这个任务对象本身,再经过对象的result函数来获取任务的处理结果

 线程队列

1.Queue 先进先出队列

与多进程中的Queue使用方式彻底相同,区别仅仅是不能被多进程共享。

```python
q = Queue(3)
q.put(1)
q.put(2)
q.put(3)
print(q.get(timeout=1))
print(q.get(timeout=1))
print(q.get(timeout=1))
```

 

2.LifoQueue 后进先出队列

该队列能够模拟堆栈,实现先进后出,后进先出

```python
lq = LifoQueue()

lq.put(1)
lq.put(2)
lq.put(3)

print(lq.get())
print(lq.get())
print(lq.get())
```

 

3.PriorityQueue 优先级队列

该队列能够为每一个元素指定一个优先级,这个优先级能够是数字,字符串或其余类型,可是必须是能够比较大小的类型,取出数据时会按照从小到大的顺序取出

```python
pq = PriorityQueue()
# 数字优先级
pq.put((10,"a"))
pq.put((11,"a"))
pq.put((-11111,"a"))

print(pq.get())
print(pq.get())
print(pq.get())
# 字符串优先级
pq.put(("b","a"))
pq.put(("c","a"))
pq.put(("a","a"))

print(pq.get())
print(pq.get())
print(pq.get())
```

.线程事件Event

### 什么是事件

事件表示在某个时间发生了某个事情的通知信号,用于线程间协同工做。

由于不一样线程之间是独立运行的状态不可预测,因此一个线程与另外一个线程间的数据是不一样步的,当一个线程须要利用另外一个线程的状态来肯定本身的下一步操做时,就必须保持线程间数据的同步,Event就能够实现线程间同步

### Event介绍

Event象包含一个可由线程设置的信号标志,它容许线程等待某些事件的发生。在 初始状况下,Event对象中的信号标志被设置为假。若是有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程若是将一个Event对象的信号标志设置为真,它将唤醒全部等待这个Event对象的线程。若是一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

可用方法:

```python
event.isSet():返回event的状态值;
event.wait():将阻塞线程;知道event的状态为True
event.set(): 设置event的状态值为True,全部阻塞池的线程激活进入就绪状态, 等待操做系统调度;
event.clear():恢复event的状态值为False。
```

使用案例:

```python
# 在连接mysql服务器前必须保证mysql已经启动,而启动须要花费一些时间,因此客户端不能当即发起连接 须要等待msyql启动完成后当即发起连接
from threading import Event,Thread
import time

boot = False
def start():
global boot
print("正正在启动服务器.....")
time.sleep(5)
print("服务器启动完成!")
boot = True

def connect():
while True:
if boot:
print("连接成功")
break
else:
print("连接失败")
time.sleep(1)

Thread(target=start).start()
Thread(target=connect).start()
Thread(target=connect).start()
```

使用Event改造后:

```python
from threading import Event,Thread
import time

e = Event()
def start():
global boot
print("正正在启动服务器.....")
time.sleep(3)
print("服务器启动完成!")
e.set()

def connect():
e.wait()
print("连接成功")

Thread(target=start).start()
Thread(target=connect).start()
Thread(target=connect).start()
```

增长需求,每次尝试连接等待1秒,尝试次数为3次

```python
from threading import Event,Thread
import time

e = Event()
def start():
global boot
print("正正在启动服务器.....")
time.sleep(5)
print("服务器启动完成!")
e.set()

def connect():
for i in range(1,4):
print("第%s次尝试连接" % i)
e.wait(1)
if e.isSet():
print("连接成功")
break
else:
print("第%s次连接失败" % i)
else:
print("服务器未启动!")

Thread(target=start).start()
Thread(target=connect).start()
# Thread(target=connect).start()
```





单线程实现并发
并发:指的是多个任务同时发生,看起来好像是同时都在进行

并行:指的是多个任务真正的同时进行

若是一个线程可以检测IO操做而且将其设置为非阻塞,并自动切换到其余任务就能够提升CPU的利用率,指的就是在单线程下实现并发。
并发 = 切换任务+保存状态,只要找到一种方案,可以在两个任务之间切换执行而且保存状态,那就能够实现单线程并发
python中的生成器就具有这样一个特色,每次调用next都会回到生成器函数中执行代码,这意味着任务之间能够切换,而且是基于上一次运行的结果,这意味着生成器会自动保存执行状态!
利用生成器来实现并发执行:
def task1():
while True:
yield
print("task1 run")

def task2():
g = task1()
while True:
next(g)
print("task2 run")
task2()
 两个计算任务一个采用生成器切换并发执行  一个直接串行调用
import time
def task1():
a = 0
for i in range(10000000):
a += i
yield

def task2():
g = task1()
b = 0
for i in range(10000000):
b += 1
next(g)
s = time.time()
task2()
print("并发执行时间",time.time()-s)

# 单线程下串行执行两个计算任务 效率反而比并发高 由于并发须要切换和保存
def task1():
a = 0
for i in range(10000000):
a += i
def task2():
b = 0
for i in range(10000000):
b += 1
s = time.time()
task1()
task2()
print("串行执行时间",time.time()-s)
```
能够看到对于纯计算任务而言,单线程并发反而使执行效率降低了一半左右,因此这样的方案对于纯计算任务而言是没有必要的

 

协程

协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。

协程:协程是一种用户态的轻量级线程,即协程是由用户程序本身控制调度的。

协程的本质就是在单线程下,由用户本身控制一个任务遇到io阻塞了就切换另一个任务去执行,以此来提高效率。为了实现它,咱们须要找寻一种能够同时知足如下条件的解决方案:

1.能够控制多个任务之间的切换,切换以前将任务的状态保存下来,以便从新运行时,能够基于暂停的位置继续执行。

2. 做为1的补充:能够检测io操做,在遇到io操做的状况下才发生切换

优势以下:

1. 协程的切换开销更小,属于程序级别的切换,操做系统彻底感知不到,于是更加轻量级
2. 单线程内就能够实现并发的效果,最大限度地利用cpu

缺点以下:

1. 协程的本质是单线程下,没法利用多核,能够是一个程序开启多个进程,每一个进程内开启多个线程,每一个线程内开启协程
2. 协程指的是单个线程,于是一旦协程出现阻塞,将会阻塞整个线程

总结协程特色:

  1. 必须在只有一个单线程里实现并发
  2. 修改共享数据不需加锁
  3. 用户程序里本身保存多个控制流的上下文栈
  4. 附加:一个协程遇到IO操做自动切换到其它协程(如何实现检测IO,yield、greenlet都没法实现,就用到了gevent模块(select机制))

能够看到对于纯计算任务而言,单线程并发反而使执行效率降低了一半左右,因此这样的方案对于纯计算任务而言是没有必要的
greenlet模块实现并发

```python
def task1(name):
print("%s task1 run1" % name)
g2.switch(name) # 切换至任务2
print("task1 run2")
g2.switch() # 切换至任务2

def task2(name):
print("%s task2 run1" % name)
g1.switch() # 切换至任务1
print("task2 run2")

g1 = greenlet.greenlet(task1)
g2 = greenlet.greenlet(task2)
g1.switch("jerry") # 为任务传参数
```

如今咱们须要一种方案 便可检测IO 又可以实现单线程并发,因而gevent闪亮登场
Gevent 是一个第三方库,能够轻松经过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet所有运行在主程序操做系统进程的内部,但它们被协做式地调度。
#用法
g1=gevent.spawn(func,1,,2,3,x=4,y=5)建立一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面能够有多个参数,能够是位置实参或关键字实参,都是传给函数eat的

g2=gevent.spawn(func2)

g1.join() #等待g1结束

g2.join() #等待g2结束

#或者上述两步合做一步:gevent.joinall([g1,g2])

g1.value#拿到func1的返回值
遇到IO阻塞时会自动切换任务
# gevent 不具有检测IO的能力  须要为它打补丁  打上补丁以后就能检测IO
# 注意补丁必定打在最上面 必须保证导入模块前就打好补丁
from gevent import monkey
monkey.patch_all()

from threading import current_thread
import gevent,time


def task1():
print(current_thread(),1)
print("task1 run")
# gevent.sleep(3)
time.sleep(3)
print("task1 over")

def task2():
print(current_thread(),2)
print("task2 run")
print("task2 over")

# spawn 用于建立一个协程任务
g1 = gevent.spawn(task1)
g2 = gevent.spawn(task2)

# 任务要执行,必须保证主线程没挂 由于全部协程任务都是主线在执行 ,必须调用join来等待协程任务
# g1.join()
# g2.join()
# 理论上等待执行时间最长的任务就行 , 可是不清楚谁的时间长 能够所有join

gevent.joinall([g1,g2])
print("over")

gevent.sleep(3)模拟的是gevent能够识别的io阻塞,

而time.sleep(3)或其余的阻塞,gevent是不能直接识别的须要用下面一行代码,打补丁,就能够识别了

from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面,如time,socket模块以前

相关文章
相关标签/搜索