Python并发编程之协程

协程

1、协程的本质:

单线程实现并发,在应用程序里控制多个任务的切换+保存状态python

2、协程的目的:

  • 想要在单线程下实现并发
  • 并发指的是多个任务看起来是同时运行的
  • 并发=切换+保存状态

3、补充:

  • yiled能够保存状态,yield的状态保存与操做系统的保存线程状态很像,可是yield是代码级别控制的,更轻量级
  • send能够把一个函数的结果传给另一个函数,以此实现单线程内程序之间的切换
  • 如何实现检测IO,yield、greenlet都没法实现,就用到了gevent模块(select机制)

4、优势

  • 应用程序级别速度要远远高于操做系统的切换

5、缺点

  • 多个任务一旦有一个阻塞没有切,整个线程都阻塞在原地,该线程内的其余的任务都不能执行了
  • 一旦引入协程,就须要检测单线程下全部的IO行为,实现遇到IO就切换,少一个都不行,由于若是一个任务阻塞了,整个线程就阻塞了,其余的任务即使是能够计算,可是也没法运行了

注意:单纯地切换反而会下降运行效率

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#并发执行
import time
 
def producer():
     g = consumer()
     next (g)
     for i in range ( 100 ):
         g.send(i)
 
def consumer():
     while True :
         res = yield
 
start_time = time.time()
producer()
stop_time = time.time()
print (stop_time - start_time)
 
#串行
import time
 
def producer():
     res = []
     for i in range ( 10000000 ):
         res.append(i)
     return res
 
 
def consumer(res):
     pass
 
start_time = time.time()
res = producer()
consumer(res)
stop_time = time.time()
print (stop_time - start_time)

greenlet

greenlet只是提供了一种比generator更加便捷的切换方式,当切到一个任务执行时若是遇到io,那就原地阻塞,仍然是没有解决遇到IO自动切换来提高效率的问题。web

注意:单纯的切换(在没有io的状况下或者没有重复开辟内存空间的操做),反而会下降程序的执行速度

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#pip3 install greenlet
from greenlet import greenlet
import time
 
def eat(name):
     print ( '%s eat 1' % name)
     time.sleep( 2 )
     g2.switch( 'tom' )
     print ( '%s eat 2' % name)
     g2.switch()
 
def play(name):
     print ( '%s play 1' % name )
     g1.switch()
     print ( '%s play 2' % name )
 
g1 = greenlet(eat)
g2 = greenlet(play)
 
g1.switch( 'tom' )
 
"""
tom eat 1
tom play 1
tom eat 2
tom play 2
"""

gevent

遇到IO阻塞时会自动切换任务redis

1、用法:

  • g1=gevent.spawn(func,1,,2,3,x=4,y=5)建立一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面能够有多个参数,能够是位置实参或关键字实参,都是传给函数eat的
  • g2=gevent.spawn(func2)
  • g1.join() #等待g1结束
  • g2.join() #等待g2结束
  • 或者上述两步合做一步:gevent.joinall([g1,g2])
  • g1.value#拿到func1的返回值

2、补充:

  • gevent.sleep(2)模拟的是gevent能够识别的io阻塞,
  • 而time.sleep(2)或其余的阻塞,gevent是不能直接识别的须要用下面一行代码,打补丁,就能够识别了
  • from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面,如time,socket模块以前或者咱们干脆记忆成:要用gevent,须要将from gevent import monkey;monkey.patch_all()放到文件的开头
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#pip3 install gevent
from gevent import monkey;monkey.patch_all()
import gevent
import time
 
def eat(name):
     print ( '%s eat 1' % name)
     time.sleep( 3 )
     print ( '%s eat 2' % name)
 
def play(name):
     print ( '%s play 1' % name)
     time.sleep( 2 )
     print ( '%s play 2' % name)
 
start_time = time.time()
g1 = gevent.spawn(eat, 'tom' )
g2 = gevent.spawn(play, 'rose' )
 
g1.join()
g2.join()
stop_time = time.time()
print (stop_time - start_time)
"""
tom eat 1
rose play 1
rose play 2
tom eat 2
3.003171920776367
"""
 
 
 
from gevent import monkey;monkey.patch_all()
import gevent
import time
 
