python下高性能的网络模式: 多进程+多协程

     人生苦短,我用python。众所周知,python因为全局锁的存在,使用多线程模型并不能提升程序性能。使用协程是较好的选择。但这仍不能发挥机器多cpu的性能。因而乎,有了下文的 多进程+多协程的网络模型。大致思路是使用gevent的StreamServer监听一个端口。而后使用multiprocessing.Process启动多进程。当有客户端链接到来时,会随机分配到某个进程的ClientHandler实例中去处理,只须要继承自该实例,处理你的业务便可。若是没有重载该类的函数,默认会把全部的新链接到来,数据到达,链接关闭事件经过multiprocessing.Queue传递给主进程(即开始启动监听端口的进程),而后由主程从队列中取出事件处理便可。须要特别说明的是window不支持该模型,为了方便在windows下调试业务代码,windos下实现为普通的单程gevent多协程模式。python

具体使用例子看代码下面__main__部份,用python tcpserver.py启动便可测试。windows

------------------------------------------------------------------代码下如-------------------------------------------------------------------------------网络

import gevent
from gevent import monkey
monkey.patch_all(thread=False)
from gevent.server import StreamServer
import gevent.queue
import multiprocessing
import os多线程

EVENT_CONNECT = 0
EVENT_RECV = 1
EVENT_SEND = 2
EVENT_CLOSE = 3
EVENT_STOP  = 4app


class ClientHandler(object):
    def __init__(self,socket, address,clientprocess):
        self.socket = socket
        self.address = address
        self._sock_id = 0
        self._is_close = False
        self._buffer = ""
        self._clientprocess = clientprocess
        self._send_queues = gevent.queue.Queue() 
        self._send_spawn = gevent.spawn(self._run_send)socket

    def set_socket_id(self,socket_id):
        self._sock_id = socket_idtcp

    def on_connect(self):
        self._clientprocess.on_connect(self._sock_id,self.address)
        return True函数

    def on_recv(self,data):
        self._clientprocess.on_recv(self._sock_id ,data)
        return Trueoop

    def on_close(self):
        self._clientprocess.on_close(self._sock_id)性能

    def send(self,data):
        self._send_queues.put((EVENT_SEND,data))

    def _run_send(self):
        combine_message = b""
        while self._is_close == False:
            msg = self._send_queues.get()
            if msg is None:
                continue
            (event,data) = msg
            if event == EVENT_SEND:
                try:
                    combine_message += data
                except:
                    pass
                if self._send_queues.qsize() > 0:
                    continue
                try:
                    self.socket.sendall(combine_message)
                    combine_message = b""
                except:
                    if self._is_close == False:
                       self._clientprocess.on_close(self._sock_id)
                    break
            elif event == EVENT_STOP:
                if combine_message != b"":
                    self.socket.sendall(combine_message)
                break
            else:
                break

    def close(self):
        if self._is_close:
            return
        self._is_close = True
        self._send_queues.put((EVENT_STOP,0))
        self.socket.close()
        gevent.joinall([self._send_spawn])


class ClientProcess:
    def __init__(self,server,index,queue_cls,hanlder_cls):                   
          self._index = index                 
          self._send_queues = queue_cls()
          self._hanlder_cls = hanlder_cls
          self.server = server
          self._is_stop = False
          
    def run(self):
        self._clients = {}                
        self._send_spawn = gevent.spawn(self._run_send)

    def stop(self):
        self.put((EVENT_STOP,0,0))

    def get(self,block=True,outtime=None):
        return self._send_queues.get(block,outtime)
    
    def put(self,data,block=True,outtime=None):
        self._send_queues.put(data,block,outtime)

    def on_new_connect(self,socket, address):
        client = self._hanlder_cls(socket, address,self)
        sock_id = id(client)*100+self._index
        self._clients[sock_id] = client       
        client.set_socket_id(sock_id)        
        if client.on_connect() == False:
          return

        while True:
            try:
               data = socket.recv(1024)            
               if data is not None and len(data)>0 and client.on_recv(data):                
                  continue
               else: 
                  client.on_close()               
                  break
            except:
                client.on_close()
                break


    def on_connect(self,sock_id,address):
        self.server.put((EVENT_CONNECT,sock_id,address))


    def on_close(self,sock_id):
        self.server.put((EVENT_CLOSE,sock_id,0))

    def on_recv(self,sock_id,data):
        self.server.put((EVENT_RECV,sock_id,data))

    def send(self,sock_id,data):
        try:
           self.put((EVENT_SEND,sock_id,data),True,0.001)
        except:
            print("send put timeout sock_id:%d"%(sock_id)) 
            return False
        return True

    def close_socket(self,sock_id):
        self.put((EVENT_CLOSE,sock_id,0))

    def __send__(self,sock_id,data):
        client = self._clients.get(sock_id,None)
        if client is not None:
           client.send(data)

    def __close_socket__(self,sock_id):
        client = self._clients.get(sock_id,None)
        if client is not None:
           client.close()
           self._clients.pop(sock_id)

    def _run_send(self):        
        msg = None
        print("_run_send pid:%d" %(os.getpid()))
        while True:
            try:
               msg = self.get(False)
            except :
                gevent.sleep(0.01)
                continue

            if msg is None: 
                continue
            (event,socket_id,data) = msg
            if event == EVENT_SEND:
                self.__send__(socket_id,data)
            elif event == EVENT_CLOSE:
                if socket_id != 0:
                   self.__close_socket__(socket_id)
            elif event == EVENT_STOP:
                for sock_id,client in self._clients.items():
                       self.__close_socket__(sock_id)
                break


