Python Twisted、Reactor

cataloguehtml

1. Twisted理论基础
2. 异步编程模式与Reactor
3. Twisted网络编程
4. reactor进程管理编程
5. Twisted并发链接

 

1. Twisted理论基础node

0x1: 异步编程模型python

事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特色是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。另外两种常见的编程范式是(单线程)同步以及多线程编程react

异步模型

在这个模型中,任务是交错完成,值得注意的是: 这是在单线程的控制下。这要比多线程模型简单多了,由于编程人员总能够认为只有一个任务在执行,而其它的在中止状态
在异步编程模型与多线程模型之间还有一个不一样git

1. 在多线程程序中,对于中止某个线程启动另一个线程,其决定权并不在程序员手里而在操做系统那里,所以,程序员在编写程序过程当中必需要假设在任什么时候候一个线程都有可能被中止而启动另一个线程
2. 相反,在异步模型中,全部事件是以异步的方式到达的,而后CPU一样以异步的方式从Cache队列中取出事件进行处理,一个任务要想运行必须显式放弃当前运行的任务的控制权。这也是相比多线程模型来讲,最简洁的地方 

0x2: 异步编程优势程序员

1. 在单线程同步模型中,任务按照顺序执行。若是某个任务由于I/O而阻塞,其余全部的任务都必须等待,直到它完成以后它们才能依次执行。这种明确的执行顺序和串行化处理的行为是很容易推断得出的。若是任务之间并无互相依赖的关系,但仍然须要互相等待的话这就使得程序没必要要的下降了运行速度 

2. 在多线程版本中,这3个任务分别在独立的线程中执行。这些线程由操做系统来管理,在多处理器系统上能够并行处理,或者在单处理器系统上交错执行。这使得当某个线程阻塞在某个资源的同时其余线程得以继续执行。与完成相似功能的同步程序相比,这种方式更有效率,但程序员必须写代码来保护共享资源,防止其被多个线程同时访问。多线程程序更加难以推断,由于这类程序不得不经过线程同步机制如锁、可重入函数、线程局部存储或者其余机制来处理线程安全问题,若是实现不当就会致使出现微妙的bug 

与同步模型相比,异步模型的优点在以下状况下会获得发挥编程

1. 有大量的任务,以致于能够认为在一个时刻至少有一个任务要运行
2. 任务执行大量的I/O操做,这样同步模型就会在由于任务阻塞而浪费大量的时间
3. 任务之间相互独立,以致于任务内部的交互不多 
//这些条件大多在CS模式中的网络比较繁忙的服务器端出现(如WEB服务器)

Relevant Link:设计模式

https://likebeta.gitbooks.io/twisted-intro-cn/content/zh/p01.html

 

2. 异步编程模式与Reactor安全

1. 异步模式客户端一次性与所有服务器完成链接,而不像同步模式那样一次只链接一个,链接完成后等待新事件的到来
2. 用来进行通讯的Socket方法是非阻塞模的,这是经过调用setblocking(0)来实现的 
3. select模块中的select方法是用来识别其监视的socket是否有完成数据接收的,若是没有它就处于阻塞状态。
4. 当从服务器中读取数据时,会尽可能多地从Socket读取数据直到它阻塞为止,而后读下一个Socket接收的数据(若是有数据接收的话)。这意味着咱们须要跟踪记录从不一样服务器传送过来数据的接收状况 

以上过程能够被设计成为一个模式: reactor模式服务器

reactor模式

这个循环就是个"reactor"(反应堆),由于它等待事件的发生而后对其做相应的反应。正由于如此,它也被称做事件循环。因为交互式系统都要进行I/O操做,所以这种循环也有时被称做select loop,这是因为select调用被用来等待I/O操做。所以,在本程序中的select循环中,一个事件的发生意味着一个socket端处有数据来到
值得注意的是,select并非惟一的等待I/O操做的函数,它仅仅是一个比较古老的函数,如今有一些新API能够完成select的工做并且性能更优,它们已经在不一样的系统上实现了。不考虑性能上的因素,它们都完成一样的工做