def eat(name):
     print ( '%s eat 1' % name)
     time.sleep( 3 )
     print ( '%s eat 2' % name)
 
def play(name):
     print ( '%s play 1' % name)
     time.sleep( 2 )
     print ( '%s play 2' % name)
 
g1 = gevent.spawn(eat, 'tom' )
g2 = gevent.spawn(play, 'rose' )
 
# g1.join()
# g2.join()
gevent.joinall([g1,g2])

3、经过gevent实现单线程下的socket并发

from gevent import monkey;monkey.patch_all()必定要放到导入socket模块以前,不然gevent没法识别socket的阻塞算法

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
"""
服务端
#基于gevent实现
"""
from gevent import monkey,spawn;monkey.patch_all()
from socket import *
 
def communicate(conn):
     while True :
         try :
             data = conn.recv( 1024 )
             if not data: break
             conn.send(data.upper())
         except ConnectionResetError:
             break
 
     conn.close()
 
def server(ip,port):
     server = socket(AF_INET, SOCK_STREAM)
     server.bind((ip,port))
     server.listen( 5 )
 
     while True :
         conn, addr = server.accept()
         spawn(communicate,conn)
 
     server.close()
 
if __name__ = = '__main__' :
     g = spawn(server, '127.0.0.1' , 8090 )
     g.join()
     
     
"""
客户端
"""
from socket import *
from threading import Thread,currentThread
 
def client():
     client = socket(AF_INET,SOCK_STREAM)
     client.connect(( '127.0.0.1' , 8090 ))
 
     while True :
         client.send(( '%s hello' % currentThread().getName()).encode( 'utf-8' ))
         data = client.recv( 1024 )
         print (data.decode( 'utf-8' ))
 
     client.close()
 
if __name__ = = '__main__' :
     for i in range ( 500 ):
         t = Thread(target = client)
         t.start()
