Python并发编程之多进程,多线程

基础概念

1、进程、程序和线程

  • 程序:程序只是一堆代码而已
  • 进程:指的是程序的运行过程,是对正在运行程序的一个抽象。进程是一个资源单位
  • 线程:每一个进程有一个地址空间,并且默认就有一个控制线程。线程才是cpu上的执行单位

2、并发与并行

不管是并行仍是并发,在用户看来都是'同时'运行的,无论是进程仍是线程,都只是一个任务而已,真是干活的是cpu,cpu来作这些任务,而一个cpu同一时刻只能执行一个任务python

  • 并发:是伪并行,即看起来是同时运行。单个cpu+多道技术就能够实现并发,(并行也属于并发)
  • 并行:同时运行,只有具有多个cpu才能实现并行

3、同步与异步

  • 同步:发出一个功能调用时,在没有获得结果以前,该调用就不会返回。
  • 异步:异步功能调用发出后,不会等返回,而是继续往下执行当,当该异步功能完成后,经过状态、通知或回调函数来通知调用者。

4、阻塞与非阻塞

  • 阻塞:调用结果返回以前,当前线程会被挂起(如遇到io操做)。函数只有在获得结果以后才会将阻塞的线程激活。
  • 非阻塞:在不能马上获得结果以前也会马上返回,同时该函数不会阻塞当前线程。

5、总结

  • 同步与异步针对的是函数/任务的调用方式:同步就是当一个进程发起一个函数(任务)调用的时候,一直等到函数(任务)完成,而进程继续处于激活状态。而异步状况下是当一个进程发起一个函数(任务)调用的时候,不会等函数返回,而是继续往下执行当,函数返回的时候经过状态、通知、事件等方式通知进程任务完成。
  • 阻塞与非阻塞针对的是进程或线程:阻塞是当请求不能知足的时候就将进程挂起,而非阻塞则不会阻塞当前进程

多进程

1、进程的状态

2、multiprocessing模块介绍

  • python中的多线程没法利用多核优点,若是想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分状况须要使用多进程。
  • multiprocessing模块用来开启子进程,并在子进程中执行咱们定制的任务(好比函数),该模块与多线程模块threading的编程接口相似。
  • 与线程不一样,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。

3、Process类

一、方法介绍

  • p.start():启动进程,并调用该子进程中的p.run()
  • p.run():进程启动时运行的方法,正是它去调用target指定的函数,咱们自定义类的类中必定要实现该方法
  • p.terminate():强制终止进程p,不会进行任何清理操做,若是p建立了子进程,该子进程就成了僵尸进程,使用该方法须要特别当心这种状况。若是p还保存了一个锁那么也将不会被释放,进而致使死锁
  • p.is_alive():若是p仍然运行,返回True
  • p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,须要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程

二、属性介绍

  • p.daemon:默认值为False,若是设为True,表明p为后台运行的守护进程,当p的父进程终止时,p也随之终止,而且设定为True后,p不能建立本身的新进程,必须在p.start()以前设置
  • p.name:进程的名称
  • p.pid:进程的pid

三、开启子进程的两种方式

注意:在windows中Process()必须放到# if __name__ == '__main__':下git

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#方式一:
from multiprocessing import Process
import time,os
 
def task(name):
     print ( '%s %s is running,parent id is <%s>' % (name,os.getpid(),os.getppid()))
     time.sleep( 3 )
     print ( '%s %s is running,parent id is <%s>' % (name, os.getpid(), os.getppid()))
 
if __name__ = = '__main__' :
     # Process(target=task,kwargs={'name':'子进程1'})
     p = Process(target = task,args = ( '子进程1' ,))
     p.start() #仅仅只是给操做系统发送了一个信号
 
     print ( '主进程' , os.getpid(), os.getppid())
 
#方式二
from multiprocessing import Process
import time,os
 
class MyProcess(Process):
     def __init__( self ,name):
         super ().__init__()
         self .name = name
 
     def run( self ):
         print ( '%s %s is running,parent id is <%s>' % ( self .name, os.getpid(), os.getppid()))
         time.sleep( 3 )
         print ( '%s %s is running,parent id is <%s>' % ( self .name, os.getpid(), os.getppid()))
 
if __name__ = = '__main__' :
     p = MyProcess( '子进程1' )
     p.start()
     print ( '主进程' , os.getpid(), os.getppid())

4、守护进程

主进程建立守护进程github

  • 守护进程会在主进程代码执行结束后就终止
  • 守护进程内没法再开启子进程,不然抛出异常:AssertionError: daemonic processes are not allowed to have children
  • 注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止
  • 必定要在p.start()前设置,设置p为守护进程,禁止p建立子进程,而且父进程代码执行结束,p即终止运行
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from multiprocessing import Process
 