1. 监视一系列sockets(文件描述符)
2. 并阻塞程序
3. 直到至少有一个准备好的I/O操做 

一个真正reactor模式的实现是须要实现循环独立抽象出来并具备以下的功能

1. 监视一系列与I/O操做相关的文件描述符(description)
2. 不停地汇报那些准备好的I/O操做的文件描述符
3. 处理全部不一样系统会出现的I/O事件
4. 提供优雅的抽象来帮助在使用reactor时少花些心思去考虑它的存在
5. 提供能够在抽象层外使用的公共协议实现 

0x1: Twisted中的异步事件模型

Twisted实现了设计模式中的反应堆(reactor)模式,这种模式在单线程环境中调度多个事件源产生的事件到它们各自的事件处理例程中去
Twisted的核心就是reactor事件循环。Reactor能够感知网络、文件系统以及定时器事件。它等待而后处理这些事件,从特定于平台的行为中抽象出来,并提供统一的接口,使得在网络协议栈的任何位置对事件作出响应都变得简单
基本上reactor完成的任务就是

while True:
    timeout = time_until_next_timed_event()
    events = wait_for_events(timeout)
    events += timed_events_until(now())
    for event in events:
        event.process()

Twisted目前在全部平台上的默认reactor都是基于poll API的。此外,Twisted还支持一些特定于平台的高容量多路复用API。这些reactor包括基于FreeBSD中kqueue机制的KQueue reactor,支持epoll接口的系统(目前是Linux 2.6)中的epoll reactor,以及基于Windows下的输入输出完成端口的IOCP reactor
在实现轮询的相关细节中,Twisted须要考虑的包括

1. 网络和文件系统的限制
2. 缓冲行为
3. 如何检测链接丢失
4. 出现错误时的返回值

Twisted的reactor实现同时也考虑了正确使用底层的非阻塞式API,并正确处理各类边界状况。因为Python中没有暴露出IOCP API,所以Twisted须要维护本身的实现

0x2: Deferreds

Deferred对象以抽象化的方式表达了一种思想,即结果还尚不存在。它一样可以帮助管理产生这个结果所须要的回调链。当从函数中返回时,Deferred对象承诺在某个时刻函数将产生一个结果。返回的Deferred对象中包含全部注册到事件上的回调引用,所以在函数间只须要传递这一个对象便可,跟踪这个对象比单独管理全部的回调要简单的多
Deferred对象包含一对回调链

1. 一个是针对操做成功的回调
2. 一个是针对操做失败的回调

初始状态下Deferred对象的两条链都为空。在事件处理的过程当中,每一个阶段都为其添加处理成功的回调和处理失败的回调。当一个异步结果到来时,Deferred对象就被"激活",那么处理成功的回调和处理失败的回调就能够以合适的方式按照它们添加进来的顺序依次获得调用

0x3: Transports

Transports表明网络中两个通讯结点之间的链接。Transports负责描述链接的细节,好比链接是面向流式的仍是面向数据报的,流控以及可靠性。TCP、UDP和Unix套接字可做为transports的例子。它们被设计为”知足最小功能单元,同时具备最大程度的可复用性“,并且从协议实现中分离出来,这让许多协议能够采用相同类型的传输。Transports实现了ITransports接口,它包含以下的方法

1. write: 以非阻塞的方式按顺序依次将数据写到物理链接上
2. writeSequence: 将一个字符串列表写到物理链接上
3. loseConnection: 将全部挂起的数据写入,而后关闭链接
4. getPeer: 取得链接中对端的地址信息
5. getHost: 取得链接中本端的地址信息

将transports从协议中分离出来也使得对这两个层次的测试变得更加简单。能够经过简单地写入一个字符串来模拟传输,用这种方式来检查

0x4: Protocols