"""
网络编程
B/S和C/S架构
osi五层协议
应用层
传输层  端口/服务相关  四次路由器 四层交换机
网络层 ip 路由器 三层交换机
数据链路层 arp mac相关 网卡 交换机
物理层
socket/socketserver
tcp/udp
    tcp协议:面向链接的,可靠的,全双工的,流失传输协议
    三次握手
    四次挥手
    粘包现象
        在接受度粘 发送信息无边界,在接受端没有及时接受,在缓存端粘在一块儿
        在发送端粘,时间短,因为优化机制粘在一块儿的
    udp协议:面向数据报的,不可靠 无链接的协议
ip
arp

I/O操做 输入会让输出
    input输入到内存 input  read  recv  recvfrom   accept   connect  close
    output 从内存输出 print write  send   sendto   accept   connect  close

阻塞和非阻塞
阻塞: cpu不工做 accept recv connect等待
非阻塞:cpu工做


##############################

人机矛盾
    cpu利用率低
磁带存储+批处理
    下降数据的读取时间
    提升cpu的利用率
多道操做系统------在一个任务遇到io的时候主动让出cpu
    数据隔离
    时空复用
    可以在一个任务遇到io操做的时候主动把cpu让出来,给其余的任务使用
        切换:占时间,
        切换:操做系统作的
分时操做系统------给时间分片,让多个任务轮流使用cpu
    cpu的轮转
    每个程序分配一个时间片
    切换:占时间
    反而下降来cpu的使用率
    提升了用户体验
分时操做系统+多道操做系统+实时操做系统
    多个程序一块儿在计算机中执行
    一个程序若是遇到io操做,切出去让出cpu
    一个程序没有遇到io,可是时间片到时了,切出去让出cpu
网络操做系统
分布式操做系统



##############################
进程:运行中的程序
程序和进程之间的区别
    程序只是一个文件
    进程是这个文件被cpu运行起来了
进程是计算机中最小的资源分配单位
每个进程都有一个在操做系统中的惟一标识符:pid

操做系统调度进程的算法
    短做业优先
    先来先服务
    时间片轮转
    多级反馈算法(少在程序的一开始就使用io)

操做系统负责什么?
    调度进程前后执行的顺序,控制执行的时间等等
    资源的分配

并行与并发
    并行:两个程序两个cpu,每一个程序分别占用一个cpu本身执行本身的
         看起来是同时执行,实际在每个时间点上都在各自执行着
    并发:两个程序一个cpu每一个程序交替在一个cpu上执行,看起来在同时执行,可是实际上仍然是串行
同步:
    调用这个函数,并等待这个函数结束
异步:
    调用一个函数,不等待这个方法结束,也不关心这个方法作了什么
阻塞:cpu不工做
非阻塞:cpu工做
同步阻塞:
    conn.recv
    调用函数的这个过程,有io操做
同步非阻塞:
    func() 没有io操做
    socket 非阻塞的tcp协议的时候
    调用函数(这个函数内部不存在io操做)
异步非阻塞:
    把func仍到其余任务里去执行里,没有io操做
    自己的任务和func任务各自执行各自的,没有io操做
异步阻塞:


双击图标发生的事情:
程序在开始运行以后,并非当即开始执行代码,而是会进入就绪状态,等待操做系统调度开始运行
操做系统为其建立进程,分配一块空间,pid---》就绪(ready)---》运行(runing)---》阻塞(blocking)
运行(没有遇到io操做时间片到了)---》就绪
运行(遇到io)--》阻塞
运行--》运行完了--》结束进程
阻塞io结束---》就绪
阻塞影响了程序运行的效率

##############################
进程是计算机中最小的资源分配单位(进程负责圈资源)
    数据隔离的
    建立进程,时间开销大
    销毁进程,时间开销大
    进程之间切换,时间开销大
线程:是计算机中能被cpu调度的最小单位(线程是负责执行具体代码的)
    线程的建立,也须要一些开销(一个存储局部变量的结构,记录状态)
    建立,销毁,切换开销远远小于进程
    每一个进程中至少有一个线程
    一个进程中的多个线程是能够共享这个进程的数据的

是进程中的一部分
每个进程中至少有一个线程,线程是负责执行具体代码的
只负责执行代码,不负责存储共享的数据,也不负责资源分配


python中的线程比较特殊,因此进程也有可能被用到。

pid子进程
ppid父进程
在pycharm中启动的全部py程序都是pycharm的子进程


tcp: 可靠
https: 安全
单核 并发
多核 多个程序跑在多个cpu上,在同一时刻并行

##############################

操做系统
    1。计算机中全部的资源都是由操做系统分配的
    2。操做系统调度任务:时间分片,多道机制
    3。cpu的利用率是咱们努力的指标

进程:开销大 数据隔离 资源分配单位 cpython下能够利用多核  由操做系统调度的
    进程的三状态:就绪 运行 阻塞
    multiprocessing模块
    Process-开启进程
    Lock-互斥锁
        为何要在进程中加锁
        由于进程操做文件也会发生数据不安全
    Queue -队列 IPC机制(Pipe,redis,memcache,rabbitmq,kafka)
    Manager-提供数据共享机制

线程:开销小 数据共享 cpu调度单位 cpython下不能利用多核  由操做系统调度的
    GIL锁:全局解释器锁,Cpython解释器提供的,致了一个进程中多个线程同一时刻只有一个线程能访问CPU--多线程不能利用多核
    Thread类--能开启线程start,等待线程结束join
    Lock-互斥锁  互斥锁能在一个线程中连续acquire,效率相对高
    Rlock--递归锁 能够在一个线程中连续acquire,效率相对低
    死锁现象如何发生,如何避免?
    线程队列queue模块
        Queue
        LifoQueue
        PriorityQueue
池
    实例化一个池
    提交任务到池中,返回一个对象
        使用这个对象获取返回值
        回调函数
    阻塞等待池中的任务都结束

数据安全问题
数据隔离和通讯


协程:多个任务在一条线程上来回切换,用户级别的,由咱们本身写的python代码来控制切换的,是操做系统不可见的
在Cpython解释器下----协程和线程都不能利用多核,都是在一个CPU上轮流切换
    因为多线程自己就不能利用多核
    因此即使是开启来多个线程也只能轮流在一个cpu上执行
    协程若是把全部任务的IO操做都规避掉,只剩下须要使用CPU的操做
    就意味着协程就能够作到提升CPU利用率的效果

多线程和协程
    线程 切换须要操做系统,开销大 给操做系统的压力大
        操做系统对IO操做的感知更加灵敏
    协程 切换须要 python代码 开销小,用户操做可控  彻底不会增长操做系统的压力
        用户级别可以对IO操做的感知比较低

协程:可以在一个线程下的多个任务之间来回切换,那么每个任务都是一个协程
两种切换方式
    原生python完成  yield asyncio
    c语言完成的python模块 greenlet  gevent

1。一个线程中的阻塞都被其余的各类任务占满了
2。让操做系统以为这个线程很忙
尽可能的减小这个线程进入阻塞的状态
提升了单线程对CPU的利用率
3。多个任务在同一个线程中执行
也达到了一个并发的效果
规避了每个任务的io操做
减小了线程的个数,减轻了操做系统的负担



咱们写协程:在一条线程上最大限度的提升CPU的使用率
           在一个任务中遇到IO的时候就切换到其余任务
特色:
    开销很小,是用户级的,只能从用户级别感知的IO操做
    不能利用多核,数据共享,数据安全
模块和用法:
    gevent (扩展模块)基于greenlet(内置模块)切换   aiohttp爬虫模块基于asyncio    sanic异步框架
        先导入模块
        导入monkey,执行patch all
        写一个函数看成协程要执行的任务
        协程对象 = gevent.spawn(函数名,参数,)
        协程对象.join(),gevent.joinall([g1,g2...])

    分辨gevent是否识别了咱们写的代码中的io操做的方法
        在patchall以前打印一下涉及到io操做的函数地址
        在patchall以后打印一下涉及到io操做的函数地址
        若是两个地址一致,说明gevent没有识别这个io,若是不一致说明识别了

    asyncio 基于yield机制切换的
    async 标识一个协程函数
    await 后面跟着一个asyncio模块提供的io操做的函数
    loop 事件循环,负责在多个任务之间进行切换的

    最简单的协程函数是如何完成的


"""