import time
def foo():
     print ( 123 )
     time.sleep( 1 )
     print ( "end123" )
 
def bar():
     print ( 456 )
     time.sleep( 3 )
     print ( "end456" )
 
if __name__ = = '__main__' :
     p1 = Process(target = foo)
     p2 = Process(target = bar)
 
     p1.daemon = True
     p1.start()
     p2.start()
     # p2.join()
     print ( "main-------" )
 
"""
#主进程代码运行完毕,守护进程就会结束
main-------
456
end456
"""

5、互斥锁

进程之间数据不共享,可是共享同一套文件系统,因此访问同一个文件,或同一个打印终端,是没有问题的, 而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理web

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
"""
db.txt的内容为:{"count":1}
"""
from multiprocessing import Process,Lock
import json
import time
 
def search(name):
     time.sleep( 1 )
     dic = json.load( open ( 'db.txt' , 'r' ,encoding = 'utf-8' ))
     print ( '<%s> 查看到剩余票数【%s】' % (name,dic[ 'count' ]))
 
def get(name):
     time.sleep( 1 )
     dic = json.load( open ( 'db.txt' , 'r' ,encoding = 'utf-8' ))
     if dic[ 'count' ] > 0 :
         dic[ 'count' ] - = 1
         time.sleep( 3 )
         json.dump(dic, open ( 'db.txt' , 'w' ,encoding = 'utf-8' ))
         print ( '<%s> 购票成功' % name)
 
def task(name,mutex):
     search(name)
     mutex.acquire()  #=============
     get(name)
     mutex.release()  #=============
 
if __name__ = = '__main__' :
     mutex = Lock()  #=============
     for i in range ( 10 ):
         p = Process(target = task,args = ( '路人%s' % i,mutex))
         p.start()
#加锁能够保证多个进程修改同一块数据时,同一时间只能有一个任务能够进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
虽然能够用文件共享数据实现进程间通讯,但问题是:
1.效率低(共享数据基于文件,而文件是硬盘上的数据)
2.须要本身加锁处理


#所以咱们最好找寻一种解决方案可以兼顾:一、效率高(多个进程共享一块内存的数据)二、帮咱们处理好锁问题。这就是mutiprocessing模块为咱们提供的基于消息的IPC通讯机制:队列和管道。这两种方式都是使用消息传递的。
1 队列和管道都是将数据存放于内存中
2 队列又是基于(管道+锁)实现的,可让咱们从复杂的锁问题中解脱出来,
咱们应该尽可能避免使用共享数据,尽量使用消息传递和队列,避免处理复杂的同步和锁问题,并且在进程数目增多时,每每能够得到更好的可获展性。

6、队列(推荐使用)

一、Queue

  • Queue([maxsize]):建立共享的进程队列,Queue是多进程安全的队列,可使用Queue实现多进程之间的数据传递。
  • maxsize是队列中容许最大项数,省略则无大小限制。
  • q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。若是blocked为True(默认值),而且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。若是超时,会抛出Queue.Full异常。若是blocked为False,但该Queue已满,会当即抛出Queue.Full异常。
  • q.get方法能够从队列读取而且删除一个元素。一样,get方法有两个可选参数:blocked和timeout。若是blocked为True(默认值),而且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。若是blocked为False,有两种状况存在,若是Queue有一个值可用,则当即返回该值,不然,若是队列为空,则当即抛出Queue.Empty异常.
  • q.empty():调用此方法时q为空则返回True,该结果不可靠,好比在返回True的过程当中,若是队列中又加入了项目。
  • q.full():调用此方法时q已满则返回True,该结果不可靠,好比在返回True的过程当中,若是队列中的项目被取走。
  • q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()同样
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
"""
基本用法
"""
from multiprocessing import Process, Queue
import time
 
q = Queue( 3 )
 
# put ,get ,put_nowait,get_nowait,full,empty
q.put( 3 )
q.put( 3 )
q.put( 3 )
 
print (q.full())  # 满了 True
print (q.get())
print (q.get())
print (q.get())
print (q.empty())  # 空了 True
 
"""
基于队列来实习一个生产者消费者模型
问题是主进程永远不会结束,缘由是:生产者p在生产完后就结束了,
可是消费者c在取空了q以后,则一直处于死循环中且卡在q.get()这一步。
"""
from multiprocessing import Process,Queue
import time,random
def consumer(q,name):
     while True :
         res = q.get()
         time.sleep(random.randint( 1 , 3 ))
         print ( '%s 吃 %s' % (name,res))
 
