工做流引擎,应用于解决流程审批和流程编排方面等问题,有效的提供了扩展性的支撑。而目前来讲,工做流领域也有了相对通行化的标准规范,也就是BPMN2.0。支持这个规范的开源引擎主要有:Activiti,flowable,Jbpm4等。本文着重对Activiti的架构设计进行分析和梳理,同时对流程启动和原子操做的相关代码进行完整走读。java
本文的阅读对象须要对Activiti有必定的理解而且已经可以初步的使用Activiti进行流程流转方面开发。算法
欢迎加入技术交流群186233599讨论交流,也欢迎关注技术公众号:风火说。数据库
Activiti采用了一个分层架构完成自底向上的包装。架构图以下缓存
大体包括:架构
Activit总体上采用命令模式进行代码功能解耦。将流程引擎的大部分涉及到客户端的需求让外部以具体命令实现类的方式实现。并发
完成这个编码模式,有几个重点类须要关注app
Command
命令接口,全部的具体命令都须要实现该类,最终业务就是执行该类的execute方法。CommandContext
命令上下文,为具体命令的执行提供上下文支撑。该上下文的生成是依靠命令拦截器中的上下文拦截器org.activiti.engine.impl.interceptor.CommandContextInterceptor
来生成的。该拦截器会判断是复用当前的上下文仍是生成新的上下文。引擎内的大部分功能都是经过单独的命令完成。框架
Activiti的命令模式还须要搭配其对应的责任链来完成。具体来讲,Activiti中存在一个命令拦截器链条,该命令拦截器链条由几大块的拦截器实现组成,以下ide
其中重要的默认拦截器有2个:oop
事务拦截器是否提供取决于org.activiti.engine.impl.cfg.ProcessEngineConfigurationImpl
的子类对方法createTransactionInterceptor
的实现。独立使用时的org.activiti.engine.impl.cfg.StandaloneProcessEngineConfiguration
该方法返回为空。也就是不提供事务拦截器。此时,命令的运行就没法经过事务拦截器来提供事务环境了。
实现类:org.activiti.engine.impl.interceptor.CommandContextInterceptor。
该拦截器的功能很是重要,能够说是Activiti操做的核心之一。其做用是在后续拦截器执行前检查当前上下文环境,若是不存在CommandContext对象,则建立一个;在后续拦截器执行后,将CommandContext对象close。CommandContext包含了本次操做中涉及到全部的数据对象。
Activiti遵循BPMN2.0规范,所以框架中少不了对BPMN2.0规范的定义文件(XML形式)的解析类。Activiti采用的STAX的拉模型进行XML解析。这里先不分析其具体的解析类的内在联系,而是概念性的阐述下Activiti对解析的概念分层。
首先经过类org.activiti.bpmn.converter.BpmnXMLConverter
进行XML解析,解析为org.activiti.bpmn.model
包下面的与各个XML元素定义对应的POJO类。此时这些POJO类仅仅只是XML文件的一个Java表达。
在经过类org.activiti.engine.impl.bpmn.parser.BpmnParser
聚合不一样的解析类,将上面步骤解析出来的POJO类进一步解析为能够在框架中利用的org.activiti.engine.impl.pvm.process
包下面的类。典型的表明就是ActivityImpl类。
三者之间的关系简单用图表达就是
Activiti采起了领域中的充血模型做为本身的实现方式。大部分业务逻辑都直接关联在了org.activiti.engine.impl.persistence.entity.ExecutionEntity
中。因为Activiti采用了MyBatis而非Hibernate这样的O/R Mapping产品做为持久层,所以Activiti在具体的持久化操做上也有本身的独特方式。
Activiti的持久化机制简单说来就是数据集中提交。集中提交还产生了一个额外的做用:自动提交。换句话说,在内存中的实体,若是更新了属性可是没有显示的执行刷新动做,在一个调用的生命周期结束后也会被持久化其最新的状态到数据库。下面来看下详细解释下这个集中提交机制。
在Activiti全部运行期生成的对象都须要实现一个接口org.activiti.engine.impl.interceptor.Session
,其定义以下
public interface Session { void flush(); void close(); }
而Session对象则是由接口org.activiti.engine.impl.interceptor.SessionFactory
的方法进行生成,其定义以下
public interface SessionFactory { Class<?> getSessionType(); Session openSession(); }
流程引擎内部持有各类SessionFactory实现,用户也能够自定义注册本身的SessionFactory实现,若是用户但愿自定义的对象也能够被集中提交机制处理的话。
在CommandContext中存在一个Map<Class,Session>存储,存储着该CommandContext生命周期内新建的全部Session对象。当一个命令执行完毕后,最终命令上下文CommandContext的close方法会被调用。当执行CommandContext.close()方法时,其内部会按照顺序执行flushSessions
,closeSessions
方法。从名字能够看到,第一个方法内部就是执行全部Session对象的flush方法,第二个方法内部就是执行全部的Session对象的close方法。
流程引擎内部有一个Session实现是比较特别的。也就是org.activiti.engine.impl.db.DbSqlSession
实现。若是有须要更新,删除,插入等操做,该操做是须要经过DbSqlSession来实现的,而实际上该实现会将这些操做缓存在内部。只有在执行flush方法时才会真正的提交到数据库去执行。正是由于如此,全部的数据操做,实际上最终都是要等到CommandContext执行close方法时,才会真正提到到数据库。
理论上,充血模型是在聚合根这个层面上完成的持久化。可是因为Activiti没有采用O/R Mapping框架,因而本身完成了一个相似功能的模块。
要明白工做流中的数据插入机制,首先要看下实体类的接口org.activiti.engine.impl.db.PersistentObject
,以下
public interface PersistentObject { String getId(); void setId(String id); Object getPersistentState(); }
Activiti的数据库表都是单字符串主键,每个实体类都须要实现该接口。在对实体类进行保存的时候,DbSqlSession会调用getId方法判断是否存在ID,若是不存在,则使用ID生成器(该生成器根据策略有几种不一样实现,这里不表)生成一个ID而且设置。
而方法getPersistentState
是用来返回一个持久化状态对象。则方法的使用场合在下一个章节说明
该Session内部存在三个重要属性,以下
//该属性存储着全部使用insert方法放入的对象 protected Map<Class<? extends PersistentObject>, List<PersistentObject>> insertedObjects = new HashMap<Class<? extends PersistentObject>, List<PersistentObject>>(); //该Map结构内存储全部经过该DbSqlSession查询出来的结果,以及update方法放入的对象 protected Map<Class<?>, Map<String, CachedObject>> cachedObjects = new HashMap<Class<?>, Map<String,CachedObject>>(); //该属性内存储着全部将要执行的删除操做 protected List<DeleteOperation> deleteOperations = new ArrayList<DeleteOperation>();
删除和新增都比较容易理解,就是要此类操做缓存起来,一次性提交到数据库,上文曾提到的数据集中提交就体如今这个地方。而cachedObjects就有些不一样了。要解析这个Map结构,首先来看下类org.activiti.engine.impl.db.DbSqlSession.CachedObject
的结构属性,以下
public static class CachedObject { protected PersistentObject persistentObject; protected Object persistentObjectState; } public CachedObject(PersistentObject persistentObject, boolean storeState) { this.persistentObject = persistentObject; if (storeState) { this.persistentObjectState = persistentObject.getPersistentState(); } }
经过构造方法能够明白,在新建该对象的时候,经过storeState参数决定是否保存当时的持久化状态。
该Map的数据来源有2处
当DbSqlSession执行flush方法时,主要来讲是作了数据提交动做
方法getUpdatedObjects的逻辑就是遍历全部的CachedObject,同时知足如下条件者则放入要更新的实体集合中
经过上面能够得知,若是一个实体类在DbSqlSession的生命周期被查询出来,而且其数据内容有了修改,则DbSqlSession刷新时会自动刷新到数据库。
任何框架都是核心理念上发展细化而来。Activiti的核心理念就是流程虚拟机(Process Virtual Machine,如下简称PVM)。PVM试图提供一组API,经过API自己来描述工做流方面的各类可能性。没有了具体实现,也使得PVM自己能够较好的适应各类不一样的工做流领域语言,而Activiti自己也是在PVM上的一种实现。
首先来看下流程定义自己。在工做流中,流程定义能够图形化的表达为一组节点和链接构成的集合。好比下图
即便没有任何知识也能大概明白这张图表达的是一个流程以及执行顺序的意图。流程定义的表达方式不限,可使用图形的方式表达,可使用领域语言,也能够传统的XML(好比Activiti用的就是BPMN2.0 Schema下的XML)。特别的,当前已经有了标准化的BPMN2.0规范。
PVM将流程定义描述为流程元素的集合。再将流程元素细分为2个子类:流程节点和连线。
从类图的角度也能很好的看出这种关系,流程节点PvmActivity和连线PvmTransition都是流程元素PvmProcessElement。
从类图能够看到PvmActivity继承于PvmScope。这种继承关系代表流程节点自己有其归于的做用域(PvmScope),节点自己也多是另一些节点的做用域,这也符合节点可能拥有子节点的原则。关于做用域自己,后文还会再次详细讲解,这里先按下不表。
经过流程节点和连线,PVM完成了对流程定义的表达。流程定义是一个流程的静态表达,流程执行则是依照流程定义启动的一个运行期表达,每个流程执行都具有本身惟一的生命周期。流程执行须要具有如下要素:
针对要素1,Activiti提供了接口org.activiti.engine.impl.pvm.delegate.ActivityBehavior
。该接口内部仅有一个execute方法。该接口的实现即为不一样PvmActivity节点提供了具体动做。ActivityBehavior有丰富的不一样实现,对应了流程中丰富的不一样功能的节点。每个PvmActivity对象都会持有一个ActivityBehavior对象。
针对要素2,Activiti提供了接口org.activiti.engine.impl.pvm.PvmExecution
。该接口有一个方法PvmActivity getActivity()
。用以返回当前流程执行所处的流程节点。
针对要素3,Activiti提供了接口org.activiti.engine.impl.pvm.runtime.InterpretableExecution
。接口方法不少,这里取和流程执行运转最重要的2个方法展开,以下
public interface InterpretableExecution extends ActivityExecution, ExecutionListenerExecution, PvmProcessInstance { void take(PvmTransition transition); void take(PvmTransition transition, boolean fireActivityCompletedEvent);
执行方法take,以连线对象做为入参,这会使得流程执行该连线定义的路线。其实现逻辑应该为让流程执行定位于连线源头的活动节点,经由连线对象,到达连线目的地的活动节点。
针对要素4,实际上也是由接口org.activiti.engine.impl.pvm.runtime.AtomicOperation
来完成的。经过该接口的调用类,此种状况的实现者须要获取当前流程执行所处的活动节点的ActivityBehavior
对象,执行其execute
方法来执行节点动做。结合要素3和4,能够看出AtomicOperation
接口用于执行流程运转中的单一指令,例如根据连线移动,执行节点指令等。分解成单一指令的好处是易于编码和理解。这也契合接口命名中的原子一意。
从上面对PVM定义期和运行期的解释能够看出,整个概念体系并不复杂。涉及到的类也很少。正是由于PVM只对工做流中最基础的部分作了抽象和接口定义,使得PVM的实现上有了不少的可能性。
然而也正是因为定义的简单性,实际上这套PVM在转化为实际实现的时候须要额外附加不少的特性才能真正完成框架需求。
在解析完成后,一个流程定义中的全部节点都会被解析为ActivityImpl对象。ActivityImpl对象自己能够持有事件订阅(根据BPMN2.0规范,目前有定时,消息,信号三种事件订阅类型)。由于ActivityImpl自己能够嵌套而且能够持有订阅,所以引入做用域概念(Scope)。
一个ActivityImpl在如下两种状况下会被定义为做用域ActivityImpl。
做用域是一个很重要的概念,状况1中做用域定义的是复杂节点的生命周期,状况2中做用域定义的是事件的捕获范围。
ExecutionEntity的含义是一个流程定义被启动后的执行实例,表明着流程的运行期状态。在Activiti的设计中,事件订阅,流程变量等都是与一个具体的ExecutionEntity相关的。其自己有几个重要的属性:
上面对ExecutionEntity的解释仍然抽象。若是直观的看,能够认为ExecutionEntity是某一种生命周期的体现,其内部属性随着不一样的状况而变化。以下图所示:
随着流程的启动,会建立一个ExecutionEntity
对象。该ExecutionEntity
生命周期与整个流程相同,而其中的isScope
和isConcurrent
在建立之初固定,而且不会改变。而isActive
和activityId
随着流程的推动则会不断变化。
ExecutionEntity
是用来反映流程的推动状况的,实际上,每每一个ExecutionEntity
不足以支撑所有的BPMN功能。所以实现上,Activiti是经过一个树状结构的ExecutionEntity
结构来反映流程推动状况。建立之初的ExecutionEntity
对象随着流程的推动会不断的分裂和合并,ExecutionEntity
树也会不断的生长和修剪。在流程的推动过程当中会遇到4种基本状况
此种状况能够以下图所示
在流程前进的构成中,遇到单独的非做用域节点,ExecutionEntity
一直处于激活状态,只不过随着流程前进,其activityId指向会不断变化。如上图所示,会经历:开始节点、组员工做、领导审批、结束节点4个不一样的值。
实际上,这些节点都是在做用域<流程定义>之下,而ExecutionEntity
表明的正是该做用域,所以其isScope属性为true。
若是流程推动中遇到单独的做用域节点,则当前执行对象ExecutionEntity
应该建立一个做用域子执行(isScope为true的ExecutionEntity)。整个变化过程能够以下图所示
当准备进入节点<组员工做>时,ExecutionEntity1
冻结,而且建立出子执行ExecutionEntity2
。ExecutionEntity2
的isScope属性也为true。
ExecutionEntity1
的isScope为true,是由于该执行实例负责整个流程定义的事件订阅,ExecutionEntity2
的isScope为true,是由于该执行实例负责节点<组员工做>的事件订阅。
前面提到过,事件订阅与某一个具体的执行实例相关。当节点<组员工做>完成时,也就是事件订阅所在的做用域要被摧毁时,对应的事件订阅也要被删除。此时额外的ExecutionEntity
就特别方便,只要删除该ExecutionEntity
,顺便删除相关的事件订阅便可,在这里就是删除ExecutionEntity2
。
删除ExecutionEntity2
,而且激活父执行ExecutionEntity1
。随着流程推动,ExecutionEntity1
更换指向的activityId。
流程推动中节点存在多个外出连线,则能够根据须要建立多个并发的子执行,每个子执行对应一个连线。以下图所示
当流程节点A、B被激活时,ExecutionEntity1
会有2个并发的子执行ExecutionEntity2
和ExecutionEntity3
。这两个子执行的isConcurrent
属性均为true,由于节点A和B都是在相同的做用域(流程定义)下被并发的执行。
当节点A、B执行完毕后,并发的子执行被删除,父执行从新被激活,继续后面的节点。
流程推动中遇到并发节点,而且节点为做用域节点,状况就会以下所示
当流程运行至P1节点,P1节点有多个出线。根据出线数目建立2个子执行,此时2个子执行均为并发的,且从P1做为出线的源头节点,所以2个子执行的activityId均为P1。
当运行到A、B节点时,因为2个节点均为做用域节点,所以还会再建立2个子执行。此时ExecutionEntity3
和ExecutionEntity3
冻结。ExecutionEntity4
和ExecutionEntity5
所执行的节点在各自的做用域下均无并发操做,所以其isScope属性为true,isConcurrent属性为false。这5个执行实例构成的执行树以下
当A、B节点完成时,首先是各自的做用域被删除,所以ExecutionEntity4
和ExecutionEntity5
首先被删除,ExecutionEntity3
和ExecutionEntity4
激活。然后汇聚于P2节点,所以ExecutionEntity3
和ExecutionEntity4
删除,ExecutionEntity1
被激活,继续执行剩下的节点。
流程启动依靠的是命令类:org.activiti.engine.impl.cmd.StartProcessInstanceCmd
。
该命令的总体流程以下
部署管理器的查询和运行期关系不大,先忽略。两个流程着重展开:
流程以下
其中指定初始滑动节点建立实例自己展开后的流程以下
ExecutionEntity的初始化须要单独说下,流程以下
在建立流程的逻辑的尾部是一个循环流程。该循环的目的是为了建立正确的ExecutionImpl树(如下简称执行树)。本质上该方法是建立一个流程实例,而且将流程当前运行节点定位到指定的节点。而工做流的正确执行依赖于执行树的正确分裂和整合。所以就须要为指定的节点建立其上游的执行树实例。使得在效果上看起来就和流程自动执行到当前节点相似(执行树相似,节点运行历史则无类似,实际上也无历史节点)。
而若是指定的初始节点就是流程定义的初始节点,则循环就不存在乎义了。
流程实例的启动的内容,就是执行原子操做:org.activiti.engine.impl.pvm.runtime.AtomicOperationProcessStart
。关于原子操做单独阐述。
org.activiti.engine.impl.pvm.process.ActivityImpl
类中有一个属性parent,类型为org.activiti.engine.impl.pvm.process.ScopeImpl
。在解析的时候,该属性为当前节点的做用域节点。根据做用域节点的定义,该属性的取值有两种可能的类型,一种是org.activiti.engine.impl.pvm.process.ActivityImpl
,另一种是org.activiti.engine.impl.pvm.process.ProcessDefinitionImpl
。
第二种状况意味着该节点是直属于流程定义的节点了。
原子操做是一个接口org.activiti.engine.impl.pvm.runtime.AtomicOperation
。从名字也能够看出,该接口的做用就是执行流程实例中的一个单步操做。下面分阶段说明
该抽象类是众多实现类的基类。其代码以下
public abstract class AbstractEventAtomicOperation implements AtomicOperation { public boolean isAsync(InterpretableExecution execution) { return false; } public void execute(InterpretableExecution execution) { //获取当前执行对象的做用域对象。具体由子类提供。 ScopeImpl scope = getScope(execution); //从做用域对象中获取指定事件的监听器。事件名称由子类提供。 List<ExecutionListener> exectionListeners = scope.getExecutionListeners(getEventName()); int executionListenerIndex = execution.getExecutionListenerIndex(); if (exectionListeners.size()>executionListenerIndex) { execution.setEventName(getEventName()); execution.setEventSource(scope); ExecutionListener listener = exectionListeners.get(executionListenerIndex); try { listener.notify(execution); } catch (RuntimeException e) { throw e; } catch (Exception e) { throw new PvmException("couldn't execute event listener : "+e.getMessage(), e); } execution.setExecutionListenerIndex(executionListenerIndex+1); execution.performOperation(this); } else { execution.setExecutionListenerIndex(0); execution.setEventName(null); execution.setEventSource(null); eventNotificationsCompleted(execution); } } protected abstract ScopeImpl getScope(InterpretableExecution execution); protected abstract String getEventName(); protected abstract void eventNotificationsCompleted(InterpretableExecution execution); }
整个抽象类的逻辑归纳而言,就是将获取当前执行实例的做用域对象(具体由子类提供),执行其中特定事件(事件名由子类提供)的监听器。
在所有的监听器执行完毕后,执行子类的特定逻辑。
该操做用于流程启动。可是并不执行真正的启动动做。只是设置了当前执行实例的活动节点为org.activiti.engine.impl.pvm.runtime.StartingExecution
中存储的活动节点。而后执行原子操做org.activiti.engine.impl.pvm.runtime.AtomicOperationProcessStartInitial
。
本质上来讲,只是执行了一个设置的动做。
public class AtomicOperationProcessStart extends AbstractEventAtomicOperation { @Override protected ScopeImpl getScope(InterpretableExecution execution) { return execution.getProcessDefinition(); } @Override protected String getEventName() { return org.activiti.engine.impl.pvm.PvmEvent.EVENTNAME_START; } @Override protected void eventNotificationsCompleted(InterpretableExecution execution) { if (Context.getProcessEngineConfiguration() != null && Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) { Map<String, Object> variablesMap = null; try { variablesMap = execution.getVariables(); } catch (Throwable t) { // In some rare cases getting the execution variables can fail (JPA entity load failure for example) // We ignore the exception here, because it's only meant to include variables in the initialized event. } Context.getProcessEngineConfiguration().getEventDispatcher().dispatchEvent( ActivitiEventBuilder.createEntityWithVariablesEvent(ActivitiEventType.ENTITY_INITIALIZED, execution, variablesMap, false)); Context.getProcessEngineConfiguration().getEventDispatcher() .dispatchEvent(ActivitiEventBuilder.createProcessStartedEvent(execution, variablesMap, false)); } ProcessDefinitionImpl processDefinition = execution.getProcessDefinition(); StartingExecution startingExecution = execution.getStartingExecution(); List<ActivityImpl> initialActivityStack = processDefinition.getInitialActivityStack(startingExecution.getInitial()); execution.setActivity(initialActivityStack.get(0)); execution.performOperation(PROCESS_START_INITIAL); } }
代码以下
public class AtomicOperationProcessStartInitial extends AbstractEventAtomicOperation { @Override protected ScopeImpl getScope(InterpretableExecution execution) { return (ScopeImpl) execution.getActivity(); } @Override protected String getEventName() { return org.activiti.engine.impl.pvm.PvmEvent.EVENTNAME_START; } @Override protected void eventNotificationsCompleted(InterpretableExecution execution) { ActivityImpl activity = (ActivityImpl) execution.getActivity(); ProcessDefinitionImpl processDefinition = execution.getProcessDefinition(); StartingExecution startingExecution = execution.getStartingExecution(); //从开始节点开始的,该判断均为真。 if (activity==startingExecution.getInitial()) { execution.disposeStartingExecution(); execution.performOperation(ACTIVITY_EXECUTE); } else { List<ActivityImpl> initialActivityStack = processDefinition.getInitialActivityStack(startingExecution.getInitial()); int index = initialActivityStack.indexOf(activity); activity = initialActivityStack.get(index+1); InterpretableExecution executionToUse = null; if (activity.isScope()) { executionToUse = (InterpretableExecution) execution.getExecutions().get(0); } else { executionToUse = execution; } executionToUse.setActivity(activity); executionToUse.performOperation(PROCESS_START_INITIAL); } } }
该原子操做的目的仅是为了执行节点上的end
事件监听器。监听器执行完毕后,就执行下一个原子操做AtomicOperationTransitionDestroyScope
该原子操做的目的是为了执行节点上start事件监听器。在执行完毕后,会判断执行实例的当前节点是否能够执行。判断的依据该节点和链接线节点
该原子操做的做用实际上就是取出该执行实例当前的活动节点,而且执行该活动节点的行为定义。行为定义经过接口org.activiti.engine.impl.pvm.delegate.ActivityBehavior
定义。不一样的节点行为由不一样的子类完成
public class AtomicOperationActivityExecute implements AtomicOperation { private static Logger log = LoggerFactory.getLogger(AtomicOperationActivityExecute.class); public boolean isAsync(InterpretableExecution execution) { return false; } public void execute(InterpretableExecution execution) { ActivityImpl activity = (ActivityImpl) execution.getActivity(); ActivityBehavior activityBehavior = activity.getActivityBehavior(); if (activityBehavior==null) { throw new PvmException("no behavior specified in "+activity); } log.debug("{} executes {}: {}", execution, activity, activityBehavior.getClass().getName()); try { if(Context.getProcessEngineConfiguration() != null && Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) { Context.getProcessEngineConfiguration().getEventDispatcher().dispatchEvent( ActivitiEventBuilder.createActivityEvent(ActivitiEventType.ACTIVITY_STARTED, execution.getActivity().getId(), (String) execution.getActivity().getProperty("name"), execution.getId(), execution.getProcessInstanceId(), execution.getProcessDefinitionId(), (String) activity.getProperties().get("type"), activity.getActivityBehavior().getClass().getCanonicalName())); } activityBehavior.execute(execution); } catch (RuntimeException e) { throw e; } catch (Exception e) { LogMDC.putMDCExecution(execution); throw new PvmException("couldn't execute activity <"+activity.getProperty("type")+" id=\""+activity.getId()+"\" ...>: "+e.getMessage(), e); } } }
public class AtomicOperationTransitionDestroyScope implements AtomicOperation { private static Logger log = LoggerFactory.getLogger(AtomicOperationTransitionDestroyScope.class); public boolean isAsync(InterpretableExecution execution) { return false; } @SuppressWarnings("unchecked") public void execute(InterpretableExecution execution) { InterpretableExecution propagatingExecution = null; ActivityImpl activity = (ActivityImpl) execution.getActivity(); /** * 若是当前的活动节点具有做用域。这就意味着最初的时候,有一个处于激活状态的执行实例在执行该节点,如下称初始执行实例。 * 从这样的节点退出要考虑几种状况: * 1、单独的做用域节点。此时的执行树状况是:非激活的非做用域执行实例-激活的做用域执行实例(初始执行实例)。此时要离开做用域节点,首先是销毁激活的做用域执行实例(初始执行实例),激活初始执行实例的父实例,使用该父实例执行后续的出线动做。 * 2、并行的做用域节点。此时的执行树状况是:非激活的非做用域并发执行实例-非激活的 */ if (activity.isScope()) { InterpretableExecution parentScopeInstance = null; // if this is a concurrent execution crossing a scope boundary if (execution.isConcurrent() && !execution.isScope()) { // first remove the execution from the current root InterpretableExecution concurrentRoot = (InterpretableExecution) execution.getParent(); parentScopeInstance = (InterpretableExecution) execution.getParent().getParent(); log.debug("moving concurrent {} one scope up under {}", execution, parentScopeInstance); List<InterpretableExecution> parentScopeInstanceExecutions = (List<InterpretableExecution>) parentScopeInstance.getExecutions(); List<InterpretableExecution> concurrentRootExecutions = (List<InterpretableExecution>) concurrentRoot.getExecutions(); // if the parent scope had only one single scope child if (parentScopeInstanceExecutions.size()==1) { // it now becomes a concurrent execution parentScopeInstanceExecutions.get(0).setConcurrent(true); } concurrentRootExecutions.remove(execution); parentScopeInstanceExecutions.add(execution); execution.setParent(parentScopeInstance); execution.setActivity(activity); propagatingExecution = execution; // if there is only a single concurrent execution left // in the concurrent root, auto-prune it. meaning, the // last concurrent child execution data should be cloned into // the concurrent root. if (concurrentRootExecutions.size()==1) { InterpretableExecution lastConcurrent = concurrentRootExecutions.get(0); if (lastConcurrent.isScope()) { lastConcurrent.setConcurrent(false); } else { log.debug("merging last concurrent {} into concurrent root {}", lastConcurrent, concurrentRoot); // We can't just merge the data of the lastConcurrent into the concurrentRoot. // This is because the concurrent root might be in a takeAll-loop. So the // concurrent execution is the one that will be receiving the take concurrentRoot.setActivity((ActivityImpl) lastConcurrent.getActivity()); concurrentRoot.setActive(lastConcurrent.isActive()); lastConcurrent.setReplacedBy(concurrentRoot); lastConcurrent.remove(); } } } else if (execution.isConcurrent() && execution.isScope()) { /** * 根据算法,这种状况不会出现。源代码中,这部分也属于todo的内容。 */ } else { /** * 这个条件是执行实例的scope属性为真。此时销毁当前的执行实例,使用其父执行实例继续后面的流程 */ propagatingExecution = (InterpretableExecution) execution.getParent(); propagatingExecution.setActivity((ActivityImpl) execution.getActivity()); propagatingExecution.setTransition(execution.getTransition()); propagatingExecution.setActive(true); log.debug("destroy scope: scoped {} continues as parent scope {}", execution, propagatingExecution); execution.destroy(); //删除与该执行实例相关的一切,包括:定时工做,各类任务,事件订阅,用户流程关系,最后删除自身。 execution.remove(); } } else { //若是离开的是一个非做用域节点,则仍然使用当前的执行实例做为下一个节点的执行实例 propagatingExecution = execution; } // if there is another scope element that is ended ScopeImpl nextOuterScopeElement = activity.getParent(); TransitionImpl transition = propagatingExecution.getTransition(); ActivityImpl destination = transition.getDestination(); /** * 考虑当前的节点多是子流程或者活动调用中的节点。那么就须要离开当前的做用域范围,回到更上层的做用域下。所以须要判断目的地是否和源头节点处于同一个做用域。若是不是同一个做用域,则不断向上回溯 */ if (transitionLeavesNextOuterScope(nextOuterScopeElement, destination)) { propagatingExecution.setActivity((ActivityImpl) nextOuterScopeElement); propagatingExecution.performOperation(TRANSITION_NOTIFY_LISTENER_END); } else { propagatingExecution.performOperation(TRANSITION_NOTIFY_LISTENER_TAKE); } } public boolean transitionLeavesNextOuterScope(ScopeImpl nextScopeElement, ActivityImpl destination) { return !nextScopeElement.contains(destination); } }
该原子操做的目的就是为了执行在链接线上的监听器。在执行完毕后,就准备执行目标活动节点。这里关于目标节点还存在一个选择的问题。并非直接执行链接线上的目标活动节点。而是从目标活动节点出发,选择和执行实例当前活动节点同属同一个做用域的目标活动节点或其父节点。
public class AtomicOperationTransitionNotifyListenerTake implements AtomicOperation { private static Logger log = LoggerFactory.getLogger(AtomicOperationTransitionNotifyListenerTake.class); public boolean isAsync(InterpretableExecution execution) { return false; } public void execute(InterpretableExecution execution) { TransitionImpl transition = execution.getTransition(); List<ExecutionListener> executionListeners = transition.getExecutionListeners(); int executionListenerIndex = execution.getExecutionListenerIndex(); /** * 整个if的功能就是不断判断监听器是否被执行完毕。都执行完毕后走入到else的部分。 */ if (executionListeners.size()>executionListenerIndex) { execution.setEventName(org.activiti.engine.impl.pvm.PvmEvent.EVENTNAME_TAKE); execution.setEventSource(transition); ExecutionListener listener = executionListeners.get(executionListenerIndex); try { listener.notify(execution); } catch (RuntimeException e) { throw e; } catch (Exception e) { throw new PvmException("couldn't execute event listener : "+e.getMessage(), e); } execution.setExecutionListenerIndex(executionListenerIndex+1); execution.performOperation(this); } else { if (log.isDebugEnabled()) { log.debug("{} takes transition {}", execution, transition); } execution.setExecutionListenerIndex(0); execution.setEventName(null); execution.setEventSource(null); ActivityImpl activity = (ActivityImpl) execution.getActivity(); ActivityImpl nextScope = findNextScope(activity.getParent(), transition.getDestination()); execution.setActivity(nextScope); // Firing event that transition is being taken if(Context.getProcessEngineConfiguration() != null && Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) { Context.getProcessEngineConfiguration().getEventDispatcher().dispatchEvent( ActivitiEventBuilder.createSequenceFlowTakenEvent(ActivitiEventType.SEQUENCEFLOW_TAKEN, transition.getId(), activity.getId(), (String) activity.getProperties().get("name") ,(String) activity.getProperties().get("type"), activity.getActivityBehavior().getClass().getCanonicalName(), nextScope.getId(), (String) nextScope.getProperties().get("name"), (String) nextScope.getProperties().get("type"), nextScope.getActivityBehavior().getClass().getCanonicalName())); } execution.performOperation(TRANSITION_CREATE_SCOPE); } } /** finds the next scope to enter. the most outer scope is found first */ public static ActivityImpl findNextScope(ScopeImpl outerScopeElement, ActivityImpl destination) { ActivityImpl nextScope = destination; while( (nextScope.getParent() instanceof ActivityImpl) && (nextScope.getParent() != outerScopeElement) ) { nextScope = (ActivityImpl) nextScope.getParent(); } return nextScope; } }
该原子操做是为了确认进入的节点是否具有做用域。若是具有做用域,则将目前的执行实例冻结。而且建立出新的执行实例,用于执行做用域节点;若是不具有做用域,则无效果。
在确认完毕后,执行原子操做AtomicOperationTransitionNotifyListenerStart
public class AtomicOperationTransitionCreateScope implements AtomicOperation { private static Logger log = LoggerFactory.getLogger(AtomicOperationTransitionCreateScope.class); public boolean isAsync(InterpretableExecution execution) { ActivityImpl activity = (ActivityImpl) execution.getActivity(); return activity.isAsync(); } public void execute(InterpretableExecution execution) { InterpretableExecution propagatingExecution = null; ActivityImpl activity = (ActivityImpl) execution.getActivity(); if (activity.isScope()) { //为做用域活动建立一个新的执行实例,是该原子操做的主要目的 propagatingExecution = (InterpretableExecution) execution.createExecution(); propagatingExecution.setActivity(activity); propagatingExecution.setTransition(execution.getTransition()); execution.setTransition(null); execution.setActivity(null); execution.setActive(false); log.debug("create scope: parent {} continues as execution {}", execution, propagatingExecution); //这里是另一个重点。在一个流程实例初始化的时候,会对当前流程所处的做用域对象(多是流程定义或者是做用域活动进行处理,具体表现是为该做用域对象上的定时事件,消息事件,信号事件执行注册动做。分别是放入定时调度器,在数据库新增事件订阅) propagatingExecution.initialize(); } else { propagatingExecution = execution; } propagatingExecution.performOperation(AtomicOperation.TRANSITION_NOTIFY_LISTENER_START); } }