进程、线程和协程

1、进程

一、多任务原理

  多任务是指操做系统同时能够运行多个任务。python

  • 单核CPU实现多任务原理:操做系统轮流让各个任务交替执行;
  • 多核CPU实现多任务原理:真正的执行多任务只能在多核CPU上实现,多出来的任务轮流调度到每一个核心上执行。
  • 并发:看上去一块儿执行,任务数多于CPU核心数;
  • 并行:真正的一块儿执行,任务数小于等于CPU核心数。

  实现多任务的方式:
    一、多进程模式
    二、多线程模式
    三、协程模式
    四、多进程+多线程模式多线程

二、进程

  对于操做系统而言,一个任务就是一个进程;并发

  进程是系统中程序执行和资源分配的基本单元,每一个进程都有本身的数据段、代码段、堆栈段。app


 

  下面是一小段程序,一个单任务的例子。在其中,有两个输出语句分别在在两个不一样的循环当中,单任务的执行方式,也就是最初学习时,当一个循环没有结束的时候,没法执行到下面的程序当中。若是想要让两个循环能够同时在执行,就是在实现多任务,固然不是说同时输出,而是两个循环都在执行着。dom

 1 from time import sleep  2 # 只能执行到那一个循环,执行不了run,因此叫单任务
 3 def run():  4     while True:  5         print("&&&&&&&&&&&&&&&")  6         sleep(1.2)  7 
 8 if __name__ == "__main__":  9     while True: 10         print("**********") 11         sleep(1) 12     run()

  接下来启用多任务,经过进程来实现。async

  multiprocessing库:跨平台版本的多进程模块,提供了一个Process类来表明一个进程对象(fork仅适用于Linux)。ide

  下面的程序是在一个父进程中建立一个子进程,让父进程和子进程能够都在执行,建立方式程序中已经很简洁了。能够本身把这两段程序复制下来运行一下,看看输出的效果。函数

 1 from multiprocessing import Process  2 from time import sleep  3 import os  4 
 5 def run(str):  6     # os.getpid()获取当前进程id号
 7     # os.getppid()获取当前进程的父进程id号
 8     while True:  9         print("&&&&&&&&&&&&&&&%s--%s--%s" % (str, os.getpid(), os.getppid())) 10         sleep(0.5) 11 
12 if __name__ == "__main__": 13     print("主(父)进程启动 %s" % (os.getpid())) 14     # 建立子进程
15     # target说明进程执行的任务
16     p = Process(target=run, args=("nice",)) 17     # 启动进程
18  p.start() 19 
20     while True: 21         print("**********") 22         sleep(1)

  我想第一个单任务的程序就没必要说了吧,就是一个死循环,一直没有执行到下面的run函数。第二段程序是经过多进程实现的多任务,两个循环都能执行到,我把结果截图放下面,最好本身去试一下。学习

三、父子进程的前后顺序

  上面的多进程的例子中输出了那么多,咱们使用的时候到底是先执行哪一个后执行哪一个呢?根据咱们的通常思惟来讲,咱们写的主函数其实就是父进程,在主函数中间,要调用的也就是子进程。大数据

 

 1 from multiprocessing import Process  2 from time import sleep  3 import os  4 
 5 def run():  6     print("启动子进程")  7     print("子进程结束")  8     sleep(3)  9 
10 if __name__ == "__main__": 11     print("父进程启动") 12     p = Process(target=run) 13  p.start() 14 
15     # 父进程的结束不能影响子进程,让进程等待子进程结束再执行父进程
16  p.join() 17 
18     print("父进程结束")

四、全局变量在多个进程中不能共享 

  在多进程的程序当中定义的全局变量在多个进程中是不能共享的,篇幅较长在这里就不举例子了,能够本身试一下。这个也是和稍后要说的线程的一个区别,在线程中,变量是能够共享的,也所以衍生出一些问题,稍后再说。

