twisted(3)--再谈twisted

  上一章,咱们直接写了一个小例子来从总体讲述twisted运行的大体过程,今天咱们首先深刻一些概念,在逐渐明白这些概念之后,咱们会修改昨天写的例子。前端

  先看下面一张图:python

  这个系列的第一篇文章,咱们已经为你们展现了一张twisted的原理图,那张图,由于咱们没有捕获任何socket事件,因此只有一个圈。这张图上面的大圈表明捕获socket事件,这个也是twisted最主要的功能,它已经为咱们作了。而且提供了2个函数,transport.write写入事件,dataReceived读取事件。下面的小圈子,也就是咱们本身的代码,好比咱们昨天的验证、单聊、组聊等功能。你们必定要时时刻刻记住这张图,编写twisted代码的时候,脑子里印着这张图,这就跟咱们之前写c代码的时候,必定要记住内存模型同样。react

  回到这个大圈,transport.write和dataReceived实际上是通过不少层封装的函数,它们本质上仍是操做select模型中的写文件描述符(write_fd)、读文件描述符(read_fd),对应twisted的基础类就是IWriteDescriptor和IReadDescriptor,若是咱们比较熟悉select模型,咱们都知道,每次新来一个链接,都是创建write_fd、read_fd、error_fd,select不停的轮询这些fd,当其中任何一个知足条件时,触发相应的事件,这些全部的东西,twisted都已经帮咱们作好了,并且异步化了。咱们接受到事件,只管处理就行了。web

  再看下面一个图,
redis

  仔细看上面这个图,再对比以前的图,twisted在socket这块所有为咱们作好。json

  下面咱们再讲一下transport这个对象,这个对象在每一个Protocol里面都会产生一个,它表明一个链接,这个链接能够是socket,也能够是unix的pipe,twisted已经为咱们封装好,通常不会本身去新建它。一般咱们会用它来发送数据(write)、获取链接另外一方的信息(getPeer)。flask

  再看一下dataReceived这个函数,就是每次接到数据之后触发事件,上面说了,就是每次循环,select检查这些fd,fd被写入就触发。这时候你们想一想,若是循环被阻塞,在这个data里面会有不少数据,按照咱们昨天的程序,只会处理第一个数据,其余的可能被丢弃掉了。缓存

  咱们昨天的例子,把客户端运行代码稍微修改一下,在第10秒的时候,同时发送2个数据(粘包),看看服务器运行状况。服务器

if __name__ == '__main__':
    cf = EchoClientFactory()
    chat_from = sys.argv[1]
    all_phone_numbers = ['000001', '000002', '000003', '000004']
    all_phone_numbers.remove(chat_from)
    import random
    reactor.callLater(3, cf.p.send_verify, chat_from)
    reactor.callLater(10, cf.p.send_single_chat, chat_from, random.choice(all_phone_numbers), '你好,这是单聊')
    reactor.callLater(10, cf.p.send_single_chat, chat_from, random.choice(all_phone_numbers), '你好,这是单聊')
    # reactor.callLater(11, cf.p.send_group_chat, chat_from, [random.choice(all_phone_numbers), random.choice(all_phone_numbers)], '你好,这是组聊')
    # reactor.callLater(12, cf.p.send_broadcast_chat, chat_from, '你好,这是群聊')

    reactor.connectTCP('127.0.0.1', 8124, cf)

    reactor.run()

  客户端代码已经更改,运行一下,看看服务器结果。网络

