IO多路复用实现机制python
Win: selectlinux
Linux:select(效率低) poll epoll(最好)默认选择epoll安全
select缺点:dom
1 每次调用select都要讲全部的fd(文件描述符)拷贝到内核空间致使效率降低;socket
2 遍历全部fd,是否有数据访问(最重要的问题);函数
3 最大链接数(1024)。spa
Poll缺点:线程
1 最大链接数没有限制。code
epoll:对象
经过三个函数实现:
1 第一个函数:建立epoll句柄:将全部的fd(文件描述符)拷贝到内核空间,可是只需拷贝一次;
2 回调函数:某一个函数或者某一个动做成功完成以后,会触发的函数;
为全部的fd绑定一个回调函数,一旦有数据访问,触发该回调函数,回调函数将fd放到列表中。
3 第三个函数 判断列表是否为空
最大链接数没有上限
selectors:
#服务端 import selectors import socket sock=socket.socket() sock.bind(("127.0.0.1",8000)) sock.listen(5) sock.setblocking(False) sel=selectors.DefaultSelector() ##根据具体平台选择最佳IO多路机制,好比在linux,选择epoll def read(conn,mask): try: data=conn.recv(1024) print(data.decode("utf8")) msg=input(">>") conn.send(msg.encode("utf8")) except Exception: sel.unregister(conn) def accept(sock,mask): conn,addr=sock.accept() sel.register(conn,selectors.EVENT_READ,read) sel.register(sock,selectors.EVENT_READ,accept) #注册事件 while True: print("waiting......") events=sel.select() #监听 [(key,mask)] for key,mask in events: print(key.data) print(key.fileobj) func=key.data #拿到的是函数 obj=key.fileobj #拿到的是sock对象文件描述符(fd) func(obj,mask) #客户端 import socket sock=socket.socket() sock.connect(("127.0.0.1",8000)) while True: msg=input('>>') sock.send(msg.encode('utf8')) data=sock.recv(1024) print(data.decode('utf8'))
queue特色(线程是安全),也是数据类型
''' 建立一个“队列”对象 import Queue q = Queue.Queue(maxsize = 10) Queue.Queue类便是一个队列的同步实现。队列长度可为无限或者有限。可经过Queue的构造函数的可选参数 maxsize来设定队列长度。若是maxsize小于1就表示队列长度无限。 将一个值放入队列中 q.put(10) 调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值; 第二个block为可选参数,默认为 1。若是队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。若是block为0, put方法将引起Full异常。 将一个值从队列中取出 q.get() 调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。若是队列为空且 block为True,get()就使调用线程暂停,直至有项目可用。若是队列为空且block为False,队列将引起Empty异常。 '''
import queue q=queue.PriorityQueue() q.put(111) q.put(222) q.put(333) print(q.get()) print(q.get()) print(q.get()) 执行结果: 111 222 333
import queue q=queue.LifoQueue() q.put(111) q.put(222) q.put(333) print(q.get()) 执行结果: 333
import queue q=queue.PriorityQueue() q.put([4,"hello3"]) q.put([1,"hello"]) q.put([3,"hello1"]) while True: data=q.get() print(data) 执行结果: [1, 'hello'] [3, 'hello1'] [4, 'hello3']
join()阻塞进程,直到全部任务完成,须要配合另外一个task_done。
task_done()表示某个任务完成。每一条get语句后须要一条task_done。
import queue q=queue.Queue(5) q.put(111) q.put(222) print(q.get()) q.task_done() print(q.get()) q.task_done() q.join() print("ending") 执行结果: 111 222 ending
生产者消费者模式是经过一个容器来解决生产者和消费者的强藕和问题,生产者和消费者彼此之间不直接通信,而经过阻塞队列来进行通信,因此生产者生产完数据以后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就至关于一个缓冲区,平衡了生产者和消费者的处理能力。
import time,random import queue,threading q = queue.Queue() def Producer(name): count = 0 while count <10: print("making........") time.sleep(random.randrange(3)) q.put(count) print('Producer %s has produced %s baozi..' %(name, count)) count +=1 #q.task_done() #q.join() print("ok......") def Consumer(name): count = 0 while count <10: time.sleep(random.randrange(4)) if not q.empty(): data = q.get() #q.task_done() #q.join() print(data) print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data)) else: print("-----no baozi anymore----") count +=1 p1 = threading.Thread(target=Producer, args=('A',)) c1 = threading.Thread(target=Consumer, args=('B',)) # c2 = threading.Thread(target=Consumer, args=('C',)) # c3 = threading.Thread(target=Consumer, args=('D',)) p1.start() c1.start() # c2.start() # c3.start()