4.6 并发编程/IO模型

 并发编程/IO模型

背景概念

IO模型概念

IO模型分类

阻塞IO  (blocking IO)

特色: linux

    两个阶段(等待数据和拷贝数据两个阶段)都被block编程

设置服务器

server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)

解决方案: 多线程

    启用多线程或者多进程,要阻塞只阻塞当前线程/进程,不会影响其余进程/线程并发

不良影响:app

    当遇到过多得连接请求时会严重占用资源,下降响应效率异步

修复不良影响:socket

    启用进程池/线程池 ,下降进程/线程数量async

 仍旧未解决的不良影响:ide

    池得数量很差规范,请求数量和池大小很差对应,且池存在数量上限

    只是必定程度上限制了不良影响,没法根本解决

  总结 :

    基于池得建立能够解决小规模得服务请求带来的压力,对于大规模仍是无力回天

非阻塞IO  (nonblocking IO)

  特色: 

    基于IO阻塞模型相似,再本应阻塞得地方不在阻塞,若是未收到想要数据会返回一个 error 给用户告知无数据

    发送 error 后会进行其余得任务继续操做背后会一直对 kernel 进行轮询发送数据请求,

    这期间若是数据到了就会从新正常操做,而在轮询期间,无数据得进程会一直处于阻塞状态

    就结果而言。彻底得实现了并发,以及解决了IO阻塞带来得效率低下的问题

设置

server.setblocking()    #默认是True  
server.setblocking(False)     #False的话就成非阻塞了,这只是对于socket套接字来讲的 

不良影响:

    1. 虽然说解决了单线程并发,可是大大的占用了cpu

    2.  任务完成的响应延迟增大,任务可能在两次轮询之间的任意时间完成。这会致使总体数据吞吐量的下降

  总结 :

    彻底不推荐

多路复用IO  (IO multiplexing)

  特色: 

    当用户进程调用了select,那么整个进程会被block

    而同时,kernel会“监视”全部select负责的socket,当任何一个socket中的数据准备好了,select就会返回。

    这个时候用户进程再调用read操做,将数据从kernel拷贝到用户进程。

    这个图和blocking IO的图其实并无太大的不一样,事实上还更差一些。

    由于这里须要使用两个系统调用(select和recvfrom),而blocking IO只调用了一个系统调用(recvfrom)。可是,用select的优点在于它能够同时处理多个connection。

设置

使用 select 模块 或者 eppol (只能用于 linux 中)
最优的选择方案是用 selectors

    selectors 实例 

#服务端
from socket import *
import selectors

sel=selectors.DefaultSelector()
def accept(server_fileobj,mask):
    conn,addr=server_fileobj.accept()
    sel.register(conn,selectors.EVENT_READ,read)

def read(conn,mask):
    try:
        data=conn.recv(1024)
        if not data:
            print('closing',conn)
            sel.unregister(conn)
            conn.close()
            return
        conn.send(data.upper()+b'_SB')
    except Exception:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()



server_fileobj=socket(AF_INET,SOCK_STREAM)
server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server_fileobj.bind(('127.0.0.1',8088))
server_fileobj.listen(5)
server_fileobj.setblocking(False) #设置socket的接口为非阻塞
sel.register(server_fileobj,selectors.EVENT_READ,accept) #至关于网select的读列表里append了一个文件句柄server_fileobj,而且绑定了一个回调函数accept

while True:
    events=sel.select() #检测全部的fileobj,是否有完成wait data的
    for sel_obj,mask in events:
        callback=sel_obj.data #callback=accpet
        callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)

#客户端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8088))

while True:
    msg=input('>>: ')
    if not msg:continue
    c.send(msg.encode('utf-8'))
    data=c.recv(1024)
    print(data.decode('utf-8'))
View Code

不良影响:

    1. 虽然说解决了单线程并发,可是大大的占用了cpu

    2.  任务完成的响应延迟增大,任务可能在两次轮询之间的任意时间完成。这会致使总体数据吞吐量的下降

  总结 :

    select的优点在于能够处理多个链接,不适用于单个链接

异步IO(asynchronous IO)

  用户进程发起read操做以后,马上就能够开始去作其它的事。

  kernel 受到一个asynchronous read以后,首先它会马上返回,因此不会对用户进程产生任何block。

  而后,kernel会等待数据准备完成,而后将数据拷贝到用户内存,

  当这一切都完成以后,kernel会给用户进程发送一个signal,告诉它read操做完成了。

socketserver 模块

 包含类

  BaseServer是基类,它不能实例化使用,

  TCPServer使用TCP协议通讯,

  UDPServer使用UDP协议通讯,

  UnixStreamServer和UnixDatagramServer使用Unix域套接字,只适用于UNIX平台。

  ForkingMixIn 多进程异步

  ThreadingMixIn 多线程异步 

如何建立一个socketserver :

  1. 建立一个请求处理的类,继承 BaseRequestHandlerclass ,要重写父类里 handle() 方法;

  2. 你必须实例化 TCPServer,而且传递server IP和你上面建立的请求处理类,给这个TCPServer;

  3. server.handle_requese()  只处理一个请求,server.server_forever() 处理多个一个请求,永远执行

  4. 关闭链接 server_close()

import socketserver

class MyTCPHandler(socketserver.BaseRequestHandler): #服务类,监听绑定等等
  
 
    def handle(self):  #请求处理类,全部请求的交互都是在handle里执行的
        # self.request is the TCP socket connected to the client
        self.data = self.request.recv(1024).strip()  #每个请求都会实例化MyTCPHandler(socketserver.BaseRequestHandler):
        print("{} wrote:".format(self.client_address[0]))
        print(self.data)
        self.request.sendall(self.data.upper())  #sendall是重复调用send.

if __name__ == "__main__":
    HOST, PORT = "localhost", 9999

    server = socketserver.TCPServer((HOST, PORT), MyTCPHandler)

    # Activate the server; this will keep running until you
    # interrupt the program with Ctrl-C
    server.serve_forever()
 
 server = socketserver.ThreadingTCPServer((HOST, PORT), MyTCPHandler)   #线程
# server  = socketserver.ForkingTCPServer((HOST, PORT), MyTCPHandler) #多进程 linux适用
# server = socketserver.TCPServer((HOST, PORT), MyTCPHandler) 单进程

ThreadingTCPServer

  服务器内部会为每一个client建立一个 “线程”,该线程用来和客户端进行交互。

import SocketServer

class MyServer(SocketServer.BaseRequestHandler):

    def handle(self):
        pass

if __name__ == '__main__':
    server = SocketServer.ThreadingTCPServer(('127.0.0.1',8766), MyServer)
    server.serve_forever()

ForkingTCPServer

  用法相似 ThreadingTCPServer 只是将线程换成了进程 

import SocketServer

class MyServer(SocketServer.BaseRequestHandler):

    def handle(self):
        pass

if __name__ == '__main__':
    server = SocketServer.ForkingTCPServer(('127.0.0.1',8009),MyServer)
    server.serve_forever()

 总结:

socketserver 完美的解决了以前的各类困扰。不管是并发问题仍是线程池大小以及IO问题都被解决。

  所以socketserver成为了最终解决方案

相关文章
相关标签/搜索