/usr/bin/python2.7 /home/yudahai/PycharmProjects/blog01/tcpserver/frontTCP.py
2016-06-22 10:23:55+0800 [-] Log opened.
2016-06-22 10:23:55+0800 [-] ChatFactory starting on 8124
2016-06-22 10:23:55+0800 [-] Starting factory <__main__.ChatFactory instance at 0x7f382d908638>
2016-06-22 10:24:02+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '127.0.0.1', 47834)
2016-06-22 10:24:05+0800 [Chat,0,127.0.0.1] 欢迎, 000001!
2016-06-22 10:24:12+0800 [Chat,0,127.0.0.1] 你好,这是单聊
2016-06-22 10:24:12+0800 [Chat,0,127.0.0.1] Phone_number:000002 不在线,不能聊天.

  果真,只处理了一个数据,后面一个直接丢弃掉了。

  一般来讲,咱们都会为每一个Protocol申请一段内存,每次接受到数据之后,先存放到这段内存中,而后再集中处理,这样,即便循环被blocking住或者客户端粘包,咱们也能正确处理。新的代码以下:

  

# coding:utf-8
from twisted.internet.protocol import Factory, Protocol
from twisted.internet import reactor
import struct
import json
from twisted.python import log
import sys
log.startLogging(sys.stdout)


