随着单体应用的拆分以及服务化的流行,如今分布式事务已经比较常见,分布式事务理论ACID、CAP、BASE等我就不说了,如今就直接说一下一种常见的解决方案-tcc TCC 其实就是采用的补偿机制,其核心思想是:针对每一个操做,都要注册一个与其对应的确认和补偿(撤销)操做。它分为三个阶段:python
优势: 跟和两阶段提交比起来,实现以及流程相对简单了一些,但数据的一致性比2PC也要差一些程序员
缺点: 缺点仍是比较明显的,在2,3步中都有可能失败。TCC属于应用层的一种补偿方式,因此须要程序员在实现的时候多写不少补偿的代码,在一些场景中,一些业务流程可能用TCC不太好定义及处理。express
下面介绍下咱们应用的一种场景,有一个运维系统须要运用到zabbix,而运维系统拆分出了一个配置中心,下面是子系统依赖图 网络
在配置告警策略时须要调用zabbix接口 app
这时就涉及到一个分布式事务。因为咱们这里只涉及到两个事务,因此我这里就写了一个zabbix代理client,来做为事务协调器运维
class ZabbixClientProxy(object): ''' zabbix client simple proxy ''' client = models.get_zbx_client() def __init__(self): self.create_triggers = list() self.update_triggers = list() self.delete_triggers = list() self.update_macros = list() def trigger_create(self, name, expression,uuid): try: trigger = self.client.hosts.trigger_create(name, expression, 1) trigger["uuid"]=uuid self.create_triggers.append(trigger) logger.debug("trigger_create " + name) return trigger except Exception, e: logger.error("trigger_create fail,cause by " + e.message) raise def trigger_update(self, triggerid, name, expression,uuid): try: logger.debug("trigger_update " + name) old_trigger = self.client.hosts.trigger_get(triggerid) update_result = self.client.hosts.trigger_update( triggerid, name=name, expression=expression, priority=1, enable=True) old_trigger["uuid"]=uuid logger.debug(old_trigger) self.update_triggers.append(old_trigger) return update_result except Exception, e: logger.error("trigger_update fail,cause by " + e.message) def trigger_delete(self, triggerid,uuid): try: logger.debug("trigger_delete " + triggerid) old_trigger = self.client.hosts.trigger_get(triggerid) delete_result = self.client.hosts.trigger_delete(triggerid) old_trigger["uuid"]=uuid self.delete_triggers.append(old_trigger) return delete_result except Exception, e: logger.error("trigger_delete fail,cause by " + e.message) def update_trigger_macro(self, uuid, item_threshold, alert_duration): all_hmacros = self.get_macro_by_name(uuid) if all_hmacros and len(all_hmacros) > 2: self.update_macro(all_hmacros, "DISK_USER_MAX", item_threshold) self.update_macro(all_hmacros, "DISK_USER_TIMES", str(alert_duration) + "m") self.update_macro(all_hmacros, "DISK_USER_ENABLE", 1) else: self.create_macro("DISK_USER_MAX", item_threshold, uuid) self.create_macro("DISK_USER_TIMES", str(alert_duration) + "m", uuid) self.create_macro("DISK_USER_ENABLE", 1, uuid) def stop_trigger(self, assets): if assets: for asset in assets: if asset.host is None: continue all_hmacros = self.get_macro_by_name(asset.host.uuid) if all_hmacros and len(all_hmacros) > 2: self.update_macro(all_hmacros, "DISK_USER_ENABLE", 0) else: self.create_macro("DISK_USER_MAX", 80, asset.host.uuid) self.create_macro("DISK_USER_TIMES", "5m", asset.host.uuid) self.create_macro("DISK_USER_ENABLE", 0, asset.host.uuid) def get_macro_by_name(self, uuid): return self.client.macros.list(uuid) def update_macro(self, all_hmacros, macro_name, value): for macro in all_hmacros: if macro['macro'] == ('{$' + macro_name + '}'): try: self.client.macros.update(macro['hostmacroid'], macro=macro_name, value=value) macro['name'] = macro_name self.update_macros.append(macro) logger.debug('update_macro ' + macro_name + ' to ' + str(value)) except Exception, e: logger.error('update_macro ' + macro_name + ' fail,case by ' + e.message) def create_macro(self, macro_name, value, uuid): try: hostid = self.client.macros._get_hostid(uuid) hmacro = self.client.macros.create(macro_name, value, hostid) logger.debug("create_macro success,macro_name:" + macro_name + ",value:" + str(value)) except Exception, e: logger.error("create_macro fail,cause by " + e.message) def trigger_get(self, triggerid): return self.client.hosts.trigger_get(triggerid) def trigger_list(self, hostid): return self.client.hosts.trigger_list(hostid) def item_list(self, uuid): return self.client.hosts.item_list(uuid) def rollback(self): logger.debug("start rollback") # rollback create for trigger in self.create_triggers: try: self.client.hosts.trigger_delete(trigger["triggerid"]) logger.debug('rollback_create_trigger ' + trigger["name"]) except Exception, e: logger.error('rollback_create_trigger ' + trigger["triggerid"] + ' fail,case by ' + str(e.message)) self.create_triggers = [] for trigger in self.update_triggers: try: expression=trigger["expression"].replace(trigger['uuid']+']','{HOST.HOST}]') self.client.hosts.trigger_update(trigger["triggerid"], name=trigger["name"], expression=expression, priority=1, enable=True) logger.debug('rollback_update_trigger ' + trigger["name"]) except Exception, e: logger.error('rollback_update_trigger ' + trigger["triggerid"] + ' fail,case by ' + str(e.message)) self.update_triggers = [] for trigger in self.delete_triggers: try: expression=trigger["expression"].replace(trigger['uuid']+']','{HOST.HOST}]') new_trigger = self.client.hosts.trigger_create(trigger["name"], expression, 1) logger.debug(new_trigger) logger.debug('rollback_delete_trigger ' + trigger["name"]) # 更新数据中的zabbix trigger id alert_models.ConditionTrigger.objects.filter(zabbix_trigger_id=trigger["triggerid"]).update( zabbix_trigger_id=new_trigger["triggerid"]) except Exception, e: logger.error('rollback_delete_trigger ' + trigger["triggerid"] + ' fail,case by ' + str(e.message)) self.delete_triggers = [] for macro in self.update_macros: try: self.client.macros.update(macro['hostmacroid'], macro=macro['name'], value=macro['value']) except Exception, e: logger.error('rollback_update_macro ' + macro['name'] + ' fail,case by ' + str(e.message)) logger.debug("end rollback")
事务成功,则提交本地事务,若是失败则调用rollback分布式
def create(self, request, *args, **kwargs): ''' policy add ''' assets = request.data["data"] client = ZabbixClientProxy() try: with transaction.atomic(): #save policy #将client做为参数,对主机、监控项、触发器进行增删改 except rest_framework_serializers.ValidationError, e: logger.exception(e) client.rollback() raise
这样作还有一个问题就是,在回滚中若是网络忽然断了这时会回滚失败,这里咱们记录了日志,后面咱们会经过扫描日志来作到最终一致性,这里咱们后面坐了补偿,下一次修改时会自动修正回滚失败问题。ui