python3是经过multiprocessing模块来开启子进程,并执行相应的定制任务python
multiprocessing模块能够支持子进程、通讯和数据共享、执行不一样形式的同步,提供了process、Queue、Pipe、lock等组件。git
在这里咱们强调的是多线程有共享状态,进程没有任何共享状态,程序员
咱们经过Process类来建立子进程。下面是关于Process类中的参数的解释github
Process(group=None, target=None, name=None, args=(), kwargs={})编程
在这些参数中group参数未使用,始终未None安全
target至关于咱们要开启的子进程的任务的位置也就是调用的对象多线程
args使咱们给这个子进程传递参数,这个必须是元组的方式,若是只有一个参数就写成(num,)这种形式记得后面有一个“,” args = (num,)并发
kwargs表示调用对象的字典,kwargs={"name":"Tom","age":18}app
name为子进程的名称异步
记住上面只是建立一个子进程这个子进程属于未开启状态
如上面我盟建立了一个子进程如
p = multiprocessing.Process() #这个只是建立了一个子进程可是他并无运行。
使用p.start(): 这个表示启动这个子进程,并调用了该子进程的p.run()
p.run(): 进程启动时的方法,正是它去调用target指定的函数,在multiprocessing里面已经实现,要是咱们自定义类的话一应要实现该方法。
p.is_alive():主要是看p是否在运行,若是还在运行那么返回True。
p.join([timeout]): 主进程等待子进程p执行完毕才会执行,因此在p执行完毕以前主进程处于等待状态。timeout是可选的超时时间,在这里咱们要记住p.join只能等待start开启的进程,而对run开启的进程没有效果。
p.daemon:默认值为Flash,若是设为True,表明p为后台云逊的守护进程,当p的父进程终止时,p也随之终止,而且设置为True后,p不能建立本身的子进程,必须在p.start()以前设置。
p.name:进程的名称。
p.pid: 进程的pid
#!/usr/bin/env python # -*-coding:utf-8-*- import multiprocessing import time def run(id): time.sleep(2) print("hello process{}".format(id)) if __name__ == "__main__": l = [] i = 1 for i in range(5): p = multiprocessing.Process(target=run, args=(i,))#args是一个元组因此必需要有, p.start() i += 1 l.append(p) for i in l: i.join() print("主进程") #若是没有i.join()的话就会出现子进程没有运行完,下面的代码就会运行结果。 上面的运行结果是: hello process1 hello process0 hello process4 hello process2 hello process3 主进程
若是没有i.join()的话,运行结果为 主进程 hello process0 hello process3 hello process4 hello process1 hello process2
从上能够看出当有i.join()时咱们的程序就不会往下走。它会等待join()过的子进程执行完毕才会继续往下走
下面的演示是关于在子进程中开启多线程
#!/usr/bin/env python # -*-coding:utf-8-*- import multiprocessing,time import threading def run(): time.sleep(2) print("hello process") t = threading.Thread(target=trun,) t.start() def trun(): time.sleep(1) print("hello threading", threading.get_ident()) def main(): l = [] for i in range(10): p = multiprocessing.Process(target=run) p.start() print(p.name) l.append(p) for i in l: i.join() if __name__ == "__main__": main() #在Windows中的process()执行必须放到if __name__ == '__main__':下如上面的代码,要否则就会出错。
#!/usr/bin/env python # -*-coding:utf-8-*- import multiprocessing import time def run(id): time.sleep(2) print("hello process{}".format(id)) if __name__ == "__main__": l = [] i = 1 for i in range(5): p = multiprocessing.Process(target=run, args=(i,)) p.daemon = True #记住这个必定要在start()方法以前设置
#同时记住当咱们设置守护进程时p进程就不能建立子进程,而且父进程结束,p也结束无论p有没有运行完 p.start() i += 1 l.append(p) for i in l: i.join() #去掉这个后就会发现开启守护进程的区别 print("主进程") #上面的加上守护进程咱们不会发现什么不一样和咱们前面没加上同样的结果。 #可是当咱们去掉join()方法的时候就会发现子进程尚未运行完毕整个进程就结束了
上面咱们也说了进程之间的数据是不共享的,可是共享同一套文件系统,因此访问同一个文件或者同一个打印终端是没问题的。当咱们同一个终端打印的时候会发现多行打印到一行的现象,出现这种现象就是多个进程共享并抢占同一个终端打印致使打印乱了。
在这里咱们加锁的就是为了保证同一时间同一个数据只能被一个进程修改,这样作速度慢了可是却也保证了数据的正确性
#!/usr/bin/env pytho#
-*-coding:utf-8-*
from multiprocessing import Process,Lock def run(l, i): l.acquire() print("process:", i) # 在屏幕上打印的时候打印完整 l.release() if __name__ == "__main__": lock = Lock() p_list = [] for i in range(10): p = Process(target=run, args=(lock, i)) p.start() p_list.append(p) for i in p_list: i.join() 运行结果为: process: 1 process: 0 process: 2 process: 8 process: 7 process: 5 process: 3 process: 6 process: 4 process: 9
这个程序我运行了不少次都没有出现过打印错乱的状况,
好比第一行没打印完,第二行就插在中间的这种状况。
不过我我在Linux上弄了好屡次包括python3和python2
都没有出现过相应的状况。有可能我试验少了,可是看别
人的博客上说python2运行的时候有可能出现错乱的现象。
无论怎么样记住加上进程锁就是为了防止在屏幕上打印错乱。
上面咱们说了不一样的进程是不可以共享内存的,可是为了实现两个进程之间的数据交换咱们可使用一下方法。
Queue
这里使用的queue和线程threading的queue的用法差很少。
队列属于先进先出的原则的
建立队列的类:
Queue([maxsize]): 建立共享的进程队列,Queue是多进程安全的队列,可使用Queue实现多进程之间的数据传递。
关于里面的maxsize表示的是队列容许的最大项数,省略表示无大小限制。
关于Queue的方法介绍:
q.put 是用来向队列中插入数据的
q.get 是向队列中去除一个元素,并删除掉
q.empty():调用此方法时,q为空则返回True
q.full():调用此方法时q已满则返回True
q.qsize():返回队列中目前元素的数目
#!/usr/bin/env python # -*-coding:utf-8-*- from multiprocessing import Process, Queue def run(arg): arg.put([41, None, 'miss']) if __name__ == "__main__": q = Queue() p = Process(target=run, args=(q,)) p.start() print("进程:", q.get()) p.join() 运行的结果为 进程: [41, None, 'miss']
进程间的通讯方式除了队列还有管道。
建立管道的类:
Pipe([duplex]):在进程间建立一条管道,并返回元组(conn1, conn2),其中conn1,conn2表示管道两端的链接对象,必须在产生process对象以前,产生管道。
duplex:默认管道是双全工,若是duplex设置为Flash那么conn1只能用于接受。conn2只能用于发送
关于Pipe其中方法的介绍:
conn1.recv():接收conn2.send(obj)发送的对象。若是没有消息可接受,recv会一直阻塞,若是链接的另外一端关闭了那么recv方法会抛出EOFError。
conn2.send(obj):经过链接发送对象,obj是与序列化兼容的对象
#!/usr/bin/env python # -*-coding:utf-8-*- from multiprocessing import Process, Pipe def run(arg): arg.send([1, None, "chengshun"]) arg.send([2, None, "chengjie"]) if __name__ == "__main__": parent_conn, child_conn = Pipe() 在建立进程以前使用Pipe()方法 p = Process(target=run, args=(child_conn,)) p.start() print(parent_conn.recv()) print(parent_conn.recv()) p.join()
主要是实如今进程间实现list,dict,Lock等等这种类型的数据的共享
#!/usr/bin/env python # -*-coding:utf-8-*- from multiprocessing import Process, Manager import os def run(d, l): d[os.getpid()] = os.getpid() l.append(os.getpid()) #print(l) if __name__ == "__main__": with Manager() as manager: # 把Manager命名为manager d = manager.dict() # 建立一个共享的字典d l = manager.list(range(10)) # 建立一个共享的列表l process = [] for i in range(10): p = Process(target=run, args=(d, l)) # 建立进程 p.start() process.append(p) for i in process: i.join() print(l) print(d)
咱们开多进程就是为了并发执行任务,当咱们有多少核就开多少个进程,可是在日程中咱们经常须要的并发执行任务远大于核数,这个是后咱们就能够经过维护一个进程池来控制进程的数目。
进程池内部维护一个进程序列,当时用的,则取进程池中获取一个进程,若是进程池中没有可用序列的进程,那么程序就会等待知道进程池中有可用的进程序列为止。
Pool对象调用join方法会等待全部的子进程执行完毕,
在调用join方法以前,必须调用close
调用close以后就不可以继续添加新的process了
Pool的中2中方法
apply_async:该方法用来同步执行进程,也就是说容许同时多个进程进入到进程池中。
apply:该方法只能容许一个进程进入池子,
#!/usr/bin/env python # -*-coding:utf-8-*- from multiprocessing import Pool,process import os, time def Foo(i): time.sleep(1) print("子进程号:", os.getpid()) return i + 100 def back(arg): # 这个回调函数是主进程调用的 print("---exit---", arg) # 这里的arg就是func中的return返回值,若是没有,那么arg为None if __name__ == "__main__": pool = Pool(processes=5) # 进程池中同时能够容许多少个进程 for i in range(40): pool.apply_async(func=Foo, args=(i,), callback=back) # callback为回调函数 # pool.apply(func=Foo, args=(i,)) # 也就是说在没次执行完func这个函数的时候就会调用callback函数。 pool.close() pool.join() # join()以前必需要close(),同时close()后就不可以在添加新的process
# 若是直接注释掉pool.join()那么程序就不会等待子进程结束就自动结束了
上面代码中的pool.apply_async()这个运行的结果咱们会看见会出现运行的时候去掉---exit---会出现5个5个一组打印在屏幕上。若是是pool.apply()这个运行的时候咱们会发现会一行一行的打印。也就是串行的结果。
协程,又称为微线程,纤程。英文名字Coroutine。咱们能够说协程是一种用户态的轻量级线程。
线程是系统级别的它由操做系统调度,而协程则是应用程序级别的由程序根据需求本身调度。一个线程中会有不少函数,咱们把这些函数称之为子程序,在子程序执行过程当中能够去执行别的子程序,而别的子程序也能够中断回来继续执行以前的子程序,这个过程我么称之为协程。也就是说在同一线程内一段代码在执行过程当中会中断而后跳转执行别的代码,接着在以前中断的地方继续执行,相似于yield操做。
协程拥有本身的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其余地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。所以:协程能保留上一次调用时的状态(即全部局部状态的一个特定组合),每次过程重入时,就至关于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置
协程的优势:
(1)无需线程上下文切换的开销,协程避免了无心义的调度,由此能够提升性能(但也所以,程序员必须本身承担调度的责任,同时,协程也失去了标准线程使用多CPU的能力)
(2)无需原子操做锁定及同步的开销
(3)方便切换控制流,简化编程模型
(4)高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。因此很适合用于高并发处理。
协程的缺点:
(1)没法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程须要和进程配合才能运行在多CPU上.固然咱们平常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
(2)进行阻塞(Blocking)操做(如IO时)会阻塞掉整个程序
一、关于yield实现协程效果
import time import queue def consumer(name): print("--->开始吃包子..." ) while True: print("%s 须要包子" % name) new_baozi = yield print("[%s] 吃了包子%s" % (name, new_baozi)) def producer(): r = con.__next__() r = con2.__next__() n = 0 while n < 5: n += 1 con.send(n) con2.send(n) print("\033[32;1m[producer]\033[0m 是包子 %s" % n) if __name__ == '__main__': con = consumer("c1") con2 = consumer("c2") p = producer()
二、greenlet模块实现模块之间的切换
#!/usr/bin/env python # -*-coding:utf-8-*- from greenlet import greenlet def fun1(): print("1") # 程序先执行这一行 gre2.switch() # 这表示咱们切换到gre2,也就是fun2.同时保留如今这个状态,下次切回继续执行 print(4) # 有下面的gre1.switch()切换到这执行该行 gre2.switch() # 在切换到gre2 def fun2(): print("2") # 由gre2.switch()切换到这执行 gre1.switch() # 这个表示切换到上面gre1,若是上面有保留状态则从保留状态开始 print(3) if __name__ == "__main__": gre1 = greenlet(fun1) # 启动一个协程 gre2 = greenlet(fun2) # 启动一个协程 gre1.switch() # 一开始运行gre1也就是fun1
上面的模块咱们能够知道这个须要咱们手动切换,不可以实现IO自动切换。
三、gevent
gevent是一个第三方库,能够轻松经过gevent实现协程编程,在gevent用到的主要模块时greenlet,它是以C扩展模块形式接入到Python的轻量级协程。 greenlet所有运行在主程序操做系统的内部,但它们协做式的调度。下面咱们看看
#!/usr/bin/env python # -*-coding:utf-8-*- import gevent def f1(): print("1") gevent.sleep(2) # 到这里程序会自动的切换到f2函数 print("6") def f2(): print("2") gevent.sleep(1) # 自动切换到f3哪怕这个sleep了0秒,它也会切换到下一个,至关于触发了自动切换。 print("5") def f3(): print("3") gevent.sleep(0) # 这个会切换到f1看看有没有运行完,若是没有运行完就按着顺序继续运行下去。 print("4") if __name__ == "__main__": gevent.joinall([ gevent.spawn(f1), # 至关于启动一个协程,第一位就是一开始运行 gevent.spawn(f2), gevent.spawn(f3) ])
上面的程序运行的时候咱们能够看到运行的结果是
1
2
3
4
5
6
它基本的实现了IO的自动切换
下面一个例子时候关于协程,为了体现差别咱们同时也执行了同步时间
#!/usr/bin/env python # -*-coding:utf-8-*- import gevent, time from urllib import request # 包下面还有一个包,因此必须这样不可以使用urllib.request from gevent import monkey def run(url, name): res = request.urlopen(url) data = res.read() f = open('{}.txt'.format(name), 'wb') f.write(data) f.close() print("{} bytes received from {}".format(len(data), url)) if __name__ == "__main__": # 同步时间 get = [ ("https://www.python.org", 'python'), ("https://www.yahoo.com", 'yahoo'), ("https://www.github.com", 'github') ] start_time = time.time() for i, j in get: run(i, j) done_time = time.time() print("同步时间", done_time - start_time) # 异步时间 monkey.patch_all() # urllib默认状况下是串行的使用这个是把当前程序的全部的IO操做的打上标记这个样就能够异步了 asy_start_time = time.time() gevent.joinall([ gevent.spawn(run, "https://www.python.org", 'python'), gevent.spawn(run, "https://www.yahoo.com", 'yahoo'), gevent.spawn(run, "https://www.github.com", 'github') ]) # 从第一位开始运行 asy_done_time = time.time() print("异步时间", asy_done_time - asy_start_time) 运行的结果是 48872 bytes received from https://www.python.org 532554 bytes received from https://www.yahoo.com 52840 bytes received from https://www.github.com 同步时间 6.285885572433472 48872 bytes received from https://www.python.org 525992 bytes received from https://www.yahoo.com 52840 bytes received from https://www.github.com 异步时间 3.48797869682312 这个有时候时间是不对的,由于和网速有关,这几个网址有时候会出现打开速度慢。
四、协程实现socket多并发
gevent_socket_server
#!/usr/bin/env python # -*-coding:utf-8-*- import socket, gevent from gevent import monkey monkey.patch_all() # 必需要有这个,要否则就实现多并发 def Server(): server = socket.socket() server.bind(("localhost", 9999)) server.listen(500) while True: conn, add = server.accept() gevent.spawn(handle_request, conn) def handle_request(conns): while True: received_data = conns.recv(1024).decode("utf-8") print("received:", received_data) if not received_data: conns.shutdown(socket.SHUT_WR) conns.send(received_data.encode("utf-8")) conns.close() if __name__ == "__main__": Server()
socket_client
#!/usr/bin/env python # -*-coding:utf-8-*- import socket client = socket.socket() client.connect(("localhost", 9999)) while True: acquire = input(">>>").strip().encode("utf-8") print(acquire) if acquire.decode("utf-8") == "exit": break if len(acquire) == 0: continue client.send(acquire) print(2) data = client.recv(1024).decode("utf-8") print(3) print(data) client.close()
这个就是记录本身学习中的知识,同时也知道写的也不是很全。这个权当本身对知识的巩固。