import os
import time
from multiprocessing import Process
from multiprocessing import Queue
def func(exp,q):
    print('start',os.getpid())
    ret = eval(exp)
    q.put(ret)
    time.sleep(1)
    print('end',os.getpid())

if __name__ == '__main__': #window下须要加上这一句
    q=Queue() #先进先出
    p=Process(target=func,args=('1+2+3',q)) #建立一个即将要执行func函数的进程对象
    # p.daemon=True #守护进程是随着主进程的代码的结束而结束的
    p.start() #异步非阻塞    调用开启进程的方法,可是并不等待这个进程真的开启
    # p.join() #同步阻塞直到p对应的进程结束以后才结束阻塞
    # print(p.is_alive())
    # p.terminate() # 异步非阻塞  强制结束一个子进程
    # print(p.is_alive()) #子进程还活着,由于操做系统还没来得及关闭进程
    # time.sleep(0.01)
    # print(p.is_alive())#操做系统已经响应了咱们要关闭进程的需求,再去检测的时候,获得的结束是进程已经结束了
    print('main',os.getpid()) #主进程代码执行完毕,可是主进程没有结束,等待子进程结束
    print(q.get())

"""
操做系统建立进程的方式不一样
Windows操做系统执行开启进程的代码
    实际上新的子进程须要经过import父进程的代码来完成数据的导入工做
    因此有一些内容咱们只但愿在父进程中完成,就写在if __name__ == '__main__': 下面

iOS和Linux是直接从内存级别去拷贝代码(fork),从父进程拷贝数据到子进程

主进程没有结束,等待子进程结束
主进程负责回收子进程的资源
若是子进程执行结束,父进程没有回收资源,那么这个子进程会变成一个僵尸进程    

主进程的结束逻辑:
    主进程的代码结束
    全部的子进程结束
    给子进程回收资源
    主进程结束
主进程怎么知道子进程结束的呢? 监控文件
    基于网络,文件    
join方法
    把一个进程的结束事件封装成一个join方法
    执行join方法的效果就是阻塞直到这个子进程执行结束就结束阻塞
    
    在多个子进程中执行join

p=Process(target=函数名,args=(参数,))
进程对象和进程并无直接的关系,只是存储来一些和进程相关的内容,此时此刻,操做系统尚未接到建立进程的指令 
p.start()开启来一个进程----这个方法至关于给来操做系统一条指令
satrt方法的非阻塞和异步的特色
    在执行开启进程这个方法的时候
    咱们既不等待这个进程开启,也不等待操做系统给咱们的相应
    这里只是负责通知操做系统去开启一个进程
    开启了一个子进程以后,主进程的代码和子进程的代码彻底异步
    
p.daemon 守护进程是随着主进程代码的结束而结束的
全部的子进程都必须在主进程结束以前结束,由主进程来负责回收资源 

p.is_alive()
p.terminate() 强制结束一个子进程的


什么是异步非阻塞?
    terminate,start  
什么是同步阻塞
    join      
    
1.若是在一个并发的场景下,涉及到某部份内容是须要修改一些全部进程共享的数据资源,须要加锁来维护数据的安全
2.在数据安全的基础上,才考虑效率问题
3.同步存在的意义就是数据安全   

在主进程中实例化 lock=Lock()
在子进程中,对须要加锁的代码进行with lock:
    with lock: 至关于lock.acquire()和lock.release()
在进程中须要加锁的场景
    1.操做共享的数据资源(文件,数据库)
    2.对资源进行修改,删除操做    


进程之间的通信--IPC(inter process communication) 
队列和管道都是IPC机制,队列进程之间数据安全,管道进程之间数据不安全
第三方工具(软件)提供给咱们的IPC机制,集群的概念(高可用)
redis
memcache
kafka
rabbitmq
并发需求,高可用,断电保存数据,解耦


from multiprocessing import Queue
Queue(天生就是数据安全的) 基于 文件家族的socket,pickle和lock实现的
队列就是管道加锁
get():阻塞的
get_nowait():非阻塞的

下面三个方法不推荐使用,多进程中不许确()
q.empty() 判断队列是否为空
q.qsize() 返回队列中的数据个数
q.full() 判断队列是否为满



pipe 管道(不安全的)基于 文件家族的socket,pickle实现的
from multiprocessing import Pipe
pip=Pipe()
pip.send()
pip.recv()

解耦:修改  复用  可读性
把写在一块儿的大的功能分开成多个小的功能处理

joinablequeue
q.join() 阻塞  直到这个队列中全部的内容都被取走且task_done

multiprocessing中有一个manager类
封装了全部和进程相关的,数据共享,数据传递相关的数据类型
可是对于字典列表这一类的数据操做的时候会产生数据不安全
须要加锁解决问题,而且须要尽可能少的使用这种方式

线程自己是能够利用多核的
cpython解释器中不能实现多线程利用多核 
垃圾回收机制 gc  引用计数+分代回收
专门有一条线程来完成垃圾回收
对每个在程序中的变量统计引用计数

锁:GIL全局解释器锁
cpython解释器中的机制,致使了在同一个进程中多个线程不能同时利用多核。
python的多线程只能是并发不能是并行

线程即使有GIL,也会出现数据不安全的问题
1.操做的是全局变量
2.作如下操做:
    +=,-=,*=,/=先计算再赋值才容易出现数据不安全的问题
    包括list[0]+=1,dic['key']-=1



a=0
def func():
    global a
    a+=1

import dis
dis.dis(func)  #查看func的cpu指令

互斥锁是锁中的一种:在同一个线程中,不能连续acquire屡次,(在一个线程中连续屡次acquire会死锁)
死锁现象:有多把锁+多把锁交替使用,互斥锁在一个线程中连续acquire
递归锁 (在一个线程中连续屡次acquire不会死锁)
好:在同一个进程中屡次acquire也不会发生阻塞
很差:占用了更多资源

递归锁---将多把互斥锁变成了一把递归锁,递归锁也会发生死锁现象(多把递归锁交替使用的时候),之后的代码尽可能用一把锁。
mutexB=mutexA=RLock()
注意不是:mutexB=RLock()  mutexA=RLock() 否则仍是同样会发生死锁现象



保证了整个python程序中,只能有一个线程被CPU执行
缘由;cpython解释器中特殊的垃圾回收机制
GIL锁致使来线程不能并行,可是能够并发
因此使用多线程并不影响高io型的操做
只会对高计算型的程序有效率上的影响
遇到高计算的:多进程+多线程  或者 分布式


遇到IO操做的时候
5亿条cpu指令/s
5-6cpu指令==一句python代码
几千万条python代码
web框架,几乎都是多线程


主线程何时结束?等待全部子线程结束以后才结束
主线程若是结束了,主进程也就结束了
t.ident  线程id
from threading import Thread,current_thread,active_count,enumerate
active_count()==len(enumerate())
current_thread.ident  线程id   在哪个线程里,current_thread获得的就是这个当前线程的信息
在线程中不能从主线程结束一个子线程 没有terminate
不要在子线程里随便修改全局变量
守护线程一直等到全部的非守护线程都结束以后才结束
除了守护了主线程的代码以外也会守护子线程

p.daemon 守护进程是随着主进程代码的结束而结束的
非守护线程不结束,主线程也不结束,主线程结束了,主进程也结束
结束顺序:
非守护线程结束--》主线程结束--》主进程结束——》守护线程也结束



什么是生产者消费者模型
把一个产生数据而且处理数据的过程解耦
让生产数据的过程和处理数据的过程达到一个工做效率上的平衡
中间的容器,在多进程中使用队列或者可被join的队列,作到控制数据的量
当数据过剩的时候,队列的大小会控制消费者的行为
当数据严重不足的时候,队列会控制消费者的行为
而且还能够经过按期检测队列中元素的个数来调节生产者消费者的个数
爬虫:
    请求网页的平均时间是0.3s
    处理网页代码的时候是0.003s
    100倍,每启动100个线程生产数据,就启动一个线程来处理数据
web程序的server端
    每秒钟有6w条请求
    一个服务每s中只能处理2000条
    先写一个web程序,只负责一件事情,就是接收请求,而后把请求放到队列中(消息中间件)
    再写不少个server端,从队列中获取请求,而后处理,而后返回结果

# from queue import Queue #先进先出队列
import queue  #线程之间的通讯,线程安全

先进先出:写一个server全部的用户的请求放在队列里,先来先服务的思想  q=queue.Queue(3)
后进先出:算法中    q=queue.LifoQueue(3) #后进先出->堆栈
优先级队列:自动的排序(vip号码段)告警级别    q=queue.PriorityQueue(3) #优先级队列   q.put((10,'one'))

池:预先的开启固定个数的进程数,当任务来临的时候,直接提交给已经开好的进程/线程,让其去执行就能够了
节省了进程,线程的开启 关闭 切换锁花费的时间
而且减轻了操做系统调度的负担

shutdown 关闭池以后就不能继续提交任务,而且会阻塞,直到已经提交的任务完成
result() 同步阻塞
submit() 异步非阻塞

5*20*500

是单独开启线程进程仍是池?
若是只是开启一个子进程作一件事情,就能够单独开线程
有大量的任务等待程序去作,要达到必定的并发数,开启线程池
根据你程序的io操做也能够断定是用池仍是不用池
socket大量的阻塞io ,socket server就没有用池,用的是线程
爬虫的时候,池

回调函数:执行完子线程任务以后直接调用对应的回调函数
        爬取网页 须要等待数据传输和网络上的响应高IO的--子线程
        分析网页,没有什么IO操做--这个操做不必在子线程完成,交给回调函数
两件事情:存在IO操做的事情,基本不存在IO操做的事情
obj=tp.submit(io操做对应的函数)
obj.result() 是一个阻塞方法 获取返回值
obj.add_done_callback(计算型的事情)

obj=tp.submit(须要在子线程执行的函数名,参数)
obj.add_done_callback(子线程执行完毕以后要执行的代码对应的函数)

进程和线程都有锁
全部在线程中能工做的基本都不能在进程中工做
在进程中可以使用的基本在线程中也可使用

没一个进程下面都有一个独立的垃圾回收机制的线程,不须要担忧开了多进程之后,垃圾回收机制怎么处理


"""