def producer(q,name,food):
     for i in range ( 3 ):
         time.sleep(random.randint( 1 , 3 ))
         res = '%s%s' % (food,i)
         q.put(res)
         print ( '%s 生产了 %s' % (name,res))
 
if __name__ = = '__main__' :
     q = Queue()
     #生产者们:即厨师们
     p1 = Process(target = producer,args = (q, 'egon' , '包子' ))
     #消费者们:即吃货们
     c1 = Process(target = consumer,args = (q, 'alex' ))
     #开始
     p1.start()
     c1.start()
     print ( '主' )
 
"""
解决方式:让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就能够break出死循环
"""
from multiprocessing import Process,Queue
import time,random,os
def consumer(q,name):
     while True :
         res = q.get()
         if res is None : break
         time.sleep(random.randint( 1 , 3 ))
         print ( '%s 吃 %s' % (name,res))
 
def producer(q,name,food):
     for i in range ( 3 ):
         time.sleep(random.randint( 1 , 3 ))
         res = '%s%s' % (food,i)
         q.put(res)
         print ( '%s 生产了 %s' % (name,res))
 
if __name__ = = '__main__' :
     q = Queue()
     #生产者们:即厨师们
     p1 = Process(target = producer,args = (q, 'egon' , '包子' ))
 
     #消费者们:即吃货们
     c1 = Process(target = consumer,args = (q, 'alex' ))
 
     #开始
     p1.start()
     c1.start()
 
     p1.join()
     q.put( None )
     print ( '主' )
 
 
"""
有多个生产者和多个消费者时,有几个消费者就须要发送几回结束信号:至关low,须要使用JoinableQueue
"""
from multiprocessing import Process,Queue
import time,random,os
def consumer(q,name):
     while True :
         res = q.get()
         if res is None : break
         time.sleep(random.randint( 1 , 3 ))
         print ( '\%s 吃 %s' % (name,res))
 
def producer(q,name,food):
     for i in range ( 3 ):
         time.sleep(random.randint( 1 , 3 ))
         res = '%s%s' % (food,i)
         q.put(res)
         print ( '%s 生产了 %s' % (name,res))
 
if __name__ = = '__main__' :
     q = Queue()
     #生产者们:即厨师们
     p1 = Process(target = producer,args = (q, 'egon1' , '包子' ))
     p2 = Process(target = producer,args = (q, 'egon2' , '骨头' ))
     p3 = Process(target = producer,args = (q, 'egon3' , '泔水' ))
 
     #消费者们:即吃货们
     c1 = Process(target = consumer,args = (q, 'alex1' ))
     c2 = Process(target = consumer,args = (q, 'alex2' ))
 
     #开始
     p1.start()
     p2.start()
     p3.start()
     c1.start()
     c2.start()
 
     p1.join()
     p2.join()
     p3.join()
     q.put( None )
     q.put( None )
     q.put( None )
     print ( '主' )

二、JoinableQueue

JoinableQueue([maxsize]):这就像是一个Queue对象,但队列容许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。编程

  • maxsize是队列中容许最大项数,省略则无大小限制。
  • JoinableQueue的实例p除了与Queue对象相同的方法以外还具备:
  • q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。若是调用此方法的次数大于从队列中删除项目的数量,将引起ValueError异常
  • q.join():生产者调用此方法进行阻塞,直到队列中全部的项目均被处理。阻塞将持续到队列中的每一个项目均调用q.task_done()方法为止
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from multiprocessing import Process,JoinableQueue
import time
 
def producer(q):
     for i in range ( 2 ):
         res = '包子%s' % i
         time.sleep( 0.5 )
         print ( '生产者生产了%s' % res)
 
         q.put(res)
     q.join()
 
def consumer(q):
     while True :
         res = q.get()
         if res is None : break
         time.sleep( 1 )
         print ( '消费者吃了%s' % res)
         q.task_done()  #向q.join()发送一次信号,证实一个数据已经被取走了
 
 
if __name__ = = '__main__' :
     #容器
     q = JoinableQueue()
 
     #生产者们
     p1 = Process(target = producer,args = (q,))
     p2 = Process(target = producer,args = (q,))
     p3 = Process(target = producer,args = (q,))
 
     #消费者们
     c1 = Process(target = consumer,args = (q,))
     c2 = Process(target = consumer,args = (q,))
     c1.daemon = True
     c2.daemon = True
 
     # p1.start()
     # p2.start()
     # p3.start()
     # c1.start()
     # c2.start()
 
     # 开始
     p_l = [p1, p2, p3, c1, c2]
     for p in p_l:
         p.start()
 
     p1.join()
     p2.join()
     p3.join()
     print ( '主' )
 
