基于python yield机制的异步操做同步化编程模型

     又一个milestone即将结束,有了些许的时间总结研发过程当中的点滴心得,今天总结下如何在编写python代码时对异步操做进行同步化模拟,从而提升代码的可读性和可扩展性。 python

     游戏引擎通常都采用分布式框架,经过必定的策略来均衡服务器集群的资源负载,从而保证服务器运算的高并发性和CPU高利用率,最终提升游戏的性能和负载。因为引擎的逻辑层调用是非抢占式的,服务器之间都是经过异步调用来进行通信,致使游戏逻辑没法同步执行,因此在代码层不得不人为地添加不少回调函数,使一个本来完整的功能碎片化地分布在各个回调函数中。 编程

异步逻辑 服务器

     以游戏中的副本评分逻辑为例,在副本结束时副本管理进程须要收集副本中每一个玩家的战斗信息,再结合管理进程内部的统计信息最终给出一个副本评分,发放相应奖励。由于每一个玩家实体都随机分布在不一样进程中,因此管理进程须要经过异步调用来获取玩家身上的战斗信息。 并发

实现代码以下所示: 框架

# -*- coding: gbk -*-
import random

# 玩家实体类
class Player(object):
    def __init__(self, entityId):
        super(Player, self).__init__()
        # 玩家标识
        self.entityId = entityId

    def onFubenEnd(self, mailBox):
        score = random.randint(1, 10)
        print "onFubenEnd player %d score %d"%(self.entityId, score)

        # 向副本管理进程发送本身的id和战斗信息
        mailBox.onEvalFubenScore(self.entityId, score)

# 副本管理类
class FubenStub(object):
    def __init__(self, players):
        super(FubenStub, self).__init__()
        self.players = players

    def evalFubenScore(self):
        self.playerRelayCnt = 0
        self.totalScore = 0

        # 通知每一个注册的玩家,副本已经结束,索取战斗信息
        for player in self.players:
            player.onFubenEnd(self)

    def onEvalFubenScore(self, entityId, score):
        # 收到其中一个玩家的战斗信息
        print "onEvalFubenScore player %d score %d"%(entityId, score)
        self.playerRelayCnt += 1
        self.totalScore += score

        # 当收集完全部玩家的信息后,打印评分
        if len(self.players) == self.playerRelayCnt:
            print 'The fuben totalScore is %d'%self.totalScore

if __name__ == '__main__':
    # 模拟建立玩家实体
    players = [Player(i) for i in xrange(3)]

    # 副本开始时,每一个玩家将本身的MailBox注册到副本管理进程
    fs = FubenStub(players)

    # 副本进行中
    # ....

    # 副本结束,开始评分
    fs.evalFubenScore()
代码简化了副本评分逻辑的实现,其中Player类表示游戏的玩家实体,在游戏运行时无缝地在不一样服务器中切换,FubenStub表示副本的管理进程,在副本刚开始的时候该副本内全部玩家会将本身的MailBox注册到管理进程中,其中MailBox表示各个实体的远程调用句柄。在副本结束时,FubenStub首先向各个玩家发送副本结束消息,同时请求玩家的战斗信息,玩家在获得消息后,将本身的战斗信息发送给FubenStub;而后当FubenStub收集完全部玩家的信息后,最终打印副本评分。


同步逻辑 dom

    若是Player和FubenStub在同一进程中的话,那全部的操做均可以同步完成,在FubenStub向玩家发送副本结束消息的同时能够立刻获得该玩家的战斗信息,实现代码以下所示: 异步

# -*- coding: gbk -*-

import random

class Player(object):
    def __init__(self, entityId):
        super(Player, self).__init__()
        self.entityId = entityId

    def onFubenEnd(self, mailBox):
        score = random.randint(1, 10)
        print "onFubenEnd player %d score %d"%(self.entityId, score)
        return self.entityId, score

class FubenStub(object):
    def __init__(self, players):
        super(FubenStub, self).__init__()
        self.players = players

    def evalFubenScore(self):
        totalScore = 0
        for player in self.players:
            entityId, score = player.onFubenEnd(self)
            print "onEvalFubenScore player %d score %d"%(entityId, score)
            totalScore += score

        print 'The fuben totalScore is %d'%totalScore

if __name__ == '__main__':
    players = [Player(i) for i in xrange(3)]

    fs = FubenStub(players)
    fs.evalFubenScore()

     从以上两份代码能够看到因为异步操做,FubenStub中的评分逻辑人为地分红两个功能点:1)向玩家发送副本结束消息;2)接受玩家的战斗信息;而且两个功能点分布在两个不一样的函数中。若是游戏逻辑一旦复杂,势必会形成功能点分散,出现过多onXXX异步回调函数,最终致使代码的开发成本和维护成本提升,可读性和可扩展性降低。 分布式

     若是有一种方法,可让函数在异步调用时暂时挂起,而且在回调函数获得返回值后恢复执行,那么就能够用同步化的编程模式开发异步逻辑。 ide

