本节内容 :html
1.Socket实现多链接处理linux
根据上一篇文章多socket的基本介绍,咱们大体能明白怎么去创建一个客户端和服务器通信,可是你会发现,若是客户端断开,服务器也会跟着短了,若是咱们想要客户端断开,服务器还能够为下一个客户端服务,该如何实现呢?缓存
1 conn,addr = server.accept() #接收并创建与客户端的连接,一直阻塞直到客户端连进来
咱们想要实现多链接处理,上面这句代码很关键。只要咱们同一个客户端断开链接后,它还能够循环监听,那么多链接就实现了。服务器
1 import socket 2 3 server = socket.socket() #获取socket实例 4 5 server.bind(('localhost',9999)) #绑定 ip + port 6 7 server.listen() # 开始监听 8 9 while True: #第一层循环用来持续接收新连接 10 print('waiting for new conn...') 11 conn,addr = server.accept() #等待新连接 12 print('new conn:',addr) 13 while True: #第二层循环用来保持和连到的连接持续通信 14 data = conn.recv(1024) 15 if not data: 16 print('client is off...') 17 break #断开已创建的通信,回到等新连接的循环 18 print('recv data:',data) 19 conn.send(data.upper()) 20 server.close()
注意了, 此时服务器端依然只能同时为一个客户服务,其客户来了,得排队(链接挂起),要想同时处理多个连接,请继续日后看。网络
2. Socket实现简单的ssh多线程
出了能够发送一些简单消息,咱们还能够干点更高级的,好比来作一个简单的ssh(不明白的能够百度这是干吗的!),它就是经过客户端连上服务器后,让服务器执行命令,并把结果返回给客户端。并发
1 import os 2 import socket 3 4 server = socket.socket() 5 server.bind(('localhost',7777)) 6 server.listen() 7 8 while True: 9 print("等待接收。。") 10 conn,addr = server.accept() 11 print("接收到客户端:",addr) 12 while True: 13 cmd = conn.recv(1024).decode() #接收客户端发送的命令 14 data = os.popen(cmd).read() #用poen内置函数执行指令,并读取结果 15 print(data) 16 if not data: #若是没有内容,说明没有此指令 17 data = "mei you ci ming ling" 18 conn.send(data.encode("utf-8")) #将执行的结果返回给客户端 19 break 20 21 server.close()
1 import socket 2 3 client = socket.socket() 4 client.connect(('localhost',7777)) 5 while True: 6 7 cmd = input(">>>") #输入指令 8 if len(cmd) == 0: #用长短来判断用户是否输入为空 9 continue 10 elif cmd == 'exit': #判断是否为退出 11 print("退出!") 12 break 13 client.send(cmd.encode("utf-8")) #发送指令须要编码成bytes类型 14 data = client.recv(6094).decode() 15 print(data)
很好,咱们已经实现了一个简单的ssh,但你若是尝试的指令够多,就会发现,程序仍是有问题的app
在开始解决上面问题3以前,咱们要考虑,客户端要循环接收服务器端的大量数据返回直到一条命令的结果所有返回为止, 但问题是客户端知道服务器端返回的数据有多大么?答案是不知道,那既然不知道服务器的要返回多大的数据,那客户端怎么知道要循环接收多少次呢?答案是不知道,擦,那咋办? 总不能靠猜吧?呵呵。。。 固然不能,那只能让服务器在发送数据以前主动告诉客户端,要发送多少数据给客户端,而后再开始发送数据,yes, 机智如我,搞起。ssh
1 import os 2 import socket 3 4 server = socket.socket() 5 server.bind(('localhost',7777)) 6 server.listen() 7 8 while True: 9 print("等待接收。。") 10 conn,addr = server.accept() 11 print("接收到客户端:",addr) 12 while True: 13 cmd = conn.recv(1024).decode() 14 if cmd == 'exit': 15 print('client is off...') 16 break 17 data = os.popen(cmd).read() 18 print(data) 19 if not data: 20 data = "cmd exec success,has no output" 21 conn.send(str(len(data)).encode()) #数据发送前,先告诉客户端数据的大小 22 conn.sendall(data.encode("utf-8")) #发送端也有最大数据量限制,因此这里用sendall,至关于重复循环调用conn.send,直至数据发送完毕 23 24 server.close()
1 import socket 2 3 client = socket.socket() 4 client.connect(('localhost',7777)) 5 while True: 6 7 cmd = input(">>>") 8 if len(cmd) == 0: 9 continue 10 elif cmd == 'exit': 11 client.send(cmd.encode("utf-8")) 12 print("退出!") 13 break 14 client.send(cmd.encode("utf-8")) 15 res_return_size = client.recv(1024) #接收这条命令执行结果的大小 16 #data = client.recv(6094).decode() 17 print(res_return_size)
若是你够幸运,会发现程序执行会报错,客户端本想只接服务器端命令的执行结果,但实际上却连命令结果也跟着接收了一部分。这又是为何呢?异步
这里就引入了一个重要的概念,“粘包”, 即服务器端你调用时 ’send ’ 2次,但你send调用时,数据其实并无马上被发送给客户端,而是放到了系统的socket发送缓冲区里,等缓冲区满了、或者数据等待超时了,数据才会被send到客户端,这样就把好几回的小数据拼成一个大数据,统一发送到客户端了,这么作的目地是为了提升io利用效率,一次性发送总比连发好几回效率高嘛。 但也带来一个问题,就是“粘包”,即2次或屡次的数据粘在了一块儿统一发送了。就是报错的状况 。
咱们在这里必需要想办法把粘包分开, 由于不分开,咱们就没办法取出来服务器端返回的命令执行结果的大小呀。so ,那怎么分开呢?首先咱们是没办法让缓冲区强制刷新把数据发给客户端的。 能作的,只有一个。让缓冲区超时,超时了,系统就不会等缓冲区满了,会直接把数据发走,由于不能一个劲的等后面的数据呀,等过久,会形成数据延迟了,那但是极很差的。如何让缓冲区超时呢?
答案就是:
1 import os 2 import socket 3 4 server = socket.socket() 5 server.bind(('localhost',7777)) 6 server.listen() 7 8 while True: 9 print("等待接收。。") 10 conn,addr = server.accept() 11 print("接收到客户端:",addr) 12 while True: 13 cmd = conn.recv(1024).decode() 14 if cmd == 'exit': 15 print('client is off...') 16 break 17 data = os.popen(cmd).read() 18 print(data) 19 if not data: 20 data = "cmd exec success,has no output" 21 conn.send(str(len(data)).encode()) 22 conn.recv(1024) #接收客户端回执信息,不作处理,防粘包 23 conn.sendall(data.encode("utf-8")) 24 25 server.close()
1 import socket 2 3 client = socket.socket() 4 client.connect(('localhost',7777)) 5 while True: 6 7 cmd = input(">>>") 8 if len(cmd) == 0: 9 continue 10 elif cmd == 'exit': 11 client.send(cmd.encode("utf-8")) 12 print("退出!") 13 break 14 client.send(cmd.encode("utf-8")) 15 16 res_return_size = client.recv(1024) #接收这条命令执行结果的大小 17 print("getting cmd result , ", res_return_size) 18 total_rece_size = int(res_return_size) 19 print("total size:",res_return_size) 20 client.send("准备好接收了,发吧loser".encode("utf-8")) #发送回执信息 21 received_size = 0 #已接收到的数据 22 cmd_res = b'' 23 f = open("test_copy.html","wb")#把接收到的结果存下来,一会看看收到的数据 对不对 24 while received_size != total_rece_size: #表明还没收完 25 data = client.recv(1024) 26 received_size += len(data) #为何不是直接1024,还判断len干吗,注意,实际收到的data有可能比1024少 27 cmd_res += data 28 else: 29 print("数据收完了",received_size) 30 #print(cmd_res.decode()) 31 f.write(cmd_res) #把接收到的结果存下来,一会看看收到的数据 对不对 32 #print(data.decode()) #命令执行结果 33 34 client.close()
3.SocketServer实现真正的并发
SocketServer内部使用 IO多路复用 以及 “多线程” 和 “多进程” ,从而实现并发处理多个客户端请求的Socket服务端。即:每一个客户端请求链接到服务器时,Socket服务端都会在服务器是建立一个“线程”或者“进程” 专门负责处理当前客户端的全部请求。
ThreadingTCPServer
ThreadingTCPServer实现的Soket服务器内部会为每一个client建立一个 “线程”,该线程用来和客户端进行交互。
一、ThreadingTCPServer基础
使用ThreadingTCPServer:
1 import socketserver 2 3 class MyTCPHandler(socketserver.BaseRequestHandler): 4 5 """ 6 The request handler class for our server. 7 8 It is instantiated once per connection to the server, and must 9 override the handle() method to implement communication to the 10 client. 11 """ 12 def handle(self): 13 # self.request is the TCP socket connected to the client 14 while True: 15 try: 16 self.data = self.request.recv(1024).strip() 17 print("{} wrote:".format(self.client_address[0])) 18 print(self.data) 19 # just send back the same data, but upper-cased 20 self.request.sendall(self.data.upper()) 21 except ConnectionResetError as e: 22 print("error:",e) 23 break 24 25 if __name__ == "__main__": 26 HOST, PORT = "localhost", 9999 27 28 # Create the server, binding to localhost on port 9999 29 server = socketserver.ThreadingTCPServer((HOST, PORT), MyTCPHandler) 30 31 # Activate the server; this will keep running until you 32 # interrupt the program with Ctrl-C 33 server.serve_forever()
1 import socket 2 #客户端没什么特别,跟以前都同样 3 client = socket.socket() 4 client.connect(("localhost",9999)) 5 6 while True: 7 data = input(">>>").strip() 8 client.send(data.encode("utf-8")) 9 recv_data = client.recv(1024).decode() 10 print(recv_data)
二、ThreadingTCPServer源码剖析
ThreadingTCPServer的类图关系以下:
内部调用流程为:
ThreadingTCPServer相关源码:
class BaseServer: """Base class for server classes. Methods for the caller: - __init__(server_address, RequestHandlerClass) - serve_forever(poll_interval=0.5) - shutdown() - handle_request() # if you do not use serve_forever() - fileno() -> int # for select() Methods that may be overridden: - server_bind() - server_activate() - get_request() -> request, client_address - handle_timeout() - verify_request(request, client_address) - server_close() - process_request(request, client_address) - shutdown_request(request) - close_request(request) - handle_error() Methods for derived classes: - finish_request(request, client_address) Class variables that may be overridden by derived classes or instances: - timeout - address_family - socket_type - allow_reuse_address Instance variables: - RequestHandlerClass - socket """ timeout = None def __init__(self, server_address, RequestHandlerClass): """Constructor. May be extended, do not override.""" self.server_address = server_address self.RequestHandlerClass = RequestHandlerClass self.__is_shut_down = threading.Event() self.__shutdown_request = False def server_activate(self): """Called by constructor to activate the server. May be overridden. """ pass def serve_forever(self, poll_interval=0.5): """Handle one request at a time until shutdown. Polls for shutdown every poll_interval seconds. Ignores self.timeout. If you need to do periodic tasks, do them in another thread. """ self.__is_shut_down.clear() try: while not self.__shutdown_request: # XXX: Consider using another file descriptor or # connecting to the socket to wake this up instead of # polling. Polling reduces our responsiveness to a # shutdown request and wastes cpu at all other times. r, w, e = _eintr_retry(select.select, [self], [], [], poll_interval) if self in r: self._handle_request_noblock() finally: self.__shutdown_request = False self.__is_shut_down.set() def shutdown(self): """Stops the serve_forever loop. Blocks until the loop has finished. This must be called while serve_forever() is running in another thread, or it will deadlock. """ self.__shutdown_request = True self.__is_shut_down.wait() # The distinction between handling, getting, processing and # finishing a request is fairly arbitrary. Remember: # # - handle_request() is the top-level call. It calls # select, get_request(), verify_request() and process_request() # - get_request() is different for stream or datagram sockets # - process_request() is the place that may fork a new process # or create a new thread to finish the request # - finish_request() instantiates the request handler class; # this constructor will handle the request all by itself def handle_request(self): """Handle one request, possibly blocking. Respects self.timeout. """ # Support people who used socket.settimeout() to escape # handle_request before self.timeout was available. timeout = self.socket.gettimeout() if timeout is None: timeout = self.timeout elif self.timeout is not None: timeout = min(timeout, self.timeout) fd_sets = _eintr_retry(select.select, [self], [], [], timeout) if not fd_sets[0]: self.handle_timeout() return self._handle_request_noblock() def _handle_request_noblock(self): """Handle one request, without blocking. I assume that select.select has returned that the socket is readable before this function was called, so there should be no risk of blocking in get_request(). """ try: request, client_address = self.get_request() except socket.error: return if self.verify_request(request, client_address): try: self.process_request(request, client_address) except: self.handle_error(request, client_address) self.shutdown_request(request) def handle_timeout(self): """Called if no new request arrives within self.timeout. Overridden by ForkingMixIn. """ pass def verify_request(self, request, client_address): """Verify the request. May be overridden. Return True if we should proceed with this request. """ return True def process_request(self, request, client_address): """Call finish_request. Overridden by ForkingMixIn and ThreadingMixIn. """ self.finish_request(request, client_address) self.shutdown_request(request) def server_close(self): """Called to clean-up the server. May be overridden. """ pass def finish_request(self, request, client_address): """Finish one request by instantiating RequestHandlerClass.""" self.RequestHandlerClass(request, client_address, self) def shutdown_request(self, request): """Called to shutdown and close an individual request.""" self.close_request(request) def close_request(self, request): """Called to clean up an individual request.""" pass def handle_error(self, request, client_address): """Handle an error gracefully. May be overridden. The default is to print a traceback and continue. """ print '-'*40 print 'Exception happened during processing of request from', print client_address import traceback traceback.print_exc() # XXX But this goes to stderr! print '-'*40 BaseServer
class TCPServer(BaseServer): """Base class for various socket-based server classes. Defaults to synchronous IP stream (i.e., TCP). Methods for the caller: - __init__(server_address, RequestHandlerClass, bind_and_activate=True) - serve_forever(poll_interval=0.5) - shutdown() - handle_request() # if you don't use serve_forever() - fileno() -> int # for select() Methods that may be overridden: - server_bind() - server_activate() - get_request() -> request, client_address - handle_timeout() - verify_request(request, client_address) - process_request(request, client_address) - shutdown_request(request) - close_request(request) - handle_error() Methods for derived classes: - finish_request(request, client_address) Class variables that may be overridden by derived classes or instances: - timeout - address_family - socket_type - request_queue_size (only for stream sockets) - allow_reuse_address Instance variables: - server_address - RequestHandlerClass - socket """ address_family = socket.AF_INET socket_type = socket.SOCK_STREAM request_queue_size = 5 allow_reuse_address = False def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True): """Constructor. May be extended, do not override.""" BaseServer.__init__(self, server_address, RequestHandlerClass) self.socket = socket.socket(self.address_family, self.socket_type) if bind_and_activate: try: self.server_bind() self.server_activate() except: self.server_close() raise def server_bind(self): """Called by constructor to bind the socket. May be overridden. """ if self.allow_reuse_address: self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind(self.server_address) self.server_address = self.socket.getsockname() def server_activate(self): """Called by constructor to activate the server. May be overridden. """ self.socket.listen(self.request_queue_size) def server_close(self): """Called to clean-up the server. May be overridden. """ self.socket.close() def fileno(self): """Return socket file number. Interface required by select(). """ return self.socket.fileno() def get_request(self): """Get the request and client address from the socket. May be overridden. """ return self.socket.accept() def shutdown_request(self, request): """Called to shutdown and close an individual request.""" try: #explicitly shutdown. socket.close() merely releases #the socket and waits for GC to perform the actual close. request.shutdown(socket.SHUT_WR) except socket.error: pass #some platforms may raise ENOTCONN here self.close_request(request) def close_request(self, request): """Called to clean up an individual request.""" request.close()
class ThreadingMixIn: """Mix-in class to handle each request in a new thread.""" # Decides how threads will act upon termination of the # main process daemon_threads = False def process_request_thread(self, request, client_address): """Same as in BaseServer but as a thread. In addition, exception handling is done here. """ try: self.finish_request(request, client_address) self.shutdown_request(request) except: self.handle_error(request, client_address) self.shutdown_request(request) def process_request(self, request, client_address): """Start a new thread to process the request.""" t = threading.Thread(target = self.process_request_thread, args = (request, client_address)) t.daemon = self.daemon_threads t.start()
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
ForkingTCPServer
ForkingTCPServer和ThreadingTCPServer的使用和执行流程基本一致,只不过在内部分别为请求者创建 “线程” 和 “进程”。
基本使用:
import socketserver class MyTCPHandler(socketserver.BaseRequestHandler): """ The request handler class for our server. It is instantiated once per connection to the server, and must override the handle() method to implement communication to the client. """ def handle(self): # self.request is the TCP socket connected to the client while True: try: self.data = self.request.recv(1024).strip() print("{} wrote:".format(self.client_address[0])) print(self.data) # just send back the same data, but upper-cased self.request.sendall(self.data.upper()) except ConnectionResetError as e: print("error:",e) break if __name__ == "__main__": HOST, PORT = "localhost", 9999 # Create the server, binding to localhost on port 9999 server = socketserver.ForkingTCPServer((HOST, PORT), MyTCPHandler) # Activate the server; this will keep running until you # interrupt the program with Ctrl-C server.serve_forever()
import socket client = socket.socket() client.connect(("localhost",9999)) while True: data = input(">>>").strip() client.send(data.encode("utf-8")) recv_data = client.recv(1024).decode() print(recv_data)