#主进程等--->p1,p2,p3等---->c1,c2
#p1,p2,p3结束了,证实c1,c2确定全都收完了p1,p2,p3发到队列的数据
#于是c1,c2也没有存在的价值了,应该随着主进程的结束而结束,因此设置成守护进程

7、进程池

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
"""
服务端多进程
"""
from socket import *
from multiprocessing import Process
 
server = socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR, 1 )
server.bind(( '127.0.0.1' , 8080 ))
server.listen( 5 )
 
def talk(conn,client_addr):
     while True :
         try :
             msg = conn.recv( 1024 )
             if not msg: break
             conn.send(msg.upper())
         except Exception:
             break
 
if __name__ = = '__main__' : #windows下start进程必定要写到这下面
     while True :
         conn,client_addr = server.accept()
         p = Process(target = talk,args = (conn,client_addr))
         p.start()
 
"""
服务端进程池
#Pool内的进程数默认是cpu核数,假设为4(查看方法os.cpu_count())
#开启6个客户端,会发现2个客户端处于等待状态
#在每一个进程内查看pid,会发现pid使用为4个,即多个客户端公用4个进程
"""
from socket import *
from multiprocessing import Pool
import os
 
server = socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR, 1 )
server.bind(( '127.0.0.1' , 8080 ))
server.listen( 5 )
 
def talk(conn,client_addr):
     print ( '进程pid: %s' % os.getpid())
     while True :
         try :
             msg = conn.recv( 1024 )
             if not msg: break
             conn.send(msg.upper())
         except Exception:
             break
 
if __name__ = = '__main__' :
     p = Pool()
     while True :
         conn,client_addr = server.accept()
         p.apply_async(talk,args = (conn,client_addr))
         # p.apply(talk,args=(conn,client_addr)) #同步的话,则同一时间只有一个客户端能访问
 
"""
客户端都同样
"""
from socket import *
 
client = socket(AF_INET,SOCK_STREAM)
client.connect(( '127.0.0.1' , 8080 ))
 
 
while True :
     msg = input ( '>>: ' ).strip()
     if not msg: continue
 
     client.send(msg.encode( 'utf-8' ))
     msg = client.recv( 1024 )
     print (msg.decode( 'utf-8' ))

多线程

多线程指的是,在一个进程中开启多个线程,简单的讲:若是多个任务共用一块地址空间,那么必须在一个进程内开启多个线程。json

1、进程与线程的区别

  • 开进程的开销远大于开线程
  • 同一进程内的多个线程共享该进程的地址空间
  • from multiprocessing import Process p1=Process(target=task,) 换成 from threading import Thread t1=Thread(target=task,)
  • 计算密集型的多线程不能加强性能,多进程才能够,I/O密集型的多线程会加快程序的执行速度

2、threading模块介绍

multiprocess模块的彻底模仿了threading模块的接口,两者在使用层面,有很大的类似性windows

一、Thread实例对象的方法

  • isAlive(): 返回线程是否活动的。
  • getName(): 返回线程名。
  • setName(): 设置线程名。

二、threading模块提供的一些方法:

  • threading.currentThread(): 返回当前的线程变量。
  • threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  • threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from threading import Thread,currentThread,active_count, enumerate
import time
 
def task():
     print ( '%s is ruuning' % currentThread().getName())
     time.sleep( 2 )
     print ( '%s is done' % currentThread().getName())
 
if __name__ = = '__main__' :
     # 在主进程下开启线程
     t = Thread(target = task,name = '子线程1' )
     t.start()
     t.setName( '儿子线程1' )
     # t.join()
     print (t.getName())
 
     currentThread().setName( '主线程' )
     print (t.isAlive())
 
     print ( '主线程' ,currentThread().getName())
     print (active_count())
     print ( enumerate ()) #连同主线程在内有两个运行的线程
 
"""
子线程1 is ruuning
儿子线程1
True
主线程 主线程
2
[<_MainThread(主线程, started 8672)>, <Thread(儿子线程1, started 7512)>]
儿子线程1 is done
"""

三、开启子线程的两种方式

注意:在windows中Process()必须放到# if __name__ == '__main__':下安全

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#方式一
from threading import Thread
import time
def sayhi(name):
     time.sleep( 2 )
     print ( '%s say hello' % name)
 
if __name__ = = '__main__' :
     t = Thread(target = sayhi,args = ( 'egon' ,))
     t.start()
     print ( '主线程' )
     