Protocols描述了如何以异步的方式处理网络中的事件。HTTP、DNS以及IMAP是应用层协议中的例子。Protocols实现了IProtocol接口,它包含以下的方法

1. makeConnection: 在transport对象和服务器之间创建一条链接
2. connectionMade: 链接创建起来后调用
3. dataReceived: 接收数据时调用
4. connectionLost: 关闭链接时调用

Relevant Link:

https://likebeta.gitbooks.io/twisted-intro-cn/content/zh/p02.html
https://likebeta.gitbooks.io/twisted-intro-cn/content/zh/p04.html
http://blog.csdn.net/hanhuili/article/details/9389433
http://blog.sina.com.cn/s/blog_704b6af70100py9n.html

 

3. Twisted网络编程

0x1: Writing Servers

from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver
from twisted.internet import reactor

class Chat(LineReceiver):

    def __init__(self, users):
        self.users = users
        self.name = None
        self.state = "GETNAME"

    def connectionMade(self):
        self.sendLine("What's your name?")

    def connectionLost(self, reason):
        if self.name in self.users:
            del self.users[self.name]

    def lineReceived(self, line):
        if self.state == "GETNAME":
            self.handle_GETNAME(line)
        else:
            self.handle_CHAT(line)

    def handle_GETNAME(self, name):
        if name in self.users:
            self.sendLine("Name taken, please choose another.")
            return
        self.sendLine("Welcome, %s!" % (name,))
        self.name = name
        self.users[name] = self
        self.state = "CHAT"

    def handle_CHAT(self, message):
        message = "<%s> %s" % (self.name, message)
        for name, protocol in self.users.iteritems():
            if protocol != self:
                protocol.sendLine(message)


class ChatFactory(Factory):

    def __init__(self):
        self.users = {} # maps user names to Chat instances

    def buildProtocol(self, addr):
        return Chat(self.users)


reactor.listenTCP(8123, ChatFactory())
reactor.run()

0x2: Writing Clients

Twisted is a framework designed to be very flexible, and let you write powerful clients. The cost of this flexibility is a few layers in the way to writing your client

1. single-use clients

In many cases, the protocol only needs to connect to the server once, and the code just wants to get a connected instance of the protocol. In those cases twisted.internet.endpoints provides the appropriate API, and in particular connectProtocol which takes a protocol instance rather than a factory.

from twisted.internet import reactor
from twisted.internet.protocol import Protocol
from twisted.internet.endpoints import TCP4ClientEndpoint, connectProtocol

class Greeter(Protocol):
    def sendMessage(self, msg):
        self.transport.write("MESSAGE %s\n" % msg)

def gotProtocol(p):
    p.sendMessage("Hello")
    reactor.callLater(1, p.sendMessage, "This is sent in a second")
    reactor.callLater(2, p.transport.loseConnection)

point = TCP4ClientEndpoint(reactor, "localhost", 1234)
d = connectProtocol(point, Greeter())
d.addCallback(gotProtocol)
reactor.run()

2. ClientFactory

Still, there’s plenty of code out there that uses lower-level APIs, and a few features (such as automatic reconnection) have not been re-implemented with endpoints yet, so in some cases they may be more convenient to use.
To use the lower-level connection APIs, you will need to call one of the reactor.connect* methods directly. For these cases, you need a ClientFactory . The ClientFactory is in charge of creating the Protocol and also receives events relating to the connection state. This allows it to do things like reconnect in the event of a connection error

from twisted.internet import reactor
from twisted.internet.protocol import Protocol, ClientFactory
from sys import stdout

class Echo(Protocol):
    def dataReceived(self, data):
        stdout.write(data)

class EchoClientFactory(ClientFactory):
    def startedConnecting(self, connector):
        print 'Started to connect.'

    def buildProtocol(self, addr):
        print 'Connected.'
        return Echo()

    def clientConnectionLost(self, connector, reason):
        print 'Lost connection.  Reason:', reason

    def clientConnectionFailed(self, connector, reason):
        print 'Connection failed. Reason:', reason

