Activiti 并发处理

Activiti 版本 5.10 

使用activiti 有一段时间了,目前使用activiti 的大部分公司都是用来作相似于OA 等以用户任务为主的流程, 
这我没什么好说的,由于咱们的流程是以ServiceTask UserTask 结合来处理定时调度等数据处理任务。 
ServiceTask 以主,采用class 和 Spring bean 的方式。废话补多少,切入正题: 

Activiti 5.10 设计器是支持 delegateExpression 的注入参数的,但activiti 引擎的解析器却未能将参数注入从Spring 获得的bean,该问题在5.11 的版本上是获得解决了的,但 5.11版本目前尚未发布,有兴趣的同窗能够去下载正在开发中的代码进行研究,或者修改源码 

而后咱们来谈谈Activiti 对于并发的处理以及其中的问题(以ServiceTask 为例): 
当咱们将serviceTask 设置 async = "true" (关于 isExclusive 后续会提到) 的时候,流程引擎采用JobExecutor 来异步执行,执行顺序为引擎首先会将该任务实例化一条job记录,插act_ru_job表,而后JobExecutor 扫描该表并加锁执行该job,这里就涉及到定义ServiceTask 的另一个属性isExclusive,这个属性默认为true,即同一流程中当前存在于act_ru_job 且 is_exclusive 为true的会一块儿取出来,放入一个AcquireJobsCmd,而后放入一个线程执行,这样作用来保证该批任务时串行执行的,使用相同的context,这样作没有什么问题, 
但不能达到真正并行的目的。 
附上咱们的流程图: 



目标,处于parallelGateWay后面的任务并行执行,即任务完成的时间为单个任务完成的最大时间。 
每一个ServiceTask 的 acitiviti:async = "true" activiti:exclusive="false" 
运行,这时你会遇到一个 ActivitiOptimisticLockingException 异常 
   (toString(updatedObject) " was updated by another transaction concurrently"); 
   
为何会这样呢?由于每一个ServiceTask 对应一条act_ru_execution 表的记录,当该任务完后后,会去跟新其 
parent_id 对应的execution 将其版本 1 , 
  update ${prefix}ACT_RU_EXECUTION set 
      REV_ = #{revisionNext, jdbcType=INTEGER}, 
      PROC_DEF_ID_ = #{processDefinitionId, jdbcType=VARCHAR}, 
      ACT_ID_ = #{activityId, jdbcType=VARCHAR}, 
      IS_ACTIVE_ = #{isActive, jdbcType=BOOLEAN}, 
      IS_CONCURRENT_ = #{isConcurrent, jdbcType=BOOLEAN}, 
      IS_SCOPE_ = #{isScope, jdbcType=BOOLEAN}, 
      IS_EVENT_SCOPE_ = #{isEventScope, jdbcType=BOOLEAN}, 
      PARENT_ID_ = #{parentId, jdbcType=VARCHAR}, 
      SUPER_EXEC_ = #{superExecutionId, jdbcType=VARCHAR}, 
      SUSPENSION_STATE_ = #{suspensionState, jdbcType=INTEGER}, 
      CACHED_ENT_STATE_ = #{cachedEntityState, jdbcType=INTEGER} 
     where ID_ = #{id, jdbcType=VARCHAR} 
      and REV_ = #{revision, jdbcType=INTEGER} 

跟踪该异常的抛出缘由是 在serviceTask完成后,更新的ExecutionEntity 是同一条记录,而每个serviceTask 此时处于 
两个不一样的线程和事务当中,两个事务彼此不可见,任务开始时获取的ExecutionEntity完成相同,当一个事务成功更新后, 
另外一个事务就会失败。这样保证了流程的准确执行,当该任务失败后,会在下一个JobExecutor 扫描时从新执行。此时获取的 
execution 的版本已经加1,此时任务正常结束。Activiti 引擎如此作有必定的道理,但这不是我要的。为何这样说呢? 
假设我两个 serviceTask,每一个执行都须要30分钟,仅仅由于这样,我就须要花费1个小时的时间才能完成,天啊,饶了我吧! 

有木有办法,我想是有的,只要你够大胆。跟踪代码,更新父execution的时候,惟一变了的就是version,其余值都没变化, 
那么咱们是否能够将update语句更改一下,以下: 
update ${prefix}ACT_RU_EXECUTION set 
      REV_ = REV_ 1, 
      PROC_DEF_ID_ = #{processDefinitionId, jdbcType=VARCHAR}, 
      ACT_ID_ = #{activityId, jdbcType=VARCHAR}, 
      IS_ACTIVE_ = #{isActive, jdbcType=BOOLEAN}, 
      IS_CONCURRENT_ = #{isConcurrent, jdbcType=BOOLEAN}, 
      IS_SCOPE_ = #{isScope, jdbcType=BOOLEAN}, 
      IS_EVENT_SCOPE_ = #{isEventScope, jdbcType=BOOLEAN}, 
      PARENT_ID_ = #{parentId, jdbcType=VARCHAR}, 
      SUPER_EXEC_ = #{superExecutionId, jdbcType=VARCHAR}, 
      SUSPENSION_STATE_ = #{suspensionState, jdbcType=INTEGER}, 
      CACHED_ENT_STATE_ = #{cachedEntityState, jdbcType=INTEGER} 
     where ID_ = #{id, jdbcType=VARCHAR} 
     