class Chat(Protocol):
    def __init__(self, users):
        self.users = users
        self.phone_number = None
        self.state = "VERIFY"
        self.version = 0
        self.command_func_dict = {
            1: self.handle_verify,
            2: self.handle_single_chat,
            3: self.handle_group_chat,
            4: self.handle_broadcast_chat
        }
        self._data_buffer = bytes()

    def connectionMade(self):
        log.msg("New connection, the info is:", self.transport.getPeer())

    def connectionLost(self, reason):
        if self.phone_number in self.users:
            del self.users[self.phone_number]

    def dataReceived(self, data):
        """
        接受到数据之后的操做
        """
        self._data_buffer += data

        while True:
            length, self.version, command_id = struct.unpack('!3I', self._data_buffer[:12])

            if length > len(self._data_buffer):
                break

            content = self._data_buffer[12:length]

            if command_id not in [1, 2, 3, 4]:
                return

            if self.state == "VERIFY" and command_id == 1:
                self.handle_verify(content)
            else:
                self.handle_data(command_id, content)

            self._data_buffer = self._data_buffer[length:]

            if len(self._data_buffer) < 12:
                break

    def handle_verify(self, content):
        """
        验证函数
        """
        content = json.loads(content)
        phone_number = content.get('phone_number')
        if phone_number in self.users:
            log.msg("电话号码<%s>存在老的链接." % phone_number.encode('utf-8'))
            self.users[phone_number].connectionLost("")
        log.msg("欢迎, %s!" % (phone_number.encode('utf-8'),))
        self.phone_number = phone_number
        self.users[phone_number] = self
        self.state = "DATA"

        send_content = json.dumps({'code': 1})

        self.send_content(send_content, 101, [phone_number])

    def handle_data(self, command_id, content):
        """
        根据command_id来分配函数
        """
        self.command_func_dict[command_id](content)

    def handle_single_chat(self, content):
        """
        单播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_to = content.get('chat_to')
        chat_content = content.get('chat_content')
        log.msg(chat_content.encode('utf-8'))
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        self.send_content(send_content, 102, [chat_to])

    def handle_group_chat(self, content):
        """
        组播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_to = content.get('chat_to')
        chat_content = content.get('chat_content')
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        phone_numbers = chat_to
        self.send_content(send_content, 103, phone_numbers)

    def handle_broadcast_chat(self, content):
        """
        广播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_content = content.get('chat_content')
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        phone_numbers = self.users.keys()
        self.send_content(send_content, 104, phone_numbers)

    def send_content(self, send_content, command_id, phone_numbers):
        """
        发送函数
        """
        length = 12 + len(send_content)
        version = self.version
        command_id = command_id
        header = [length, version, command_id]
        header_pack = struct.pack('!3I', *header)
        for phone_number in phone_numbers:
            if phone_number in self.users.keys():
                self.users[phone_number].transport.write(header_pack + send_content)
            else:
                log.msg("Phone_number:%s 不在线,不能聊天." % phone_number.encode('utf-8'))


class ChatFactory(Factory):
    def __init__(self):
        self.users = {}

    def buildProtocol(self, addr):
        return Chat(self.users)


reactor.listenTCP(8124, ChatFactory())
reactor.run()

  咱们在构造函数里面,加入了一个字段,这个字段就是self._data_buffer,在每次接受到数据之后,都循环处理这段内存。再看看运行结果,有什么不一样。

/usr/bin/python2.7 /home/yudahai/PycharmProjects/blog01/tcpserver/frontTCP.py
2016-06-22 10:40:42+0800 [-] Log opened.
2016-06-22 10:40:42+0800 [-] ChatFactory starting on 8124
2016-06-22 10:40:42+0800 [-] Starting factory <__main__.ChatFactory instance at 0x7f96860e0680>
2016-06-22 10:40:57+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '127.0.0.1', 48010)
2016-06-22 10:41:00+0800 [Chat,0,127.0.0.1] 欢迎, 000001!
2016-06-22 10:41:07+0800 [Chat,0,127.0.0.1] 你好,这是单聊
2016-06-22 10:41:07+0800 [Chat,0,127.0.0.1] Phone_number:000004 不在线,不能聊天.
2016-06-22 10:41:07+0800 [Chat,0,127.0.0.1] 你好,这是单聊
2016-06-22 10:41:07+0800 [Chat,0,127.0.0.1] Phone_number:000003 不在线,不能聊天.

  是否是正确了?接受数据,咱们先讲到这,下面咱们讲开发tcpserver必定要处理的问题,异常断线

异常断线

  异常断线的处理在tcpserver开发过程当中必不可少,不少时候,尤为是无线、3G、4G网络,信号很差的时候就断线,因为是网络问题,没有通过tcp结束的4次握手,服务器不可能及时检查到此事件,这时候就有可能出错。一般咱们会采起一种心跳包机制,即客户端每隔一段时间就向服务器端发送一个心跳包,服务器端每隔一段时间就检测一下,若是发现客户端连续2次或者屡次没有发送心跳包,就认为客户端已经掉线,再采起措施。

  好了,说了这么多,先要从新部署一下程序,我把一个客户端发在个人另一台笔记本上,先链接好,而后拔掉网线,再从服务器端发送一组数据过去,看看会发生什么。

  首先,咱们把000002放在笔记本上,000001在服务器端,在10秒和20秒的时候,分别发送一个单聊给000002,看看服务器端和000002的状况。

000001的运行代码修改以下:

if __name__ == '__main__':
    cf = EchoClientFactory()
    chat_from = sys.argv[1]
    all_phone_numbers = ['000001', '000002', '000003', '000004']
    all_phone_numbers.remove(chat_from)
    import random
    reactor.callLater(3, cf.p.send_verify, chat_from)
    reactor.callLater(10, cf.p.send_single_chat, chat_from, '000002', '你好,这是10秒的时候发送')
    reactor.callLater(20, cf.p.send_single_chat, chat_from, '000002', '你好,这是20秒的时候发送')

    reactor.connectTCP('127.0.0.1', 8124, cf)

    reactor.run()

  10秒和20秒,分别发送数据到服务器端,而000002端,在10秒和20秒的中间,拔掉网线,咱们看看发生了什么状况。

  首先,服务器端的运行结果以下:

/usr/bin/python2.7 /home/yudahai/PycharmProjects/blog01/tcpserver/frontTCP.py
2016-06-22 11:40:02+0800 [-] Log opened.
2016-06-22 11:40:02+0800 [-] ChatFactory starting on 8124
2016-06-22 11:40:02+0800 [-] Starting factory <__main__.ChatFactory instance at 0x7f9c39f89638>
2016-06-22 11:41:26+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '192.168.5.15', 57150)
2016-06-22 11:41:29+0800 [Chat,0,192.168.5.15] 欢迎, 000002!
2016-06-22 11:41:41+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '127.0.0.1', 49526)
2016-06-22 11:41:44+0800 [Chat,1,127.0.0.1] 欢迎, 000001!
2016-06-22 11:41:51+0800 [Chat,1,127.0.0.1] 你好,这是10秒的时候发送
2016-06-22 11:42:01+0800 [Chat,1,127.0.0.1] 你好,这是20秒的时候发送

  它在000002中断了之后,并无发现000002已经中断,仍是照样write下去,其实本质上,它仍是把数据发到了write_fd上,而后就是底层的事了。

  而000002客户端的结果比较有意思。

2016-06-22 11:41:26+0800 [-] Log opened.
2016-06-22 11:41:26+0800 [-] Starting factory <__main__.EchoClientFactory instance at 0x7f4e75db7680>
2016-06-22 11:41:26+0800 [-] Started to connect
2016-06-22 11:41:26+0800 [Uninitialized] Connected.
2016-06-22 11:41:26+0800 [Uninitialized] New connection IPv4Address(TCP, '192.168.5.60', 8124)
2016-06-22 11:41:29+0800 [EchoClient,client] 验证经过
2016-06-22 11:41:51+0800 [EchoClient,client] [单聊][000001]:你好,这是10秒的时候发送
2016-06-22 11:44:27+0800 [EchoClient,client] [单聊][000001]:你好,这是20秒的时候发送

  你们注意到没有,竟然仍是收到了,可是看时间,时间和原来的是不对的。我后来把网线从新插上去,而后就接受到了。twisted把write_fd的数据从新发送给了客户端,由于客户端没有任何改变,ip和端口都是原来的,网络状况没有改变,因此再次就链接上来。

  咱们再试一下另一种状况,也是移动端常常遇到的状况,就是切换网络,好比从4G切换到无线网,看看会发生什么。

yudahai@yu-sony:~/PycharmProjects/flask001$ python frontClient.py 000002
2016-06-22 13:09:34+0800 [-] Log opened.
2016-06-22 13:09:34+0800 [-] Starting factory <__main__.EchoClientFactory instance at 0x7fd8a0408680>
2016-06-22 13:09:34+0800 [-] Started to connect
2016-06-22 13:09:34+0800 [Uninitialized] Connected.
2016-06-22 13:09:34+0800 [Uninitialized] New connection IPv4Address(TCP, '192.168.5.60', 8124)
2016-06-22 13:09:37+0800 [EchoClient,client] 验证经过
2016-06-22 13:09:54+0800 [EchoClient,client] [单聊][000001]:你好,这是10秒的时候发送

  客户端再也收不到了,这也是真实状况。一般来讲,用户切换网络的时候,都会更改网络信息,这时候移动客户端再也收不到这个信息了,并且服务器端也不会报错(之后要为咱们作消息确认机制埋下伏笔。)

  既然收不到了,咱们就解决这个问题,上面说了,增长心跳包机制,客户端每隔一段时间发送一次心跳包,服务器端收到心跳包之后,记录最近一次接受到的时间。每隔一段时间,服务器总体轮询一次,若是发现某一个客户端很长时间没有接受到心跳包,就断定它为断线,这时候主动切断这个客户端。

  心跳包的command_id也要加上,直接为5吧,内容为空。只是心跳包,没有必要写内容了。

  新代码以下:
  frontTCP.py

# coding:utf-8
from twisted.internet.protocol import Factory, Protocol
from twisted.internet import reactor, task
import struct
import json
from twisted.python import log
import sys
import time
log.startLogging(sys.stdout)


class Chat(Protocol):
    def __init__(self, users):
        self.users = users
        self.phone_number = None
        self.state = "VERIFY"
        self.version = 0
        self.last_heartbeat_time = 0
        self.command_func_dict = {
            1: self.handle_verify,
            2: self.handle_single_chat,
            3: self.handle_group_chat,
            4: self.handle_broadcast_chat,
            5: self.handle_heartbeat
        }
        self._data_buffer = bytes()

    def connectionMade(self):
        log.msg("New connection, the info is:", self.transport.getPeer())

    def connectionLost(self, reason):
        log.msg("[%s]:断线" % self.phone_number.encode('utf-8'))
        if self.phone_number in self.users:
            del self.users[self.phone_number]

    def dataReceived(self, data):
        """
        接受到数据之后的操做
        """
        self._data_buffer += data

        while True:
            length, self.version, command_id = struct.unpack('!3I', self._data_buffer[:12])

            if length > len(self._data_buffer):
                break

            content = self._data_buffer[12:length]

            if command_id not in [1, 2, 3, 4, 5]:
                return

            if self.state == "VERIFY" and command_id == 1:
                self.handle_verify(content)
            else:
                self.handle_data(command_id, content)

            self._data_buffer = self._data_buffer[length:]

            if len(self._data_buffer) < 12:
                break

    def handle_heartbeat(self, content):
        """
        处理心跳包
        """
        self.last_heartbeat_time = int(time.time())

    def handle_verify(self, content):
        """
        验证函数
        """
        content = json.loads(content)
        phone_number = content.get('phone_number')
        if phone_number in self.users:
            log.msg("电话号码<%s>存在老的链接." % phone_number.encode('utf-8'))
            self.users[phone_number].connectionLost("")
        log.msg("欢迎, %s!" % (phone_number.encode('utf-8'),))
        self.phone_number = phone_number
        self.users[phone_number] = self
        self.state = "DATA"

        send_content = json.dumps({'code': 1})

        self.send_content(send_content, 101, [phone_number])

    def handle_data(self, command_id, content):
        """
        根据command_id来分配函数
        """
        self.command_func_dict[command_id](content)

    def handle_single_chat(self, content):
        """
        单播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_to = content.get('chat_to')
        chat_content = content.get('chat_content')
        log.msg(chat_content.encode('utf-8'))
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        self.send_content(send_content, 102, [chat_to])

    def handle_group_chat(self, content):
        """
        组播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_to = content.get('chat_to')
        chat_content = content.get('chat_content')
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        phone_numbers = chat_to
        self.send_content(send_content, 103, phone_numbers)

    def handle_broadcast_chat(self, content):
        """
        广播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_content = content.get('chat_content')
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        phone_numbers = self.users.keys()
        self.send_content(send_content, 104, phone_numbers)

    def send_content(self, send_content, command_id, phone_numbers):
        """
        发送函数
        """
        length = 12 + len(send_content)
        version = self.version
        command_id = command_id
        header = [length, version, command_id]
        header_pack = struct.pack('!3I', *header)
        for phone_number in phone_numbers:
            if phone_number in self.users.keys():
                self.users[phone_number].transport.write(header_pack + send_content)
            else:
                log.msg("Phone_number:%s 不在线,不能聊天." % phone_number.encode('utf-8'))