reactor.connectTCP(host, port, EchoClientFactory())
reactor.run()

3. A Higher-Level Example: ircLogBot

# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.


"""
An example IRC log bot - logs a channel's events to a file.

If someone says the bot's name in the channel followed by a ':',
e.g.

    <foo> logbot: hello!

the bot will reply:

    <logbot> foo: I am a log bot

Run this script with two arguments, the channel name the bot should
connect to, and file to log to, e.g.:

    $ python ircLogBot.py test test.log

will log channel #test to the file 'test.log'.

To run the script:

    $ python ircLogBot.py <channel> <file>
"""


# twisted imports
from twisted.words.protocols import irc
from twisted.internet import reactor, protocol
from twisted.python import log

# system imports
import time, sys


class MessageLogger:
    """
    An independent logger class (because separation of application
    and protocol logic is a good thing).
    """
    def __init__(self, file):
        self.file = file

    def log(self, message):
        """Write a message to the file."""
        timestamp = time.strftime("[%H:%M:%S]", time.localtime(time.time()))
        self.file.write('%s %s\n' % (timestamp, message))
        self.file.flush()

    def close(self):
        self.file.close()


class LogBot(irc.IRCClient):
    """A logging IRC bot."""
    
    nickname = "twistedbot"
    
    def connectionMade(self):
        irc.IRCClient.connectionMade(self)
        self.logger = MessageLogger(open(self.factory.filename, "a"))
        self.logger.log("[connected at %s]" % 
                        time.asctime(time.localtime(time.time())))

    def connectionLost(self, reason):
        irc.IRCClient.connectionLost(self, reason)
        self.logger.log("[disconnected at %s]" % 
                        time.asctime(time.localtime(time.time())))
        self.logger.close()


    # callbacks for events

    def signedOn(self):
        """Called when bot has succesfully signed on to server."""
        self.join(self.factory.channel)

    def joined(self, channel):
        """This will get called when the bot joins the channel."""
        self.logger.log("[I have joined %s]" % channel)

    def privmsg(self, user, channel, msg):
        """This will get called when the bot receives a message."""
        user = user.split('!', 1)[0]
        self.logger.log("<%s> %s" % (user, msg))
        
        # Check to see if they're sending me a private message
        if channel == self.nickname:
            msg = "It isn't nice to whisper!  Play nice with the group."
            self.msg(user, msg)
            return

        # Otherwise check to see if it is a message directed at me
        if msg.startswith(self.nickname + ":"):
            msg = "%s: I am a log bot" % user
            self.msg(channel, msg)
            self.logger.log("<%s> %s" % (self.nickname, msg))

    def action(self, user, channel, msg):
        """This will get called when the bot sees someone do an action."""
        user = user.split('!', 1)[0]
        self.logger.log("* %s %s" % (user, msg))

    # irc callbacks

    def irc_NICK(self, prefix, params):
        """Called when an IRC user changes their nickname."""
        old_nick = prefix.split('!')[0]
        new_nick = params[0]
        self.logger.log("%s is now known as %s" % (old_nick, new_nick))


    # For fun, override the method that determines how a nickname is changed on
    # collisions. The default method appends an underscore.
    def alterCollidedNick(self, nickname):
        """
        Generate an altered version of a nickname that caused a collision in an
        effort to create an unused related name for subsequent registration.
        """
        return nickname + '^'



class LogBotFactory(protocol.ClientFactory):
    """A factory for LogBots.

    A new protocol instance will be created each time we connect to the server.
    """

    def __init__(self, channel, filename):
        self.channel = channel
        self.filename = filename

    def buildProtocol(self, addr):
        p = LogBot()
        p.factory = self
        return p

    def clientConnectionLost(self, connector, reason):
        """If we get disconnected, reconnect to server."""
        connector.connect()

    def clientConnectionFailed(self, connector, reason):
        print "connection failed:", reason
        reactor.stop()