from multiprocessing import process
from threading import Thread
import os
def tfunc():
    print(os.getpid())

def pfunc():
    print('pfunc-->',os.getpid())
    Thread(target=tfunc).start()

if __name__ == '__main__':
    Process(target=pfunc).start()




from multiprocessing import Queue
import queue
q=Queue(3)
q.put(1)
q.put(2)
q.put(3)
print('aaa')
# q.put(44444)  #当队列为满的时候再向队列中放数据,队列会阻塞
try:
    q.put_nowait(6) #当队列为满的时候再向队列中放数据,会报错而且会丢失数据
except queue.Full:
    pass
print('bbb')

print(q.get())
print(q.get())
print(q.get())
# print(q.get()) #在队列为空的时候会发生阻塞
try:
    q.get_nowait() #当队列为空的时候直接报错
except queue.Empty:
    pass





"""
##############################
协程:多个任务在一条线程上来回切换
咱们写协程:在一条线程上最大限度的提升CPU的使用率
           在一个任务中遇到IO的时候就切换到其余任务
特色:
    开销很小,是用户级的,只能从用户级别感知的IO操做
    不能利用多核,数据共享,数据安全
模块和用法:
    gevent 基于greenlet切换
        先导入模块
        导入monkey,执行patch all
        写一个函数看成协程要执行的任务
        协程对象 = gevent.spawn(函数名,参数,)
        协程对象.join(),gevent.joinall([g1,g2...])

    分辨gevent是否识别了咱们写的代码中的io操做的方法
        在patchall以前打印一下涉及到io操做的函数地址
        在patchall以后打印一下涉及到io操做的函数地址
        若是两个地址一致,说明gevent没有识别这个io,若是不一致说明识别了

    asyncio 基于yield机制切换的
    async 标识一个协程函数
    await 后面跟着一个asyncio模块提供的io操做的函数
    loop 事件循环,负责在多个任务之间进行切换的

    最简单的协程函数是如何完成的
"""