class ChatFactory(Factory):
    def __init__(self):
        self.users = {}

    def buildProtocol(self, addr):
        return Chat(self.users)

    def check_users_online(self):
        for key, value in self.users.items():
            if value.last_heartbeat_time != 0 and int(time.time()) - value.last_heartbeat_time > 4:
                log.msg("[%s]没有检测到心跳包,主动切断" % key.encode('utf-8'))
                value.transport.abortConnection()

cf = ChatFactory()

task1 = task.LoopingCall(cf.check_users_online)
task1.start(3, now=False)

reactor.listenTCP(8124, cf)
reactor.run()

  就像上面所说的,加了一个接受心跳包的检测的函数,handle_heartbeat,每次来一个心跳包,就把它相应的last_heartbeat_time变换一下,这样,总体轮询检测的时候,我只要判断最后一次链接时间和当前链接时间之差,就能够判断它是否是异常断线了。

  这里看我异常断线的处理,transport.abortConnection(),从字面意思上,直接丢弃这个链接,它会调用Protocol的connectionLost,并且它无论那个fd里面有没有数据,所有丢弃。这个咱们之后用netstat分析链接的时候,会进一步说明这个函数,如今只要记住,它会强行中断这个链接,删除任何缓存在里面的数据便可。

 

  frontClient.py

