#!/usr/bin/env python #-*- coding: utf-8 -*- #Asynchronous Echo Server - Chapter 22 - echoserver.py #Compare to echo server in Chapter 3 import socket import traceback import os import sys import select class stateclass: stdmask = select.POLLERR | select.POLLHUP | select.POLLNVAL def __init__(self, mastersock): """ Initialize the state class """ """ select.poll() Returns a polling object, which supports registering and unregistering file descriptors, and then polling them for I/O events;(Not supported by all operating systems) The poll() system call, supported on most Unix systems, poll() scales better because the system call only requires listing the file descriptors of interest, while select() builds a bitmap, turns on bits for the fds of interest, and then afterward the whole bitmap has to be linearly scanned again. """ self.p = select.poll() self.mastersock = mastersock self.watchread(mastersock) self.buffers = {} self.sockets = {mastersock.fileno():mastersock} def fd2socket(self, fd): """ Given a file descriptor, return a socket """ return self.sockets[fd] def watchread(self, fd): """ Note interest in reading and register it """ """ poll.register(fd,[,eventmask]) Register a file descriptor with the polling object. Future calls to the poll() method will then check whether the file descriptor has any pending I/O events. fd can be either an integer, or an object with a fineno() method that returns an integer. File objects implement fileno(), so they can also be used as the argument. eventmask is an optional bitmask describing the type of events you want to check for, and can be a combination of the constants POLLIN, POLLPRI, and POLLOUT, described in the table below. If not specified, the default value used will check for all 3 types of events. Constant Meaning POLLIN There is data to read POLLPRI There is urgent data to read POLLOUT Ready for output:writing will not block POLLERR Error condition of some sort POLLHUP Hung up POLLNVAL Invalid request: descriptor not open """ self.p.register(fd, select.POLLIN | self.stdmask) def watchwrite(self, fd): """ Note interest in writing and register it """ self.p.register(fd, select.POLLOUT | self.stdmask) def watchboth(self, fd): """ Note interest in reading and writing and register them """ self.p.register(fd, select.POLLIN | select.POLLOUT | self.stdmask) def dontwatch(self, fd): """ Don't watch anything about this fd and unregister it """ self.p.unregister(fd) def newconn(self, sock): """ Process a new connection """ fd = sock.fileno() #Start out watching both since there will be an outgoing message self.watchboth(fd) #Put a greeting message as default value into the buffer self.buffers[fd] = "Welcome to the echoserver, %s\n" %str(sock.getpeername()) self.sockets[fd] = sock def readevent(self, fd): """ Called when data is ready to read """ try: #Read the data and append it to the write buffer self.buffers[fd] += self.fd2socket(fd).recv(4096) except: self.closeout(fd) #Interest in reading and writing, because if data received, then meaning writing maybe needed self.watchboth(fd) def writeevent(self, fd): """ Called when data is ready to write """ if not len(self.buffers[fd]): #No data to send? Take it out of the write list and return self.watchread(fd) return try: """ socket.send(string[,flags]) send data to the socket. The socket must be connected to a remote socket. The optional flags argument has the same meaning as for recv() above. Returns the number of bytes sent. Applications are responsible for checking that all data has been sent; if only some of the data was transmitted, the application needs to attempt delivery of the remaining data. """ #send() function will send all data with non-blocking, and return the number of bytes sent byteswritten = self.fd2socket(fd).send(self.buffers[fd]) except: self.closeout(fd) #Delete the text send from the buffer self.buffers[fd] = self.buffers[fd][byteswritten:] #If the buffer is empty, we don't care about writing in the future if not len(self.buffers[fd]): self.watchread(fd) def errorevent(self, fd): """ Called when an error occurs """ self.closeout(fd) def closeout(self, fd): """ Closes out a connection and removes it from data structures """ self.dontwatch(fd) try: """ socket.close() Close the socket. All future opertaions on the socket object will fail. The remote end will receive no more data(after queued data is flushed). Sockets are automaticcly closed when they are garbage-collected. Note: close() releases the resource associated with a connection but does not necessarily close the connection immediately. If you want to close the connection in a timely fashion, call shutdown() before close(). """ self.fd2socket(fd).close() except: pass del self.buffers[fd] del self.sockets[fd] def loop(self): """ Main loop for the program """ while 1: """ poll.poll([timeout]) polls the set of registered file descriptors, and returns a possibly-empty list containing(fd, event) 2-tuples for the descriptors that have events or errors to report. fd is the file descriptor, and event is a bitmask with bits set for the reported events for that descriptor - POLLIN for waiting input, POLLOUT to indicate that the descriptor can be written to, and so forth. An empty list indicates that the call timed out and no file descriptors had any events to report. If timeout is given, is specifies the length of time in milliseconds which the system will wait for events before returning. If timeout is ommited, negative or None, the call will block until there is an event for this poll object. """ """ (1)首先,对p.poll()的调用返回master socket,以及能够读的socket列表。服务器经过新的客户socket调用newconn()函数。newconn()函数会初始化数据结构,并在为客户准备的缓存器中添加“问候语”。最后,它在返回以前会观察来自客户端的读和写,控制接着被返回给p.poll()。 (2)当客户端准备好接收数据的时候,p.poll()将和客户socket一块儿再次返回,同时还有一个准备好写的socket列表。服务器会传输一些数据,若是缓存器的全部内容都被发送,它将把客户从写的socket列表中去掉。控制再次返回循环。 (3)当客户端发送数据到服务器的时候,p.poll()将返回,并代表要从客户端读取一些数据,readevent()方法被调用。它接收数据,把它们加到缓存器的结尾处,并确保服务器已经准备好把数据写回给客户端。当客户端准备好接收数据了,数据就像开始时候发送欢迎语那样发送出去。 (4)当客户端关闭链接的时候,服务器会被告知出现了一个错误,所以会调用errorevent()函数,它关闭服务器端的socket,并把客户从数据结构中去掉。 """ #The step take time and blocked until received client connection result = self.p.poll() for fd, event in result: if fd == self.mastersock.fileno() and event == select.POLLIN: #mastersock events mean a new client connection. #Accept it(get the new socket of client), configure it, and pass it over to newconn() try: newsock, addr = self.fd2socket(fd).accept() """ socket.setblocking(flag) set blocking or non-blocking mode of the socket: if flag is 0, the socket is set to non-blocking, else to blocking mode. Initially all sockets are in blocking mode. In non-blocking mode, if a recv() call doesn't find any data, or if a send() call can't immediately dispose of the data, a error exception is raised; in blocking mode, the calls block until they can proceed. s.setblocking(0) is equivalent to s.settimeout(0.0); s.setblocking(1) is equivalent to s.settimeout(None). """ newsock.setblocking(0) print "Got connection from", newsock.getpeername() self.newconn(newsock) except: pass elif event == select.POLLIN: self.readevent(fd) elif event == select.POLLOUT: self.writeevent(fd) else: self.errorevent(fd) host = '' port = 51423 """ socket() function returns a socket object whose methods implement the various socket system calls. """ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) """ socket.setsockopt(level, optname, value) set the value of the given socket option. """ s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((host, port)) s.listen(1) s.setblocking(0) state = stateclass(s) state.loop()
2、高级的服务器端使用 python
不少异步服务器实际上针对每一个客户端使用两个缓存器---一个是为到来的指令,一个是为发送的数据。这样就可使服务器把那些不在一个单独信息包中的指令合并在一块儿。下面是一个很是简单的聊天系统。服务器只有在收到文本SEND以后,才会把收到的数据发送给全部链接的客户端。 缓存
这个程序的框架和前一个相似。可是,请注意对于加入的读缓存器和处理它的代码。代码主要处理3种不一样的输入感兴趣: 服务器
(1)没有结束指令(SEND)的数据; 数据结构
(2)一个或多个的结束指令; app
(3)一个或多个结束指令外加一条未结束的指令。 框架
经过异步I/O,使用了常规的指令,好比没法读取一整行。所以,必须本身来实现把输入放入缓存器,并准备好同时接收部分行或多行。 异步
#!/usr/bin/env python #-*- coding: utf-8 -*- #Asynchronous Chat Server - Chapter 22 - chatserver.py import socket, traceback, os, sys, select class stateclass: stdmask = select.POLLERR | select.POLLHUP | select.POLLNVAL def __init__(self, mastersock): """ select.poll() Returns a polling object, which supports registering and unregistering file descriptors, and then polling them for I/O events; The poll() system call, supported on most Unix systems, provides better scalability for network servers that service many, many clients at the same time. poll()scales better because the system call only requires listing the file descriptors of interest, while select() builds a bitmap, turns on bits for the fds of interest, and the afterward the whole bitmap has to be linearly scanned again. """ self.p = select.poll() self.mastersock = mastersock self.watchread(mastersock) self.readbuffers = {} self.writebuffers = {} self.sockets = {mastersock.fileno():mastersock} def fd2socket(self, fd): return self.sockets[fd] def watchread(self, fd): """ poll.register(fd[,eventmask]) Register a file descriptor with the polling object. Future calls to the poll() method will then check whether the file descriptor has any pending I/O events. fd can be either an integer, or an object with a finelno() method that returns an integer. File objects implement fileno(), so they can also be used as the argument. eventmask is an optional bitmask describing the type of events you want to check for, and can be a combination of the constants POLLIN, POLLPRI, and POLLOUT, described in the table below. If not specified, the default value used will check for all 3 types of events. Constant Meaning POLLIN There is data to read POLLPRI There is urgent data to read POLLOUT Ready for output: wirting will not block POLLERR Error condition of some sort POLLHUP Hung up POLLNVAL Invalid request:descriptor not open Registering a file descriptor that's already registered is not an error, and has the same effect as registering the descriptor exactly once. """ self.p.register(fd, select.POLLIN | self.stdmask) def watchwrite(self, fd): self.p.register(fd, select.POLLOUT | self.stdmask) def watchboth(self, fd): self.p.register(fd, select.POLLIN | select.POLLOUT | self.stdmask) def dontwatch(self, fd): """ Remove a file descriptor being tracked by a polling object. Just like the register() method, fd can be an integer or an object with a fileno() method that returns an integer. """ self.p.unregister(fd) def sendtoall(self, text, originfd): for line in text.split("\n"): line = line.strip() transmittext = str(self.fd2socket(originfd).getpeername())+ ": " + line + "\n" for fd in self.writebuffers.keys(): self.writebuffers[fd] += transmittext self.watchboth(fd) def newconn(self, sock): """ Return the socket's file descriptor(a small integer). """ fd = sock.fileno() self.watchboth(fd) self.writebuffers[fd] = "Welcome to the chat server %s\n" %str(sock.getpeername()) self.readbuffers[fd] = "" self.sockets[fd] = sock def readevent(self, fd): try: """ socket.recv(bufsize[,flags]) Receive data from the socket. The return value is a string representing the data received. The maximum amount of data to be received at once is specified by bufsize. Note: For best match with hadrware and network realities, the value of bufsize should be a relatively small power of 2, for example, 4096 """ #Read the data and append it to the write buffer. self.readbuffers[fd] += self.fd2socket(fd).recv(4096) except: self.closeout(fd) parts = self.readbuffers[fd].split("SEND") if len(parts) < 2: #No SEND command received return elif parts[-1] == '': #Nothing follows the SEND command,send what we have and ignore the rest. self.readbuffers[fd] = "" sendlist = parts[:-1] else: #The last element has data for which a SEND has not yet been seen; #push it onto the buffer and process the rest. self.readbuffers[fd] = parts[-1] sendlist = parts[:-1] for item in sendlist: self.sendtoall(item.strip(), fd) def writeevent(self, fd): if not len(self.writebuffers[fd]): #No data to send? Take it out of the write list and return. self.watchread(fd) return try: """ socket.send(string[,flags]) send data to the socket. The socket must be connected to a remote socket. The optional flags argument has the same meaning as for recv() above. Returns the number of bytes sent. Applications are responsible for checking that all data has been sent; if only some of the data was tranmitted, the application needs to attempt delivery of the remaining data. """ byteswritten = self.fd2socket(fd).send(self.writebuffers[fd]) except: self.closeout(fd) self.writebuffers[fd] = self.writebuffers[fd][byteswritten:] if not len(self.writebuffers[fd]): self.watchread(fd) def errorevent(self, fd): self.closeout(fd) def closeout(self, fd): self.dontwatch(fd) try: """ soket.close() Close the socket. All future operations on the socket object will fail. The remote end will receive no more data(after queued data is flushed). Sockets are automatically closed when they are garbage-collected. Note: close() releases the resource associated with a connection but does not necessarily close the connection immediately. If you want to close the connection in a timely fashion, call shutdown() before close(). socket.shutdown(how) shut down one or both halves of the connection. If how is SHUT_RD, further receives are disallowed. If how is SHUT_WR, further sends are disallowed. If how is SHUT_RDWR, further sends and receives are disallowed. Depending on the platform, shutting down one half of the connection can also close the opposite half. """ self.fd2socket(fd).close() except: pass del self.writebuffers[fd] del self.sockets[fd] def loop(self): while 1: """ poll.poll([timeout]), polls the set of registered file descriptors, and returns a possibly-empty list containing(fd, event) 2-tuples for the descriptors that have events or errors report. fd is the file descriptor, and event is a bitmask with bits set for the reported events for that descriptor - POLLIN for waiting input, POLLOUT to indicate that the descriptor can be written to, and so forth. An empty list indicates that the call timed out and no file descriptors had any events to report. If timeout is given, it specifies the length of time in milliseconds which the system will wait for events before returning. If timeout is omitted, negative, or None, the call will block until there is an event for this poll object. """ result = self.p.poll() for fd, event in result: if fd == self.mastersock.fileno() and event == select.POLLIN: try: """ socket.accept() Accept a connection. The socket must be bound to an address and listening for connections. The return value is a pair(conn, address) where conn is a new socket object usable to send and receive data on the connection, and address is the address bound to the socket on the other end of the connection. """ newsock, addr = self.fd2socket(fd).accept() newsock.setblocking(0) """ socket.getpeername() Return the remote address to which the socket is connected. This is useful to find out the port number of a remote IPv4/v6 socket, for instance.(The format of the address returned depends on the address family) """ print "Got connection from", newsock.getpeername() self.newconn(newsock) except: pass elif event == select.POLLIN: self.readevent(fd) elif event == select.POLLOUT: self.writeevent(fd) else: self.errorevent(fd) host = '' #Bind to all interfaces port = 51423 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((host, port)) s.listen(1) """ socket.setblocking(flag) Set blocking or non-blocking mode of the socket: if flag is 0, the socket is set to non-blocking, else to blocking mode. Initially all sockets are in blocking mode. In non-blocking mode, if a recv() call does't find any data, or if a send() call can't immediately dispose of the data, a error exception is raised; in blocking mode, the calls block until they can proceed. s.setblocking(0) is equivalent to s.settimeout(0.0); s.setblocking(1) is equivalent to s.settimeout(None). """ s.setblocking(0) state = stateclass(s) state.loop()
3、监控多个master socket socket
使用一个单任务服务器来侦听多个不一样的端口。事实上,标准UNIX的"superserver"守护进程(inetd)就是这样作的。 ide
守护进程侦听多个端口。当有链接到来的时候,它会启动一个能够处理这个链接的程序。经过这种方法,一个单独的进程能够处理许多socket。在那些有不少进程,可是用得不是不少的系统上,这时有一个优势,由于可使用一个进程侦听多个不一样的socket。 函数
实现相似守护进程服务器的一种方法是使用poll()函数来检测全部的master socket。当接收到链接的时候,它会转向一个已经知道的文件描述符,并把它转给一个实际的处理程序。
在实际当中,相似守护进程的服务器将是一些东西的混合体。它使用poll()来监视master socket,可是也会使用fork()把它们传递给处理程序。
首先,它创建一个stateclass类实例,接着打开它的配置文件inetd.txt,并读取它。每一行都给出了一个TCP端口号和一个当有客户端链接到该端口的时候会运行的指令。因此,对于配置文件的每一行,一个新的socket对象被创建、绑定、配置以及被加入到stateclass中。最后,配置文件被所有处理以后,它被关闭,程序也进入主循环。守护进程的循环只须要处理一个事件---那就是客户端的链接。当有链接的时候,客户端被传递给self.newconn(),同时还有将要运行的指令。
newconn()是实际操做发生的地方。经过检查pid的值,若是正在父进程中,新的客户端socket将被关闭,而且代码返回循环。在子进程一端,它作的第一件事情是关闭每一个单独的master socket。
#!/usr/bin/env python #-*- coding: utf-8 -*- #Asynchronous Inetd-like Server - Chapter 22 - inetd.py import socket, traceback, os, sys, select class stateclass: def __init__(self): """ Returns a polling object, which supports registering and unregistering file descriptors, and then polling them for I/O events. The poll() system call, supported on most Unix systems, provides better scalability for network servers that service many, many clients at the same time. poll() scales better because the system call only requires listing the file descriptors of interest, while select() builds a bitmap, turns on bits for the fds of interest, and then afterward the whole bitmap has to be linearly scanned again. """ self.p = select.poll() self.mastersocks = {} self.commands = {} def fd2socket(self, fd): return self.mastersocks[fd] def addmastersock(self, sockobj, command): self.mastersocks[sockobj.fileno()] = sockobj self.commands[sockobj.fileno()] = command self.watchread(sockobj) def watchread(self, fd): """ poll.register(fd[,eventmask]) Register a file descriptor with the polling object. Future calls to the poll() method will then check whether the file descriptor has any pending I/O events. fd can be either an integer, or an object with a fileno() method that returns an integer. File objects implement fileno(), so they can also be used as the argument. eventmask is an optional bitmask describing the type of events you want to check for, and can be a combination of the constants POLLIN, POLLPRI and POLLOUT, described in the table below. If not specified, the default value used will check for all 3 types of events. Constant Meaning POLLIN There is data to read POLLPRI There is urgent data to read POLLOUT Ready for output: writing will not block POLLERR Error condition of some sort POLLHUP Hung up POLLNVAL Invalid request: descriptor not open """ self.p.register(fd, select.POLLIN) def dontwatch(self, fd): """ poll.unregister(fd) Remove a file descriptor being tracked by a polling object. Just like the register() method, fd can be an integer or an object with a fineno() method that returns an integer. Attempting to remove a fild descriptor that was never registerd causes a KeyError exception to be raised. """ self.p.unregister(fd) def newconn(self, newsock, command): try: pid = os.fork() except: try: newsock.close() except: pass return if pid: #Parent process newsock.close() return #Child process from here on #First, close all the master sockets. for sock in self.mastersocks.values(): """ socket.close() Close the socket. All future operations on the socket object will fail. The remote end will receive no more data(after queued data is flushed). Sockets are automatically closed when they are garbage-collected. Note: close() releases the resource associated with a connection but does not necessarily close the connection immediately. If you want to close the connection in a timely fashion, call shutdown() before close(). """ sock.close() #Next, copy the socket's file descriptor to standard input(0), #standard output(1), and standard error(2). fd = newsock.fileno() """ os.dup2(fd, fd2) Duplicate file descriptor fd to fd2, closing the latter first if necessary. """ os.dup2(fd, 0) os.dup2(fd, 1) os.dup2(fd, 2) #Finally, call the command. program = command.split('')[0] args = command.split('')[1:] try: """ os.execvp(file, args) os.execvpe(file, args, env) These functions all execute a new program, replacing the current process; they do not return. On Unix, the new executable is loaded into the current process, and will have the same process id as the caller. Errors will be reported as OSError exceptions. The current process is replaced immediately. Open file objects and descriptors are not flushed, so if there may be data buffered on these open files, you should flush them using sys.stdout.flush() or os.fsync() before calling an exec*() function. """ os.execvp(program, [program]+args) except: sys.exit(1) def loop(self): while 1: """ poll.poll([timeout]) Polls the set of registered file descriptors, and returns a possibly-empty list containing(id, event) 2-tuples for the descriptors that have events or errors to report. fd is the file descriptor, and event is a bitmask with bits set for the reported events for that descriptor ---POLLIN for waiting input, POLLOUT to indicate that the descriptor can be written to, and so forth. An empty list indicates that the call timed out and no file descriptors had any events to report. If timeout is given, it specifies the lenght of time in milliseconds which the system will wait for events before returning. If timeout is omitted, negative, or None, the call will block until there is an event for this poll object. """ result = self.p.poll() for fd, event in result: print "Received a child connection" try: """ socket.accept() Accept a connection. The socket must be bound to an address and listening for connections. The return value is a pair(conn, address) where conn is a new socket object usable to send and receive data on the connection, and address is the address bound to the socket on the other end of the connection. """ newsock, addr = self.fd2socket(fd).accept() self.newconn(newsock, self.commands[fd]) except: pass host = '' #Bind to all interfaces state = stateclass() config = open("inetd.txt") for line in config: line = line.strip() """ string.split(s[,sep[,maxsplit]]) Return a list of the words of the string s. If the optional second argument sep is absent or None, the words are separated by arbitrary strings of whitespace characters(space, tab, newline, return, formfeed. If the second argument sep is present and not None, it specifies a string to be used as the word separator. The returned list will then have one more item than the number of non-overlapping occurrences of the separtor inthe string. If maxsplit is given, at most maxsplit number of splits occur, and the remainder of the string is returned as the final element of the list(thus, the list will have at most maxlist+1 elements). If maxplit is not specified or -1, then there is no limit on the number of splits(all possible splits are made). The behavior of split on empty string depends on the value of sep. If sep is not specified, or specified as None, the result will be an empty list. If sep is specified as any string, the result will be a list containing one element which is an empty string. """ port, command = line.split(":",1) port = int(port) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) """ socket.setsockopt(level, optname, value) Set the value of the given option. The needed symbolic constants are defined in the socket module. The value can be an integer or a string representing a buffer. """ s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) """ socket.bind(address) Bind the socket to address. The socket must not alredy be bound.(The format of address depends on the address family) """ s.bind((host, port)) """ socket.listen(backlog) Listen for connections made to the socket. The backlog argument specifies the maximum number of queued connections and should be at least 0; the maximum value is system-dependent(usually 5), the minimum value is forced to 0. """ s.listen(1) """ socket.setblocking(flag) set blocking or non-blocking mode of the socket: if flag is 0, the socket is set to non-blocking, else to blocking mode. Initially all sockets are in blocking mode. In non-blocking mode, if a recv() call doesn't find any data, or if a send() call can't immediately dispose of the data, a error exception is raised; in blocking mode, the calls block until they can proceed. s.setblocking(0) is equivalent to s.settimeout(0.0); s.setblocking(1) is equivalent to s.settimeout(None). """ s.setblocking(0) state.addmastersock(s, command) config.close() state.loop()