五、启动多个进程 

  在正常工做使用的时候,固然不止有有个一个两个进程,毕竟这一两个也起不到想要的效果。那么就须要采用更多的进程,这时候须要经过进程池来实现,就是在进程池中放好你要创建的进程,而后执行的时候,把他们都启动起来,就能够同时进行了,在必定的环境下能够大大的提升效率。固然这个也和起初提到的有关,若是你的CPU是单核的,那么多进程也只是起到了让几个任务同时在执行着,并无提升效率,并且启动进程的时候还要花费一些时间,所以在多核CPU当中更能发挥优点。

  在multiprocessing中有个Pool方法,能够实现进程池。在利用进程池时能够设置要启动几个进程,通常状况下,它默认和你电脑的CPU核数一致,也能够本身设置,若是设置的进程数多于CPU核数,那多出来的进程会轮流调度到每一个核心上执行。下面是启动多个进程的过程。

 1 from multiprocessing import Pool  2 import os  3 import time  4 import random  5 
 6 
 7 def run(name):  8     print("子进程%s启动--%s" % (name, os.getpid()))  9     start = time.time() 10     time.sleep(random.choice([1,2,3,4,5])) 11     end = time.time() 12     print("子进程%s结束--%s--耗时%.2f" % (name, os.getpid(), end-start)) 13 
14 if __name__ == "__main__": 15     print("启动父进程") 16 
17     # 建立多个进程
18     # Pool 进程池 :括号里的数表示能够同时执行的进程数量
19     # Pool()默认大小是CPU核心数
20     pp = Pool(4) 21     for i in range(5): 22         # 建立进程,放入进程池,统一管理
23         pp.apply_async(run, args=(i,)) 24 
25     # 在调用join以前必须先调用close,调用close以后就不能再继续添加新的进程了
26  pp.close() 27     # 进程池对象调用join还等待进程池中全部的子进程结束
28  pp.join() 29 
30     print("结束父进程")

六、文件拷贝(单进程与多进程对比)

(1)单进程实现

 1 from multiprocessing import Pool
 2 import time
 3 import os
 4 
 5 # 实现文件的拷贝
 6 def copyFile(rPath, wPath):
 7     fr = open(rPath, 'rb')
 8     fw = open(wPath, 'wb')
 9     context = fr.read()
10     fw.write(context)
11     fr.close()
12     fw.close()
13 
14 path = r'F:\python_note\线程、协程'
15 toPath = r'F:\python_note\test'
16 
17 # 读取path下的全部文件
18 filesList = os.listdir(path)
19 
20 # 启动for循环处理每个文件
21 start = time.time()
22 for fileName in filesList:
23     copyFile(os.path.join(path,fileName), os.path.join(toPath,fileName))
24 
25 end = time.time()
26 print('总耗时:%.2f' % (end-start))
View Code

(2)多进程实现

 1 from multiprocessing import Pool
 2 import time
 3 import os
 4 
 5 # 实现文件的拷贝
 6 def copyFile(rPath, wPath):
 7     fr = open(rPath, 'rb')
 8     fw = open(wPath, 'wb')
 9     context = fr.read()
10     fw.write(context)
11     fr.close()
12     fw.close()
13 
14 path = r'F:\python_note\线程、协程'
15 toPath = r'F:\python_note\test'
16 
17 
18 if __name__ == "__main__":
19     # 读取path下的全部文件
20     filesList = os.listdir(path)
21 
22     start = time.time()
23     pp = Pool(4)
24     for fileName in filesList:
25         pp.apply_async(copyFile, args=(os.path.join(
26             path, fileName), os.path.join(toPath, fileName)))
27     pp.close()
28     pp.join()
29     end = time.time()
30     print("总耗时:%.2f" % (end - start))
View Code

  上面两个程序是两种方法实现同一个目标的程序,能够将其中的文件路径更换为你本身的路径,能够看到最后计算出的耗时是多少。也许有人发现并非多进程的效率就高,说的的确没错,由于建立进程也要花费时间,没准启动进程的时间远多让这一个核心运行全部核心用的时间要多。这个例子也只是演示一下如何使用,在大数据的任务下会有更深入的体验。

 七、进程对象

  咱们知道Python是一个面向对象的语言。并且Python中万物皆对象,进程也能够封装成对象,来方便之后本身使用,只要把他封装的足够丰富,提供清晰的接口,之后使用时会快捷不少,这个就根据本身的需求本身能够试一下,不写了。

 八、进程间通讯

  上面提到过进程间的变量是不能共享的,那么若是有须要该怎么办?经过队列的方式进行传递。在父进程中建立队列,而后把队列传到每一个子进程当中,他们就能够共同对其进行操做。 

 1 from multiprocessing import Process, Queue  2 import os  3 import time  4 
 5 
 6 def write(q):  7     print("启动写子进程%s" % (os.getpid()))  8     for chr in ['A', 'B', 'C', 'D']:  9  q.put(chr) 10         time.sleep(1) 11     print("结束写子进程%s" % (os.getpid())) 12 