# coding:utf-8
from twisted.internet import reactor, task
from twisted.internet.protocol import Protocol, ClientFactory
import struct
from twisted.python import log
import sys
import json
log.startLogging(sys.stdout)


class EchoClient(Protocol):
    def __init__(self):
        self.command_func_dict = {
            101: self.handle_verify_s,
            102: self.handle_single_chat_s,
            103: self.handle_group_chat_s,
            104: self.handle_broadcast_chat_s
        }
        self.version = 0
        self.state = "VERIFY"
        self.phone_number = ""

    def connectionMade(self):
        log.msg("New connection", self.transport.getPeer())

    def dataReceived(self, data):
        length, self.version, command_id = struct.unpack('!3I', data[:12])
        content = data[12:length]
        if self.state == "VERIFY" and command_id == 101:
            self.handle_verify_s(content)
        else:
            self.handle_data(command_id, content)

    def handle_data(self, command_id, pack_data):
        self.command_func_dict[command_id](pack_data)

    def connectionLost(self, reason):
        log.msg("connection lost")

    def handle_verify_s(self, pack_data):
        """
        接受验证结果
        """
        content = json.loads(pack_data)
        code = content.get('code')
        if code == 1:
            log.msg('验证经过')
        self.state = "Data"

    def handle_single_chat_s(self, pack_data):
        """
        接受单聊
        """
        content = json.loads(pack_data)
        chat_from = content.get('chat_from')
        chat_content = content.get('chat_content')
        log.msg("[单聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8')))

    def handle_group_chat_s(self, pack_data):
        """
        接受组聊
        """
        content = json.loads(pack_data)
        chat_from = content.get('chat_from')
        chat_content = content.get('chat_content')
        log.msg("[组聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8')))

    def handle_broadcast_chat_s(self, pack_data):
        """
        接受广播
        """
        content = json.loads(pack_data)
        chat_from = content.get('chat_from')
        chat_content = content.get('chat_content')
        log.msg("[群聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8')))

    def send_verify(self, phone_number):
        """
        发送验证
        """
        content = json.dumps(dict(phone_number=phone_number))
        self.send_data(content, 1)

    def send_single_chat(self, chat_from, chat_to, chat_content):
        """
        发送单聊内容
        """
        content = json.dumps(dict(chat_from=chat_from, chat_to=chat_to, chat_content=chat_content))
        self.send_data(content, 2)

    def send_group_chat(self, chat_from, chat_to, chat_content):
        """
        发送组聊内容
        """
        content = json.dumps(dict(chat_from=chat_from, chat_to=chat_to, chat_content=chat_content))
        self.send_data(content, 3)

    def send_broadcast_chat(self, chat_from, chat_content):
        """
        发送群聊内容
        """
        content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))
        self.send_data(content, 4)

    def send_data(self, send_content, command_id):
        """
        发送函数
        """
        length = 12 + len(send_content)
        version = self.version
        command_id = command_id
        header = [length, version, command_id]
        header_pack = struct.pack('!3I', *header)
        self.transport.write(header_pack + send_content)

    def send_heartbeat(self):
        """
        发送心跳包
        """
        length = 12
        version = self.version
        command_id = 5
        header = [length, version, command_id]
        header_pack = struct.pack('!3I', *header)
        self.transport.write(header_pack)


class EchoClientFactory(ClientFactory):
    def __init__(self):
        self.p = EchoClient()

    def startedConnecting(self, connector):
        log.msg("Started to connect")

    def buildProtocol(self, addr):
        log.msg("Connected.")
        return self.p

    def clientConnectionFailed(self, connector, reason):
        log.msg("Lost connection. Reason:", reason)

    def clientConnectionLost(self, connector, reason):
        log.msg("Connection failed. Reason:", reason)


if __name__ == '__main__':
    cf = EchoClientFactory()
    chat_from = sys.argv[1]
    all_phone_numbers = ['000001', '000002', '000003', '000004']
    all_phone_numbers.remove(chat_from)
    import random

    task_send_heartbeat = task.LoopingCall(cf.p.send_heartbeat)
    task_send_heartbeat.start(2, now=False)

    reactor.callLater(3, cf.p.send_verify, chat_from)
    reactor.callLater(10, cf.p.send_single_chat, chat_from, '000002', '你好,这是10秒的时候发送')
    reactor.callLater(20, cf.p.send_single_chat, chat_from, '000002', '你好,这是20秒的时候发送')

    reactor.connectTCP('192.168.5.60', 8124, cf)

    reactor.run()

  这边就添加了一个心跳包发送程序,每隔2秒发送一个心跳包。

  我在000002的客户端在10秒和20秒之间,拔掉了网线,看看调试效果,

  先看服务器端的调试结果。

/usr/bin/python2.7 /home/yudahai/PycharmProjects/blog01/tcpserver/frontTCP.py
2016-06-22 15:15:23+0800 [-] Log opened.
2016-06-22 15:15:23+0800 [-] ChatFactory starting on 8124
2016-06-22 15:15:23+0800 [-] Starting factory <__main__.ChatFactory instance at 0x7ff3c3615758>
2016-06-22 15:15:53+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '192.168.5.15', 39774)
2016-06-22 15:15:54+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '192.168.5.60', 36084)
2016-06-22 15:15:56+0800 [Chat,0,192.168.5.15] 欢迎, 000002!
2016-06-22 15:15:57+0800 [Chat,1,192.168.5.60] 欢迎, 000001!
2016-06-22 15:16:04+0800 [Chat,1,192.168.5.60] 你好,这是10秒的时候发送
2016-06-22 15:16:11+0800 [-] [000002]没有检测到心跳包,主动切断
2016-06-22 15:16:11+0800 [-] [000002]:断线
2016-06-22 15:16:14+0800 [Chat,1,192.168.5.60] 你好,这是20秒的时候发送
2016-06-22 15:16:14+0800 [Chat,1,192.168.5.60] Phone_number:000002 不在线,不能聊天.

  看见没有,已经能主动检测到了。

  再看一下客户端000002的调试结果