if __name__ == '__main__':
    # initialize logging
    log.startLogging(sys.stdout)
    
    # create factory protocol and application
    f = LogBotFactory(sys.argv[1], sys.argv[2])

    # connect factory to this host and port
    reactor.connectTCP("irc.freenode.net", 6667, f)

    # run bot
    reactor.run()

4. Persistent Data in the Factory

When the protocol is created, it gets a reference to the factory as self.factory . It can then access attributes of the factory in its logic.
Factories have a default implementation of buildProtocol. It does the same thing the example above does using the protocol attribute of the factory to create the protocol instance. In the example above, the factory could be rewritten to look like this:

class LogBotFactory(protocol.ClientFactory):
    protocol = LogBot

    def __init__(self, channel, filename):
        self.channel = channel
        self.filename = filename

Relevant Link:

http://twisted.readthedocs.org/en/latest/core/howto/clients.html

 

4. reactor进程管理编程

Along with connection to servers across the internet, Twisted also connects to local processes with much the same API.
须要明白的是,reactor是一个编程范式,Twisted是基于这种异步事件编程模型实现的网络编程框架,一样的,reactor异步事件编程模型还能够用在进程时间管理上

0x1: Example

#!/usr/bin/env python

# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

from twisted.internet import protocol
from twisted.internet import reactor
import re

class MyPP(protocol.ProcessProtocol):
    def __init__(self, verses):
        self.verses = verses
        self.data = ""
    def connectionMade(self):
        print "connectionMade!"
        for i in range(self.verses):
            self.transport.write("Aleph-null bottles of beer on the wall,\n" +
                                 "Aleph-null bottles of beer,\n" +
                                 "Take one down and pass it around,\n" +
                                 "Aleph-null bottles of beer on the wall.\n")
        self.transport.closeStdin() # tell them we're done
    def outReceived(self, data):
        print "outReceived! with %d bytes!" % len(data)
        self.data = self.data + data
    def errReceived(self, data):
        print "errReceived! with %d bytes!" % len(data)
    def inConnectionLost(self):
        print "inConnectionLost! stdin is closed! (we probably did it)"
    def outConnectionLost(self):
        print "outConnectionLost! The child closed their stdout!"
        # now is the time to examine what they wrote
        #print "I saw them write:", self.data
        (dummy, lines, words, chars, file) = re.split(r'\s+', self.data)
        print "I saw %s lines" % lines
    def errConnectionLost(self):
        print "errConnectionLost! The child closed their stderr."
    def processExited(self, reason):
        print "processExited, status %d" % (reason.value.exitCode,)
    def processEnded(self, reason):
        print "processEnded, status %d" % (reason.value.exitCode,)
        print "quitting"
        reactor.stop()

pp = MyPP(10)
reactor.spawnProcess(pp, "wc", ["wc"], {})
reactor.run()

0x2: Example

class GPGProtocol(ProcessProtocol):
    def __init__(self, crypttext):
        self.crypttext = crypttext
        self.plaintext = ""
        self.status = ""
    def connectionMade(self):
        self.transport.writeToChild(3, self.passphrase)
        self.transport.closeChildFD(3)
        self.transport.writeToChild(0, self.crypttext)
        self.transport.closeChildFD(0)
    def childDataReceived(self, childFD, data):
        if childFD == 1: self.plaintext += data
        if childFD == 4: self.status += data
    def processEnded(self, status):
        rc = status.value.exitCode
        if rc == 0:
            self.deferred.callback(self)
        else:
            self.deferred.errback(rc)

def decrypt(crypttext):
    gp = GPGProtocol(crypttext)
    gp.deferred = Deferred()
    cmd = ["gpg", "--decrypt", "--passphrase-fd", "3", "--status-fd", "4",
           "--batch"]
    p = reactor.spawnProcess(gp, cmd[0], cmd, env=None,
                             childFDs={0:"w", 1:"r", 2:2, 3:"w", 4:"r"})
    return gp.deferred

Relevant Link:

