上次讲了因为GIL锁的存在,Python的多线程是假的,用的仍是CPU的单核。Python的多线程只是利用了CPU的上下文切换,上下分切换也是占用CPU的。那么何时用多行程?
Python的多线程,适合IO密集型的任务,不适合CPU密集型的任务。
IO操做不占用CPU,好比socket这种网络编程的情景。
计算占用CPU,因此大量计算的情景下多线程反而更慢,额外消耗了CPU切换上下文的计算。html
多进程的基本语法和多线程差很少:python
import multiprocessing import time def show(name): time.sleep(2) print('hello', name) if __name__ == '__main__': p = multiprocessing.Process(target=show, args=('Jack',)) p.start() p.join() # join的效果就是等待子进程执行完毕 print('执行结束')
上面的例子,只模块名变了,其余都和多线程差很少。linux
下面的例子打印了进程的id号:git
import multiprocessing import os, time def info(title): print(title, 'module name:', __name__) # 模块名 time.sleep(0.3) # 加点停顿,能够看出来,全部进程真的是并行处理的 print(title, 'parent process:', os.getppid()) # 父进程号 time.sleep(0.3) print(title, 'process id:', os.getpid()) # 进程号 def f(title): info(title) if __name__ == '__main__': info('main') for i in range(10): # 此次起10个进程 p = multiprocessing.Process(target=f, args=('p%s' % i,)) p.start()
能够适当修改加长info的延时,,能够去系统里查看一下全部进程的状况,以下图:
上面起了10个子进程,加上主进程,一个11个python进程。
我是用pycharm执行的代码,主进程的ID是8036,主进程的父进程是pycharm7832。
而后,全部的子进程,都是经过8036这个python的父进程开启的。8036就是这些子进程的父进程。github
进程间的内存是独立的,若是进程间须要交换数据,就须要借助其余方法web
下面的例子经过Queue实现了进程间的通讯,数据库
from multiprocessing import Process, Queue import time def f(q): q.put([42, None, 'hello']) time.sleep(30) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" time.sleep(30)
这里的Queue是 multiprocessing 多线程模块里的,不是以前的独立的queue模块。编程
from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() # 实例化管道后会生成2个对象,这2个对象是同样的 p = Process(target=f, args=(child_conn,)) # 把管道的1头交给子进程,本身操做另一头 p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']"
上面实例化管道以后的两个对象是同样的,不要被名字误导,这么取名是随进程的关系。
实例化后的两个对象就是管道的两头。通信的双方任意各取一头操做,就能实现管道两头的通信。这里是父进程留下一头,把另外一头传递给子进程操做。
这里管道的操做使用send和recv,相似socket(不过没有黏包)。这边send一次对端就recv一次获取数据,若是一边send屡次,那么对端也只能一次recv取到一次的数据,因此也得recv屡次才能取到所有的数据。若是数据取完了,再recv则会阻塞,等待管道对端send数据进来。windows
上面的2个方法,只是实现了数据的传递。
经过Manager生成的数据对象,能够在多个进程间共享。如下的数据类型均可以经过Manager来生成,实现共享:
list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array
举例说明:缓存
from multiprocessing import Process, Manager import os def f(d, l): pid = os.getpid() d['pid'] = pid # 每次这个key都会被重写,最后打印的必定是最后一个操做的进程的结果 d[pid] = pid # 每次都是生成一个新的key,添加到字典里 l.append(pid) # 每次往列表中添加一个元素 if __name__ == '__main__': with Manager() as manager: # 等于manager = Manager(),最后还省了一个manager.close() d = manager.dict() # 生成一个共享的字典 l = manager.list() # 生成一个共享的列表 p_list = [] for i in range(10): p = Process(target=f, args=(d, l)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l)
线程有线程锁,进程也有进程锁。用法也和线程锁同样,以下:
from multiprocessing import Process, Lock import time def f(l, n): l.acquire() print('hello world', n, end='', flush=True) time.sleep(0.1) print(" Finished", n) l.release() time.sleep(0.1) if __name__ == '__main__': lock = Lock() for n in range(10): Process(target=f, args=(lock, n)).start()
有锁,可是咱们要锁什么?上面的代码就算不加锁应该也不必定会有什么问题。线程锁咱们锁的是内存,由于线程共享的是同一块内存空间。进程锁锁的是资源,进程就是各类资源的集合。好比例子中的print用到的就是屏幕输出的资源,若是不加锁,可能几个进程同时想操做屏幕输出内容,那么就有可能会形成最终输出的字符错乱。不过打印数据应该是一串一次性进缓存的,应该也不会出现被插队吧。
虽然试不出字符错乱的状况,可是对两次print之间不要插入别的进程的内容。好比例子中去掉进程锁就会出现两段内容混乱的状况。
和线程锁的状况同样,加了锁以后,其实就应该是暂时变成了串行执行了。
每起一个进程,就是克隆一份父进程的基本数据给子进程用。这样开销会很大(好比内存),系统资源是有限的,无限起进程,可能会致使系统瘫痪。因此要有进程池。
学习线程的时候也有相似的问题,不过线程占用资源小,不容易致使系统瘫痪,可是必定会致使CPU频繁切换上下文致使效率反而会下降。因此要有信号量。
设定进程池,能够限制一次起的进程的数量。这点有点像信号量
进程池有两个方法:
先看apply的例子:
from multiprocessing import Pool import time def Foo(i): time.sleep(1) print("In Foo", i) if __name__ == '__main__': pool = Pool(5) for i in range(10): pool.apply(func=Foo, args=(i,)) print('end') pool.close() # 老师说进程池这里要先close再join,至于为啥不知道 pool.join() # 进程池中进程执行完毕后再关闭,若是注释,那么程序直接关闭。
执行的时候就是同时起5个进程(Pool中设置了进程池的大小),可是是一个一个依次执行的。执行完毕后会退出而后能够将以后的进程放入进程池等待执行。
注意最后要先close再join。能够试验一下,若是不执行close,可是直接执行join的话会报错。
使用这个方法就是并行效果了,一次并行执行5个。每完成1个会再将下一个放入进程池立刻执行:
from multiprocessing import Pool import time def Foo(i): time.sleep(1) print("In Foo", i) if __name__ == '__main__': pool = Pool(5) for i in range(10): pool.apply_async(func=Foo, args=(i,)) print('end') pool.close() pool.join() # 进程池中进程执行完毕后再关闭,若是注释,那么程序直接关闭。
注意:仍是close和join的问题,刚才异步的时候其实不写也没问题。可是这里因为是并行的,主进程执行完毕以后若是没有join就会直接关闭了。子进程也不会再执行了,就像守护线程那样。因此这里必定要加上join,等待pool里的进程都执行完毕。而后join前必需要close。
callback参数 :这里还有一个callback参数,函数执行完成后能够调用另外一个函数,这个叫回调函数。
回调函数 :就是前面的方法执行完以后,就会自动对用执行这个回调函数。而且是由主进程调用执行的。
举例说明:
from multiprocessing import Pool import os import time def Foo(i): time.sleep(1) # raise return os.getpid(), os.getppid() def Bar(arg): print(os.getpid(), 'Foo 执行完毕,结果:', arg) if __name__ == '__main__': pool = Pool(5) for i in range(10): pool.apply_async(func=Foo, args=(i,), callback=Bar) print('end') pool.close() pool.join() # 进程池中进程执行完毕后再关闭,若是注释,那么程序直接关闭。
回调函数只有在函数正常执行完以后才会被调用。Foo中有一句raise,主动抛出一个错误,若是去掉注释致使函数没有正常执行完成,rais以前的print仍是会正常执行,可是不会调用callback的函数执行。
另外,这里打印了每一个进程id,从id中能够看到,Foo函数是由主进程启动的子进程执行的。而callback的函数是由主进程来执行的。Foo的父进程id就是Bar的进程id
回调函数的意义,主要就是由于回调函数是由主进程执行的。若是子进程的执行结果须要记录保留,那么这部分工做就经过调用回调函数,由回调函数在主进程中来处理。好比将结果写入数据库,咱们就要让每一个子进程都链接数据库写入数据,而是在主进程里创建一个与数据库的链接,统一将执行结果写入数据库。虽然调用的是同一个函数,可是经过回调函数调用在主进程中执行效率会更高。好比例子中的作法,Foo负责返回数据,回调函数统一打印Foo的执行结果。
协程,又称微线程,纤程。英文名Coroutine。一句话说明:协程是一种用户态的轻量级线程。
协程的好处:
所谓原子操做是指不会被线程调度机制打断的操做;这种操做一旦开始,就一直运行到结束,中间不会有任何
协程的缺点:
以前学习生成器的时候,经过yield实现了单线程下多并发。可是那也不是真正的协程。
协程的定义:
总结:使用协程就是为了高效。协程如何实现高效?一遇到IO操做就切换,由于IO操做耗时可是不占用CPU,此时切换到另外一个协程,高效的利用CPU。
问题:什么时候切换回来?IO操做结束了就能够切换回来。如何知道IO操做结束了?往下学 ...
这里写个例子,回顾一下yield的用法。可是yield并不知足咱们前面对协程的定义。
下面的例子会先启动B,B会启动A。B中打印后切换到A执行,A返回后循环。A中打印后经过yield返回,循环。A和B之间经过yield和send来传递count的值,每次都自增1。
import time def print_A(): count = 0 while True: print('A'.center(9, '-'), count) time.sleep(0.1) count = yield count+1 # 从B那里send过来的值,赋值给count。而后回到开头执行打印,把count值自增1后再返回给B def print_B(func): count = next(func) # 要先next一下,启动A,这样A会先运行一次到yield的地方返回 while True: print('B'.center(9, '-'), count) time.sleep(0.1) count = func.send(count+1) # 将值传递给A的yield,并获取A的返回值 if __name__ == '__main__': a = print_A() print_B(a)
greenlet是一个用C实现的协程模块,相比与python自带的yield,它可使你在任意函数之间随意切换,而不需把这个函数先声明为generator。
这个是第三方模块,因此须要安装。安装的话直接安装下面要讲的gevent模块就行了。
greenlet 模块实现的是协程的手动切换,其实就和yield差很少。不过用起来更好理解了。
gevent 模块才能实现咱们要的自动切换,可是gevent是在greenlet的基础上进行了封装,实现了自动切换。因此安装的时候顺便把有依赖关系的greenlet模块一块儿装好了。如今天然也是先看一下greenlet模块的用法。
from greenlet import greenlet def test1(): print('A1') gr2.switch() print('A2') gr2.switch() def test2(): print("B1") gr1.switch() print('B2') if __name__ == '__main__': gr1 = greenlet(test1) # 启动一个协程 gr2 = greenlet(test2) # 再启动一个协程 gr1.switch() # 手动切换一下
执行一下switch,就完成了切换的操做。比yield更直观。不太重点是学下一个模块。
Gevent 是一个第三方库,能够轻松经过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet,它是以C扩展模块形式接入Python的轻量级协程。Greenlet所有运行在主程序操做系统进程的内部,但它们被协做式地调度。
import gevent def func1(): print("This is in func1") gevent.sleep(2) print("End of func1") def func2(): print("This is in func2") gevent.sleep(1) print("End of func2") if __name__ == '__main__': gevent.joinall([gevent.spawn(func1), gevent.spawn(func2)]) # 上面是把要启动的方法加到列表里一块儿处理了,推荐就这么作 # 其实也可使用start和join一个一个启动, f1 = gevent.spawn(func1) f2 = gevent.spawn(func2) f1.start() # 没有start,貌似也同样,注释掉试一下 f2.start() f1.join() # 没有join主线程会直接退出,就不会等待上面的协程的执行结果了 f2.join()
要启动全部的协程,经过gevent.joinall。参数是一个列表,列表中依次启动须要进行自动切换操做的协程。而且阻塞等待全部协程处理完毕。至关于start和join
上面的gevent.sleep是一个模拟IO操做,不会像time.sleep那样停在那里,而是会当作有一个几秒的IO操做。上面的输出结果是:
This is in func1 This is in func2 End of func2 End of func1
首先执行了func1,打印了第一行。而后以后是一个IO操做,因此切换到了下一个协程。
切换到func2,打印了一行,以后又是一个IO操做,此时再切换。不过此时已经没有可操做的协程了。没别的协程了,fun1也没好。
以后是fun2的IO操做先执行完毕,因此最终切换到fun2的时候,打印了func2的第二行,打印前会顿1秒。
最后func1的IO操做也结束了,因而切换到fun1,打印fun1的第二行,打印前会再顿1秒。
这根本就不是爬虫,这里先讲如何将一个网页保存到本地,由于这就是一个比较耗时的IO操做。刚才的例子中咱们是用sleep来模拟的。直接上代码就行了:
from urllib import request def f(url): print('GET: %s' % url) resp = request.urlopen(url) # 给网址发一个请求 data = resp.read() # 读取到的就是整个网页的内容 with open('url.html', 'wb') as file: # 将网页保存下来 file.write(data) print('%d betes received from %s' % (len(data), url)) if __name__ == '__main__': f('http://www.python.org/')
而后咱们来多爬几个网页,看下协程的效果。此次计算一下整个过程的时间:
from urllib import request import gevent import time def f(url): print('GET: %s' % url) resp = request.urlopen(url) # 给网址发一个请求 data = resp.read() # 读取到的就是整个网页的内容 # with open('url.html', 'wb') as file: # 保存网页的操做就先不用了 # file.write(data) print('%d betes received from %s' % (len(data), url)) if __name__ == '__main__': g_list = [] url_list = ['http://www.python.org/', 'http://www.yahoo.com/', 'http://github.com/'] for url in url_list: g_list.append(gevent.spawn(f, url)) start_time = time.time() gevent.joinall(g_list) print('运行时间:', time.time()-start_time)
上面这段虽然用了gevent,可是仍是串行的。究竟是串行仍是并行,只要看f函数开始的时候的第一句print是何时出现的就知道了。这里之因此是串行,是由于,并无看到IO切换的命令,就是f函数里没有相似gevent.sleep这样的切换命令。可是,其实gevent是能够自动判断是否有IO操做的。因此这里的问题是gevent发现不了urllib模块里的IO操做。
因此真正要作的是在开头加上一句,让gevent可以发现这些IO操做。
from urllib import request import gevent import time # 导入模块,添加下面这2句 from gevent import monkey monkey.patch_all() # 把当前程序的全部的IO操做给我单独的作上标记 def f(url): print('GET: %s' % url) resp = request.urlopen(url) # 给网址发一个请求 data = resp.read() # 读取到的就是整个网页的内容 # with open('url.html', 'wb') as file: # 保存网页的操做就先不用了 # file.write(data) print('%d betes received from %s' % (len(data), url)) if __name__ == '__main__': g_list = [] url_list = ['http://www.python.org/', 'http://www.yahoo.com/', 'http://github.com/'] for url in url_list: g_list.append(gevent.spawn(f, url)) start_time = time.time() gevent.joinall(g_list) print('运行时间:', time.time()-start_time)
gevent里有专门的socket模块,固然其中大部分都是import原生的socket模块。咱们可使用gevent的socket实现一个单线程下高并发的socket_server。
服务端:
# 就算这里不导入socket,也会在gevent模块里导入。可是pycharm里下面的socket.socket会显示个错误,不影响运行 # 并且到下面也是要导入的,这里显示的声明一下,提早导入也不会影响效率 # import socket # 建议导入 # 下面两句也能够不要,这里不加也能识别到socket的IO操做 # 只因此能识别,是由于下面的socket已经不是原生的socket了,而是gevent修改后的socket # from gevent import monkey # monkey.patch_all() import gevent from gevent import socket def server(port): # 这里pycharm里会显示个错误,不影响运行。由于gevent的socket模块里没有socket这个方法 # 可是其实gevent会把原生的socket所有导入的。就是运行的时候会有socket.socket这个方法 server = socket.socket() server.bind(('localhost', port)) server.listen(500) print("监听已经开始") while True: conn, addr = server.accept() print("发现链接请求:\n%s\n%s" % (conn, addr)) gevent.spawn(handle_request, conn) # 上面的函数创建了链接后,就将链接做为handle_request的参数,启动一个gevent的协程 # 下面的方法是经过协程启动的,是协程并发运行的 def handle_request(conn): while True: data = conn.recv(1024) if not data: break print("recv:", data.decode('utf-8')) conn.send(data.upper()) conn.close() print("断开链接:", conn) if __name__ == '__main__': server(8002)
客户端只须要以前的客户端就能够了。这里测试一下效率,起100个线程,每一个线程发送100条消息:
import socket import threading HOST = 'localhost' # The remote host PORT = 8002 # The same port as used by the server def client(i): client = socket.socket() client.connect((HOST, PORT)) for j in range(100): msg = "hello %s %s" % (i, j) client.send(msg.encode('utf-8')) data = client.recv(1024) print('Received:', data.decode('utf-8')) client.close() if __name__ == '__main__': for i in range(100): t = threading.Thread(target=client, args=(i,)) t.start()
这里有一个问题,记一下。 gevent.spawn()
一般,咱们写服务器处理模型的程序时,有如下几种模型:
三种方法各有千秋,以前应该都说过,这里就当总结一下:
事件驱动模型,目前大部分的UI编程都是事件驱动模型,如不少UI平台都会提供onClick()事件(好比web页面),这个事件就表明鼠标按下事件。事件驱动模型大致思路以下:
事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特色是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。另外两种常见的编程范式是(单线程)同步以及多线程编程。
在面对以下的环境时,事件驱动模型一般是一个好的选择:
此处重申一下协程开篇提出的问题,只要一遇到IO就注册一个事件,而后主程序就能够继续干其它的事情了,直到IO处理完毕后,继续恢复以前中断的任务,这本质上是怎么实现的呢?
用户空间与内核空间:如今操做系统都是采用虚拟存储器,那么对32位操做系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操做系统的核心是内核,独立于普通的应用程序,能够访问受保护的内存空间,也有访问底层硬件设备的全部权限。为了保证用户进程不能直接操做内核(kernel),保证内核的安全,操做系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对linux操做系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。
缓存 I/O :又被称做标准 I/O,大多数文件系统的默认 I/O 操做都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操做系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操做系统内核的缓冲区中,而后才会从操做系统内核的缓冲区拷贝到应用程序的地址空间。
对于一次IO访问(以read举例),数据会先被拷贝到操做系统内核的缓冲区中,而后才会从操做系统内核的缓冲区拷贝到应用程序的地址空间。因此说,当一个read操做发生时,它会经历两个阶段:
正是由于上面的两个阶段,linux系统产生了下面五种网络模式的方案:
五种模型的比较:
用的最多的是IO多路复用。虽然看似异步IO更好,反正用的很少。另外多线程+阻塞模式也是一个方案,可是多线程的开销较大(相对于单线程),更适合处理少许的并发(多少算少?看你系统能起多少个线程,不过和进程比线程的开销还不算特别大)。要处理高并发,推荐仍是使用IO多路复用。
文件描述符(File descriptor) :是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者建立一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写每每会围绕着文件描述符展开。可是文件描述符这一律念每每只适用于UNIX、Linux这样的操做系统。
I/O多路复用就是经过一种机制,一个进程能够监视多个描述符,一旦某个描述符就绪(通常是读就绪或者写就绪),就通知程序进行相应的读写操做。
IO多路复用的三种机制:
select :最先出现,有些缺点,优点就是几乎在全部平台上都支持。
pool :解决了部分缺点,可是本质上没多大差异,能够认为是个过分阶段。
epool :如今用这个,性能最好的。可是不是全部系统都支持,windows就不支持。
Python的select()方法直接调用操做系统的IO接口,它监控sockets,open files,and pipes(全部带fileno()方法的文件句柄)什么时候变成readable 和writeable,或者通讯错误,select()使得同时监控多个链接变的简单,而且这比写一个长循环来等待和监控多客户端链接要高效,由于select直接经过操做系统提供的C的网络接口进行操做,而不是经过Python的解释器。
写一个socket的例子来理解
注意:socket必须得运行在非阻塞模式下。以前用的都是默认的阻塞模式,阻塞模式下,若是没有数据会等待。非阻塞模式下,若是没有数据就会抛出异常。因此咱们须要用select来帮咱们监视
先写到accept以前,accept以前只是设置,执行到accept在阻塞模式下会进入阻塞。咱们暂时只要能收到客户端请求创建起链接就好。如今是阻塞模式,因此直接accept没有数据就会报错。因此就须要select来解决了,监视到有活动的链接再返回并继续执行accept。
import select import socket server = socket.socket() server.bind(('localhost', 9000)) server.listen() print("监听已经开启") server.setblocking(False) # 设置为非阻塞 inputs = [server, ] outputs = [] # 3个参数,读列表,写列表,异常列表,就是你想让内核监视哪些连接 # 异常列表监视的仍是连接返回的内容,那么仍是在inputs里,因此第三个参数仍是填inputs # 开始什么都没有,只有server,先把server加到inputs里 # 就是select监视到server活动了,就能够返回了 # 返回3个数据,监视到有活动的3个列表(读列表,写列表,异常列表) readable, writeable, exceptional = select.select(inputs, outputs, inputs) # 这里就是阻塞的,直到select监视到列表中有活动的连接,才会继续 # 非阻塞socket就是经过select来实现阻塞 print(readable, writeable, exceptional) # 下面有打印的内容 # [<socket.socket fd=472, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9000)>] [] [] # 第一个fd,就是文件描述符,非负整数。 conn, addr = server.accept() print(conn, addr)
测试的客户端,只发不收:
import socket HOST, PORT = "localhost", 9000 client = socket.socket() client.connect((HOST, PORT)) while True: msg = input(">>:").strip() if len(msg) == 0: break client.send(msg.encode('utf-8')) # 下面2行是接收服务的返回是要用的,暂时先注释掉,到后面再启用测试返回数据 # data = client.recv(1024) # print('Received:', data.decode('utf-8')) client.close()
上面连接已经能够创建起来了,那么来接收一条数据吧。这里recv以前固然仍是要用select来监视是否有数据进来,有才会执行到recv。
import select import socket server = socket.socket() server.bind(('localhost', 9000)) server.listen() print("监听已经开启") server.setblocking(False) # 设置为非阻塞 inputs = [server, ] outputs = [] # 3个参数,读列表,写列表,异常列表,就是你想让内核监视哪些连接 # 异常列表监视的仍是连接返回的内容,那么仍是在inputs里,因此第三个参数仍是填inputs # 若是inputs里是读的消息,会返回到readable列表里,若是是异常消息,就返回到exceptional列表里 # 开始什么都没有,只有server,先把server加到inputs里 # 就是select监视到server活动了,就能够返回了 # 返回3个数据,监视到活动的3个列表(读列表,写列表,异常列表) readable, writeable, exceptional = select.select(inputs, outputs, inputs) # 这里就是阻塞的,直到select监视到列表中有活动的连接,才会继续 # 非阻塞socket就是经过select来实现阻塞 print(readable, writeable, exceptional) # 打印一下监视到的活动连接 for r in readable: # 可能一次返回多个链接啊,因此得写个循环 conn, addr = server.accept() # 已是非阻塞了,不通过select监视就会报错。能够注释掉select试一下 print(conn, addr) inputs.append(conn) # 把连接加入监视列表 # 如今conn加到了inputs的监视列表里了,就能够经过select监视conn是否有数据进来了 readable, writeable, exceptional = select.select(inputs, outputs, inputs) print(readable, writeable, exceptional) # 看看打印内容,inputs里的活动连接是conn,其实也多是server data = conn.recv(1024) print(data.decode('utf-8'))
这里能够再试一下,应该能够接收到数据。可是这里有个问题。由于如今inputs列表里监视的内容是2个了,一个是server,一个是conn。若是再监视到server活动,说明又有新连接进来,若是监视到的是conn的活动,那么才是收到数据了。因此咱们要在for循环里判断收到的活动连接是谁的。
再多作一步,加上一层while循环,让服务端始终处于这么一个循环之中:select返回活动连接 ==> for循环处理全部的活动连接 循环继续。
import select import socket server = socket.socket() server.bind(('localhost', 9000)) server.listen() print("监听已经开启") server.setblocking(False) # 设置为非阻塞 inputs = [server, ] outputs = [] # 3个参数,读列表,写列表,异常列表,就是你想让内核监视哪些连接 # 异常列表监视的仍是连接返回的内容,那么仍是在inputs里,因此第三个参数仍是填inputs # 开始什么都没有,只有server,先把server加到inputs里 # 就是select监视到server活动了,就能够返回了 # 返回3个数据,监视到活动的3个列表(读列表,写列表,异常列表) while True: # select返回活动连接==>for循环处理全部的活动连接,循环往复 readable, writeable, exceptional = select.select(inputs, outputs, inputs) # 这里就是阻塞的,直到select监视到列表中有活动的连接,才会继续 # 非阻塞socket就是经过select来实现阻塞 print(readable, writeable, exceptional) # 下面有打印的内容 # [<socket.socket fd=472, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9000)>] [] [] # 第一个fd,就是文件描述符,非负整数。 for r in readable: # 可能一次返回多个链接啊,因此得写个循环 # 如今inputs里有server 和 conn了, # 若是readable返回的是server的活动,表示来了一个新连接 # 若是readable返回的是conn的活动,表示收到了conn发来的数据 if r is server: conn, addr = server.accept() # 已是非阻塞了,不通过select监视就会报错。能够注释掉select试一下 print(conn, addr) inputs.append(conn) # 把连接加入监视列表 else: data = conn.recv(1024) print(data.decode('utf-8'))
如今新连接也能再连上来了,可是问题是旧的链接没有另外保存,最新连上的连接会再次赋值给conn。就连接发来的数据,致使select返回,可是会用conn去尝试recv。如今conn是新的链接,因此是空的,因而就报错。那么解决这个事情就是要保存每个conn,就是说要再用一个列表保存全部的conn,再写一个for循环?我一开始是这么想的。
其实全部的链接都保存在inputs里了,for循环的时候就是取出每个连接,在for循环里面应该使用变量r,而不是conn。难怪以前用conn的时候,pycharm会提示 ‘name can not be defined’ 。第一次循环的时候没有conn这个变量,不过也不会进到那个if里
接下来继续,咱们能够直接把数据发回去,这里没有阻塞的问题,就不搞了。换个方法,不直接发回去,先把消息保存到队列里。而后统一发送,这样就是发消息也是多路复用的形式。
import select import socket import queue server = socket.socket() server.bind(('localhost', 9000)) server.listen() print("监听已经开启") server.setblocking(False) # 设置为非阻塞 inputs = [server, ] outputs = [] data_queue = {} # 存放返回给客户端消息的队列,每一个客户端链接一个队列,就是一个item # 3个参数,读列表,写列表,异常列表,就是你想让内核监视哪些连接 # 异常列表监视的仍是连接返回的内容,那么仍是在inputs里,因此第三个参数仍是填inputs # 开始什么都没有,只有server,先把server加到inputs里 # 就是select监视到server活动了,就能够返回了 # 返回3个数据,监视到活动的3个列表(读列表,写列表,异常列表) while True: # select返回活动连接==>for循环处理全部的活动连接,循环往复 readable, writeable, exceptional = select.select(inputs, outputs, inputs) # 这里就是阻塞的,直到select监视到列表中有活动的连接,才会继续 # 非阻塞socket就是经过select来实现阻塞 print('select返回:\n', readable, writeable, exceptional) # 下面有打印的内容 # [<socket.socket fd=472, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9000)>] [] [] # 第一个fd,就是文件描述符,非负整数。 for r in readable: # 可能一次返回多个链接啊,因此得写个循环 # 如今inputs里有server 和 conn了, # 若是readable返回的是server的活动,表示来了一个新连接 # 若是readable返回的是conn的活动,表示收到了conn发来的数据 if r is server: conn, addr = r.accept() # r就是server print('接收到新的客户端链接:\n', conn, addr) inputs.append(conn) data_queue[conn] = queue.Queue() # 建立链接的消息队列 else: data = r.recv(1024) # r就是conn print(data.decode('utf-8')) # 接下来处理发数据 # 把有数据返回的链接当道outputs列表里,这样下次循环select就会监视到 # 要发的数据也得保存,这里用队列来保存准备发送的数据 # 要为每一个链接分别建一个队列,不能把消息搞混,这里用字典 # 字典的key就是链接,value就是该链接的消息队列 outputs.append(r) # data_queue = {} 在使用前,要先创建一个空字典。这句放到While循环外 # data_queue[conn] = queue.Queue # 在客户端创建链接时,就建立好链接的消息队列。这句放在上面处理server.accept()里面 data_queue[r].put(data.upper()) # 而后就先无论了。等到select再监视的时候,会返回到writeable列表里。 # 因此后面还要写一个writeable的for循环 # 虽然是在上面的for循环里添加的,可是要等到在执行一次select后才会在writeable里有返回值 for w in writeable: data = data_queue[w].get() # 从队列里取出数据。这里get了以后,这条消息就从队列里移除了 w.send(data) # 发数据,注意data的数据类型 outputs.remove(w) # 从outputs里移除这个活动的链接,不然下次过来还有尝试在发数据,可是消息队列里是空的
这里把以前客户端注释掉的内容去掉测试一下收数据。而且已经能够接入多个客户端了,开2个试一下就行了。
剩下就是客户端断开的问题了。断开有两种状况:
一种是正常断开,客户端close(),会发送一个空给服务的,那么要在 data = r.recv(1024)
以后判断一下是否是空,就和以前写的socket服务器同样。
还有一种是强行关闭客户端,这时inputs仍然会收到活跃链接,可是recv的时候会抛出异常“ConnectionResetError”,这里大概要把recv放到try里,若是捕获到异常,就断开客户端。
另外还有一个exceptional异常列表有返回的状况,这里也粗暴的断开客户端处理了好了。
客户端断开就是要清除掉字典和列表中的这个链接的信息。
import select import socket import queue server = socket.socket() server.bind(('localhost', 9000)) server.listen() print("监听已经开启") server.setblocking(False) # 设置为非阻塞 inputs = [server, ] outputs = [] data_queue = {} # 存放返回给客户端消息的队列,每一个客户端链接一个队列,就是一个item while True: # select返回活动连接==>for循环处理全部的活动连接,循环往复 # 返回3个数据,监视到活动的3个列表(读列表,写列表,异常列表) readable, writeable, exceptional = select.select(inputs, outputs, inputs) # 这里就是阻塞的,直到select监视到列表中有活动的连接,才会继续 print('select返回:\n', readable, writeable, exceptional) for r in readable: # 可能一次返回多个链接啊,因此得写个循环 # 如今inputs里有server 和 conn了, # 若是readable返回的是server的活动,表示来了一个新连接 # 若是readable返回的是conn的活动,表示收到了conn发来的数据 if r is server: conn, addr = r.accept() # r就是server print('接收到新的客户端链接:\n', conn, addr) inputs.append(conn) # 将新链接加入到select监视列表中 data_queue[conn] = queue.Queue() # 建立新链接的消息队列 else: # 这里有3中状况,有数据,有空数据(正常断开),无数据(一旦recv就报错) try: data = r.recv(1024) except Exception as error: print("recv时捕获到异常:%s" % error) # 清除链接的4个操做,这段代码重复用了3次,应该专门写个函数引用 # 1 从读列表中清除,这里其实不用判断,可是后面的for循环里可能会尝试重复remove # 2 若是还有没发出去的消息,把链接从写列表中清除 # 3 关闭链接 # 4 若是还有没发出去的消息,把消息队列的对象从字典里清除 if r in inputs: inputs.remove(r) if r in outputs: outputs.remove(r) r.close() if r in data_queue: del data_queue[r] else: # 这里处理能recv到数据,收到数据就加入outputs列表。为空就清除链接 if data: print(data.decode('utf-8')) outputs.append(r) data_queue[r].put(data.upper()) else: print("客户端已断开:\n", r) inputs.remove(r) if r in outputs: outputs.remove(r) if r in data_queue: del data_queue[r] # 虽然是上面的for循环里添加的,可是要等到在执行一次select后才会在writeable里有返回值 for w in writeable: data = data_queue[w].get() # 从队列里取出数据。这里get了以后,这条消息就从队列里移除了 w.send(data) # 发数据,注意data的数据类型 outputs.remove(w) # 从outputs里移除这个活动的链接,不然下次过来还有尝试在发数据,可是消息队列里是空的 # 还有一个exceptional没处理,仍是和上面同样,再写一个for循环 # 异常处理这里仍是简单粗暴把异常列表中的链接清除就行了 for e in exceptional: print("异常列表有返回:", e) if e in inputs: inputs.remove(e) if e in outputs: outputs.remove(e) e.close() if e in data_queue: del data_queue[e]
如今就能够处理高并发了,写一个多线程的客户端,每一个线程循环发数据测试一下。
测试客户端:
import socket import threading HOST = 'localhost' PORT = 9000 def client(i): client = socket.socket() client.connect((HOST, PORT)) for j in range(100): msg = "hello %s %s" % (i, j) client.send(msg.encode('utf-8')) data = client.recv(1024) print('Received:', data.decode('utf-8')) client.close() if __name__ == '__main__': for i in range(100): t = threading.Thread(target=client, args=(i,)) t.start()
到这里感受好了,可是还有3地方有问题,下面一个一个说明。上面的代码经不起强行断开客户端的考验。
非阻塞模式下,若是调用recv()没有发现任何数据,或send()调用没法当即发送数据,那么将引发异常。我这里看到的是 "ConnectionResetError" 。
产生的缘由是强行断开客户端,致使这个链接已经失效,可是链接还在select返回的列表里。这时以后的for循环里还会尝试去send或recv这个链接就会抛出异常。
解决办法 :send 和 recv 的时候都得用try,而后捕获到异常后,就把这个链接清理掉
data = data_queue[w].get()
这里有时候会报错 "KeyError" ,就是字典里已经没有这个key了。那就是在另外2个for循环里已经将这个key清除了。这里清除的时候没清writeable列表,因此在清除链接的时候加一句:
if r in writeable: writeable.remove(r)
或者是用字典的get方法读取,这样在读取不存在的key的时候,会返回None:
data = data_queue.get(w) if data: data = data.get() else: if w in outputs: outputs.remove(w) continue
上面有3处地方会清除链接,这里能够另外写一个函数,须要清除的时候调用函数就行了。另外可能会再两个for循环里同时会要清除同一个链接,这样在第二次清除的时候若是不作if判断,就会报错没法删除列表里不存在的元素。这里保险起见仍是在remove以前都加上if判断。应该只有第一个for循环里的inputs是必定有的不用判断,别的地方均可能会报错。
说好了3个问题,这里还有一个暂时没事出问题,就是可能发数据的时候一次发不完。如今的写法都是默认一次send就是发完的,send以后直接从outputs里remove掉。
也能够换一种方法,send以后,不从移出outputs列表。那么下次while循环还会进来,此时get队列的时候要用get_nowait无阻塞模式取队列,若是空会抛出队列空的异常,那么这个必定是发完了。此时再把连接移出outputs列表。若是须要能够把上面字典 "KeyError" 一块儿处理了,继续用 data = data_queue[w].get()
取列表,放到try里,捕获2种异常分别处理掉。
后面的最终版本没这么用,我怕再踩到坑,先把想法记着。
下面给出一个全部遇到的问题都解决了的最终版本:
import select import socket import queue server = socket.socket() server.bind(('localhost', 9000)) server.listen() print("监听已经开启") server.setblocking(False) # 设置为非阻塞 inputs = [server, ] outputs = [] data_queue = {} # 存放返回给客户端消息的队列,每一个客户端链接一个队列,就是一个item while True: # select返回活动连接==>for循环处理全部的活动连接,循环往复 # 返回3个数据,监视到活动的3个列表(读列表,写列表,异常列表) readable, writeable, exceptional = select.select(inputs, outputs, inputs) # 这里就是阻塞的,直到select监视到列表中有活动的连接,才会继续 print('select返回:\n', readable, writeable, exceptional) for r in readable: # 可能一次返回多个链接啊,因此得写个循环 # 如今inputs里有server 和 conn了, # 若是readable返回的是server的活动,表示来了一个新连接 # 若是readable返回的是conn的活动,表示收到了conn发来的数据 if r is server: conn, addr = r.accept() # r就是server print('接收到新的客户端链接:\n', conn, addr) conn.setblocking(False) inputs.append(conn) # 将新链接加入到select监视列表中 data_queue[conn] = queue.Queue() # 建立新链接的消息队列 else: # 这里有3中状况,有数据,有空数据(正常断开),无数据(一旦recv就报错) try: data = r.recv(1024) except Exception as error: print("recv时捕获到异常:%s" % error) # 清除链接的4个操做,这段代码重复用了3次,应该专门写个函数引用 # 1 从读列表中清除,这里其实不用判断,可是后面的for循环里可能会尝试重复remove # 2 若是还有没发出去的消息,把链接从写列表中清除 # 3 关闭链接 # 4 若是还有没发出去的消息,把消息队列的对象从字典里清除 if r in inputs: inputs.remove(r) if r in outputs: outputs.remove(r) r.close() if r in data_queue: del data_queue[r] else: # 这里处理能recv到数据,收到数据就加入outputs列表。为空就清除链接 if data: print(data.decode('utf-8')) if r not in outputs: outputs.append(r) data_queue[r].put(data.upper()) else: print("客户端已断开:\n", r if r in inputs: inputs.remove(r) if r in outputs: outputs.remove(r) r.close() if r in data_queue: del data_queue[r] # 虽然是上面的for循环里添加的,可是要等到在执行一次select后才会在writeable里有返回值 for w in writeable: data = data_queue.get(w) # 先从字典里取出这个队列 if data: # 队列存在,取数据 data = data.get() # 从队列里取出数据。这里get了以后,这条消息就从队列里移除了 else: # 队列不存在,这个链接已经被清除了,remove掉,下一个循环 if w in outputs: outputs.remove(w) continue try: w.send(data) # 发数据,注意data的数据类型 except Exception as error: print("send时捕获到异常:%s" % error) if r in inputs: inputs.remove(r) if r in outputs: outputs.remove(r) r.close() if r in data_queue: del data_queue[r] else: if w in outputs: outputs.remove(w) # 有可能一次send不完,这里也能够不remove,用另一个方法 # 开头用get_nowait无阻塞模式取队列,捕获到队列为空的异常,再remove掉这个链接 # 还有一个exceptional没处理,仍是和上面同样,再写一个for循环 # 异常处理这里仍是简单粗暴把异常列表中的链接清除就行了 for e in exceptional: print("异常列表有返回:", e) if e in inputs: inputs.remove(e) if e in outputs: outputs.remove(e) e.close() if e in data_queue: del data_queue[e]
学了那么多都没用,应该也用不到,并且可能还会有别的问题。最可怕的是别人都不这么用,有问题都找不到人解决。主要是经过select的使用来了解IO多路复用。除了select,还有poll和epoll。
epoll 更高效,可是代码也更复杂。不过这些咱们都用不到,都太底层了。平时用用已经封装好的模块就行了。这里经过学习大概了解一下底层是怎么实现的。剩下的会用模块就行了,如今至少知道模块中是怎么运做的了。
最后就是学一下下面的已经封装的selectors模块,底层都清楚了,学习使用下面的模块已经没有难度了。
平时直接用已经封装好的模块,简单坑又少。selectors模块,默认会用epoll,若是系统不支持,就用select,完美。
理解了前面select的机制,在使用这个模块就简单了。步骤都同样。步骤都同样,可是都封装好了。
import selectors import socket def accept(sock, mask): """创建新链接并注册""" conn, addr = sock.accept() print('接收到新的客户端链接:\n', conn, addr) conn.setblocking(False) # sock里已经设置为False,这里貌似没意义,反正没差 # 上面已经创建好链接了,把新链接注册到sel里,这是第二次注册了 # 第一次注册是注册server接受客户端链接请求的链接 # 这里是链接创建后收发数据的链接,这个链接若是发现是活动的,调用的就是read方法了 sel.register(conn, selectors.EVENT_READ, read) def read(conn, mask): """接收数据""" data = conn.recv(1024) if data: print(data.decode('utf-8')) conn.send(data.upper()) else: print("客户端已断开:\n", conn) sel.unregister(conn) # 注销这个链接 conn.close() def server(port): sock = socket.socket() sock.bind(('localhost', port)) sock.listen() print("监听已经开启") sock.setblocking(False) # 下面就是select方法,也多是epoll # 注册你的socket,就是让select监视,监视到有活动的链接,就调用accept函数 sel.register(sock, selectors.EVENT_READ, accept) # 上面尚未开始监视,只是先把select准备好 while True: events = sel.select() # 这里就仍是监视了 # 这里会阻塞,一旦有活动的链接,就会返回给events列表 for key, mask in events: callback = key.data # key.data就是sel.register里的accpet这个函数 # 如今callback就是accpet这个函数了,下面加上括号填上参数就执行了 callback(key.fileobj, mask) # key.fileobj就是sel.register里的sock if __name__ == '__main__': sel = selectors.DefaultSelector() # 老套路,用以前先实例化一个对象 server(10001)
仍是有那个老问题,客户端强制断开,服务端会报错 "ConnectionResetError" 。
SELECT版FTP :