注意到这里的变化,where 条件只剩id去掉了version ,而后REV_使用表里面的数据直接 1经测试完成可行,有兴趣的朋友能够本身试试。这里的关口过了,但后面仍然存在危险,因此修改源码是有风险的(%>_<%)。问题出在哪儿?有时候你会发现 两个serviceTask 运行完了,Execution表的记录也更新了,流程停滞不前了,这是神马缘由??? 

最后终于让我问题出现的地方,ParallelGatewayActivityBehavior,咱们前面说过,当每一个ServiceTask执行完成以后,事务并无到结束的地方,根据ServiceTask 的流程指向来到了第二个ParallelGateway,ParallelGatewayActivityBehavior 的做用就是判断前面的任务是否完成,是否继续执行,当每一个ServiceTask所在的事务到达此处时,他们都只能看见本身完成的部分,而不能看见与他并行的事务里面的状态。因此当到达是否执行下一步的判断条件时 
if (nbrOfExecutionsJoined==nbrOfExecutionsToJoin) {
      // Fork
      log.fine("parallel gateway '" activity.getId() "' activates: " nbrOfExecutionsJoined " of " nbrOfExecutionsToJoin " joined");
      execution.takeAll(outgoingTransitions, joinedExecutions);
      
  } else if (log.isLoggable(Level.FINE)){
     log.fine("parallel gateway '" activity.getId() "' does not activate: " nbrOfExecutionsJoined " of " nbrOfExecutionsToJoin " joined");
  }

都会告诉ParallelGatewayActivityBehavior 我已经完成了,其余的尚未完成。 
有人可能会说,为何个人程序没有遇到这种状况呢? 
第一 若是你不是ServiceTask 任务很难遇到这种状况 
第二 若是你的ServiceTask 没有设置为 async = "true" 和 exclusive="false" 也就不是真正的并发,固然也不会遇到 
     还有其余缘由我就不赘述了。 
     
这种问题也是能够解决的,由于针对同一个流程,每个事务都是经过同一个ParallelGatewayActivityBehavior实例来进行判断的, 
咱们只要记录每个经过该GateWay的事务的完成状况,而后汇总起来就OK 了,另外在完成的那一步须要将executio 的parent 的 
executions 对应的更新,不然execution 会有记录不能删除,但流程是能够完整的执行完成,给出个人完整处理方式: 
public class ParallelGatewayActivityBehavior extends GatewayActivityBehavior {
  
  private static Logger log = Logger.getLogger(ParallelGatewayActivityBehavior.class.getName());
  private Map<String,ActivityExecution> activityJoinedExecutions = new ConcurrentHashMap<String,ActivityExecution>();
  
  public void execute(ActivityExecution execution) throws Exception { 
    
    // Join
    PvmActivity activity = execution.getActivity();
    List<PvmTransition> outgoingTransitions = execution.getActivity().getOutgoingTransitions();
    
    execution.inactivate();
    lockConcurrentRoot(execution);
    
    List<ActivityExecution> joinedExecutions = execution.findInactiveConcurrentExecutions(activity);
    int nbrOfExecutionsToJoin = execution.getActivity().getIncomingTransitions().size();
    int nbrOfExecutionsJoined = joinedExecutions.size();
    if(nbrOfExecutionsToJoin!=nbrOfExecutionsJoined){
	for(ActivityExecution e:joinedExecutions){
	    activityJoinedExecutions.put(e.getId(), e);
	}
	nbrOfExecutionsJoined = activityJoinedExecutions.size();
	if(nbrOfExecutionsJoined == nbrOfExecutionsToJoin && execution.getParentId()!=null 
		&& execution instanceof ExecutionEntity){
	    ExecutionEntity et = (ExecutionEntity)execution;
	    while(joinedExecutions.size()!=nbrOfExecutionsToJoin ){
		Thread.sleep(10000);
		for(int i = 0 ; i < et.getParent().getExecutions().size(); i  ){
		    ExecutionEntity ct = et.getParent().getExecutions().get(i);
		    if(activityJoinedExecutions.containsKey(ct.getId()) ){
			et.getParent().getExecutions().set(i, (ExecutionEntity)activityJoinedExecutions.get(ct.getId()));
		    }
		    
		}
		joinedExecutions = execution.findInactiveConcurrentExecutions(activity);
	    }
	}
    }
    
    if (nbrOfExecutionsJoined==nbrOfExecutionsToJoin) {
	activityJoinedExecutions.clear();
      // Fork
      log.fine("parallel gateway '" activity.getId() "' activates: " nbrOfExecutionsJoined " of " nbrOfExecutionsToJoin " joined");
      execution.takeAll(outgoingTransitions, joinedExecutions);
      
    } else if (log.isLoggable(Level.FINE)){
      log.fine("parallel gateway '" activity.getId() "' does not activate: " nbrOfExecutionsJoined " of " nbrOfExecutionsToJoin " joined");
    }
  }

}
相关文章
相关标签/搜索