#方式二
from threading import Thread
import time
class Sayhi(Thread):
     def __init__( self ,name):
         super ().__init__()
         self .name = name
     def run( self ):
         time.sleep( 2 )
         print ( '%s say hello' % self .name)
 
 
if __name__ = = '__main__' :
     t = Sayhi( 'egon' )
     t.start()
     print ( '主线程' )

3、守护线程

  • 不管是进程仍是线程,都遵循:守护xxx会等待主xxx运行完毕后被销毁,须要强调的是:运行完毕并不是终止运行
  • 对主进程来讲,运行完毕指的是主进程代码运行完毕---主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),而后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(不然会产生僵尸进程),才会结束,
  • 对主线程来讲,运行完毕指的是主线程所在的进程内全部非守护线程通通运行完毕,主线程才算运行完毕---主线程在其余非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。由于主线程的结束意味着进程的结束,进程总体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
  • 必定要在t.start()前设置,设置t为守护线程
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from threading import Thread
import time
def foo():
     print ( 123 )
     time.sleep( 1 )
     print ( "end123" )
 
def bar():
     print ( 456 )
     time.sleep( 3 )
     print ( "end456" )
 
t1 = Thread(target = foo)
t2 = Thread(target = bar)
 
t1.daemon = True
t1.start()
t2.start()
print ( "main-------" )
"""
123
456
main-------
end123
end456
"""

4、 GIL

  • 在Cpython解释器中,同一个进程下开启的多线程,由于有GIL的存在,同一时刻同一进程中只能有一个线程被执行
  • GIL本质就是一把互斥锁,既然是互斥锁,全部互斥锁的本质都同样,都是将并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全。
  • 保护不一样的数据的安全,就应该加不一样的锁。
  • 在一个python的进程内,不只有test.py的主线程或者由该主线程开启的其余线程,还有解释器开启的垃圾回收等解释器级别的线程,总之,全部线程都运行在这一个进程内
  • 全部数据都是共享的,这其中,代码做为一种数据也是被全部线程共享的(test.py的全部代码以及Cpython解释器的全部代码)
  • 全部线程的任务,都须要将任务的代码当作参数传给解释器的代码去执行,即全部的线程要想运行本身的任务,首先须要解决的是可以访问到解释器的代码。
  • GIL 与Lock是两把锁,保护的数据不同,前者是解释器级别的(固然保护的就是解释器级别的数据,好比垃圾回收的数据),后者是保护用户本身开发的应用程序的数据,很明显GIL不负责这件事,只能用户自定义加锁处理,即Lock
若是多个线程的target=work,那么执行流程是:

多个线程先访问到解释器的代码,即拿到执行权限,而后将target的代码交给解释器的代码去执行

解释器的代码是全部线程共享的,因此垃圾回收线程也可能访问到解释器的代码而去执行,这就致使了一个问题:对于同一个数据100,可能线程1执行x=100的同时,而垃圾回收执行的是回收100的操做,解决这种问题没有什么高明的方法,就是加锁处理,以下图的GIL,保证python解释器同一时间只能执行一个任务的代码

#分析:
咱们有四个任务须要处理,处理方式确定是要玩出并发的效果,解决方案能够是:
方案一:开启四个进程
方案二:一个进程下,开启四个线程

#单核状况下,分析结果: 
  若是四个任务是计算密集型,没有多核来并行计算,方案一徒增了建立进程的开销,方案二胜
  若是四个任务是I/O密集型,方案一建立进程的开销大,且进程的切换速度远不如线程,方案二胜

#多核状况下,分析结果:
  若是四个任务是计算密集型,多核意味着并行计算,在python中一个进程中同一时刻只有一个线程执行用不上多核,方案一胜
  若是四个任务是I/O密集型,再多的核也解决不了I/O问题,方案二胜

 
#结论:如今的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多大性能上的提高,甚至不如串行(没有大量切换),可是,对于IO密集型的任务效率仍是有显著提高的。

由于Python解释器帮你自动按期进行内存回收,你能够理解为python解释器里有一个独立的线程,
每过一段时间它起wake up作一次全局轮询看看哪些内存数据是能够被清空的,此时你本身的程序 里的线程和 py解释器本身的线程是并发运行的,
假设你的线程删除了一个变量,py解释器的垃圾回收线程在清空这个变量的过程当中的clearing时刻,
可能一个其它线程正好又从新给这个还没来及得清空的内存空间赋值了,结果就有可能新赋值的数据被删除了,
为了解决相似的问题,python解释器简单粗暴的加了锁,即当一个线程运行时,其它人都不能动,这样就解决了上述的问题,  
这能够说是Python早期版本的遗留问题。 