13 def read(q): 14     print("启动读子进程%s" % (os.getpid())) 15     while True: 16         value = q.get() 17         print("value = "+value) 18     print("结束读子进程%s" % (os.getpid())) 19 
20 if __name__ == "__main__": 21     # 父进程建立队列,并传递给子进程
22     q = Queue() 23     pw = Process(target=write, args=(q,)) 24     pr = Process(target=read, args=(q,)) 25 
26  pw.start() 27  pr.start() 28     # 写进程结束
29  pw.join() 30     # pr进程里是个死循环,没法等待期结束,只能强行结束
31  pr.terminate() 32     print("父进程结束")

 2、线程

一、线程

  • 在一个进程内部,要同时干多件事,就须要运行多个"子任务",咱们把进程内的多个"子任务"叫作线程
  • 线程一般叫作轻型的进程,线程是共享内存空间,并发执行的多任务,每个线程都共享一个进程的资源
  • 线程是最小的执行单元而进程由至少一个线程组成。如何调度进程和线程,彻底由操做系统来决定,程序本身不能决定何时执行,执行多长时间

模块:

一、_thread模块 低级模块(更接近底层)

二、threading模块 高级模块,对_thread进行了封装

二、启动一个线程

  一样,先给一个多线程的例子,其中,仍然使用run函数做为其中的一个子线程,主函数为父线程。经过threading的Thread方法建立线程并开启,join来等待子线程。

 1 import threading  2 import time  3 
 4 
 5 def run():  6     print("子线程(%s)启动" % (threading.current_thread().name))  7 
 8     # 实现线程的功能
 9     time.sleep(1) 10     print("打印") 11     time.sleep(2) 12 
13     print("子线程(%s)结束" % (threading.current_thread().name)) 14 
15 
16 if __name__ == "__main__": 17     # 任何进程都默认会启动一个线程,称为主线程,主线程能够启动新的子线程
18     # current_thread():返回线程的实例
19     print("主线程(%s)启动" % (threading.current_thread().name)) 20 
21     # 建立子线程
22     t = threading.Thread(target=run, name="runThread") 23  t.start() 24 
25     # 等待线程结束
26  t.join() 27 
28     print("主线程(%s)结束" % (threading.current_thread().name))

三、线程间数据共享

  多线程和多进程最大的不一样在于,多进程中,同一个变量,各自有一份拷贝存在每一个进程中,互不影响。

  而多线程全部变量都由全部线程共享。因此任何一个变量均可以被任何一个线程修改,所以,线程之间共享数据最大的危险在于多个线程同时修改一个变量,容易把内容改乱了。

 1 import threading  2 
 3 
 4 num = 10
 5 
 6 def run(n):  7     global num  8     for i in range(10000000):  9         num = num + n 10         num = num - n 11 
12 if __name__ == "__main__": 13     t1 = threading.Thread(target=run, args=(6,)) 14     t2 = threading.Thread(target=run, args=(9,)) 15 
16  t1.start() 17  t2.start() 18  t1.join() 19  t2.join() 20 
21     print("num = ",num)

四、线程锁

  在第三小点中已经提到了,多线程的一个缺点就是数据是共享的,若是有两个线程正同时在修改这个数据,就会出现混乱,它本身也不知道该听谁的了,尤为是在运算比较复杂,次数较多的时候,这种错误的机会会更大。

  固然,解决办法也是有的,那就是利用线程锁。加锁的意思就是在其中一个线程正在对数据进行操做时,让其余线程不得介入。这个加锁和释放锁是由人来肯定的。

  • 确保了这段代码只能由一个线程从头至尾的完整执行
  • 阻止了多线程的并发执行,要比不加锁时候效率低。包含锁的代码段只能以单线程模式执行
  • 因为能够存在多个锁,不一样线程持有不一样的锁,并试图获取其余的锁,可能形成死锁致使多个线程挂起,只能靠操做系统强制终止
 1 def run(n):  2     global num  3     for i in range(10000000):  4  lock.acquire()  5         try:  6             num = num + n  7             num = num - n  8         finally:  9             # 修改完释放锁
