twisted(2)--聊天系统

  咱们今天要作一个聊天系统,这样能够和咱们以前flask api那系列文章结合起来;其次,聊天系统最能表明tcpserver,之后能够套用各类模型,好比咱们公司作的物联网,其实就是把聊天系统简化一下。python

  twisted官方网站已经为咱们提供了一个很是好的例子,咱们研究一下,而后在此基础上进行修改便可(这方面确实要比tornado作得好,不过tornado在阅读源码方面又有很大优点,之后咱们作一个tornado版的)react

from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver
from twisted.internet import reactor

class Chat(LineReceiver):
    def __init__(self, users):
        self.users = users
        self.name = None
        self.state = "GETNAME"

    def connectionMade(self):
        self.sendLine("What's your name?")

    def connectionLost(self, reason):
        if self.name in self.users:
            del self.users[self.name]

    def lineReceived(self, line):
        if self.state == "GETNAME":
            self.handle_GETNAME(line)
        else:
            self.handle_CHAT(line)

    def handle_GETNAME(self, name):
        if name in self.users:
            self.sendLine("Name taken, please choose another.")
            return
        self.sendLine("Welcome, %s!" % (name,))
        self.name = name
        self.users[name] = self
        self.state = "CHAT"

    def handle_CHAT(self, message):
        message = "<%s> %s" % (self.name, message)
        for name, protocol in self.users.iteritems():
            if protocol != self:
                protocol.sendLine(message)


class ChatFactory(Factory):
    def __init__(self):
        self.users = {} # maps user names to Chat instances

    def buildProtocol(self, addr):
        return Chat(self.users)

reactor.listenTCP(8123, ChatFactory())
reactor.run()

  代码很是简单,每一个用户链接上来的时候,都新建一个Chat对象,Chat类中,包含各类对单个链接的操做方法,其实看名字均可以看出来他们的做用,redis

  构造函数__init__中定义了3个变量,users是一个字典,包含全部当前链接的对象,key是它的name,value是Chat对象自己,表明本身这个链接;name标识这个链接名称,必定要明了,惟一,咱们之后会用客户的电话号码做为它的name;state有点意思,它表明一个状态,当这个链接没有经过验证的时候,是一个状态,验证过之后,又是一个状态。其实state之后还会继续扩展,好比说,在不少时候,会有不少垃圾链接进来,一般一个链接上来,在必定时间内尚未经过验证,就能够abort掉。数据库

  connectionMade看名字也知道,链接建立好之后,触发的函数。json

  connectionLost看名字意思,链接丢失之后,触发的函数,这个函数之后能够扩展到redis记录链接状态。flask

  lineReceived这个是一个链接用的最多的函数,就是数据接受到之后,触发的函数,下面2个函数就是在此基础上构建而成的。api

  handle_GETNAME和handle_CHAT的运用跟链接的state有关,当state在未验证状态时,调用handle_GETNAME函数;当已经验证过期,调用handle_CHAT。服务器

  再看看factory类,其中users就不用说了,记录每一个链接的变量。网络

  buildProtocol,新建一个链接之后,触发的函数,它调用了Chat的构造函数,新建一个Chat对象。dom

  其实Chat继承LineReceive,而LineReceive继承Protocol的。真实的链接是transport,因此咱们这个例子中没有展现出来transport,只有sendLine这样的函数,我下面本身写例子的时候,会加上去;Protocol其实就是整个链接连上来之后,加上一些这个链接当前的状态,再加上一些基本操做方法组成的;Factory就是全部Protocol组成的一个工厂类,每新加入或者减小一个Protocol对象时,都能在Factory里面表现出来。

  整个代码分析完毕,官方例子就能够直接运行了,看看运行结果吧。

  用telnet模拟一个客户端,就能够很好的操做了。

  

  以上全是官方的例子,咱们要引入本身的项目。

  首先,数据模型,官方例子很简单,直接把str格式的数据发送出去,在测试的时候没问题,但正式项目中绝对不可能。一般每一个数据,都会由2部分组成,一个header做为头,一个content做为内容。其实就是模拟http。header中,一般有数据长度、版本号、数据类型id等,这个都不是必须的,要根据你实际项目来。content做为真实数据内容,通常都用json数据格式,固然,若是你追求效率,也能够用google protor buf或者facebook的数据模式,均可以(不少公司都用的google protor buf模式,解析速度比较快,咱们这为了简单,就用json格式)。

  

  上面是咱们数据格式,绿色段就是header,蓝色段就是content。我上面就说了,这只是随便写的一个项目,在真实项目中,要根据你的需求来选择,极可能要保留字段。这边稍微解释一下command_id,其实这个就相似于http中的url,http根据url代表它的做用;咱们这一样根据command_id标示它的做用,由于在整个过程当中,不但有聊天,还有验证过程,之后还可能有广播,组播等各类功能。咱们就根据command_id来判断这个数据的做用(其实写到这,你们彻底能够看出来,咱们基本就是跟http学的,现实过程当中也这样,几乎都在模仿http),而响应之类的,就是服务器主动推送给客户端的command_id,这也是跟http不一样的地方,不少时候,咱们都是主动推送给客户端。

  好了,既然已经这样规定,咱们再详细规定一下command_id吧,就像http的url同样。

  

  咱们先比较简单的设定一下,之后要是有改动,再改变。

  咱们重写tcpserver,代码以下:

# coding:utf-8
from twisted.internet.protocol import Factory, Protocol
from twisted.internet import reactor
import struct
import json
from twisted.python import log
import sys
log.startLogging(sys.stdout)


class Chat(Protocol):
    def __init__(self, users):
        self.users = users
        self.phone_number = None
        self.state = "VERIFY"
        self.version = 0
        self.command_func_dict = {
            1: self.handle_verify,
            2: self.handle_single_chat,
            3: self.handle_group_chat,
            4: self.handle_broadcast_chat
        }

    def connectionMade(self):
        log.msg("New connection, the info is:", self.transport.getPeer())

    def connectionLost(self, reason):
        if self.phone_number in self.users:
            del self.users[self.phone_number]

    def dataReceived(self, data):
        """
        接受到数据之后的操做
        """
        length, self.version, command_id = struct.unpack('!3I', data[:12])
        content = data[12:length]

        if command_id not in [1, 2, 3, 4]:
            return

        if self.state == "VERIFY" and command_id == 1:
            self.handle_verify(content)
        else:
            self.handle_data(command_id, content)

    def handle_verify(self, content):
        """
        验证函数
        """
        content = json.loads(content)
        phone_number = content.get('phone_number')
        if phone_number in self.users:
            log.msg("电话号码<%s>存在老的链接." % phone_number.encode('utf-8'))
            self.users[phone_number].connectionLost("")
        log.msg("欢迎, %s!" % (phone_number.encode('utf-8'),))
        self.phone_number = phone_number
        self.users[phone_number] = self
        self.state = "DATA"

        send_content = json.dumps({'code': 1})

        self.send_content(send_content, 101, [phone_number])

    def handle_data(self, command_id, content):
        """
        根据command_id来分配函数
        """
        self.command_func_dict[command_id](content)

    def handle_single_chat(self, content):
        """
        单播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_to = content.get('chat_to')
        chat_content = content.get('chat_content')
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        self.send_content(send_content, 102, [chat_to])

    def handle_group_chat(self, content):
        """
        组播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_to = content.get('chat_to')
        chat_content = content.get('chat_content')
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        phone_numbers = chat_to
        self.send_content(send_content, 103, phone_numbers)

    def handle_broadcast_chat(self, content):
        """
        广播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_content = content.get('chat_content')
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        phone_numbers = self.users.keys()
        self.send_content(send_content, 104, phone_numbers)

    def send_content(self, send_content, command_id, phone_numbers):
        """
        发送函数
        """
        length = 12 + len(send_content)
        version = self.version
        command_id = command_id
        header = [length, version, command_id]
        header_pack = struct.pack('!3I', *header)
        for phone_number in phone_numbers:
            if phone_number in self.users.keys():
                self.users[phone_number].transport.write(header_pack + send_content)
            else:
                log.msg("Phone_number:%s 不在线,不能聊天." % phone_number.encode('utf-8'))


class ChatFactory(Factory):
    def __init__(self):
        self.users = {}

    def buildProtocol(self, addr):
        return Chat(self.users)


reactor.listenTCP(8124, ChatFactory())
reactor.run()

 

  代码修改的比较多,

  首先,直接从Protocol继承了,这样比从LineReceive继承更直观一点;command_func_dict表明command_id和其处理函数的一一对应字典;

  其次,dataReceived是主要的接受函数,接受到数据之后,先解析header,根据header里面的length截取数据,再根据command_id来把数据送个它的处理函数。若是command_id为1,就进入验证函数;若是为其余,就进入其余数据处理函数,不过要先验证经过,才能用其余函数处理。这就跟http同样。(这边之后要重写的,你们想象一下,若是我一个客户端链接,同时发送2个数据,按照上面代码,只能处理一个数据,另一个就丢弃了。)

  最后,send_content为总的发送函数,先把header头组建好,而后加上数据,就发送了。这边可能遇到发送的客户端不在线,要先检测一下(之后还会遇到各类意外断线状况,服务器端无法及时检测到,这个之后再讲。)

  服务器端是否是很简单?再写一个客户端代码,客户端若是用GUI方式写的话,篇幅太长了,咱们这就用最简单的方式,模拟客户端操做。下面是客户端代码。

# coding:utf-8
from twisted.internet import reactor, task
from twisted.internet.protocol import Protocol, ClientFactory
import struct
from twisted.python import log
import sys
import json
log.startLogging(sys.stdout)


class EchoClient(Protocol):
    def __init__(self):
        self.command_func_dict = {
            101: self.handle_verify_s,
            102: self.handle_single_chat_s,
            103: self.handle_group_chat_s,
            104: self.handle_broadcast_chat_s
        }
        self.version = 0
        self.state = "VERIFY"
        self.phone_number = ""

    def connectionMade(self):
        log.msg("New connection", self.transport.getPeer())

    def dataReceived(self, data):
        length, self.version, command_id = struct.unpack('!3I', data[:12])
        content = data[12:length]
        if self.state == "VERIFY" and command_id == 101:
            self.handle_verify_s(content)
        else:
            self.handle_data(command_id, content)

    def handle_data(self, command_id, pack_data):
        self.command_func_dict[command_id](pack_data)

    def connectionLost(self, reason):
        log.msg("connection lost")

    def handle_verify_s(self, pack_data):
        """
        接受验证结果
        """
        content = json.loads(pack_data)
        code = content.get('code')
        if code == 1:
            log.msg('验证经过')
        self.state = "Data"

    def handle_single_chat_s(self, pack_data):
        """
        接受单聊
        """
        content = json.loads(pack_data)
        chat_from = content.get('chat_from')
        chat_content = content.get('chat_content')
        log.msg("[单聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8')))

    def handle_group_chat_s(self, pack_data):
        """
        接受组聊
        """
        content = json.loads(pack_data)
        chat_from = content.get('chat_from')
        chat_content = content.get('chat_content')
        log.msg("[组聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8')))

    def handle_broadcast_chat_s(self, pack_data):
        """
        接受广播
        """
        content = json.loads(pack_data)
        chat_from = content.get('chat_from')
        chat_content = content.get('chat_content')
        log.msg("[群聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8')))

    def send_verify(self, phone_number):
        """
        发送验证
        """
        content = json.dumps(dict(phone_number=phone_number))
        self.send_data(content, 1)

    def send_single_chat(self, chat_from, chat_to, chat_content):
        """
        发送单聊内容
        """
        content = json.dumps(dict(chat_from=chat_from, chat_to=chat_to, chat_content=chat_content))
        self.send_data(content, 2)

    def send_group_chat(self, chat_from, chat_to, chat_content):
        """
        发送组聊内容
        """
        content = json.dumps(dict(chat_from=chat_from, chat_to=chat_to, chat_content=chat_content))
        self.send_data(content, 3)

    def send_broadcast_chat(self, chat_from, chat_content):
        """
        发送群聊内容
        """
        content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))
        self.send_data(content, 4)

    def send_data(self, send_content, command_id):
        """
        发送函数
        """
        length = 12 + len(send_content)
        version = self.version
        command_id = command_id
        header = [length, version, command_id]
        header_pack = struct.pack('!3I', *header)
        self.transport.write(header_pack + send_content)


class EchoClientFactory(ClientFactory):
    def __init__(self):
        self.p = EchoClient()

    def startedConnecting(self, connector):
        log.msg("Started to connect")

    def buildProtocol(self, addr):
        log.msg("Connected.")
        return self.p

    def clientConnectionFailed(self, connector, reason):
        log.msg("Lost connection. Reason:", reason)

    def clientConnectionLost(self, connector, reason):
        log.msg("Connection failed. Reason:", reason)


if __name__ == '__main__':
    cf = EchoClientFactory()
    chat_from = sys.argv[1]
    all_phone_numbers = ['000001', '000002', '000003', '000004']
    all_phone_numbers.remove(chat_from)
    import random
    reactor.callLater(3, cf.p.send_verify, chat_from)
    reactor.callLater(10, cf.p.send_single_chat, chat_from, random.choice(all_phone_numbers), '你好,这是单聊')
    reactor.callLater(11, cf.p.send_group_chat, chat_from, [random.choice(all_phone_numbers), random.choice(all_phone_numbers)], '你好,这是组聊')
    reactor.callLater(12, cf.p.send_broadcast_chat, chat_from, '你好,这是群聊')

    reactor.connectTCP('127.0.0.1', 8124, cf)

    reactor.run()

  客户端比较简单,主要是几个发送函数,基本都是以send_开头,就是主动发送消息以及验证的;接受从服务器的处理函数,基本以handle_开头。跟服务器端同样,接受到数据之后,先解析header,根据header里面的length截取数据,再根据command_id来把数据送个它的处理函数。

  这边弄了个定时任务,第3秒开始验证;第10秒随机发送一个单聊;第11秒随机发送一个组聊;第12秒发送一个群聊。

  咱们开3个客户端,看看结果吧。

yudahai@yudahaiPC:tcpserver$ python frontClient.py 000001
2016-06-21 17:33:17+0800 [-] Log opened.
2016-06-21 17:33:17+0800 [-] Starting factory <__main__.EchoClientFactory instance at 0x7fa325b41680>
2016-06-21 17:33:17+0800 [-] Started to connect
2016-06-21 17:33:17+0800 [Uninitialized] Connected.
2016-06-21 17:33:17+0800 [Uninitialized] New connection IPv4Address(TCP, '127.0.0.1', 8124)
2016-06-21 17:33:20+0800 [EchoClient,client] 验证经过
2016-06-21 17:33:29+0800 [EchoClient,client] [群聊][000001]:你好,这是群聊
2016-06-21 17:33:29+0800 [EchoClient,client] [单聊][000002]:你好,这是单聊
2016-06-21 17:33:30+0800 [EchoClient,client] [组聊][000002]:你好,这是组聊
2016-06-21 17:33:31+0800 [EchoClient,client] [群聊][000002]:你好,这是群聊
2016-06-21 17:33:38+0800 [EchoClient,client] [单聊][000003]:你好,这是单聊
2016-06-21 17:33:39+0800 [EchoClient,client] [组聊][000003]:你好,这是组聊
2016-06-21 17:33:40+0800 [EchoClient,client] [群聊][000003]:你好,这是群聊
yudahai@yudahaiPC:tcpserver$ python frontClient.py 000002
2016-06-21 17:33:19+0800 [-] Log opened.
2016-06-21 17:33:19+0800 [-] Starting factory <__main__.EchoClientFactory instance at 0x7f23f9a48680>
2016-06-21 17:33:19+0800 [-] Started to connect
2016-06-21 17:33:19+0800 [Uninitialized] Connected.
2016-06-21 17:33:19+0800 [Uninitialized] New connection IPv4Address(TCP, '127.0.0.1', 8124)
2016-06-21 17:33:22+0800 [EchoClient,client] 验证经过
2016-06-21 17:33:27+0800 [EchoClient,client] [单聊][000001]:你好,这是单聊
2016-06-21 17:33:29+0800 [EchoClient,client] [群聊][000001]:你好,这是群聊
2016-06-21 17:33:31+0800 [EchoClient,client] [群聊][000002]:你好,这是群聊
2016-06-21 17:33:40+0800 [EchoClient,client] [群聊][000003]:你好,这是群聊
yudahai@yudahaiPC:tcpserver$ python frontClient.py 000003
2016-06-21 17:33:28+0800 [-] Log opened.
2016-06-21 17:33:28+0800 [-] Starting factory <__main__.EchoClientFactory instance at 0x7ff3067dc680>
2016-06-21 17:33:28+0800 [-] Started to connect
2016-06-21 17:33:28+0800 [Uninitialized] Connected.
2016-06-21 17:33:28+0800 [Uninitialized] New connection IPv4Address(TCP, '127.0.0.1', 8124)
2016-06-21 17:33:31+0800 [EchoClient,client] 验证经过
2016-06-21 17:33:40+0800 [EchoClient,client] [群聊][000003]:你好,这是群聊

  这就是3个客户端的结果,是否是你指望的值?

  再看看服务器端的调试结果。

/usr/bin/python2.7 /home/yudahai/PycharmProjects/blog01/tcpserver/frontTCP.py
2016-06-21 17:23:01+0800 [-] Log opened.
2016-06-21 17:23:01+0800 [-] ChatFactory starting on 8124
2016-06-21 17:23:01+0800 [-] Starting factory <__main__.ChatFactory instance at 0x7f08b0ec8638>
2016-06-21 17:23:26+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '127.0.0.1', 59802)
2016-06-21 17:23:29+0800 [Chat,0,127.0.0.1] 欢迎, 000001!
2016-06-21 17:23:36+0800 [Chat,0,127.0.0.1] Phone_number:000003 不在线,不能聊天.
2016-06-21 17:23:37+0800 [Chat,0,127.0.0.1] Phone_number:000003 不在线,不能聊天.
2016-06-21 17:23:37+0800 [Chat,0,127.0.0.1] Phone_number:000002 不在线,不能聊天.
2016-06-21 17:33:17+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '127.0.0.1', 59926)
2016-06-21 17:33:19+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '127.0.0.1', 59928)
2016-06-21 17:33:20+0800 [Chat,1,127.0.0.1] 欢迎, 000001!
2016-06-21 17:33:22+0800 [Chat,2,127.0.0.1] 欢迎, 000002!
2016-06-21 17:33:28+0800 [Chat,1,127.0.0.1] Phone_number:000004 不在线,不能聊天.
2016-06-21 17:33:28+0800 [Chat,1,127.0.0.1] Phone_number:000004 不在线,不能聊天.
2016-06-21 17:33:28+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '127.0.0.1', 59930)
2016-06-21 17:33:30+0800 [Chat,2,127.0.0.1] Phone_number:000003 不在线,不能聊天.
2016-06-21 17:33:31+0800 [Chat,3,127.0.0.1] 欢迎, 000003!

  不在线的时候,都打印出来了。

  其实整个例子仍是比较简单的,可是不少地方还很是不完善,这个要在咱们接下来的系列中,慢慢完善。

  好比:若是一个客户端同时发送2个数据,上面的代码就只处理了一个,另一个就丢弃掉了;还有,咱们的程序考虑的是正常的上线、离线,若是客户端由于网络问题,忽然断线,没有发生tcp结束的4次握手,服务器端是不知道的,这时候如何保证服务器端知道客户端在线不在线?还有,twisted如何异步访问数据库、redis、rabbitmq等,这个咱们之后都会慢慢讲。

相关文章
相关标签/搜索