Twisted是一个事件驱动的网络框架,其中包含了诸多功能,例如:网络协议、线程、数据库管理、网络操做、电子邮件等。html
事件驱动简而言之,事件驱动分为二个部分:第一,注册事件;第二,触发事件。react
Protocols数据库
Protocols描述了如何以异步的方式处理网络中的事件。HTTP、DNS以及IMAP是应用层协议中的例子。Protocols实现了IProtocol接口,它包含以下的方法:服务器
makeConnection 在transport对象和服务器之间创建一条链接
connectionMade 链接创建起来后调用
dataReceived 接收数据时调用
connectionLost 关闭链接时调用网络
Transports框架
Transports表明网络中两个通讯结点之间的链接。Transports负责描述链接的细节,好比链接是面向流式的仍是面向数据报的,流控以及可靠性。TCP、UDP和Unix套接字可做为transports的例子。它们被设计为“知足最小功能单元,同时具备最大程度的可复用性”,并且从协议实现中分离出来,这让许多协议能够采用相同类型的传输。Transports实现了ITransports接口,它包含以下的方法:异步
write 以非阻塞的方式按顺序依次将数据写到物理链接上
writeSequence 将一个字符串列表写到物理链接上
loseConnection 将全部挂起的数据写入,而后关闭链接
getPeer 取得链接中对端的地址信息
getHost 取得链接中本端的地址信息socket
将transports从协议中分离出来也使得对这两个层次的测试变得更加简单。能够经过简单地写入一个字符串来模拟传输,用这种方式来检查。函数
EchoServer:源码分析
from twisted.internet import protocol from twisted.internet import reactor #reactor无限循环,写好了事件,reactor自动检测,相似于select class Echo(protocol.Protocol): def dataReceived(self, data): #只要twisted一收到数据,就会调用dataRecevied方法 self.transport.write(data) #把收到的数据返回给客户端 def main(): factory = protocol.ServerFactory() #定义一个基类,相似于socketserver的handler上面一层的类 factory.protocol = Echo #相似于socketserver中的handler,必须定义此Echo,代表每个客户端过来后都会调用Echo创建一个实例 reactor.listenTCP(1234,factory) #reactor相似于select,是一个触发器,检测1234端口,须要把定义的基础类传进来 reactor.run() #reactor执行 if __name__ == '__main__': main()
EchoClient:
from twisted.internet import reactor, protocol # a client protocol class EchoClient(protocol.Protocol): """Once connected, send a message, then print the result.""" def connectionMade(self): #只要链接一创建成功,就会自动调用此方法 self.transport.write("hello!") #给服务端发送hello def dataReceived(self, data): #当有数据收到时,就会调用这个方法,自动进行 "As soon as any data is received, write it back." print "Server said:", data #收到数据后打印数据 self.transport.loseConnection() #数据传送完毕后,关闭链接,执行了下面的方法 | # v # ---------<--------------<-----------------<----------------<--- # | # v def connectionLost(self, reason): #client connection断开了,会执行此方法,此为本身定义的connectionLost方法 print "connection lost" class EchoFactory(protocol.ClientFactory): protocol = EchoClient #在类中定义protocal,重写这个类;EchoClient至关于socketserver中的handle方法 def clientConnectionFailed(self, connector, reason): #若是reactor链接不上服务端,自动调用这方法 print "Connection failed - goodbye!" #打印链接失败信息 reactor.stop() #关闭链接 def clientConnectionLost(self, connector, reason): #若是client connection断开了,会自动调用此方法,相似于socketserver的handle后面的finish方法,和上面的connectionLost方法不一样。 print "Connection lost - goodbye!" #打印链接断开信息 reactor.stop() #关闭链接 # this connects the protocol to a server running on port 8000 def main(): f = EchoFactory() #建立一个客户端的基类,与服务端的ServerFactory相似 reactor.connectTCP("localhost", 1234, f) #链接'localhost',端口号,把客户端的基类传入reactor reactor.run() #运行reactor # this only runs if the module was *not* imported if __name__ == '__main__': main() #程序入口,进入主程序
运行服务器端脚本将启动一个TCP服务器,监听端口1234上的链接。服务器采用的是Echo协议,数据经TCP transport对象写出。运行客户端脚本将对服务器发起一个TCP链接,回显服务器端的回应而后终止链接并中止reactor事件循环。这里的 Factory用来对链接的双方生成protocol对象实例。两端的通讯是异步的,connectTCP负责注册回调函数到reactor事件循环中,当socket上有数据可读时通知回调处理。
RPC server:
#Project interpreter: 2.7 import pika, os, time def operate(body): sys_result=os.popen(body).read() print("%s client execute \033[1;31;0m%s\033[0m result:\n%s" % (time.strftime('%Y-%m-%d %H:%M:%S'),body,sys_result)) return sys_result def on_request(ch, method, props, body): response = operate(body) ch.basic_publish(exchange='', #basic_publish指向管道内发送数据 routing_key=props.reply_to, #指定向哪一个队列发数据 properties=pika.BasicProperties(correlation_id = props.correlation_id), body=str(response)) #body是发送的消息内容 ch.basic_ack(delivery_tag = method.delivery_tag) if __name__ == '__main__': try: connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) #这是阻塞的链接 channel = connection.channel() #生成一个管道 channel.queue_declare(queue='rpc_queue') #在管道中建立一个队列,名字叫rpc_queue channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='rpc_queue') print("Server is waiting RPC requests...") channel.start_consuming() #开始接收数据,阻塞状态 except KeyboardInterrupt: print("Connection lost...")
RPC client:
#Project interpreter: 2.7 import pika, uuid class OperateRpcClient(object): #对类进行实例化 def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将此queue删除 self.callback_queue = result.method.queue #服务端执行完结果返回的queue名字 self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) #no_ack不须要确认,若是为False,当客户端消费完成后,会给服务端发送确认消息;queue参数指定了收取消息队列的名称 def on_response(self, ch, method, props, body): #回调方法 if self.corr_id == props.correlation_id: self.response = body def call(self): self.response = None self.corr_id = str(uuid.uuid4()) self.input = raw_input("root@client>> ") self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), body=self.input) while self.response is None: self.connection.process_data_events() #不断的去Queue里面接收数据,并且不是阻塞的 return self.response if __name__ == '__main__': print("This program use rabbitMQ send your OS command to server, your can use common command at here, enjoy it!\n\te.g.\n\t\t1.ls\n\t\t2.pwd\n\t\t3.free -m\n\t\t4.df -Th\n\t\t5.netstat -anplute") while True: try: operate_rpc = OperateRpcClient() response = operate_rpc.call() print(response) except KeyboardInterrupt: print("Connection lost...")