class TcpServer:

    def __init__(self):
        self._server = StreamServer(('0.0.0.0',0), self.on_new_connect,backlog=100000)
        self._server.reuse_addr = 1
        self._recv_queues = None
        self._clent_process = []
        self._process_num = 0

    def start(self,ip,port,process_count = 1,queue_cls = gevent.queue.Queue,hanlder_cls = ClientHandler):
        self._recv_queues = queue_cls()
        self._process_num = process_count        
        self._server.set_listener((ip,port))        
        self._server.start()
        for i in range(process_count):
           self._clent_process.append(ClientProcess(self,i,queue_cls,hanlder_cls))
        self.event_loop()

    def event_loop(self):
        self.serve_forever(0)
    
    def get_recv_queue(self):
        return self._recv_queues

    def serve_forever(self,i):
        print("server:%d,pid:%d,index = %d" %(id(self),os.getpid(),i))
        self._index = i
        self._clent_process[i].run()
        self._send_spawn = gevent.spawn(self._server.serve_forever)
        

    def stop_server(self):        
        for client in self._clent_process:
            client.stop()
        self._server.close()

    def send(self,sock_id,data):
        index = sock_id %100
        if index >= self._process_num:
            return False
        client = self._clent_process[index]
        if client is not None:
           client.send(sock_id,data)
        return True

    def close_socket(self,sock_id):
        index = sock_id %100
        if index >= self._process_num:
            return False
        client = self._clent_process[index]
        if client is not None:
           client.close_socket(sock_id)
        return True

    def get(self,block=True,outtime=None):
        return self._recv_queues.get(block,outtime)
    
    def put(self,data,block=True,outtime=None):
        self._recv_queues.put(data,block,outtime)

    def on_new_connect(self,sock, address):         
        print("on_new_connect server:%d,pid:%d" %(id(self),os.getpid()))
        self._clent_process[self._index].on_new_connect(sock,address) 

class MultiprocessingTcpServer(TcpServer):

    def start(self,ip,port,process_count = 1,queue_cls =multiprocessing.Queue,hanlder_cls = ClientHandler):
        self._hanlder_cls = hanlder_cls
        self._queue_cls = queue_cls
        TcpServer.start(self,ip,port,process_count,queue_cls,hanlder_cls)

    def event_loop(self):
        for i in range(self._process_num):
           multiprocessing.Process(target=self.serve_forever, args=(i,)).start()

    def serve_forever(self,i):
        TcpServer.serve_forever(self,i)
        gevent.joinall([self._send_spawn])

    def add_process(self):
        index = len(self._clent_process)
        process = ClientProcess(self,index,self._queue_cls,self._hanlder_cls)
        self._clent_process.append(process)
        self._process_num += 1
        multiprocessing.Process(target=self.serve_forever, args=(index,)).start()
        

if __name__ == "__main__":        
    import sys
    process_num = 2    
    server = None
    if sys.platform == 'win32':
       server = TcpServer()  #windows下为单程进程的gevent
    else:
       server = MultiprocessingTcpServer()   #其余平台是多进程+多协程

    server.start('0.0.0.0',8089,process_num)    #监听端口     msg = None     while True:         try:             msg = server.get(True)     #获取事件         except KeyboardInterrupt:             break         except :                 gevent.sleep(0.1)                 continue         if msg is not None:             (event,socket_id,data) = msg             if event == EVENT_RECV:   #有数据到来                server.send(socket_id,data)   #数据原样返回                print("socket_id:%d ,recv data:%d" %(socket_id,len(data)))             elif event == EVENT_CONNECT:   #新链接到来                 print("new connect socket_id:%d,address:%s" %(socket_id,data))             else:    #链接断开                server.close_socket(socket_id)                print("socket_id:%d close" %(socket_id))     server.stop_server()

相关文章
相关标签/搜索