from gevent import monkey
monkey.patch_all()
import gevent
import time


def eat(name):
    print('%s eat 1' % name)
    time.sleep(3)
    print('%s eat 2' % name)


def play(name):
    print('%s play 1' % name)
    time.sleep(2)
    print('%s play 2' % name)
    return '返回值'

g_l=[]
for i in range(10):
    g = gevent.spawn(eat, 'tom')  #注意要遇到阻塞才会切换
    g_l.append(g)
gevent.joinall(g_l) #这就是阻塞的

g2=gevent.spawn(play,'rose')
g2.join()
g2.value #获取返回值,须要注意的是这个是一个属性,须要在join后才能获取到,否则就是None  此处打印 返回值


import asyncio
async def func(): #协程方法
    print("start")
    await asyncio.sleep(1)  #阻塞  须要一个关键字await告知是阻塞须要执行一个协程了,若是用await须要在函数以前加上async表示是一个协程函数
    print("end")
    return 123

async def func1():
    print("start")
    await asyncio.sleep(1)
    print("end")
    return 456

async def func2():
    print("start")
    await asyncio.sleep(1)
    print("end")
    return 789

#启动一个任务
loop=asyncio.get_event_loop() #建立一个事件循环
loop.run_until_complete(func()) #把func任务丢到事件循环中去执行    执行一个协程函数  和join差很少意思