http://twistedmatrix.com/documents/12.2.0/core/howto/process.html

 

5. Twisted并发链接

Some time back I had to write a network server which need to support ~50K concurrent clients in a single box. Server-Client communication used a propitiatory protocol on top of TCP where RawBinaryData Struct is used as the messaging format. Clients exchanged periodic keep-alives which server used to check health state. As most of the operations were IO based(socket/db) we decided to used python/twisted to implement server.
On performing load tests we found that server is able to handle only 1024 client after which connections are failing. Increased per process max open files (1024) to 100000 (ulimit -n 100000) and still the connections failed at 1024.

0x1: select limitation

select fails after 1024 fds as FD_SETSIZE max to 1024. Twisted's default reactor seems to be based on select. As a natural progression poll was tried next to over come max open fd issue.

0x2: poll limitation

poll solves the max fd issue. But as the number of concurrent clients started increasing, performance dropped drastically. Poll implementation does O(n) operations internally and performance drops as number of fds increases.

0x3: epoll

Epoll reactor solved both problems and gave awesome performance. libevent is another library build on top of epoll.

0x4: Async frameworks

do not waste time with 'select/poll' based approaches if the number of concurrent connection expected is above 1K.  Following are some of the event-loop based frameworks where this is applicable.

1. Eventlet (python)
2. Gevent (python) is similar to eventlet uses libevent which is build on top of epoll.
3. C++ ACE 
4. Java Netty
5. Ruby Eventmachine

0x5: Choosing a Reactor and GUI Toolkit Integration(new twisted)

Twisted provides a variety of implementations of the twisted.internet.reactor. The specialized implementations are suited for different purposes and are designed to integrate better with particular platforms.
The epoll()-based reactor is Twisted's default on Linux. Other platforms use poll(), or the most cross-platform reactor, select().
Platform-specific reactor implementations exist for:

Poll for Linux
Epoll for Linux 2.6
WaitForMultipleObjects (WFMO) for Win32
Input/Output Completion Port (IOCP) for Win32
KQueue for FreeBSD and Mac OS X
CoreFoundation for Mac OS X

1. Select()-based Reactor

The select reactor is the default on platforms that don't provide a better alternative that covers all use cases. If the select reactor is desired, it may be installed via:

from twisted.internet import selectreactor
selectreactor.install()

from twisted.internet import reactor

2. Poll-based Reactor

The PollReactor will work on any platform that provides select.poll. With larger numbers of connected sockets, it may provide for better performance than the SelectReactor.

from twisted.internet import pollreactor
pollreactor.install()

from twisted.internet import reactor

3. KQueue

The KQueue Reactor allows Twisted to use FreeBSD's kqueue mechanism for event scheduling

from twisted.internet import kqreactor
kqreactor.install()

from twisted.internet import reactor

4. WaitForMultipleObjects (WFMO) for Win32

from twisted.internet import win32eventreactor
win32eventreactor.install()

from twisted.internet import reactor

5. Input/Output Completion Port (IOCP) for Win32

Windows provides a fast, scalable event notification system known as IO Completion Ports, or IOCP for short. Twisted includes a reactor based on IOCP which is nearly complete.

from twisted.internet import iocpreactor
iocpreactor.install()

from twisted.internet import reactor

6. Epoll-based Reactor

The EPollReactor will work on any platform that provides epoll, today only Linux 2.6 and over. The implementation of the epoll reactor currently uses the Level Triggered interface, which is basically like poll() but scales much better.

from twisted.internet import epollreactor
epollreactor.install()

from twisted.internet import reactor

Relevant Link:

https://moythreads.com/wordpress/2009/12/22/select-system-call-limitation/
http://pipeit.blogspot.com/2011/07/select-poll-and-epoll-twisted-story.html
http://twistedmatrix.com/documents/13.2.0/core/howto/choosing-reactor.html#auto2

 

Copyright (c) 2016 LittleHann All rights reserved

相关文章
相关标签/搜索