年初要作一个运维自动化平台,须要用到流程引擎,原本打算项目用golang写的,可是golang的流程引擎功能太简单实在是用不来,最后仍是选型java + activiti。到activiti官网一看,嘿出7.0告终果文档是刚写的还不全,咱们java仍是8的,7.0是匹配的java11,最终是问题太多只好放弃用activiti6.0了。java
虽然网上教程有很多,不过要真正跑起来着实不容易,有些内容好比参数的意义仍是看5.0的手册才弄明白的。mysql
搭建环境:java8 + springboot2.1.3 + activiti6.0 + mysqlgolang
pom依赖:spring
<dependency>
<groupId>org.activiti</groupId>
<artifactId>activiti-spring-boot-starter-basic</artifactId>
<version>${activiti.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
复制代码
配置数据库:sql
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/activiti?nullCatalogMeansCurrent=true&serverTimezone=Asia/Shanghai
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.username=root
spring.datasource.password=root
复制代码
而后启动,恭喜你,应用没起来,会出现以下报错:数据库
Caused by: java.io.FileNotFoundException: class path resource [org/springframework/security/config/annotation/authentication/configurers/GlobalAuthenticationConfigurerAdapter.class] cannot be opened because it does not exist:
express
这个缘由是activiti6.0开发的时候springboot2.0还没出来以至年久失修,文件路径不对了,解决方案:springboot
启动类前排除掉SecurityAutoConfigurationbash
@SpringBootApplication(exclude = SecurityAutoConfiguration.class)
并发
再次启动,若是你碰到建立数据库表却找不到表的问题,两种解决方法:
activiti-engine-6.0.0.jar/org/activiti/db/create
nullCatalogMeansCurrent=true
这是由于在org/activiti/engine/impl/db/DbSqlSession
中activiti使用databaseMetaData.getTables
寻找库表是否存在,而dbSqlSessionFactory
获取到的catalog
为空由于mysql使用schema
标识库名而不是catalog
,致使mysql扫描全部的库来找表,一旦其余库中有同名表activiti就觉得找到了其实表并不存在。nullCatalogMeansCurrent
的意义就在于让mysql默认当前库,在mysql-connector-java 5.x
该参数默认为true,但在6.x以上默认为false。
再次启动,仍然没起来:
Caused by: java.io.FileNotFoundException: class path resource [processes/] cannot be resolved to URL because it does not exist
这是由于activiti会到resource/processes下面寻找流程文件,建立该目录。
activiti提供了多种设计器来画流程图,我安装了IDEA的插件和eclipse的插件,比较了下仍是eclipse的好用,因此只介绍eclipse 插件的安装过程。
Names : Activiti BPMN 2.0 designer
Location : https://www.activiti.org/designer/update/
复制代码
<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:activiti="http://activiti.org/bpmn" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC" xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI" xmlns:tns="http://www.activiti.org/test" typeLanguage="http://www.w3.org/2001/XMLSchema" expressionLanguage="http://www.w3.org/1999/XPath" targetNamespace="http://www.activiti.org/test" id="m1551347612734" name="">
<process id="machine_expansion" name="Machine Expansion" isExecutable="true" isClosed="false" processType="None">
<startEvent id="start" name="开始"></startEvent>
...
复制代码
使用runtimeservice建立一个createProcessInstanceBuilder,设置processDefinitionKey为xml中的process id,而后start就能够了。
@Autowired
private RuntimeService runtimeService;
@Override
public String start(BaseParamDTO baseParamDTO) {
String currentUserName = CurrentUserUtil.getCurrentUserName();
identityService.setAuthenticatedUserId(currentUserName);
ExpansionParamDTO expansionParamDTO = (ExpansionParamDTO) baseParamDTO.getParams();
ProcessInstance machineExpansion = runtimeService.createProcessInstanceBuilder().processDefinitionKey(WorksheetTypeEnum.EXPANSION.getActivitiDefineName())
.businessKey(expansionParamDTO.getAppName()).name(WorksheetTypeEnum.EXPANSION.name()).start();
baseParamDTO.setId(machineExpansion.getProcessInstanceId());
submit(baseParamDTO);
return machineExpansion.getProcessInstanceId();
}
复制代码
为了设置流程的startuser,必需要使用identityService.setAuthenticatedUserId设置,由于process获取的是当前线程的用户。
要熟悉一个项目,最简单的方式就是看他的数据库表是怎么设计的,表里面都存了什么东西,activiti数据库中一共有28张表
spring.activiti.historyLevel
配置,对应HistoryService
表名 | 描述 |
---|---|
act_evt_log | 事件日志表 |
act_ge_bytearray | 二进制数据表,好比抛错 |
act_ge_property | activiti属性表 |
act_hi_actinst | 历史的流程活动实例 |
act_hi_attachment | 历史的流程附件 |
act_hi_comment | 历史的描述信息 |
act_hi_detail | 历史的流程运行中的细节信息 |
act_hi_identitylink | 历史的用户关系信息 |
act_hi_procinst | 历史的流程实例 |
act_hi_taskinst | 历史的usertask实例 |
act_hi_varinst | 历史的流程参数 |
act_id_group | 用户组 |
act_id_info | 用户详情信息 |
act_id_membership | 用户和用户组关联关系 |
act_id_user | 用户信息 |
act_procdef_info | 流程定义扩展表 |
act_re_deployment | 流程部署信息 |
act_re_model | 流程模型 |
act_re_procdef | 已部署过的流程 |
act_ru_deadletter_job | 运行时中死掉的job |
act_ru_event_subscr | 运行时的事件监听 |
act_ru_execution | 运行时的流程执行实例 |
act_ru_identitylink | 运行时的用户关系信息 |
act_ru_job | 运行时的job |
act_ru_suspended_job | 运行时暂停的job |
act_ru_task | 运行时的usertask实例 |
act_ru_timer_job | 运行时的定时器job |
act_ru_variable | 运行时的参数 |
在activiti中有几个基础定义:
eclipse设计器总共有8种task,目前我只用到其中的4种
用户任务用来设置须要人操做才能完成的任务如审批,执行到usertask时,流程会卡住,只有用户人工触发了流程才会继续。示例以下,这里没有在画bpmn时写死用户,而是在流程中经过taskService设置assigness,usertask完成只能是complete,后续的流程能够用参数和排他网关控制。
@Override
public void submit(BaseParamDTO baseParamDTO) {
String currentUserName = CurrentUserUtil.getCurrentUserName();
ExpansionParamDTO expansionParamDTO = (ExpansionParamDTO) baseParamDTO.getParams();
Task submit = taskService.createTaskQuery().processInstanceId(baseParamDTO.getId()).taskDefinitionKey(ExpansionDisplayEnum.SUBMIT.getFlow()).singleResult();
taskService.setAssignee(submit.getId(), currentUserName);
taskService.setVariable(submit.getId(), INPUT_PARAM, expansionParamDTO);
taskService.setVariable(submit.getId(), INPUT_PRIORITY, baseParamDTO.getPriority());
taskService.setPriority(submit.getId(), baseParamDTO.getPriority());
if (StringUtils.isNotBlank(baseParamDTO.getComment())){
taskService.addComment(submit.getId(), null, baseParamDTO.getComment());
}
taskService.complete(submit.getId());
}
复制代码
用户任务涉及到的几个概念:
写一段脚本,执行到该步骤时自动执行脚本。(没用过)
最经常使用的task,执行到该步骤时自动执行对应的java类
@Component
public class ApplyMachineTask implements JavaDelegate {
public static final Logger LOGGER = LoggerFactory.getLogger(ApplyMachineTask.class);
public static final String MACHINE_APPLY_ID = "applyId";
public static final String MACHINE_APPLY_ID_LOCAL = "localApplyId";
public static final String MACHINE_APPLY_CHECK = "applyCheckCount";
@Autowired
private MachineService machineService;
@Override
public void execute(DelegateExecution delegateExecution){
ExpansionParamDTO param = delegateExecution.getVariable(INPUT_PARAM, ExpansionParamDTO.class);
LOGGER.debug("start apply machine, param {}", param);
param.setWorksheetId(delegateExecution.getProcessInstanceId());
ApiResponseDTO<String> stringApiResponseDTO = machineService.applyMachine(param);
if (stringApiResponseDTO.isSuccess()){
delegateExecution.setVariable(MACHINE_APPLY_ID, stringApiResponseDTO.getBody());
delegateExecution.setVariableLocal(MACHINE_APPLY_ID_LOCAL, stringApiResponseDTO.getBody());
delegateExecution.setVariable(MACHINE_APPLY_CHECK, 0);
} else {
delegateExecution.setVariableLocal(ERROR_LOCAL, stringApiResponseDTO.getError());
throw new BpmnError(ERROR_RETRY, stringApiResponseDTO.getError());
}
}
}
复制代码
这里说下参数设置,setVariable
会设置流程全局变量,也就是流程中任意一个步骤都能获取到的参数,用来参数传递,而setVariableLocal
设置的是局部变量,只有当前步骤才能获取的参数,局部变量的用处在于能够记录流程执行到该步骤时的状态,参数涉及到覆盖问题,因此不要出现全局和局部参数同名的现象(子流程的除外)。
业务规则用户用来同步执行一个或多个规则。activiti使用drools规则引擎执行业务规则。(没用过)
自动邮件任务,它能够发送邮件给一个或多个参与者。(没用过)
手工任务定义了BPM引擎外部的任务。 用来表示工做须要某人完成,而引擎不须要知道,也没有对应的系统和UI接口。 对于引擎,手工任务是直接经过的活动, 流程到达它以后会自动向下执行。(没用过)
接收任务是一个简单任务,它会等待对应消息的到达。流程到达该步骤后会一直等待,直到消息触发。以下图所示,在一个接收任务上挂了一个定时边界事件,定时边界事件每隔一段时间触发服务任务检查状态,当状态经过时给接收任务发消息,接收任务完成,定时边界网关中止。
Execution receiveTask = runtimeService.createExecutionQuery().processInstanceId(execution.getProcessInstanceId())
.activityId(ExpansionDisplayEnum.WAIT_INIT.getFlow()).singleResult();
runtimeService.trigger(receiveTask.getId());
复制代码
activity有两种子流程,一种是CallActivity,一种是subprocess,区别在于CallActivity是调用主流程外部的一个流程,运行时会生成新的processId,做用域彻底独立,一般用来复用其余流程或者须要独立processId的状况;而subprocess是嵌套子流程,他并不会生成新的processId,可是定义了独立的事件域,一般用来捕捉同一类事件。
这里设置的条件是所有完成的时候主流程继续。
activiti有两种事件,一种是流程事件,放在流程中的,一种是边界事件,挂在某个步骤上因为流程运行间接触发的,而这两种事件又分为两类,一种是捕获一种是触发。
这里要说的就是边界错误捕获事件。
错误边界事件中有一个errorCode参数,errorCode用来匹配捕获的错误:
配置一个排他网关,在每条路径上写明条件
子流程每一个步骤中try catch住错误,并抛出
delegateExecution.setVariableLocal(ERROR_LOCAL,stringApiResponseDTO.getError());
throw new BpmnError(ERROR_RETRY, stringApiResponseDTO.getError());
复制代码
须要注意的是抛错中的错误文本好像获取不到,因此仍是要用局部变量来存一下。
由于我想记录全部的操做步骤,因此我把每一种操做都进行了分割,所以出现了下图所示的审批流程。
@Component
public class ApprovalTaskListener implements TaskListener {
private static final Logger LOGGER = LoggerFactory.getLogger(ApprovalTaskListener.class);
@Autowired
private RuntimeService runtimeService;
@Autowired
private ApprovalTask approvalTask;
@Autowired
private ApplicationService applicationService;
@Autowired
private TaskService taskService;
@Override
public void notify(DelegateTask delegateTask) {
String processInstanceId = delegateTask.getProcessInstanceId();
ProcessInstance processInstance = runtimeService.createProcessInstanceQuery().processInstanceId(processInstanceId).singleResult();
String appName = processInstance.getBusinessKey();
String owner = processInstance.getStartUserId();
Integer priority = delegateTask.getVariable(INPUT_PRIORITY, Integer.class);
taskService.setPriority(delegateTask.getId(), priority);
ApiResponseDTO<ApplicationDTO> application = applicationService.getApplication(appName);
if (application.isSuccess()){
List<String> bizMaintainer = application.getBody().getBizMaintainer();
delegateTask.addCandidateUsers(bizMaintainer);
//申请人是审批人时直接处理
if (bizMaintainer.contains(owner)){
taskService.setAssignee(delegateTask.getId(), owner);
}
} else {
LOGGER.error("get application info error, pId: {}, app: {}", processInstanceId, application);
throw new RuntimeException(application.getError());
}
}
}
复制代码
在审批usertask外层设置exection的监听器approvalSkipListener,前面说过exection是先于usertask执行的,每一个usertask外层都有一个exection,因此这个监听器能够控制usertask的行为。
@Component
public class ApprovalSkipListener implements ExecutionListener {
private static final Logger LOGGER = LoggerFactory.getLogger(ApprovalSkipListener.class);
@Autowired
private RuntimeService runtimeService;
@Autowired
private ApplicationService applicationService;
@Override
public void notify(DelegateExecution execution) {
ProcessInstance processInstance = runtimeService.createProcessInstanceQuery().processInstanceId(execution.getProcessInstanceId()).singleResult();
String appName = processInstance.getBusinessKey();
String owner = processInstance.getStartUserId();
ApiResponseDTO<ApplicationDTO> application = applicationService.getApplication(appName);
if (application.isSuccess()){
List<String> bizMaintainer = application.getBody().getBizMaintainer();
if (bizMaintainer.contains(owner)){
execution.setVariable(SKIP_ENABLE,true);
execution.setVariable(APPROVAL_PASS, true);
}
} else {
LOGGER.error("get application info error, pId: {}, app: {}", processInstance.getId(), application);
throw new RuntimeException(application.getError());
}
}
}
复制代码
设置跳过条件
activiti有个颇有意思的功能就是补偿,回滚就是用补偿触发事件和边界补偿捕获事件完成的。当补偿触发后,activiti会从后往前依次触发补偿捕获事件。
说到这个问题,就要说起异步任务:
借用官方的图来简要说明一下。activiti是经过事务的方式执行流程,当流程开始后,activit会一直推动流程直到遇到等待状态,而后把当前状态存到数据库,并等待下一次触发。以下图中usertask是第一个等待状态,定时器是第二个等待状态,而完成usertask和执行服务任务在同一个工做单元中,同一个工做单元中的成功和失败是原子的。若是服务任务成功了数据库中看到的是完成的usertask和完成的服务任务,若是服务任务出错了就会致使activti回滚事务,这个时候就会回到最初的状态,现象就是数据库中依然只有未完成的usertask的记录,而看不到服务任务的记录。
若是我想看到服务任务的执行状态呢,解法就是设置async为true,将服务任务交给后台执行,后台的JobExecutor会周期性的扫描数据库的job提交给工做线程池。以下图所示,服务任务设置了async为true,这个时候有三个等待状态,第一个是usertask,第二个是服务任务,第三个是定时器。如今流程开始,usertask完成后会建立一个等待状态的服务任务job并把他保存到数据库,而后由工做线程池执行,若是服务任务job执行失败了,这个job仍然在数据库中。而邮件任务若是失败了则会和以前同样回滚到服务任务初始状态。
这也是常常遇到的一个坑,目前尚未解决,这涉及到排他任务。
仍是借用官方的图,假设有以下场景,三个服务任务都是异步的,jobExecutor会把这3个job分给工做线程池来执行,若是他们同时到达并发汇聚网关那就会出现一致性问题,由于每一个分支都会检查其余分支是否到达就会出现一直等待的状况。
按道理只有async为true,exclusive为false时才会job锁住不跑了,可是事实并非这样,所以我怀疑是当多台机器同时获取job时致使的这个问题,还待后续排查。
洋洋洒洒终于把activiti的总结写完了,不过是点到为止,还有不少功能没有尝试,bug也没查出来。革命还没有成功,同志仍需努力。