应用:

多线程用于IO密集型,如socket,爬虫,web
多进程用于计算密集型,如金融分析

5、互斥锁

  • 线程抢的是GIL锁,GIL锁至关于执行权限,拿到执行权限后才能拿到互斥锁Lock,其余线程也能够抢到GIL,但若是发现Lock仍然没有被释放则阻塞,即使是拿到执行权限GIL也要马上交出来
  • join是等待全部,即总体串行,而锁只是锁住修改共享数据的部分,即部分串行,要想保证数据安全的根本原理在于让并发变成串行,join与互斥锁均可以实现,毫无疑问,互斥锁的部分串行效率要更高
过程分析:全部线程抢的是GIL锁,或者说全部线程抢的是执行权限

  线程1抢到GIL锁,拿到执行权限,开始执行,而后加了一把Lock,尚未执行完毕,即线程1还未释放Lock,有可能线程2抢到GIL锁,开始执行,执行过程当中发现Lock尚未被线程1释放,因而线程2进入阻塞,被夺走执行权限,有可能线程1拿到GIL,而后正常执行到释放Lock。。。这就致使了串行运行的效果

  既然是串行,那咱们执行

  t1.start()

  t1.join

  t2.start()

  t2.join()

  这也是串行执行啊,为什么还要加Lock呢,需知join是等待t1全部的代码执行完,至关于锁住了t1的全部代码,而Lock只是锁住一部分操做共享数据的代码。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from threading import Thread,Lock
import time
 
n = 100
 
def task():
     global n
     mutex.acquire()
     temp = n
     time.sleep( 0.1 )
     n = temp - 1
     mutex.release()
     
if __name__ = = '__main__' :
     mutex = Lock()
     t_l = []
     for i in range ( 100 ):
         t = Thread(target = task)
         t_l.append(t)
         t.start()
 
     for t in t_l:
         t.join()
 
     print ( '主' ,n)   #主 0

6、死锁现象与递归锁

  • 进程也有死锁与递归锁
  • 所谓死锁: 是指两个或两个以上的进程或线程在执行过程当中,因争夺资源而形成的一种互相等待的现象,若无外力做用,它们都将没法推动下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程
  • 解决方法,递归锁,在Python中为了支持在同一线程中屡次请求同一资源,python提供了可重入锁RLock。 这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源能够被屡次require。直到一个线程全部的acquire都被release,其余的线程才能得到资源。上面的例子若是使用RLock代替Lock,则不会发生死锁
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# 死锁
from threading import Thread,Lock
import time
 
mutexA = Lock()
mutexB = Lock()
 
class MyThread(Thread):
     def run( self ):
         self .f1()
         self .f2()
 
     def f1( self ):
         mutexA.acquire()
         print ( '%s 拿到了A锁' % self .name)
 
         mutexB.acquire()
         print ( '%s 拿到了B锁' % self .name)
         mutexB.release()
 
         mutexA.release()
 
 
     def f2( self ):
         mutexB.acquire()
         print ( '%s 拿到了B锁' % self .name)
         time.sleep( 0.1 )
 
         mutexA.acquire()
         print ( '%s 拿到了A锁' % self .name)
         mutexA.release()
 
         mutexB.release()
 
if __name__ = = '__main__' :
     for i in range ( 10 ):
         t = MyThread()
         t.start()
 
 
#互斥锁只能acquire一次
from threading import Thread,Lock
 
mutexA = Lock()
 
mutexA.acquire()
mutexA.release()
 
 
# 递归锁:能够连续acquire屡次,每acquire一次计数器+1,只有计数为0时,才能被抢到acquire
from threading import Thread,RLock
import time
 
mutexB = mutexA = RLock()  #一个线程拿到锁,counter加1,该线程内又碰到加锁的状况,则counter继续加1,这期间全部其余线程都只能等待,等待该线程释放全部锁,即counter递减到0为止
 
class MyThread(Thread):
     def run( self ):
         self .f1()
         self .f2()
 
     def f1( self ):
         mutexA.acquire()
         print ( '%s 拿到了A锁' % self .name)
 
         mutexB.acquire()
         print ( '%s 拿到了B锁' % self .name)
         mutexB.release()
 
         mutexA.release()
 
 
     def f2( self ):
         mutexB.acquire()
         print ( '%s 拿到了B锁' % self .name)
         time.sleep( 7 )
 
         mutexA.acquire()
         print ( '%s 拿到了A锁' % self .name)
         mutexA.release()
 
         mutexB.release()
 
if __name__ = = '__main__' :
     for i in range ( 10 ):
         t = MyThread()
         t.start()

