多任务是指操做系统同时能够运行多个任务。python
- 单核CPU实现多任务原理:操做系统轮流让各个任务交替执行;
- 多核CPU实现多任务原理:真正的执行多任务只能在多核CPU上实现,多出来的任务轮流调度到每一个核心上执行。
- 并发:看上去一块儿执行,任务数多于CPU核心数;
- 并行:真正的一块儿执行,任务数小于等于CPU核心数。
实现多任务的方式:
一、多进程模式
二、多线程模式
三、协程模式
四、多进程+多线程模式多线程
对于操做系统而言,一个任务就是一个进程;并发
进程是系统中程序执行和资源分配的基本单元,每一个进程都有本身的数据段、代码段、堆栈段。app
下面是一小段程序,一个单任务的例子。在其中,有两个输出语句分别在在两个不一样的循环当中,单任务的执行方式,也就是最初学习时,当一个循环没有结束的时候,没法执行到下面的程序当中。若是想要让两个循环能够同时在执行,就是在实现多任务,固然不是说同时输出,而是两个循环都在执行着。dom
1 from time import sleep 2 # 只能执行到那一个循环,执行不了run,因此叫单任务
3 def run(): 4 while True: 5 print("&&&&&&&&&&&&&&&") 6 sleep(1.2) 7
8 if __name__ == "__main__": 9 while True: 10 print("**********") 11 sleep(1) 12 run()
接下来启用多任务,经过进程来实现。async
multiprocessing库:跨平台版本的多进程模块,提供了一个Process类来表明一个进程对象(fork仅适用于Linux)。ide
下面的程序是在一个父进程中建立一个子进程,让父进程和子进程能够都在执行,建立方式程序中已经很简洁了。能够本身把这两段程序复制下来运行一下,看看输出的效果。函数
1 from multiprocessing import Process 2 from time import sleep 3 import os 4
5 def run(str): 6 # os.getpid()获取当前进程id号
7 # os.getppid()获取当前进程的父进程id号
8 while True: 9 print("&&&&&&&&&&&&&&&%s--%s--%s" % (str, os.getpid(), os.getppid())) 10 sleep(0.5) 11
12 if __name__ == "__main__": 13 print("主(父)进程启动 %s" % (os.getpid())) 14 # 建立子进程
15 # target说明进程执行的任务
16 p = Process(target=run, args=("nice",)) 17 # 启动进程
18 p.start() 19
20 while True: 21 print("**********") 22 sleep(1)
我想第一个单任务的程序就没必要说了吧,就是一个死循环,一直没有执行到下面的run函数。第二段程序是经过多进程实现的多任务,两个循环都能执行到,我把结果截图放下面,最好本身去试一下。学习
上面的多进程的例子中输出了那么多,咱们使用的时候到底是先执行哪一个后执行哪一个呢?根据咱们的通常思惟来讲,咱们写的主函数其实就是父进程,在主函数中间,要调用的也就是子进程。大数据
1 from multiprocessing import Process 2 from time import sleep 3 import os 4
5 def run(): 6 print("启动子进程") 7 print("子进程结束") 8 sleep(3) 9
10 if __name__ == "__main__": 11 print("父进程启动") 12 p = Process(target=run) 13 p.start() 14
15 # 父进程的结束不能影响子进程,让进程等待子进程结束再执行父进程
16 p.join() 17
18 print("父进程结束")
在多进程的程序当中定义的全局变量在多个进程中是不能共享的,篇幅较长在这里就不举例子了,能够本身试一下。这个也是和稍后要说的线程的一个区别,在线程中,变量是能够共享的,也所以衍生出一些问题,稍后再说。
在正常工做使用的时候,固然不止有有个一个两个进程,毕竟这一两个也起不到想要的效果。那么就须要采用更多的进程,这时候须要经过进程池来实现,就是在进程池中放好你要创建的进程,而后执行的时候,把他们都启动起来,就能够同时进行了,在必定的环境下能够大大的提升效率。固然这个也和起初提到的有关,若是你的CPU是单核的,那么多进程也只是起到了让几个任务同时在执行着,并无提升效率,并且启动进程的时候还要花费一些时间,所以在多核CPU当中更能发挥优点。
在multiprocessing中有个Pool方法,能够实现进程池。在利用进程池时能够设置要启动几个进程,通常状况下,它默认和你电脑的CPU核数一致,也能够本身设置,若是设置的进程数多于CPU核数,那多出来的进程会轮流调度到每一个核心上执行。下面是启动多个进程的过程。
1 from multiprocessing import Pool 2 import os 3 import time 4 import random 5
6
7 def run(name): 8 print("子进程%s启动--%s" % (name, os.getpid())) 9 start = time.time() 10 time.sleep(random.choice([1,2,3,4,5])) 11 end = time.time() 12 print("子进程%s结束--%s--耗时%.2f" % (name, os.getpid(), end-start)) 13
14 if __name__ == "__main__": 15 print("启动父进程") 16
17 # 建立多个进程
18 # Pool 进程池 :括号里的数表示能够同时执行的进程数量
19 # Pool()默认大小是CPU核心数
20 pp = Pool(4) 21 for i in range(5): 22 # 建立进程,放入进程池,统一管理
23 pp.apply_async(run, args=(i,)) 24
25 # 在调用join以前必须先调用close,调用close以后就不能再继续添加新的进程了
26 pp.close() 27 # 进程池对象调用join还等待进程池中全部的子进程结束
28 pp.join() 29
30 print("结束父进程")
(1)单进程实现
1 from multiprocessing import Pool 2 import time 3 import os 4 5 # 实现文件的拷贝 6 def copyFile(rPath, wPath): 7 fr = open(rPath, 'rb') 8 fw = open(wPath, 'wb') 9 context = fr.read() 10 fw.write(context) 11 fr.close() 12 fw.close() 13 14 path = r'F:\python_note\线程、协程' 15 toPath = r'F:\python_note\test' 16 17 # 读取path下的全部文件 18 filesList = os.listdir(path) 19 20 # 启动for循环处理每个文件 21 start = time.time() 22 for fileName in filesList: 23 copyFile(os.path.join(path,fileName), os.path.join(toPath,fileName)) 24 25 end = time.time() 26 print('总耗时:%.2f' % (end-start))
(2)多进程实现
1 from multiprocessing import Pool 2 import time 3 import os 4 5 # 实现文件的拷贝 6 def copyFile(rPath, wPath): 7 fr = open(rPath, 'rb') 8 fw = open(wPath, 'wb') 9 context = fr.read() 10 fw.write(context) 11 fr.close() 12 fw.close() 13 14 path = r'F:\python_note\线程、协程' 15 toPath = r'F:\python_note\test' 16 17 18 if __name__ == "__main__": 19 # 读取path下的全部文件 20 filesList = os.listdir(path) 21 22 start = time.time() 23 pp = Pool(4) 24 for fileName in filesList: 25 pp.apply_async(copyFile, args=(os.path.join( 26 path, fileName), os.path.join(toPath, fileName))) 27 pp.close() 28 pp.join() 29 end = time.time() 30 print("总耗时:%.2f" % (end - start))
上面两个程序是两种方法实现同一个目标的程序,能够将其中的文件路径更换为你本身的路径,能够看到最后计算出的耗时是多少。也许有人发现并非多进程的效率就高,说的的确没错,由于建立进程也要花费时间,没准启动进程的时间远多让这一个核心运行全部核心用的时间要多。这个例子也只是演示一下如何使用,在大数据的任务下会有更深入的体验。
咱们知道Python是一个面向对象的语言。并且Python中万物皆对象,进程也能够封装成对象,来方便之后本身使用,只要把他封装的足够丰富,提供清晰的接口,之后使用时会快捷不少,这个就根据本身的需求本身能够试一下,不写了。
上面提到过进程间的变量是不能共享的,那么若是有须要该怎么办?经过队列的方式进行传递。在父进程中建立队列,而后把队列传到每一个子进程当中,他们就能够共同对其进行操做。
1 from multiprocessing import Process, Queue 2 import os 3 import time 4
5
6 def write(q): 7 print("启动写子进程%s" % (os.getpid())) 8 for chr in ['A', 'B', 'C', 'D']: 9 q.put(chr) 10 time.sleep(1) 11 print("结束写子进程%s" % (os.getpid())) 12
13 def read(q): 14 print("启动读子进程%s" % (os.getpid())) 15 while True: 16 value = q.get() 17 print("value = "+value) 18 print("结束读子进程%s" % (os.getpid())) 19
20 if __name__ == "__main__": 21 # 父进程建立队列,并传递给子进程
22 q = Queue() 23 pw = Process(target=write, args=(q,)) 24 pr = Process(target=read, args=(q,)) 25
26 pw.start() 27 pr.start() 28 # 写进程结束
29 pw.join() 30 # pr进程里是个死循环,没法等待期结束,只能强行结束
31 pr.terminate() 32 print("父进程结束")
- 在一个进程内部,要同时干多件事,就须要运行多个"子任务",咱们把进程内的多个"子任务"叫作线程
- 线程一般叫作轻型的进程,线程是共享内存空间,并发执行的多任务,每个线程都共享一个进程的资源
- 线程是最小的执行单元而进程由至少一个线程组成。如何调度进程和线程,彻底由操做系统来决定,程序本身不能决定何时执行,执行多长时间
模块:
一、_thread模块 低级模块(更接近底层)
二、threading模块 高级模块,对_thread进行了封装
一样,先给一个多线程的例子,其中,仍然使用run函数做为其中的一个子线程,主函数为父线程。经过threading的Thread方法建立线程并开启,join来等待子线程。
1 import threading 2 import time 3
4
5 def run(): 6 print("子线程(%s)启动" % (threading.current_thread().name)) 7
8 # 实现线程的功能
9 time.sleep(1) 10 print("打印") 11 time.sleep(2) 12
13 print("子线程(%s)结束" % (threading.current_thread().name)) 14
15
16 if __name__ == "__main__": 17 # 任何进程都默认会启动一个线程,称为主线程,主线程能够启动新的子线程
18 # current_thread():返回线程的实例
19 print("主线程(%s)启动" % (threading.current_thread().name)) 20
21 # 建立子线程
22 t = threading.Thread(target=run, name="runThread") 23 t.start() 24
25 # 等待线程结束
26 t.join() 27
28 print("主线程(%s)结束" % (threading.current_thread().name))
多线程和多进程最大的不一样在于,多进程中,同一个变量,各自有一份拷贝存在每一个进程中,互不影响。
而多线程全部变量都由全部线程共享。因此任何一个变量均可以被任何一个线程修改,所以,线程之间共享数据最大的危险在于多个线程同时修改一个变量,容易把内容改乱了。
1 import threading 2
3
4 num = 10
5
6 def run(n): 7 global num 8 for i in range(10000000): 9 num = num + n 10 num = num - n 11
12 if __name__ == "__main__": 13 t1 = threading.Thread(target=run, args=(6,)) 14 t2 = threading.Thread(target=run, args=(9,)) 15
16 t1.start() 17 t2.start() 18 t1.join() 19 t2.join() 20
21 print("num = ",num)
在第三小点中已经提到了,多线程的一个缺点就是数据是共享的,若是有两个线程正同时在修改这个数据,就会出现混乱,它本身也不知道该听谁的了,尤为是在运算比较复杂,次数较多的时候,这种错误的机会会更大。
固然,解决办法也是有的,那就是利用线程锁。加锁的意思就是在其中一个线程正在对数据进行操做时,让其余线程不得介入。这个加锁和释放锁是由人来肯定的。
- 确保了这段代码只能由一个线程从头至尾的完整执行
- 阻止了多线程的并发执行,要比不加锁时候效率低。包含锁的代码段只能以单线程模式执行
- 因为能够存在多个锁,不一样线程持有不一样的锁,并试图获取其余的锁,可能形成死锁致使多个线程挂起,只能靠操做系统强制终止
1 def run(n): 2 global num 3 for i in range(10000000): 4 lock.acquire() 5 try: 6 num = num + n 7 num = num - n 8 finally: 9 # 修改完释放锁
10 lock.release() 11
12 if __name__ == "__main__": 13 t1 = threading.Thread(target=run, args=(6,)) 14 t2 = threading.Thread(target=run, args=(9,)) 15
16 t1.start() 17 t2.start() 18 t1.join() 19 t2.join() 20
21 print("num = ",num)
上面这段程序是循环屡次num+n-n+n-n的过程,变量n分别设为6和9是在两个不一样的线程当中,程序中已经加了锁,你能够先去掉试一下,当循环次数较小的时候也许还能正确,但次数一旦取的较高就会出现混乱。
加锁是在循环体当中,依次执行加减法,定义中说到确保一个线程从头至尾的完整执行,也就是在计算途中,不会有其余的线程打扰。你能够想一下,若是一个线程执行完加法,正在执行减法,另外一个线程进来了,它要先进行加法时的初始sum值该是多少呢,线程二不必定在线程一的何时进来,万一刚进来时候,线程一刚好给sum赋值了,而线程二仍然用的是正准备进来时候的sum值,那从这里开始岂不已经分道扬镳了。因此,运算的次数越多,结果会越离谱。
这个说完了,还有一个小小的改进。你是否记得读写文件时候书写的一种简便形式,经过with来实现,能够避免咱们忘记关闭文件,自动帮咱们关闭。固然还有一些其余地方也用到了这个方法。这里也一样适用。
1 # 与上面代码功能相同,with lock能够自动上锁与解锁
2 with lock: 3 num = num + n 4 num = num - n
- 建立一个全局的ThreadLocal对象
- 每一个线程有独立的存储空间
- 每一个线程对ThreadLocal对象均可以读写,可是互不影响
根据名字也能够看出,也就是在本地建个链接,全部的操做在本地进行,每一个线程之间没有数据的影响。
1 import threading 2
3
4 num = 0 5 local = threading.local() 6
7 def run(x, n): 8 x = x + n 9 x = x - n 10
11 def func(n): 12 # 每一个线程都有local.x
13 local.x = num 14 for i in range(10000000): 15 run(local.x, n) 16 print("%s-%d" % (threading.current_thread().name, local.x)) 17
18
19 if __name__ == "__main__": 20 t1 = threading.Thread(target=func, args=(6,)) 21 t2 = threading.Thread(target=func, args=(9,)) 22
23 t1.start() 24 t2.start() 25 t1.join() 26 t2.join() 27
28 print("num = ",num)
1 '''
2 控制线程数量是指控制线程同时触发的数量,能够拿下来这段代码运行一下,下面启动了5个线程,可是他们会两个两个的进行 3 '''
4 import threading 5 import time 6
7 # 控制并发执行线程的数量
8 sem = threading.Semaphore(2) 9
10 def run(): 11 with sem: 12 for i in range(10): 13 print("%s---%d" % (threading.current_thread().name, i)) 14 time.sleep(1) 15
16
17 if __name__ == "__main__": 18 for i in range(5): 19 threading.Thread(target=run).start()
上面的程序是有多个线程,可是每次限制同时执行的线程,通俗点说就是限制并发线程的上限;除此以外,也能够限制线程数量的下限,也就是至少达到多少个线程才能触发。
1 import threading 2 import time 3
4
5 # 凑够必定数量的线程才会执行,不然一直等着
6 bar = threading.Barrier(4) 7
8 def run(): 9 print("%s--start" % (threading.current_thread().name)) 10 time.sleep(1) 11 bar.wait() 12 print("%s--end" % (threading.current_thread().name)) 13
14
15 if __name__ == "__main__": 16 for i in range(5): 17 threading.Thread(target=run).start()
1 import threading 2
3
4 def run(): 5 print("***********************") 6
7 # 延时执行线程
8 t = threading.Timer(5, run) 9 t.start() 10
11 t.join() 12 print("父线程结束")
1 import threading 2 import time 3
4
5 def func(): 6 # 事件对象
7 event = threading.Event() 8 def run(): 9 for i in range(5): 10 # 阻塞,等待事件的触发
11 event.wait() 12 # 重置阻塞,使后面继续阻塞
13 event.clear() 14 print("**************") 15 t = threading.Thread(target=run).start() 16 return event 17
18 e = func() 19
20 # 触发事件
21 for i in range(5): 22 time.sleep(2) 23 e.set()
这个例子是用了生产者和消费者来模拟,要进行数据通讯,还引入了队列。先来理解一下。
1 import threading 2 import queue 3 import time 4 import random 5
6
7 # 生产者
8 def product(id, q): 9 while True: 10 num = random.randint(0, 10000) 11 q.put(num) 12 print("生产者%d生产了%d数据放入了队列" % (id, num)) 13 time.sleep(3) 14 # 任务完成
15 q.task_done() 16
17 # 消费者
18 def customer(id, q): 19 while True: 20 item = q.get() 21 if item is None: 22 break
23 print("消费者%d消费了%d数据" % (id, item)) 24 time.sleep(2) 25 # 任务完成
26 q.task_done() 27
28
29 if __name__ == "__main__": 30 # 消息队列
31 q = queue.Queue() 32
33 # 启动生产者
34 for i in range(4): 35 threading.Thread(target=product, args=(i, q)).start() 36
37 # 启动消费者
38 for i in range(3): 39 threading.Thread(target=customer, args=(i, q)).start()
1 import threading 2 import time 3
4
5 # 线程条件变量
6 cond = threading.Condition() 7
8
9 def run(): 10 with cond: 11 for i in range(0, 10, 2): 12 print(threading.current_thread().name, i) 13 time.sleep(1) 14 cond.wait() # 阻塞
15 cond.notify() # 告诉另外一个线程能够执行
16
17
18 def run2(): 19 with cond: 20 for i in range(1, 10, 2): 21 print(threading.current_thread().name, i) 22 time.sleep(1) 23 cond.notify() 24 cond.wait() 25
26
27 threading.Thread(target=run).start() 28 threading.Thread(target=run2).start()
- 子程序/子函数:在全部语言中都是层级调用,好比A调用B,在B执行的工程中又能够调用C,C执行完毕返回,B执行完毕返回最后是A执行完毕。是经过栈实现的,一个线程就是一个子程序,子程序调用老是一个入口,一次返回,调用的顺序是明确的
- 协程:看上去也是子程序,但执行过程当中,在子程序的内部可中断,而后转而执行别的子程序,不是函数调用,有点相似CPU中断
1 # 这是一个子程序的调用
2 def C(): 3 print("C--start") 4 print("C--end") 5
6 def B(): 7 print("B--start") 8 C() 9 print("B--end") 10
11 def A(): 12 print("A--start") 13 B() 14 print("A--end") 15
16 A()
- 协程与子程序调用的结果相似,但不是经过在函数中调用另外一个函数
- 协程执行起来有点像线程,但协程的特色在因而一个线程
- 与线程相比的优势:协程的执行效率极高,由于只有一个线程,也不存在同时写变量的冲突,在协程中共享资源不加锁,只须要判断状态
1 # python对协程的支持是经过generator实现的
2 def run(): 3 print(1) 4 yield 10
5 print(2) 6 yield 20
7 print(3) 8 yield 30
9
10 # 协程的最简单风格,控制函数的阶段执行,节约线程或者进程的切换
11 # 返回值是一个生成器
12 m = run() 13 print(next(m)) 14 print(next(m)) 15 print(next(m))
1 # python对协程的支持是经过generator实现的
2 def run(): 3 print(1) 4 yield 10
5 print(2) 6 yield 20
7 print(3) 8 yield 30
9
10 # 协程的最简单风格,控制函数的阶段执行,节约线程或者进程的切换
11 # 返回值是一个生成器
12 m = run() 13 print(next(m)) 14 print(next(m)) 15 print(next(m))
1 def product(c): 2 c.send(None) 3 for i in range(5): 4 print("生产者产生数据%d" % (i)) 5 r = c.send(str(i)) 6 print("消费者消费了数据%s" % (r)) 7 c.close() 8
9
10 def customer(): 11 data = ""
12 while True: 13 n = yield data 14 if not n: 15 return
16 print("消费者消费了%s" % (n)) 17 data = "200"
18
19
20 c = customer() 21 product(c)