10  lock.release() 11 
12 if __name__ == "__main__": 13     t1 = threading.Thread(target=run, args=(6,)) 14     t2 = threading.Thread(target=run, args=(9,)) 15 
16  t1.start() 17  t2.start() 18  t1.join() 19  t2.join() 20 
21     print("num = ",num)

  上面这段程序是循环屡次num+n-n+n-n的过程,变量n分别设为6和9是在两个不一样的线程当中,程序中已经加了锁,你能够先去掉试一下,当循环次数较小的时候也许还能正确,但次数一旦取的较高就会出现混乱。

  加锁是在循环体当中,依次执行加减法,定义中说到确保一个线程从头至尾的完整执行,也就是在计算途中,不会有其余的线程打扰。你能够想一下,若是一个线程执行完加法,正在执行减法,另外一个线程进来了,它要先进行加法时的初始sum值该是多少呢,线程二不必定在线程一的何时进来,万一刚进来时候,线程一刚好给sum赋值了,而线程二仍然用的是正准备进来时候的sum值,那从这里开始岂不已经分道扬镳了。因此,运算的次数越多,结果会越离谱。

  这个说完了,还有一个小小的改进。你是否记得读写文件时候书写的一种简便形式,经过with来实现,能够避免咱们忘记关闭文件,自动帮咱们关闭。固然还有一些其余地方也用到了这个方法。这里也一样适用。

1 # 与上面代码功能相同,with lock能够自动上锁与解锁
2 with lock: 3     num = num + n 4     num = num - n

五、ThreadLocal

  • 建立一个全局的ThreadLocal对象
  • 每一个线程有独立的存储空间
  • 每一个线程对ThreadLocal对象均可以读写,可是互不影响

  根据名字也能够看出,也就是在本地建个链接,全部的操做在本地进行,每一个线程之间没有数据的影响。

 1 import threading  2 
 3 
 4 num = 0  5 local = threading.local()  6 
 7 def run(x, n):  8     x = x + n  9     x = x - n 10 
11 def func(n): 12     # 每一个线程都有local.x
13     local.x = num 14     for i in range(10000000): 15  run(local.x, n) 16     print("%s-%d" % (threading.current_thread().name, local.x)) 17 
18 
19 if __name__ == "__main__": 20     t1 = threading.Thread(target=func, args=(6,)) 21     t2 = threading.Thread(target=func, args=(9,)) 22 
23  t1.start() 24  t2.start() 25  t1.join() 26  t2.join() 27 
28     print("num = ",num)

六、控制线程数量

 1 '''
 2 控制线程数量是指控制线程同时触发的数量,能够拿下来这段代码运行一下,下面启动了5个线程,可是他们会两个两个的进行  3 '''
 4 import threading  5 import time  6 
 7 # 控制并发执行线程的数量
 8 sem = threading.Semaphore(2)  9 
10 def run(): 11  with sem: 12         for i in range(10): 13             print("%s---%d" % (threading.current_thread().name, i)) 14             time.sleep(1) 15 
16 
17 if __name__ == "__main__": 18     for i in range(5): 19         threading.Thread(target=run).start()

  上面的程序是有多个线程,可是每次限制同时执行的线程,通俗点说就是限制并发线程的上限;除此以外,也能够限制线程数量的下限,也就是至少达到多少个线程才能触发。

 1 import threading  2 import time  3 
 4 
 5 # 凑够必定数量的线程才会执行,不然一直等着
 6 bar = threading.Barrier(4)  7 
 8 def run():  9     print("%s--start" % (threading.current_thread().name)) 10     time.sleep(1) 11  bar.wait() 12     print("%s--end" % (threading.current_thread().name)) 13 
14 
15 if __name__ == "__main__": 16     for i in range(5): 17         threading.Thread(target=run).start()

七、定时线程

 1 import threading  2 
 3 
 4 def run():  5     print("***********************")  6 
 7 # 延时执行线程
 8 t = threading.Timer(5, run)  9 t.start() 10 
11 t.join() 12 print("父线程结束")

八、线程通讯

 1 import threading  2 import time  3 
 4 
 5 def func():  6     # 事件对象
 7     event = threading.Event()  8     def run():  9         for i in range(5): 10             # 阻塞,等待事件的触发
11  event.wait() 12             # 重置阻塞,使后面继续阻塞
13  event.clear() 14             print("**************") 15     t = threading.Thread(target=run).start() 16     return event 17 
18 e = func() 19 
20 # 触发事件
21 for i in range(5): 22     time.sleep(2) 23     e.set()