7、信号量Semaphore

信号量与进程池的概念很像,可是要区分开,信号量涉及到加锁的概念
进程池Pool(4),最大只能产生4个进程,并且从头至尾都只是这四个进程,不会产生新的,而信号量是产生一堆线程/进程

同进程的同样
Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其余线程调用release()。

互斥锁同时只容许一个线程更改数据,而Semaphore是同时容许必定数量的线程更改数据
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from threading import Thread,Semaphore,currentThread
import time,random
 
sm = Semaphore( 3 )
 
def task():
     # sm.acquire()
     # print('%s in' %currentThread().getName())
     # sm.release()
     with sm:
         print ( '%s in' % currentThread().getName())
         time.sleep(random.randint( 1 , 3 ))
 
if __name__ = = '__main__' :
     for i in range ( 4 ):
         t = Thread(target = task)
         t.start()

8、Event

线程的一个关键特性是每一个线程都是独立运行且状态不可预测。
若是程序中的其余线程须要经过判断某个线程的状态来肯定本身下一步的操做,这时线程同步问题就会变得很是棘手。
为了解决这些问题,咱们须要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它容许线程等待某些事件的发生。
在初始状况下,Event对象中的信号标志被设置为假。若是有线程等待一个Event对象,而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。
一个线程若是将一个Event对象的信号标志设置为真,它将唤醒全部等待这个Event对象的线程。若是一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行
  • event.isSet():返回event的状态值;
  • event.wait():若是 event.isSet()==False将阻塞线程;
  • event.set(): 设置event的状态值为True,全部阻塞池的线程激活进入就绪状态, 等待操做系统调度;
  • event.clear():恢复event的状态值为False。
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from threading import Thread,Event
import time
 
event = Event()
 
def student(name):
     print ( '学生%s 正在听课' % name)
     event.wait( 2 )
     print ( '学生%s 课间活动' % name)
 
def teacher(name):
     print ( '老师%s 正在授课' % name)
     time.sleep( 7 )
     event. set ()
 
if __name__ = = '__main__' :
     stu1 = Thread(target = student,args = ( 'tom' ,))
     stu2 = Thread(target = student,args = ( 'rose' ,))
     stu3 = Thread(target = student,args = ( 'jack' ,))
     t1 = Thread(target = teacher,args = ( 'tony' ,))
 
     stu1.start()
     stu2.start()
     stu3.start()
     t1.start()
 
 
from threading import Thread,Event,currentThread
import time
 
event = Event()
 
def conn():
     n = 0
     while not event.is_set():
         if n = = 3 :
             print ( '%s try too many times' % currentThread().getName())
             return
         print ( '%s try %s' % (currentThread().getName(),n))
         event.wait( 0.5 )
         n + = 1
 
     print ( '%s is connected' % currentThread().getName())
 
def check():
     print ( '%s is checking' % currentThread().getName())
     time.sleep( 5 )
     event. set ()
 
if __name__ = = '__main__' :
     for i in range ( 3 ):
         t = Thread(target = conn)
         t.start()
     t = Thread(target = check)
     t.start()

9、定时器

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
"""
60s后换验证码
"""
from threading import Timer
import random
 
class Code:
     def __init__( self ):
         self .make_cache()
 
     def make_cache( self ,interval = 60 ):
         self .cache = self .make_code()
         print ( self .cache)
         self .t = Timer(interval, self .make_cache)
         self .t.start()
 
     def make_code( self ,n = 4 ):
         res = ''
         for i in range (n):
             s1 = str (random.randint( 0 , 9 ))
             s2 = chr (random.randint( 65 , 90 ))
             res + = random.choice([s1,s2])
         return res
 
     def check( self ):
         while True :
             code = input ( '请输入你的验证码>>: ' ).strip()
             if code.upper() = = self .cache:
                 print ( '验证码输入正确' )
                 self .t.cancel()
                 break
 
obj = Code()
obj.check()

10、线程queue

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import queue
 
q = queue.Queue( 3 ) #先进先出->队列
 
q.put( 'first' )
q.put( 2 )
q.put( 'third' )
# q.put(4)
# q.put(4,block=False) #q.put_nowait(4)
# q.put(4,block=True,timeout=3)
 
print (q.get())
print (q.get())
print (q.get())
# print(q.get(block=False)) #q.get_nowait()
# print(q.get_nowait())
# print(q.get(block=True,timeout=3))
 
#======================================
q = queue.LifoQueue( 3 ) #后进先出->堆栈
q.put( 'first' )
q.put( 2 )
q.put( 'third' )
 