#启动多个任务,而且没有返回值
loop=asyncio.get_event_loop()
wait_l=asyncio.wait([func(),func1(),func2()]) #执行多个协程函数
loop.run_until_complete(wait_l)


#启动多个任务,而且有返回值
loop=asyncio.get_event_loop()
f=loop.create_task(func())
f1=loop.create_task(func1())
f2=loop.create_task(func2())
task_l=[f,f1,f2] # 按照这个顺序取返回值
wait_l=asyncio.wait([f,f1,f2])
loop.run_until_complete(wait_l)
for i in task_l:
    print(i.result()) #返回值

#谁先回来先取谁的值
import asyncio
async def demo(i):
    print("start")
    await asyncio.sleep(10-i)
    print("end")
    return i,456

async def main():
    task_l=[]
    for i in range(10):
        task = asyncio.ensure_future(demo(i))
        task_l.append(task)

    for ret in asyncio.as_completed(task_l):
        res = await ret
        print(res)

loop=asyncio.get_event_loop()
loop.run_until_complete(main())

"""
await 阻塞 协程函数这里要切换出去,还能保证一下子在切回来
await必须写在async函数里,async函数是协程函数
loop事件循环
全部的协程的执行,调度都离不开这个loop 
"""
asyncio
a=0
def func():
    global a
    a+=1

import dis
dis.dis(func)  #查看func的cpu指令
dis
相关文章
相关标签/搜索