multiprocessing
is a package that supports spawning processes using an API similar to the threading
module. The multiprocessing
package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing
module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.html
多进程是一个支持使用相似于线程模块的API来支持生成进程的包。多进程包提供本地和远程并发性,经过使用子进程代替线程,有效地避免了全局解释器锁。因为这个缘由,多进程模块容许程序员在给定的机器上充分利用多个处理器。它在Unix和Windows上运行。python
一个简单的使用栗子:git
from multiprocessing import Process import time def f(name): time.sleep(2) print('hello', name) if __name__ == '__main__': p = Process(target=f, args=('bob',)) p.start() p.join()
上面是用多进程执行了一遍函数,效果和单进程并没什么不一样,都是等待了2s,下面咱们看看执行屡次程序员
from multiprocessing import Process import time def f(name): time.sleep(2) print('hello', name) if __name__ == '__main__': for i in range(10): p = Process(target=f, args=('bob',)) p.start() p.join()
结果显示执行了10次函数等待之间依旧2s,若是是单线程可想而知就是2*10s了。github
为了展现父进程与子进程之间的关系,下面的栗子能够直观看到:编程
from multiprocessing import Process import os def info(title): print(title) print('module name:', __name__) print('parent process:', os.getppid()) print('process id:', os.getpid()) print("\n\n") def f(name): info('\033[31;1mfunction f\033[0m') print('hello', name) if __name__ == '__main__': info('\033[32;1mmain process line\033[0m') p = Process(target=f, args=('bob',)) p.start() p.join()
不一样进程间内存是不共享的,要想实现两个进程间的数据交换,能够用如下方法:数组
Queues网络
使用方法跟threading里的queue差很少数据结构
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()
这里注意,看似好像主进程与子进程共用一个queue,然而并非,实际上是子进程clone了一个queue,有的骚年就说了,那不就是俩queue了,怎么同步俩queue的数据啊,实际是用pickle进行序列化,一个queue先序列化后,给另外一个queue反序列化进行数据传递。并发
Pipes
顾名思义,管道,其实就是像一根电话线,一人拿这头,一人拿另外一头,互相通讯
from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()
Managers
queue和pipes只是数据的传递,不是真正的数据共享,manager就厉害了,能实现真正的数据共享
A manager object returned by Manager()
controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager()
will support types list
, dict
, Namespace
, Lock
, RLock
, Semaphore
, BoundedSemaphore
, Condition
, Event
, Barrier
, Queue
, Value
and Array
. For example,
from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.append(1) print(l) if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(5)) p_list = [] for i in range(10): p = Process(target=f, args=(d, l)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l)
进程同步
进程其实也有锁,他这个锁实际上是屏幕锁,就是输出锁,多个进程输出内容时,有可能会打印乱,这个锁就是为了解决这个问题。
from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
协程在手,说走就走,咳咳。好了,这个协程就厉害了,先了解概念。
协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程。
协程拥有本身的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其余地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。所以:
协程能保留上一次调用时的状态(即全部局部状态的一个特定组合),每次过程重入时,就至关于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。
协程的好处:
缺点:
使用yield实现协程操做例子:
import time import queue def consumer(name): print("--->starting eating baozi...") while True: new_baozi = yield print("[%s] is eating baozi %s" % (name,new_baozi)) #time.sleep(1) def producer(): r = con.__next__() # 一开始带yield的函数实例化时只是变成生成器,并不执行,用__next__来执行下一步 r = con2.__next__() n = 0 while n < 5: n +=1 con.send(n) con2.send(n) print("\033[32;1m[producer]\033[0m is making baozi %s" %n ) if __name__ == '__main__': con = consumer("c1") con2 = consumer("c2") p = producer()
有没有很快~~其实上面的栗子不算是真正的协程,嘿嘿,那什么才算标准的协程呢?只要符合下面四条,才能够成为协程:
那么问题来了,上面的栗子为啥不属于协程嘞,关键就在于第四条,若是我把consumer的sleep打开,速度会立刻变慢,由于每次消费都要等待IO。这样cpu的利用率就大大下降了。那么这个问题如何解决?请各位看官继续往下看:
greenlet是一个用C实现的协程模块,相比与python自带的yield,它可使你在任意函数之间随意切换,而不需把这个函数先声明为generator
from greenlet import greenlet def test1(): print(12) gr2.switch() print(34) gr2.switch() def test2(): print(56) gr1.switch() print(78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()
感受确实用着比generator还简单了呢,然而仍是那个问题,就是遇到IO并无自动切换啊。。。。
Gevent 是一个第三方库,能够轻松经过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet所有运行在主程序操做系统进程的内部,但它们被协做式地调度。
import gevent def func1(): print('\033[31;1mA在跟B搞...\033[0m') gevent.sleep(2) print('\033[31;1mA又回去继续跟B搞...\033[0m') def func2(): print('\033[32;1mA切换到了跟C搞...\033[0m') gevent.sleep(1) print('\033[32;1mA搞完了B,回来继续跟C搞...\033[0m') gevent.joinall([ gevent.spawn(func1), gevent.spawn(func2), ])
遇到IO完美切换,完美~~
同步与异步的性能区别
import gevent def task(pid): """ Some non-deterministic task """ gevent.sleep(0.5) print('Task %s done' % pid) def synchronous(): for i in range(1,10): task(i) def asynchronous(): threads = [gevent.spawn(task, i) for i in range(10)] gevent.joinall(threads) print('Synchronous:') synchronous() print('Asynchronous:') asynchronous()
上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn
。 初始化的greenlet列表存放在数组threads
中,此数组被传给gevent.joinall
函数,后者阻塞当前流程,并执行全部给定的greenlet。执行流程只会在 全部greenlet执行完后才会继续向下走。
遇到IO阻塞时会自动切换任务
来个实际爬页面的小栗子:
from gevent import monkey import gevent from urllib.request import urlopen monkey.patch_all() def f(url): print('GET: %s' % url) resp = urlopen(url) data = resp.read() print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://github.com/'), ])
这里有个小重点,在默认状况下,gevent并不能识别出urllib的IO操做,致使协程没起到异步做用。这里须要一个小补丁,就是monkey模块的patch_all(),他会把urllib中的IO操做进行标记,使gevent能识别,这样就好用了。
经过gevent实现单线程下的多socket并发
server端:
import sys import socket import time import gevent from gevent import socket,monkey monkey.patch_all() def server(port): s = socket.socket() s.bind(('0.0.0.0', port)) s.listen(500) while True: cli, addr = s.accept() gevent.spawn(handle_request, cli) def handle_request(conn): try: while True: data = conn.recv(1024) print("recv:", data) conn.send(data) if not data: conn.shutdown(socket.SHUT_WR) except Exception as ex: print(ex) finally: conn.close() if __name__ == '__main__': server(8001)
客户端:
import socket HOST = 'localhost' # The remote host PORT = 8001 # The same port as used by the server s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((HOST, PORT)) while True: msg = bytes(input(">>:"),encoding="utf8") s.sendall(msg) data = s.recv(1024) #print(data) print('Received', repr(data)) s.close()
模拟100个client同时访问:
import socket import threading def sock_conn(): client = socket.socket() client.connect(("localhost",8001)) count = 0 while True: #msg = input(">>:").strip() #if len(msg) == 0:continue client.send( ("hello %s" %count).encode("utf-8")) data = client.recv(1024) print("[%s]recv from server:" % threading.get_ident(),data.decode()) #结果 count +=1 client.close() for i in range(100): t = threading.Thread(target=sock_conn) t.start()
首先列一下,sellect、poll、epoll三者的区别
select
select最先于1983年出如今4.2BSD中,它经过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程能够得到这些文件描述符从而进行后续的读写操做。
select目前几乎在全部的平台上支持,其良好跨平台支持也是它的一个优势,事实上从如今看来,这也是它所剩很少的优势之一。
select的一个缺点在于单个进程可以监视的文件描述符的数量存在最大限制,在Linux上通常为1024,不过能够经过修改宏定义甚至从新编译内核的方式提高这一限制。
另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增加。同时,因为网络响应时间的延迟使得大量TCP链接处于非活跃状态,但调用select()会对全部socket进行一次线性扫描,因此这也浪费了必定的开销。
poll
poll在1986年诞生于System V Release 3,它和select在本质上没有多大差异,可是poll没有最大文件描述符数量的限制。
poll和select一样存在一个缺点就是,包含大量文件描述符的数组被总体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增长而线性增大。
另外,select()和poll()将就绪的文件描述符告诉进程后,若是进程没有对其进行IO操做,那么下次调用select()和poll()的时候将再次报告这些文件描述符,因此它们通常不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。
epoll
直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具有了以前所说的一切优势,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。
epoll能够同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,若是咱们没有采起行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,可是代码实现至关复杂。
epoll一样只告知那些就绪的文件描述符,并且当咱们调用epoll_wait()得到就绪文件描述符时,返回的不是实际的描述符,而是一个表明就绪描述符数量的值,你只须要去epoll指定的一个数组中依次取得相应数量的文件描述符便可,这里也使用了内存映射(mmap)技术,这样便完全省掉了这些文件描述符在系统调用时复制的开销。
另外一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用必定的方法后,内核才对全部监视的文件描述符进行扫描,而epoll事先经过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用相似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便获得通知。
select 多并发socket 实例
select socket server:
#_*_coding:utf-8_*_ __author__ = 'Alex Li' import select import socket import sys import queue server = socket.socket() server.setblocking(0) server_addr = ('localhost',10000) print('starting up on %s port %s' % server_addr) server.bind(server_addr) server.listen(5) inputs = [server, ] #本身也要监测呀,由于server自己也是个fd outputs = [] message_queues = {} while True: print("waiting for next event...") readable, writeable, exeptional = select.select(inputs,outputs,inputs) #若是没有任何fd就绪,那程序就会一直阻塞在这里 for s in readable: #每一个s就是一个socket if s is server: #别忘记,上面咱们server本身也当作一个fd放在了inputs列表里,传给了select,若是这个s是server,表明server这个fd就绪了, #就是有活动了, 什么状况下它才有活动? 固然 是有新链接进来的时候 呀 #新链接进来了,接受这个链接 conn, client_addr = s.accept() print("new connection from",client_addr) conn.setblocking(0) inputs.append(conn) #为了避免阻塞整个程序,咱们不会马上在这里开始接收客户端发来的数据, 把它放到inputs里, 下一次loop时,这个新链接 #就会被交给select去监听,若是这个链接的客户端发来了数据 ,那这个链接的fd在server端就会变成就续的,select就会把这个链接返回,返回到 #readable 列表里,而后你就能够loop readable列表,取出这个链接,开始接收数据了, 下面就是这么干 的 message_queues[conn] = queue.Queue() #接收到客户端的数据后,不马上返回 ,暂存在队列里,之后发送 else: #s不是server的话,那就只能是一个 与客户端创建的链接的fd了 #客户端的数据过来了,在这接收 data = s.recv(1024) if data: print("收到来自[%s]的数据:" % s.getpeername()[0], data) message_queues[s].put(data) #收到的数据先放到queue里,一会返回给客户端 if s not in outputs: outputs.append(s) #为了避免影响处理与其它客户端的链接 , 这里不马上返回数据给客户端 else:#若是收不到data表明什么呢? 表明客户端断开了呀 print("客户端断开了",s) if s in outputs: outputs.remove(s) #清理已断开的链接 inputs.remove(s) #清理已断开的链接 del message_queues[s] ##清理已断开的链接 for s in writeable: try : next_msg = message_queues[s].get_nowait() except queue.Empty: print("client [%s]" %s.getpeername()[0], "queue is empty..") outputs.remove(s) else: print("sending msg to [%s]"%s.getpeername()[0], next_msg) s.send(next_msg.upper()) for s in exeptional: print("handling exception for ",s.getpeername()) inputs.remove(s) if s in outputs: outputs.remove(s) s.close() del message_queues[s] select socket server
select socket client:
#_*_coding:utf-8_*_ __author__ = 'Alex Li' import socket import sys messages = [ b'This is the message. ', b'It will be sent ', b'in parts.', ] server_address = ('localhost', 10000) # Create a TCP/IP socket socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM), socket.socket(socket.AF_INET, socket.SOCK_STREAM), ] # Connect the socket to the port where the server is listening print('connecting to %s port %s' % server_address) for s in socks: s.connect(server_address) for message in messages: # Send messages on both sockets for s in socks: print('%s: sending "%s"' % (s.getsockname(), message) ) s.send(message) # Read responses on both sockets for s in socks: data = s.recv(1024) print( '%s: received "%s"' % (s.getsockname(), data) ) if not data: print(sys.stderr, 'closing socket', s.getsockname() ) select socket client
selectors模块
用Python提供的模块写出更简单高效的服务端:
import selectors import socket sel = selectors.DefaultSelector() def accept(sock, mask): conn, addr = sock.accept() # Should be ready print('accepted', conn, 'from', addr) conn.setblocking(False) sel.register(conn, selectors.EVENT_READ, read) def read(conn, mask): data = conn.recv(1000) # Should be ready if data: print('echoing', repr(data), 'to', conn) conn.send(data) # Hope it won't block else: print('closing', conn) sel.unregister(conn) conn.close() sock = socket.socket() sock.bind(('localhost', 10000)) sock.listen(100) sock.setblocking(False) sel.register(sock, selectors.EVENT_READ, accept) while True: events = sel.select() for key, mask in events: callback = key.data callback(key.fileobj, mask)