print (q.get())
print (q.get())
print (q.get())
 
#======================================
q = queue.PriorityQueue( 3 ) #优先级队列
 
q.put(( 10 , 'one' ))
q.put(( 40 , 'two' ))
q.put(( 30 , 'three' ))
 
print (q.get())
print (q.get())
print (q.get())

11、线程池

#1 介绍
concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用
Both implement the same interface, which is defined by the abstract Executor class.

#2 基本方法
#submit(fn, *args, **kwargs)
异步提交任务

#map(func, *iterables, timeout=None, chunksize=1) 
取代for循环submit的操做

#shutdown(wait=True) 
至关于进程池的pool.close()+pool.join()操做
wait=True,等待池内全部任务执行完毕回收完资源后才继续
wait=False,当即返回,并不会等待池内的任务执行完毕
但无论wait参数为什么值,整个程序都会等到全部任务执行完毕
submit和map必须在shutdown以前

#result(timeout=None)
取得结果

#add_done_callback(fn)
回调函数
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
"""
shutdown
把ProcessPoolExecutor换成ThreadPoolExecutor,其他用法所有相同
"""
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import os,time,random
def task(n):
     print ( '%s is runing' % os.getpid())
     time.sleep(random.randint( 1 , 3 ))
     return n * * 2
 
if __name__ = = '__main__' :
     pool = ProcessPoolExecutor(max_workers = 3 )
     futures = []
     for i in range ( 11 ):
         future = pool.submit(task,i)
         futures.append(future)
     pool.shutdown( True )
     print ( '+++>' )
     for future in futures:
         print (future.result())
 
"""
map方法
"""
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
 
import os,time,random
def task(n):
     print ( '%s is runing' % os.getpid())
     time.sleep(random.randint( 1 , 3 ))
     return n * * 2
 
if __name__ = = '__main__' :
     pool = ThreadPoolExecutor(max_workers = 3 )
     pool. map (task, range ( 1 , 12 )) #map取代了for+submit
"""
add_done_callback
"""
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import requests
import os
 
def get_page(url):
     print ( '<进程%s> get %s' % (os.getpid(),url))
     respone = requests.get(url)
     if respone.status_code = = 200 :
         return { 'url' :url, 'text' :respone.text}
 
def parse_page(res):
     res = res.result()
     print ( '<进程%s> parse %s' % (os.getpid(),res[ 'url' ]))
     parse_res = 'url:<%s> size:[%s]\n' % (res[ 'url' ], len (res[ 'text' ]))
     with open ( 'db.txt' , 'a' ) as f:
         f.write(parse_res)
 
 
if __name__ = = '__main__' :
     urls = [
         'https://www.baidu.com' ,
         'https://www.python.org' ,
         'https://www.openstack.org' ,
         'https://help.github.com/' ,
         'http://www.sina.com.cn/'
     ]
 
     p = ProcessPoolExecutor( 3 )
     for url in urls:
         p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,须要用obj.result()拿到结果

  

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#基于多线程实现
from socket import *
from threading import Thread          #多线程
# from multiprocessing import Process  #多进程
 
def communicate(conn):
     while True :
         try :
             data = conn.recv( 1024 )
             if not data: break
             conn.send(data.upper())
         except ConnectionResetError:
             break
 
     conn.close()
 
def server(ip,port):
     server = socket(AF_INET, SOCK_STREAM)
     server.bind((ip,port))
     server.listen( 5 )
 
     while True :
         conn, addr = server.accept()
         t = Thread(target = communicate,args = (conn,))       # 多线程
         # t = Process(target=communicate,args=(conn,))  # 多进程
         t.start()
 
     server.close()
 
if __name__ = = '__main__' :
     server( '127.0.0.1' , 8081 )
 
 
#基于线程池实现
from socket import *
from concurrent.futures import ThreadPoolExecutor   #线程池
# from concurrent.futures import ProcessPoolExecutor #进程池
 
def communicate(conn):
     while True :
         try :
             data = conn.recv( 1024 )
             if not data: break
             conn.send(data.upper())
         except ConnectionResetError:
             break
 
     conn.close()
 
def server(ip,port):
     server = socket(AF_INET, SOCK_STREAM)
     server.bind((ip,port))
     server.listen( 5 )
 
     while True :
         conn, addr = server.accept()
         pool.submit(communicate,conn)                      # #####
 
     server.close()
 
if __name__ = = '__main__' :
     pool = ThreadPoolExecutor( 2 )    #线程池
     #pool=ProcessPoolExecutor()   #进程池
     server( '127.0.0.1' , 8081 )
相关文章
相关标签/搜索