Python全栈【进程、线程、IO多路复用】 |
本节内容:javascript
进程 |
一、进程就是一个程序在一个数据集上的一次动态执行过程,进程是资源分配的最小单元。html
二、进程通常由程序、数据集、进程控制块三部分组成。java
编写的程序用来描述进程要完成哪些功能以及如何完成;
数据集则是程序在执行过程当中所须要使用的资源;
进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统能够利用它来控制和管理进程,它是系统感知进程存在的惟一标志。
三、线程的上一级就是进程,进程可包含不少线程,进程和线程的区别是进程间的数据不共享,多进程也能够用来处理多任务,不过多进程很消耗资源,python
计算型的任务最好交给多进程来处理,IO密集型最好交给多线程来处理,此外进程的数量应该和cpu的核数保持一致。linux
线程 |
一、线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程当中的最小单元,由线程ID、程序计数器、寄存器集合和堆栈共同组成。
二、线程的引入减少了程序并发执行时的开销,提升了操做系统的并发性能。
三、线程没有本身的系统资源。
四、多任务能够由多进程完成,也能够由一个进程内的多线程完成,一个进程内的全部线程,共享同一块内存python中建立线程比较简单,导入threading模块,下面来看一下代码中如何建立多线程。web
建立一个新线程:编程
import time import threading def f1(i): time.sleep(1) print(i) if __name__ == '__main__': for i in range(5): t = threading.Thread(target=f1, args=(i,)) t.start() print('start') # 主线程等待子线程完成,子线程并发执行 # 每次数字顺序不一 # start # 0 # 1 # 2 # 4 # 3
主线程从上到下执行,建立5个子线程,打印出'start',而后等待子线程执行完结束,若是想让线程要一个个依次执行完,而不是并发操做,那么就要使用join方法。windows
import time import threading def f1(i): time.sleep(1) print(i) if __name__ == '__main__': li = [] for i in range(5): t = threading.Thread(target=f1, args=(i,)) t.start() li.append(t) for t in li: t.join() print('start') # 每次数字顺序不一 # 0 # 4 # 3 # 2 # 1 # start
上面的代码不适用join的话,主线程会默认等待子线程结束,才会结束,还有一种守护线程,即子线程守护主线程,主线程结束守护线程也结束。数组
import time import threading def f1(i): time.sleep(1) print(i) if __name__ == '__main__': for i in range(5): t = threading.Thread(target=f1, args=(i,)) t.setDaemon(True) #守护线程设置在start以前 t.start() print('start') # start
除此以外,本身还能够为线程自定义名字,经过 t = threading.Thread(target=f1, args=(i,), name='mythread{}'.format(i)) 中的name参数,除此以外,Thread还有一下一些方法缓存
t.getName() : 获取线程的名称
t.setName() : 设置线程的名称
t.name : 获取或设置线程的名称
t.is_alive() : 判断线程是否为激活状态
t.isAlive() :判断线程是否为激活状态
t.isDaemon() : 判断是否为守护线程
线程锁 |
死锁示例:
import threading,time class MyThread(threading.Thread): def funcA(self): A.acquire() print(self.name,'got A',time.ctime()) time.sleep(2) B.acquire() print(self.name,'got B',time.ctime()) B.release() A.release() def funcB(self): B.acquire() print(self.name,'got A',time.ctime()) time.sleep(1) A.acquire() print(self.name,'got B',time.ctime()) A.release() B.release() def run(self): self.funcA() self.funcB() li = [] A = threading.Lock() B = threading.Lock() for i in range(5): t = MyThread() li.append(t) for j in li: j.start() for k in li: k.join() # Thread-1 got A # Thread-1 got B # Thread-1 got A # Thread-2 got A #线程阻塞,1与2都不先释放锁,就出现了死锁 # 在线程间共享多个资源的时候,若是两个线程分别占有一部分资源而且同时等待对方的资源, # 就会形成死锁,由于系统判断这部分资源都正在使用,全部这两个线程在无外力做用下将一直等待下去。
解决方法:递归锁
import threading,time class MyThread(threading.Thread): def funcA(self): r_lock.acquire() print(self.name,'got A',time.ctime()) time.sleep(2) r_lock.acquire() print(self.name,'got B',time.ctime()) r_lock.release() r_lock.release() def funcB(self): r_lock.acquire() print(self.name,'got A',time.ctime()) time.sleep(1) r_lock.acquire() print(self.name,'got B',time.ctime()) r_lock.release() r_lock.release() def run(self): self.funcA() self.funcB() li = [] r_lock = threading.RLock() for i in range(5): t = MyThread() li.append(t) for j in li: j.start() for k in li: k.join()
Lock若是屡次获取锁的时候会出错,而RLock容许在同一线程中被屡次acquire,可是须要用n次的release才能真正释放所占用的琐,一个线程获取了锁在释放以前,其余线程只有等待。
为了支持在同一线程中屡次请求同一资源,python提供了“可重入锁”:threading.RLock。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源能够被屡次acquire。
线程间通信Event |
Event是线程间通讯最多见的机制之一,主要用于主线程控制其余线程的执行,主要用过wait,clear,set,这三个方法来实现的
红绿灯示例:
import time import threading def lighter(): count = 0 while 1: if count<30: if not event.is_set(): event.set() print('\33[32;1m绿灯\33[1m') elif count<34: print('\33[33;1m黄灯\33[1m') elif count<60: event.clear() print('\33[31;1m红灯\33[1m') else: count = 0 count+=1 time.sleep(0.2) def car(n): count =0 while 1: event.wait() print('汽车【%s】经过'%n) count+=1 time.sleep(1) event = threading.Event() l1 =threading.Thread(target=lighter) l1.start() c1 = threading.Thread(target=car,args=('奔驰',)) c1.start() # 绿灯 # 汽车【奔驰】经过 # 绿灯 # 绿灯 # 绿灯 # 绿灯 # 黄灯 # 汽车【奔驰】经过 # 黄灯 # 黄灯 # 黄灯 # 红灯 # 红灯 # 红灯
import threading,time class Boss(threading.Thread): def run(self): print("BOSS:今晚你们都要加班到22:00。") print(event.isSet()) event.set() time.sleep(5) print("BOSS:<22:00>能够下班了。") print(event.isSet()) event.set() class Worker(threading.Thread): def run(self): event.wait() print("Worker:哎……命苦啊!") time.sleep(1) event.clear() event.wait() print("Worker:OhYeah!") if __name__=="__main__": event=threading.Event() threads=[] for i in range(5): threads.append(Worker()) threads.append(Boss()) for t in threads: t.start() for t in threads: t.join()
信号量(semaphore) |
信号量用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数 器,每当调用acquire()时-1,调用release()时+1。
计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其余线程调用release()。(相似于停车位的概念)
BoundedSemaphore与Semaphore的惟一区别在于前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,若是超过了将抛出一个异常。
import threading,time class myThread(threading.Thread): def run(self): if semaphore.acquire(): print(self.name) time.sleep(5) semaphore.release() if __name__=="__main__": semaphore=threading.Semaphore(5) thrs=[] for i in range(100): thrs.append(myThread()) for t in thrs: t.start()
队列 |
队列的方法:
q = queue.Queue(maxsize=0) # 构造一个先进显出队列,maxsize指定队列长度,为0时,表示队列长度无限制。 q.join() # 等到队列为kong的时候,在执行别的操做 q.qsize() # 返回队列的大小 (不可靠) q.empty() # 当队列为空的时候,返回True 不然返回False (不可靠) q.full() # 当队列满的时候,返回True,不然返回False (不可靠) q.put(item, block=True, timeout=None) # 将item放入Queue尾部,item必须存在,参数block默认为True,表示当队列满时,会等待 # 为False时为非阻塞,此时若是队列已满,会引起queue.Full 异常。 可选参数timeout,表示会阻塞设置的时间, # 若是在阻塞时间里 队列仍是没法放入,则引起 queue.Full 异常 q.get(block=True, timeout=None) # 移除并返回队列头部的一个值,可选参数block默认为True,表示获取值的时候,若是队列为空,则阻塞 # 阻塞的话若此时队列为空,则引起queue.Empty异常。 可选参数timeout,表示会阻塞设置的时间, q.get_nowait() # 等效于 get(item,block=False)
队列的三种进出模式:
import queue q= queue.Queue(3) q.put(12) q.put('hello') q.put({'name':'alex'}) while 1: data = q.get() print(data) print('================') ###################################################### import queue q= queue.LifoQueue(3) q.put(12) q.put('hello') q.put({'name':'alex'}) q.qsize() while 1: data = q.get() print(data) print('================') ####################################################### import queue q= queue.PriorityQueue() q.put([2,12]) q.put([1,'hello']) q.put([3,{'name':'alex'}]) q.qsize() while 1: data = q.get() print(data[1]) print('================')
生产者消费者模型:
def producer(num): for i in range(num): q.put(i) print('将{}添加到队列中'.format(i)) time.sleep(1) def consumer(num): count = 0 while count < num: i = q.get() print('将{}从队列取出'.format(i)) time.sleep(2) count += 1 q = queue.Queue(10) t1 = threading.Thread(target=producer, args=(10,)) t1.start() t2 = threading.Thread(target=consumer, args=(10,)) t2.start()
进程与线程的区别 |
一个程序至少有一个进程,一个进程至少有一个线程.(进程能够理解成线程的容器)
进程在执行过程当中拥有独立的内存单元,而多个线程共享内存,从而极大地提升了程序的运行效率。
线程在执行过程当中与进程仍是有区别的。每一个独立的线程有一个程序运行的入口、顺序执行序列和程序的出口。
可是线程不可以独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制。
进程是具备必定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位.
线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程本身基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈)可是它可与同属一个进程的其余的线程共享进程所拥有的所有资源.
一个线程能够建立和撤销另外一个线程;同一个进程中的多个线程之间能够并发执行.
协程 |
协程,又称微线程,协程执行看起来有点像多线程,可是事实上协程就是只有一个线程,所以,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优点就越明显,此外由于只有一个线程,不须要多线程的锁机制,也不存在同时写变量冲突。协程的适用场景:当程序中存在大量不须要CPU的操做时(IO)下面来看一个利用协程例子
import time import queue def consumer(name): print("--->ready to eat baozi...") while True: new_baozi = yield print("[%s] is eating baozi %s" % (name,new_baozi)) #time.sleep(1) def producer(): r = con.__next__() r = con2.__next__() n = 0 while 1: time.sleep(1) print("\033[32;1m[producer]\033[0m is making baozi %s and %s" %(n,n+1) ) con.send(n) con2.send(n+1) n +=2 if __name__ == '__main__': con = consumer("c1") con2 = consumer("c2") p = producer()
import gevent import requests,time start=time.time() def f(url): print('GET: %s' % url) resp =requests.get(url) data = resp.text print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://www.baidu.com/'), gevent.spawn(f, 'https://www.sina.com.cn/'), ]) # f('https://www.python.org/') # # f('https://www.yahoo.com/') # # f('https://baidu.com/') # # f('https://www.sina.com.cn/') print("cost time:",time.time()-start)
事件驱动 |
传统的编程是以下线性模式的:
开始--->代码块A--->代码块B--->代码块C--->代码块D--->......--->结束
每个代码块里是完成各类各样事情的代码,但编程者知道代码块A,B,C,D...的执行顺序,惟一可以改变这个流程的是数据。输入不一样的数据,根据条件语句判断,流程或许就改成A--->C--->E...--->结束。每一次程序运行顺序或许都不一样,但它的控制流程是由输入数据和你编写的程序决定的。若是你知道这个程序当前的运行状态(包括输入数据和程序自己),那你就知道接下来甚至一直到结束它的运行流程。
对于事件驱动型程序模型,它的流程大体以下:
开始--->初始化--->等待
与上面传统编程模式不一样,事件驱动程序在启动以后,就在那等待,等待什么呢?等待被事件触发。传统编程下也有“等待”的时候,好比在代码块D中,你定义了一个input(),须要用户输入数据。但这与下面的等待不一样,传统编程的“等待”,好比input(),你做为程序编写者是知道或者强制用户输入某个东西的,或许是数字,或许是文件名称,若是用户输入错误,你还须要提醒他,并请他从新输入。事件驱动程序的等待则是彻底不知道,也不强制用户输入或者干什么。只要某一事件发生,那程序就会作出相应的“反应”。这些事件包括:输入信息、鼠标、敲击键盘上某个键还有系统内部定时器触发。
一般,咱们写服务器处理模型的程序时,有如下几种模型:
(1)每收到一个请求,建立一个新的进程,来处理该请求;
(2)每收到一个请求,建立一个新的线程,来处理该请求;
(3)每收到一个请求,放入一个事件列表,让主进程经过非阻塞I/O方式来处理请求
第三种就是协程、事件驱动的方式,通常广泛认为第(3)种方式是大多数网络服务器采用的方式
示例:
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <p onclick="fun()">点我呀</p> <script type="text/javascript"> function fun() { alert('约吗?') } </script> </body> </html>
在UI编程中,经常要对鼠标点击进行相应,首先如何得到鼠标点击呢? 两种方式:
那么这个方式有如下几个缺点:
目前大部分的UI编程都是事件驱动模型,如不少UI平台都会提供onClick()事件,这个事件就表明鼠标按下事件。事件驱动模型大致思路以下:
事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特色是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。
另外两种常见的编程范式是(单线程)同步以及多线程编程。
最初的问题:怎么肯定IO操做完了切回去呢?经过回调函数
1.要理解事件驱动和程序,就须要与非事件驱动的程序进行比较。实际上,现代的程序大可能是事件驱动的,好比多线程的程序,确定是事件驱动的。早期则存在许多非事件驱动的程序,这样的程序,在须要等待某个条件触发时,会不断地检查这个条件,直到条件知足,这是很浪费cpu时间的。而事件驱动的程序,则有机会释放cpu从而进入睡眠态(注意是有机会,固然程序也可自行决定不释放cpu),当事件触发时被操做系统唤醒,这样就能更加有效地使用cpu. 2.再说什么是事件驱动的程序。一个典型的事件驱动的程序,就是一个死循环,并以一个线程的形式存在,这个死循环包括两个部分,第一个部分是按照必定的条件接收并选择一个要处理的事件,第二个部分就是事件的处理过程。程序的执行过程就是选择事件和处理事件,而当没有任何事件触发时,程序会因查询事件队列失败而进入睡眠状态,从而释放cpu。 3.事件驱动的程序,一定会直接或者间接拥有一个事件队列,用于存储未能及时处理的事件。 4.事件驱动的程序的行为,彻底受外部输入的事件控制,因此,事件驱动的系统中,存在大量这种程序,并以事件做为主要的通讯方式。 5.事件驱动的程序,还有一个最大的好处,就是能够按照必定的顺序处理队列中的事件,而这个顺序则是由事件的触发顺序决定的,这一特性每每被用于保证某些过程的原子化。 6.目前windows,linux,nucleus,vxworks都是事件驱动的,只有一些单片机多是非事件驱动的。
注意,事件驱动的监听事件是由操做系统调用的cpu来完成的
IO多路复用 |
基础:
1.用户空间和内核空间(用户态和内核态)
内核态:就是系统的最高指令集,控制权限由cpu控制
用户态:最高指令为系统,用户操做控制系统,再又系统去执行操做内核态。
2. 进程切换
为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复之前挂起的某个进程的执行。这种行为被称为进程切换,这种切换是由操做系统来完成的。所以能够说,任何进程都是在操做系统内核的支持下运行的,是与内核紧密相关的。
从一个进程的运行转到另外一个进程上运行,这个过程当中通过下面这些变化:
保存处理机上下文,包括程序计数器和其余寄存器。
更新PCB信息。
把进程的PCB移入相应的队列,如就绪、在某事件阻塞等队列。
选择另外一个进程执行,并更新其PCB。
更新内存管理的数据结构。
恢复处理机上下文。
注:总而言之就是很耗资源的
3.进程的阻塞
正在执行的进程,因为期待的某些事件未发生,如请求系统资源失败、等待某种操做的完成、新数据还没有到达或无新工做作等,则由系统自动执行阻塞原语(Block),使本身由运行状态变为阻塞状态。可见,进程的阻塞是进程自身的一种主动行为,也所以只有处于运行态的进程(得到CPU),才可能将其转为阻塞状态。当进程进入阻塞状态,是不占用CPU资源的。
4.文件描述符
文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。
文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者建立一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写每每会围绕着文件描述符展开。可是文件描述符这一律念每每只适用于UNIX、Linux这样的操做系统。
5. 缓存 I/O
缓存 I/O 又被称做标准 I/O,大多数文件系统的默认 I/O 操做都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操做系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操做系统内核的缓冲区中,而后才会从操做系统内核的缓冲区拷贝到应用程序的地址空间。用户空间无法直接访问内核空间的,内核态到用户态的数据拷贝
思考:为何数据必定要先到内核区,直接到用户内存不是更直接吗?
缓存 I/O 的缺点:
数据在传输过程当中须要在应用程序地址空间和内核进行屡次数据拷贝操做,这些数据拷贝操做所带来的 CPU 以及内存开销是很是大的。
I/O
对于一个network IO (这里咱们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另外一个就是系统内核(kernel)。当一个read操做发生时,它会经历两个阶段:
一、 等待数据准备 (Waiting for the data to be ready)
二、 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)
记住这两点很重要,由于这些IO Model的区别就是在两个阶段上各有不一样的状况。
当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来讲,不少时候数据在一开始尚未到达(好比,尚未收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,而后kernel返回结果,用户进程才解除block的状态,从新运行起来。
因此,blocking IO的特色就是在IO执行的两个阶段都被block了。
从图中能够看出,当用户进程发出read操做时,若是kernel中的数据尚未准备好,那么它并不会block用户进程,而是马上返回一个error。从用户进程角度讲 ,它发起一个read操做后,并不须要等待,而是立刻就获得了一个结果。用户进程判断结果是一个error时,它就知道数据尚未准备好,因而它能够再次发送read操做。一旦kernel中的数据准备好了,而且又再次收到了用户进程的system call,那么它立刻就将数据拷贝到了用户内存,而后返回。
因此,用户进程实际上是须要不断的主动询问kernel数据好了没有。
注意:
在网络IO时候,非阻塞IO也会进行recvform系统调用,检查数据是否准备好,与阻塞IO不同,”非阻塞将大的整片时间的阻塞分红N多的小的阻塞, 因此进程不断地有机会 ‘被’ CPU光顾”。
即每次recvform系统调用之间,cpu的权限还在进程手中,这段时间是能够作其余事情的。
也就是说非阻塞的recvform系统调用调用以后,进程并无被阻塞,内核立刻返回给进程,若是数据还没准备好,此时会返回一个error。进程在返回以后,能够干点别的事情,而后再发起recvform系统调用。重复上面的过程,循环往复的进行recvform系统调用。这个过程一般被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。
须要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。
优势:可以在等待任务完成的时间里干其余活了(包括提交其余任务,也就是 “后台” 能够有多个任务在同时执行)。
缺点:任务完成的响应延迟增大了,由于每过一段时间才去轮询一次read操做,而任务可能在两次轮询之间的任意时间完成。这会致使总体数据吞吐量的下降。
当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”全部select负责的socket,当任何一个socket中的数据准备好了,select就会返回。
这个时候用户进程再调用read操做,将数据从kernel拷贝到用户进程。
这个图和blocking IO的图其实并无太大的不一样,事实上,还更差一些。由于这里须要使用两个system call (select 和 recvfrom),而blocking IO只调用了一个system call (recvfrom)。
可是,用select的优点在于它能够同时处理多个connection。(多说一句。因此,若是处理的链接数不是很高的话,使用select/epoll的web server不必定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优点并非对于单个链接能处理得更快,而是在于能处理更多的链接。)
在IO multiplexing Model中,实际中,对于每个socket,通常都设置成为non-blocking,可是,如上图所示,整个用户的process实际上是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。
注意1:select函数返回结果中若是有文件可读了,那么进程就能够经过调用accept()或recv()来让kernel将位于内核中准备到的数据copy到用户区。
注意2: select的优点在于能够处理多个链接,不适用于单个链接
I/O多路复用指经过一种机制,能够监视多个描述符,一旦某个描述符就绪(通常是读就绪或者写就绪),可以通知程序进行相应的读写操做。下面看一下 select,poll,epoll的介绍
select select最先于1983年出如今4.2BSD中,它经过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程能够得到这些文件描述符从而进行后续的读写操做。 select目前几乎在全部的平台上支持,其良好跨平台支持也是它的一个优势,事实上从如今看来,这也是它所剩很少的优势之一。 select的一个缺点在于单个进程可以监视的文件描述符的数量存在最大限制,在Linux上通常为1024,不过能够经过修改宏定义甚至从新编译内核的方式提高这一限制。 另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增加。同时,因为网络响应时间的延迟使得大量TCP链接处于非活跃状态,但调用select()会对全部socket进行一次线性扫描,因此这也浪费了必定的开销。 poll poll在1986年诞生于System V Release 3,它和select在本质上没有多大差异,可是poll没有最大文件描述符数量的限制。 poll和select一样存在一个缺点就是,包含大量文件描述符的数组被总体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增长而线性增大。 另外,select()和poll()将就绪的文件描述符告诉进程后,若是进程没有对其进行IO操做,那么下次调用select()和poll()的时候将再次报告这些文件描述符,因此它们通常不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。 epoll 直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具有了以前所说的一切优势,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。 epoll能够同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,若是咱们没有采起行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,可是代码实现至关复杂。 epoll一样只告知那些就绪的文件描述符,并且当咱们调用epoll_wait()得到就绪文件描述符时,返回的不是实际的描述符,而是一个表明就绪描述符数量的值,你只须要去epoll指定的一个数组中依次取得相应数量的文件描述符便可,这里也使用了内存映射(mmap)技术,这样便完全省掉了这些文件描述符在系统调用时复制的开销。 另外一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用必定的方法后,内核才对全部监视的文件描述符进行扫描,而epoll事先经过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用相似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便获得通知。
windows下只支持select,示例:
#*************************server.py import socket import select sk=socket.socket() sk.bind(("127.0.0.1",9904)) sk.listen(5) while True: r,w,e=select.select([sk,],[],[],5) for i in r: # conn,add=i.accept() #print(conn) print("hello") print('>>>>>>') #*************************client.py import socket sk=socket.socket() sk.connect(("127.0.0.1",9904)) while 1: inp=input(">>").strip() sk.send(inp.encode("utf8")) data=sk.recv(1024) print(data.decode("utf8"))
请思考:为何不调用accept,会反复print?
Answer:由于select是水平触发
select实现并发聊天示例:
#***********************server.py import socket import select sk=socket.socket() sk.bind(("127.0.0.1",8801)) sk.listen(5) inputs=[sk,] while True: r,w,e=select.select(inputs,[],[],5) print(len(r)) for obj in r: if obj==sk: conn,add=obj.accept() print(conn) inputs.append(conn) else: data_byte=obj.recv(1024) print(str(data_byte,'utf8')) inp=input('回答%s号客户>>>'%inputs.index(obj)) obj.sendall(bytes(inp,'utf8')) print('>>',r) #***********************client.py import socket sk=socket.socket() sk.connect(('127.0.0.1',8801)) while True: inp=input(">>>>") sk.sendall(bytes(inp,"utf8")) data=sk.recv(1024) print(str(data,'utf8'))
此处的Socket服务端相比与原生的Socket,他支持当某一个请求再也不发送数据时,服务器端不会等待而是能够去处理其余请求的数据。可是,若是每一个请求的耗时比较长时,select版本的服务器端也没法完成同时操做。并且select,实现的是一个伪并发。
selectors |
selectors经过单线程实现并发,示例:
import selectors import socket sel = selectors.DefaultSelector() def accept(sock, mask): conn, addr = sock.accept() # 接收连接 print('accepted', conn, 'from', addr) conn.setblocking(False)#设置为非阻塞 sel.register(conn, selectors.EVENT_READ, read)#注册连接 def read(conn, mask): data = conn.recv(1000) # 接受消息 if data: print('echoing', repr(data), 'to', conn) conn.send(data) # 返回消息 else: print('closing', conn) sel.unregister(conn) conn.close() sock = socket.socket() sock.bind(('localhost', 1234)) sock.listen(100) sock.setblocking(False) sel.register(sock, selectors.EVENT_READ, accept) while True: events = sel.select() for key, mask in events: callback = key.data callback(key.fileobj, mask)
selectors实现简单FTP文件上传下载(多用户同时上传下载):
import os,socket,selectors BASE_DIR = os.path.dirname(os.path.abspath(__file__)) class FtpServer: def __init__(self): self.dic = {} # 建立状态字典 self.sel = selectors.DefaultSelector() #建立selectors对象 self.sock() self.main() def sock(self): '''建立socket对象''' s = socket.socket() s.bind(('127.0.0.1',8090)) s.listen(10) s.setblocking(False) self.sel.register(s, selectors.EVENT_READ, self.accept) print('服务端已开启') def main(self): '''监听主函数''' while True: events = self.sel.select() for key, mask in events: callback = key.data callback(key.fileobj, mask) def accept(self,sock, mask): '''接收函数''' conn, addr = sock.accept() conn.setblocking(False) self.sel.register(conn, selectors.EVENT_READ, self.read) self.dic[conn] = {} def read(self, conn, mask): try: if not self.dic[conn] : data = conn.recv(1024).decode() cmd,filename,filesize = data.split() self.dic[conn]={'cmd': cmd, 'filename': filename,'filesize': int(filesize)} if cmd == 'put': conn.send('100'.encode()) if cmd == 'get': file = os.path.join(BASE_DIR,'download',filename) if os.path.exists(file): fileSize = os.path.getsize(file) info = '%s %s'%('200',fileSize) #文件存在 conn.send(info.encode()) else: info = '%s %s'%('201',0) #文件不存在 conn.send(info.encode()) else: if self.dic[conn].get('cmd',None): cmd=self.dic[conn].get('cmd') if hasattr(self, cmd): func = getattr(self,cmd) func(conn) else: print('指令错误') conn.close() else: print('指令错误') conn.close() except Exception as e: print(e) self.sel.unregister(conn) conn.close() def put(self, conn): '''上传''' self.have_rec = 0 fileName = self.dic[conn]['filename'] fileSize = self.dic[conn]['filesize'] file = os.path.join(BASE_DIR,'upload',fileName) recv_data = conn.recv(1024) self.have_rec += len(recv_data) with open(file, 'ab') as f: f.write(recv_data) if fileSize == self.have_rec: if conn in self.dic.keys(): self.dic[conn] = {} #置空字典 def get(self,conn): '''下载''' filename = self.dic[conn]['filename'] path = os.path.join(BASE_DIR,'download',filename) if conn.recv(1024).decode() == '300': with open(path, 'rb') as f: for line in f: conn.send(line) self.dic[conn] = {} #置空字典 if __name__ == '__main__': FtpServer()
import os,sys,socket,time BASE_DIR = os.path.dirname(os.path.abspath(__file__)) class FtpClient: def __init__(self): self.port=('127.0.0.1',8090) self.sock() self.interact() def sock(self): '''建立socket对象函数''' try: self.sk = socket.socket() self.sk.connect(self.port) print('链接FTP服务器成功!') except Exception as e: print(e) def interact(self): '''客户端与服务端交互函数''' while True: cmd = input('>>>').strip() if cmd == 'q': break cmd,file = cmd.split() if hasattr(self, cmd): func = getattr(self, cmd) func(cmd,file) else: print('输入命令错误!') def put(self,cmd,file): if os.path.isfile(file): fileName= os.path.basename(file) fileSize = os.path.getsize(file) fileInfo ='%s %s %s'%(cmd,fileName,fileSize) self.sk.send(fileInfo.encode()) recv_mes = self.sk.recv(1024).decode() # print('recvMes', recv_mes) have_send = 0 if recv_mes == '100': with open(file, 'rb') as f: while fileSize > have_send : data = f.read(1024) self.sk.send(data) have_send += len(data) self.show_process(have_send, fileSize) sys.stdout.write('\n') print('%s文件上传成功!' % fileName) else: print('文件不存在') def get(self, cmd,file): info = '%s %s %s'%(cmd,file,'0') self.sk.send(info.encode()) fileInfo = self.sk.recv(1024).decode() fileMes, fileSize = fileInfo.split() fileSize=int(fileSize) if fileMes == '200': #文件存在 self.sk.send('300'.encode()) path = os.path.join(BASE_DIR,file) have_recv = 0 with open(path, 'wb') as f: while have_recv < int(fileSize): data = self.sk.recv(1024) have_recv += len(data) f.write(data) self.show_process(have_recv, fileSize) sys.stdout.write('\n') print('%s下载完成!' % file) else: print("文件不存在!") def show_process(self,have_send,file_size): '''显示进度条''' k = int((have_send / file_size * 100)) space = (100 - k) * ' ' flag = k * '>' sys.stdout.write('\r|%s| [%s%%]' % ((flag + space), k)) sys.stdout.flush() time.sleep(0.2) if __name__ == '__main__': FtpClient()
socketserver |
ThreadingTCPServer实现的Soket服务器内部会为每一个client建立一个 “线程”,该线程用来和客户端进行交互。首先来看一下继承关系图
socketserver搭建:
import SocketServer class MyServer(SocketServer.BaseRequestHandler): def handle(self): pass if __name__ == '__main__': server = SocketServer.ThreadingTCPServer(('127.0.0.1',8090), MyServer) server.serve_forever()
上述代码的内部调用流程为:
实战:socketserver搭建实现FTP(实现用户注册登陆、断点续传、简单命令)
部分主要代码:
import os import pickle,configparser,time import subprocess import socketserver from socket import * from FtpServer.conf import settings class MyServer(socketserver.BaseRequestHandler): # def __init__(self): # pass def handle(self): while True: try: data = self.request.recv(1024).decode() if '|' in data: cmd,argv= data.split('|') else: cmd = data argv = None self.process(cmd, argv) # process处理接收的命令 except Exception as e: print(e) break def post(self, argv=None): argv = eval(argv) file_info = pickle.loads(argv) # 获取客户端传来的消息 file_name = file_info['file_name'] file_size = int(file_info['file_size']) file_path = os.path.join(settings.USER_HOME, self.user, 'upload', file_name) have_down = 0 # 已经上传的位置 if os.path.isfile(file_path): self.request.sendall('exist'.encode()) ret = self.request.recv(1024).decode() if ret == 'Y': # 续传 have_send = os.stat(file_path).st_size # 获取已经上传文件的大小 self.request.sendall(str(have_send).encode()) if have_send ==file_size: return else: f = open(file_path, 'ab') # 续传以a模式打开文件, else: f = open(file_path, 'wb') # 不续传以w模式打开, else: self.request.sendall('N'.encode()) # 直接上传 f = open(file_path, 'wb') while True: if have_down == file_size: # 一旦接受到的内容等于文件大小,直接退出循环 break try: ret = self.request.recv(1024) except Exception as e: break f.write(ret) have_down += len(ret) def process(self, cmd, argv=None): '''使用反射处理客户端传过来的命令''' if hasattr(self, cmd): func = getattr(self, cmd) func(argv) else: if cmd.startswith('cd'): # 处理cd命令 argv = cmd.split(' ')[1] if argv =='..': self.request.sendall((os.path.join(settings.USER_HOME)).encode()) os.chdir(os.path.join(settings.USER_HOME)) return else: self.request.sendall(argv.encode()) os.chdir(argv) return elif cmd.startswith('ls'): #处理ls命令 i = pickle.dumps(os.listdir()) self.request.sendall(i) return elif cmd.startswith('login'): user,pwd = argv.split(':') if self.checklogin(user,pwd): self.user =user return elif cmd.startswith('register'): user,pwd = argv.split(':') if self.checkregister(user, pwd): self.request.sendall('ok'.encode()) else: try: data = subprocess.getoutput(cmd) # subprocess处理其余命令 self.request.sendall(data.encode()) return except Exception as e: print(e) def checkregister(self,username,password): '''校验注册信息''' while True: config = configparser.ConfigParser() config.read(settings.USER_INFO) if username in config.sections(): self.request.sendall('exist'.encode) else: config.add_section(username) config.set(username,'password',password) config.set(username, 'space_size', '5000000') config.set(username, 'use_size', '0') config.write(open(settings.USER_INFO,'w')) os.mkdir(os.path.join(settings.USER_HOME, username)) os.mkdir(os.path.join(settings.USER_HOME, username,'download')) os.mkdir(os.path.join(settings.USER_HOME, username, 'upload')) return 1 def checklogin(self,username,password): '''校验登陆数据是否一致''' while True: config = configparser.ConfigParser() config.read(settings.USER_INFO) if username in config.sections(): pwd = config[username]['password'] if pwd == password: self.request.sendall('100'.encode()) print(username,'客户端验证经过!') self.request.sendall((os.path.join(settings.USER_HOME,username)).encode()) #家目录路径返回给客户端 os.chdir(os.path.join(settings.USER_HOME,username)) #cd到客户家目录下 return 1 else: self.request.sendall('101'.encode()) break else: self.request.sendall('101'.encode()) def get(self,argv =None): '''接收客户端发来的数据''' file_name = argv file_path = os.path.join(settings.USER_HOME, self.user, 'download', file_name) if os.path.exists(file_path): file_size = os.stat(file_path).st_size self.request.sendall('ok'.encode()) file_info = { 'file_name': argv, 'file_size': file_size, } info = pickle.dumps(file_info) self.request.sendall(info) ret = self.request.recv(1024).decode() have_down = 0 if ret == 'exist': # 返回值 # res = (self.request.recv(1024)).encode() have_down = int((self.request.recv(1024)).decode()) # 接收服务端已下载文件大小 if have_down ==file_size: return with open(file_path, 'rb') as f: f.seek(have_down) while True: if have_down ==file_size: break else: data =f.read(1024) self.request.sendall(data) have_down+=len(data) return else: self.request.sendall('no'.encode()) return @classmethod def start(cls): '''启动服务器''' print('\33[32;1m服务器已启动!\33[1m') server = socketserver.ThreadingTCPServer((settings.IP, settings.PORT), MyServer) server.serve_forever() if __name__=='__main__': MyServer.start()
import sys,os,hashlib,pickle,time from socket import * from FtpClient.conf import settings class MyClient: def __init__(self): self.addr = (settings.IP,settings.PORT) self.start() self.c_d = '' #用户当前路径 def post(self,argv = None): #传入文件名称 '''上传文件''' if len(argv) ==0: print('请输入上传文件名称!') return print('上传前请确保文件在用户的upload文件夹下!') file_path = os.path.join(settings.USER_HOME,self.user,'upload',argv) if os.path.exists(file_path): #判断文件是否存在 file_size = os.stat(file_path).st_size file_info = { 'file_name': argv, 'file_size': file_size, } info = pickle.dumps(file_info) self.socket.sendall(('post|%s'%info).encode()) # 将上传的文件信息做为参数发给服务端 ret = self.socket.recv(1024).decode() have_send = 0 if ret =='exist': choice = input('文件已存在,是否续传?(Y/N)').strip() if choice.upper() =='Y': self.socket.sendall('Y'.encode()) current_size= int(self.socket.recv(1024).decode()) #接收服务端已存文件大小 if current_size ==file_size: print('文件完整不需从新上传') return else: have_send =current_size else: self.socket.sendall('N'.encode()) with open(file_path,'rb') as f: f.seek(have_send) for line in f: self.socket.sendall(line) have_send += len(line) self.show_process(have_send,file_size) sys.stdout.write('\n') def show_process(self,have_send,file_size): '''显示进度条''' k = int((have_send / file_size * 100)) space = (100 - k) * ' ' flag = k * '>' sys.stdout.write('\r|%s| [%s%%]' % ((flag + space), k)) sys.stdout.flush() time.sleep(0.2) def get(self,argv =None): '''下载文件''' if len(argv) == 0: print('请输入下载文件名称!') return print('下载前请确保服务端download下有该文件!') self.socket.sendall(('get|%s' % argv).encode()) # 将下载的文件名做为参数发给服务端 ret = self.socket.recv(1024).decode() have_down = 0 if ret=='ok': #服务端存在文件,可下载 file_info = pickle.loads(self.socket.recv(1024)) # 获取服务端传来的消息 file_name = file_info['file_name'] file_size = int(file_info['file_size']) file_path = os.path.join(settings.USER_HOME, self.user, 'download', file_name) if os.path.exists(file_path): # 客户端存在文件 self.socket.sendall('exist'.encode()) choice = input('文件已存在,是否继续下载?(Y/N)').strip() if choice.upper() == 'Y': have_down = os.stat(file_path).st_size # 获取已经下载文件的大小 self.socket.sendall((str(have_down).encode())) if have_down ==file_size: print('文件完整不需从新下载!') return f = open(file_path, 'ab') # 续传以a模式打开文件 else: self.socket.sendall((str(0).encode())) f = open(file_path, 'wb') # 不续传以w模式打开 else: self.socket.sendall('reload'.encode()) # 从新下载 f = open(file_path, 'wb') while True: if have_down == file_size: break try: ret = self.socket.recv(1024) except Exception as e: break f.write(ret) have_down += len(ret) self.show_process(have_down, file_size) sys.stdout.write('\n') else: print('所下载文件不存在') return def register(self): '''注册用户''' while True: user = input('请输入用户名:').strip() if len(user) ==0:continue password = input('请输入密码:').strip() if len(password) ==0:continue pd =hashlib.md5() pd.update(password.encode()) pwd = pd.hexdigest() #加密后的信息 # pwd = password self.socket.sendall(('register|%s:%s'%(user, pwd)).encode())# 发送加密后的帐户信息 ret = self.socket.recv(1024).decode() if ret =='ok': print('注册成功,请登陆!') os.mkdir(os.path.join(settings.USER_HOME, user)) os.mkdir(os.path.join(settings.USER_HOME, user,'download')) os.mkdir(os.path.join(settings.USER_HOME, user, 'upload')) return 1 else: print('注册用户名已存在!') def login(self): '''客户端用户登陆''' try_times = 0 while try_times < 3: user = input('请输入用户名:') self.user = user if len(user) == 0: continue password = input('请输入用密码:') if len(password) == 0: continue pd =hashlib.md5() pd.update(password.encode()) pwd = pd.hexdigest() #加密后的信息 # pwd = password self.socket.sendall('login|{}:{}'.format(user, pwd).encode()) # 发送加密后的帐户信息 ret = self.socket.recv(1024).decode() if ret == '100': print('登录成功!') self.c_d = self.socket.recv(1024).decode() return 1 else: print('用户或密码错误!') try_times += 1 sys.exit('尝试太屡次!') def interact(self): '''客户端与服务端交互''' print('服务器链接成功!') while True: choice = input("[%s]:"%self.c_d).strip() if len(choice) == 0:continue if '|' in choice: cmd,argv = choice.split('|') else: cmd = choice argv = None if hasattr(self,'%s'%cmd): func = getattr(self,'%s'%cmd) func(argv) elif cmd.startswith('cd'): self.socket.sendall(choice.encode()) self.c_d = self.socket.recv(1024).decode() continue elif cmd.startswith('ls'): self.socket.sendall(choice.encode()) ret = pickle.loads(self.socket.recv(1024)) for i in ret: print(i) continue else: self.socket.sendall(choice.encode()) ret = self.socket.recv(1024) print(ret.decode()) def start(self): '''启动主函数''' self.socket = socket(AF_INET,SOCK_STREAM) try: self.socket.connect(self.addr) except Exception as e: sys.exit("链接服务端错误:%s" % e) while True: msg = '''1.注册FTP\n2.登陆FTP\n3.退出''' print('\33[32;1m欢迎登陆MyFtpClient!\33[1m') print(msg) choice = input('>>>:').strip() if choice =='1': if self.register(): if self.login(): self.interact() elif choice =='2': if self.login(): self.interact() elif choice =='3': sys.exit() else:print('选项错误,请从新输入!') if __name__=='__main__': client = MyClient() # client.interact()