说说 jBPM 工作流引擎的设计原理
1服务 API 设计
jBPM4 工作流引擎的核心 PVM 主要依靠 4 组服务 API :
- 流程定义服务 - Process Service。
- 流程执行服务- Execution Service。
- 流程管理服务 - Managerment Service。
- 指令服务 - Command Service。
应用通过这些服务与 PVM 进行数据交互,这些都是在支持事务的持久化模式下运行的。比如:
* ExecutionService.startProcessInstanceByKey - 发起流程实例。
* TaskService.completeTask - 完成任务。
客户端 API 是核心工作流模型对象对外暴露的公共方法,我们可以直接使用客户端 API 来执行一些流程操作,客户端 API 不会进行任何持久化操作,它操作的结果是通过调用相应服务的 API 后才会被持久化。比如:
* ProcessInstance.getName - 获取流程实例名称。
* Task.setAssignee - 设置任务分配者。
1.1 活动 API
活动 API 用于实现流程活动在运行时的行为。所有的活动类型都要实现 ActivityBehaviour 接口,它提供了控制流程执行的方法,接口定义如下:
public interface ActivityBehaviour extends Serializable {
/** invoked when an execution arrives in an activity.
*
* <p>An ActivityBehaviour can control the propagation
* of execution. ActivityBehaviour's can become external activities when they
* invoke {@link ActivityExecution#waitForSignal()}. That means the
* activity will become a wait state. In that case, {@link ExternalActivityBehaviour}
* should be implemented to also handle the external signals.
* </p> */
void execute(ActivityExecution execution) throws Exception;
}
执行对象的类型需要实现 ActivityExecution 接口,这个接口定义了控制流程推进的方法:
活动定义 | 说明 |
---|---|
String getActivityName() | 获取当前活动名称。 |
void waitForSignal() | 等待执行信号。 |
void takeDefaultTransition() | 选择一个默认的流出转移。 |
void take(String transitionName) | 选择一个指定名称的流出转移。 |
void execute(String activityName) | 执行子活动。 |
void end() | 结束当前流程(包括子流程)。 |
void end(String state) | 结束当前流程(包括子流程),并为子流程指定结束状态。 |
void setPriority(int priority) | 设置活动优先级。 |
1.2 事件监听 API
事件监听 API 用于自定义事件监听器,它可以用来处理被监听到的流程事件。
它与活动 API 的区别是:它不能控制流程的执行。假设一个活动通过 execution 已经确定了一个转移,这时就会触发它所对应的事件监听器,因为转移已经先被确定,所以事件监听器必然无法改变流程的推进路线。
自定义的事件监听器,需要实现 EventListener
接口,这个接口定义如下:
public interface EventListener extends Serializable {
/** is invoked when an execution crosses the event on which this listener is registered */
void notify(EventListenerExecution execution) throws Exception;
}
这里的 notify 方法需要一个 EventListenerExecution 类型的参数,它与 ActivityExecution 的相同之处是,它们都继承自 OpenExecution 接口,但它只定义了一个设置优先级的方法:
public interface EventListenerExecution extends OpenExecution {
/** setter for the priority. The default priority is 0, which means
* NORMAL. Other recognized named priorities are HIGHEST (2), HIGH (1),
* LOW (-1) and LOWEST (-2). For the rest, the user can set any other
* priority integer value, but then, the UI will have to display it as
* an integer and not the named value.*/
void setPriority(int priority);
}
再次强调:事件监听器无法改变流程的推进路径。
2 运行环境设计
为了让流程可以在不同的事务环境(Java EE 或 Spring )中运行,PVM 定义了运行环境对象,它会根据配置的环境,执行服务延迟加载与获取事务管理等操作。
运行环境是 EnvironmentFactory 对象,它有两个实现:
- ProcessEngineImpl - 默认的 Java EE 环境。
- SpringProcessEngine - 基于 Spring 框架的环境。
通过以下方式获取默认环境工厂对象,从而执行任意流程操作:
ConfigurationImpl cfg = new ConfigurationImpl();
cfg.setResource("jbpm.cfg.xml");//指定配置文件
//创建环境工厂对象
EnvironmentFactory factory=new ProcessEngineImpl(cfg);
//执行任意流程操作
Environment environment=factory.openEnvironment();
try {
RepositoryService repositoryService = environment.get(RepositoryService.class);
} finally {
factory.close();
}
注意:通过 Environment 对象获取的流程服务受到事务的控制。
也可以通过 Configuration 类加载默认的配置文件,获取各项流程服务,这种方式更方便:
ProcessEngine engine= Configuration.getProcessEngine();
RepositoryService repositoryService=engine.getRepositoryService();
3 命令设计模式
命令设计模式是 jBPM4 实现流程逻辑的核心思想。所有的命令都需要实现 Command 接口,并在 execute() 方法中实现逻辑:
public interface Command<T> extends Serializable {
T execute(Environment environment) throws Exception;
}
注意: 每个命令都是独立的事务操作,即每一个 execute() 方法的实现都被一个 Hibernate 事务所包含。
public class CustomCommand implements Command<Void> {
private String executionId;
@Override
public Void execute(Environment environment) throws Exception {
//从环境对象中获取执行服务
ExecutionService executionService = environment.get(ExecutionService.class);
//执行服务,完成流程逻辑
executionService.signalExecutionById(executionId);
return null;
}
}
命令定义后,可以通过流程引擎对象来执行自定义的命令:
ProcessEngine engine = Configuration.getProcessEngine();
engine.execute(new CustomCommand());
4 服务设计
外部应用程序(比如客户端)会调用服务 API 来作为操作工作流引擎,也可以通过它来持久化 PVM 的操作。
三个基本的服务接口:
服务类 | 说明 |
---|---|
RepositoryService | 流程定义及其相关资源的服务 |
ExecutionService | 流程实例及其执行的服务 |
ManagementService | Job 相关服务 |
所有的流程逻辑都被封装为命令,因此上述的三个服务类的方法实现执行的都是命令。比如 ManagementService 中的 createJobQuery 的实现:
public JobQuery createJobQuery() {
JobQueryImpl query = commandService.execute(new CreateJobQueryCmd());
query.setCommandService(commandService);
return query;
}
所有的 PVM 命令都统一委派给 CommandService,由它来执行这些命令:
public interface CommandService {
String NAME_TX_REQUIRED_COMMAND_SERVICE = "txRequiredCommandService";
String NAME_NEW_TX_REQUIRED_COMMAND_SERVICE = "newTxRequiredCommandService";
/**
* @throws JbpmException if command throws an exception.
*/
<T> T execute(Command<T> command);
}
CommandService 有两种工作模式:
* NAME_TX_REQUIRED_COMMAND_SERVICE :在同一线程中使用一个事务来执行所有的命令。
* NAME_NEW_TX_REQUIRED_COMMAND_SERVICE :一个命令执行一个事务。
CommandService 只定义了一个用于执行命令方法 execute()。
在默认的配置文件 jbpm.default.cfg.xml 中,预设了以下这些服务:
<repository-service />
<repository-cache />
<execution-service />
<history-service />
<management-service />
<identity-service />
<task-service />
CommandService 的设计采用了职责链的设计模式,它是环绕在命令周围的一群拦截器所组成的一条职责链。我们可以组合不同的拦截器,按照不同的顺序,在不同的环境下实现不同的持久化事务策略。
在 jbpm.tx.hibernate.cfg.xml 中,描述了 CommandService 的实现策略:
<command-service name="txRequiredCommandService">
<skip-interceptor />
<retry-interceptor />
<environment-interceptor />
<standard-transaction-interceptor />
</command-service>
<command-service name="newTxRequiredCommandService">
<retry-interceptor />
<environment-interceptor policy="requiresNew" />
<standard-transaction-interceptor />
</command-service>
这就是我们之前所说的 CommandService 存在的两种工作模式的配置方式。
各个服务会按照需要来选择合适的 CommandService 工作模式来执行命令。各个拦截器继承自 Interceptor 抽象类,而它实现的就是 CommandService 接口:
public abstract class Interceptor implements CommandService {
protected CommandService next;
public CommandService getNext() {
return next;
}
public void setNext(CommandService next) {
this.next = next;
}
}
多个 CommandService 被配置为一条职责链来拦截命令,这样各个服务就通过职责链来选择不同的策略,而无须改变命令本身啦O(∩_∩)O哈哈~
我们以 newTxRequiredCommandService 的 CommandService 实现为例,来说明这条职责链的作用,调用一条命令后,它会依次执行以下的拦截器——
- retry-interceptor:在数据库的乐观锁失败时,捕获 Hibernate 的 StaleObjectException,并尝试重新调用命令。
- environment-interceptor:为命令的调用提供一个环境对象。
- standard-transaction-interceptor:初始化标准事务对象(StandardTransaction)。
- 最后,由 DefaultCommandService 来调用命令。
也可以在此通过配置,使用其他的方式来调用命令——
- EjbLocalCommandService:把命令委派给一个本地的 EJB,这样可以启动一个 EJB 内容管理事务。
- EjbRemoteCommandService:把命令委派给一个远程的 EJB,这样命令可以在另一个 JVM 上被执行。
- AsyncCommandService:命令被包装为一个异步消息,这样命令就会在一个新的事务中被异步执行。
5 流程历史库
在整个流程实例执行过程的各个关键阶段,都设计了历史事件触发器,它会把流程实例数据存入历史库,实现了运行中的流程数据与历史流程数据的分离。
在流程实例的运行过程中,或触发历史流程事件,然后根据分类被分发到配置好的 HistorySession 中,HistorySession 的默认实现 HistorySessionImpl 会调用相应的历史事件对象 (HistoryEvent )的 process 方法来执行相应的历史事件处理逻辑:
public class HistorySessionImpl implements HistorySession {
public void process(HistoryEvent historyEvent) {
historyEvent.process();
}
}
抽象类 HistoryEvent 的事件本身不会被持久化,它的抽象方法 process() 在它的实现类中,创建了历史实体,比如 HistoryEvent 的一个实现类 ActivityStart:
public void process() {
DbSession dbSession = EnvironmentImpl.getFromCurrent(DbSession.class);
long processInstanceDbid = execution.getProcessInstance().getDbid();
HistoryProcessInstance historyProcessInstanceImpl = dbSession.get(HistoryProcessInstanceImpl.class, processInstanceDbid);
HistoryActivityInstanceImpl historyActivityInstance =
createHistoryActivityInstance(historyProcessInstanceImpl);
String activityType = execution.getActivity().getType();
historyActivityInstance.setType(activityType);
dbSession.save(historyActivityInstance);
execution.setHistoryActivityInstanceDbid(historyActivityInstance.getDbid());
}
这里创建了 HistoryActivityInstanceImpl ,并执行了持久化操作。
在 process() 中历史事件创建的实体与当前的流程实体是对应、归并的关系,比如 ProcessInstanceCreate 事件会创建与持久化 HistoryProcessInstance;而 ProcessInstanceEnd 事件会设置与持久化对应的 HistoryProcessInstance 对象的状态(结束)。
历史流程库维护着过往流程的归档信息。但流程实例或活动实例结束时,就会在历史流程库中写入数据,因为这些数据对于当前运行着的流程来说,是历史(过时)信息。
历史流程库使用 5 张表维护着 4 种实体历史信息:
实体 | 表名 |
---|---|
历史流程实例 | jbpm4_hist_procinst |
历史活动实例 | jbpm4_hist_actinst |
历史任务 | jbpm4_hist_task |
历史流程变量 | jbpm4_hist_var |
最后一张是 jbpm4_hist_detail,它记录着上述这些实体的历史明细表。
可以使用 HistoryService 的 createHistroyXxxQuery() 方法来获取上述实体的查询对象,来获取历史流程实体信息:
在 HistoryService 中还提供了一些用于数据分析的方法,比如:
方法 | 说明 |
---|---|
avgDurationPerActivity(String processDefinitionId) | 获取活动的平均执行时间。 |
choiceDistribution(String processDefinitionId, String activityName) | 获取流程转移的选择次数。 |
需要的话,也可以根据历史明细表 jbpm4_hist_detail,扩展出我们自己的流程数据分析方法哦O(∩_∩)O哈哈~
上一篇: Java的输入/输出——缓冲流
下一篇: 缓冲流、转换流