yield 关键字 函数

     yield  Python中的一个关键字,凡是函数体中出现了 yield 关键字, Python将改变整个函数的上下文,调用该函数再也不返回值, 而是一个生成器对象。只有调用这个生成器的迭代函数next才能开始执行生成器对象,当生成器对象执行到包含 yield 表达式时, 函数将暂时挂起,等待下一次next调用来恢复执行,具体机制以下:

         1)调用生成器对象的next方法,启动函数执行;

         2)当生成器对象执行到包含 yield 表达式时, 函数挂起;

         3)下一次 next 函数调用又会驱动该生成器对象继续执行此后的语句, 直到碰见下一个 yield 再次挂起;

         4)若是某次 next 调用驱动了生成器继续执行, 而此后函数正常结束,生成器会抛出 StopIteration 异常;

以下代码所示:


def f():
    print "Before first yield"
    yield 1
    print "Before second yield"
    yield 2
    print "After second yield"

g = f()
print "Before first next"
g.next()
print "Before second next"
g.next()
print "Before third yield"
g.next()

执行结果为:

Before first next

Before first yield

Before second next

Before second yield

Before third yield

After second yield

StopIteration

     哈,有了让函数暂时挂起的机制,最后就剩下如何传递异步调用的返回值问题了。其实生成器的next函数已经实现了将参数从生成器对象内部向外传递的机制,而且python还提供了一个send函数将参数从外向生成器对象内部传递的机制,具体机制以下:

         1 调用next 函数驱动生成器时, next会同时等待生成器中下一个 yield 挂起,并将该yield后面的参数返回给next

         2)往生成器中传递参数,须要将next函数替换成send,此时send的功能与next相同(驱动生成器执行,等待返回值),同时send将后面的参数传递给生成器内部以前挂起的yield

以下代码所示:

def f():
    msg = yield 'first yield msg'
    print "generator inner receive:", msg
    msg = yield 'second yield msg'
    print "generator inner receive:", msg

g = f()
msg = g.next()
print "generator outer receive:", msg
msg = g.send('first send msg')
print "generator outer receive:", msg
g.send('second send msg')

执行结果为:

generator outer receive: first yield msg

generator inner receive: first send msg

generator outer receive: second yield msg

generator inner receive: second send msg

StopIteration

同步化实现

     好了,万事俱备只欠东风,下面就是简单对yield机制进行工程上封装以方便以后开发。下面的代码提供了一个叫IFakeSyncCall的interface,全部包含异步操做的逻辑类均可以继承这个接口:

class IFakeSyncCall(object):
    def __init__(self):
        super(IFakeSyncCall, self).__init__()
        self.generators = {}

    @staticmethod
    def FAKE_SYNCALL():
        def fwrap(method):
            def fakeSyncCall(instance, *args, **kwargs):
                instance.generators[method.__name__] = method(instance, *args, **kwargs)
                func, args = instance.generators[method.__name__].next()
                func(*args)
            return fakeSyncCall
        return fwrap

    def onFakeSyncCall(self, identify, result):
        try:
            func, args  = self.generators[identify].send(result)
            func(*args)
        except StopIteration:
            self.generators.pop(identify)

    其中interface中属性generators用来保存类中已经开始执行的生成器对象;函数FAKE_SYNCALL是一个decorator,装饰类中包含有yield的函数,改变函数的调用上下文,在fakeSyncCall内部封装了对生成器对象的next调用;函数onFakeSyncCall封装了全部onXXX函数的逻辑,其余实体经过调用这个函数传递异步回调的返回值。

下面就是通过同步化改进后的异步副本评分逻辑代码:
# -*- coding: gbk -*-
import random

class Player(object):
    def __init__(self, entityId):
        super(Player, self).__init__()
        self.entityId = entityId

    def onFubenEnd(self, mailBox):
        score = random.randint(1, 10)
        print "onFubenEnd player %d score %d"%(self.entityId, score)
        mailBox.onFakeSyncCall('evalFubenScore', (self.entityId, score))

class FubenStub(IFakeSyncCall):
    def __init__(self, players):
        super(FubenStub, self).__init__()
        self.players = players

    @IFakeSyncCall.FAKE_SYNCALL()
    def evalFubenScore(self):
        totalScore = 0
        for player in self.players:
            entityId, score = yield (player.onFubenEnd, (self,))
            print "onEvalFubenScore player %d score %d"%(entityId, score)
            totalScore += score

        print 'the totalScore is %d'%totalScore

if __name__ == '__main__':
    players = [Player(i) for i in xrange(3)]

    fs = FubenStub(players)
    fs.evalFubenScore()

比较evalFubenScore函数,基本已经和本来的同步逻辑代码相差无几。

      利用yield机制实现同步化编程模型的另一个优势是能够保证全部异步调用的逻辑串行化,从而保证数据的一致性和有效性,特别是在各类异步初始化流程中能够摒弃传统的timer sleep机制,从源头上扼杀一些隐藏很深的因为数据不一致性所致使的bug

相关文章
相关标签/搜索