yudahai@yu-sony:~/PycharmProjects/flask001$ python frontClient.py 000002
2016-06-22 15:15:53+0800 [-] Log opened.
2016-06-22 15:15:53+0800 [-] Starting factory <__main__.EchoClientFactory instance at 0x7f4e3e3d56c8>
2016-06-22 15:15:53+0800 [-] Started to connect
2016-06-22 15:15:53+0800 [Uninitialized] Connected.
2016-06-22 15:15:53+0800 [Uninitialized] New connection IPv4Address(TCP, '192.168.5.60', 8124)
2016-06-22 15:15:56+0800 [EchoClient,client] 验证经过
2016-06-22 15:16:04+0800 [EchoClient,client] [单聊][000001]:你好,这是10秒的时候发送
2016-06-22 15:24:27+0800 [EchoClient,client] connection lost
2016-06-22 15:24:27+0800 [EchoClient,client] Connection failed. Reason: [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionLost'>: Connection to the other side was lost in a non-clean fashion.
    ]
2016-06-22 15:24:27+0800 [EchoClient,client] Stopping factory <__main__.EchoClientFactory instance at 0x7f4e3e3d56c8>

  比较有意思,15:16我中断了链接,没有接受到,这时候服务器主动切断网络,再链接上来的时候,它已经接受到消息,本身被中断了,其实客户端应该有个断线重连机制,不过这是客户端的事,主要看你的业务需求。

  

  到这,利用心跳包来检测异常网络状况就完成了,若是你有更好的方案,欢迎你们跟我讨论,毕竟我不是专门作tcpserver的,不少东西可能没有研究到。

  下一章,咱们研究twisted链接redis,把一些很状态转移到redis中,这样,其余模块就能共享这个状态了,这在物联网中,用到尤为多,好比设备在线断线状态、报警状态等,前端web能够直接拿来使用了;之后咱们还会讲rabbitmq在twisted中的应用。

相关文章
相关标签/搜索