九、一个小栗子

  这个例子是用了生产者和消费者来模拟,要进行数据通讯,还引入了队列。先来理解一下。

 1 import threading  2 import queue  3 import time  4 import random  5 
 6 
 7 # 生产者
 8 def product(id, q):  9     while True: 10         num = random.randint(0, 10000) 11  q.put(num) 12         print("生产者%d生产了%d数据放入了队列" % (id, num)) 13         time.sleep(3) 14     # 任务完成
15  q.task_done() 16 
17 # 消费者
18 def customer(id, q): 19     while True: 20         item = q.get() 21         if item is None: 22             break
23         print("消费者%d消费了%d数据" % (id, item)) 24         time.sleep(2) 25     # 任务完成
26  q.task_done() 27 
28 
29 if __name__ == "__main__": 30     # 消息队列
31     q = queue.Queue() 32 
33     # 启动生产者
34     for i in range(4): 35         threading.Thread(target=product, args=(i, q)).start() 36 
37     # 启动消费者
38     for i in range(3): 39         threading.Thread(target=customer, args=(i, q)).start()

十、线程调度

 1 import threading  2 import time  3 
 4 
 5 # 线程条件变量
 6 cond = threading.Condition()  7 
 8 
 9 def run(): 10  with cond: 11         for i in range(0, 10, 2): 12             print(threading.current_thread().name, i) 13             time.sleep(1) 14             cond.wait()  # 阻塞
15             cond.notify()  # 告诉另外一个线程能够执行
16 
17 
18 def run2(): 19  with cond: 20         for i in range(1, 10, 2): 21             print(threading.current_thread().name, i) 22             time.sleep(1) 23  cond.notify() 24  cond.wait() 25 
26 
27 threading.Thread(target=run).start() 28 threading.Thread(target=run2).start()

3、协程

一、协程

  • 子程序/子函数:在全部语言中都是层级调用,好比A调用B,在B执行的工程中又能够调用C,C执行完毕返回,B执行完毕返回最后是A执行完毕。是经过栈实现的,一个线程就是一个子程序,子程序调用老是一个入口,一次返回,调用的顺序是明确的
  • 协程:看上去也是子程序,但执行过程当中,在子程序的内部可中断,而后转而执行别的子程序,不是函数调用,有点相似CPU中断
 1 # 这是一个子程序的调用
 2 def C():  3     print("C--start")  4     print("C--end")  5 
 6 def B():  7     print("B--start")  8  C()  9     print("B--end") 10 
11 def A(): 12     print("A--start") 13  B() 14     print("A--end") 15 
16 A()
  • 协程与子程序调用的结果相似,但不是经过在函数中调用另外一个函数
  • 协程执行起来有点像线程,但协程的特色在因而一个线程
  • 与线程相比的优势:协程的执行效率极高,由于只有一个线程,也不存在同时写变量的冲突,在协程中共享资源不加锁,只须要判断状态

二、协程的原理

 1 # python对协程的支持是经过generator实现的
 2 def run():  3     print(1)  4     yield 10
 5     print(2)  6     yield 20
 7     print(3)  8     yield 30
 9 
10 # 协程的最简单风格,控制函数的阶段执行,节约线程或者进程的切换
11 # 返回值是一个生成器
12 m = run() 13 print(next(m)) 14 print(next(m)) 15 print(next(m))

三、数据传输

 1 # python对协程的支持是经过generator实现的
 2 def run():  3     print(1)  4     yield 10
 5     print(2)  6     yield 20
 7     print(3)  8     yield 30
 9 
10 # 协程的最简单风格,控制函数的阶段执行,节约线程或者进程的切换
11 # 返回值是一个生成器
12 m = run() 13 print(next(m)) 14 print(next(m)) 15 print(next(m))

四、小栗子

 1 def product(c):  2  c.send(None)  3     for i in range(5):  4         print("生产者产生数据%d" % (i))  5         r = c.send(str(i))  6         print("消费者消费了数据%s" % (r))  7  c.close()  8 
 9 
10 def customer(): 11     data = ""
12     while True: 13         n = yield data 14         if not n: 15             return
16         print("消费者消费了%s" % (n)) 17         data = "200"
18 
19 
20 c = customer() 21 product(c)
相关文章
相关标签/搜索