欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

说说 jBPM 工作流引擎的设计原理

程序员文章站 2022-04-03 19:35:35
...

1服务 API 设计

jBPM4 工作流引擎的核心 PVM 主要依靠 4 组服务 API :

  • 流程定义服务 - Process Service。
  • 流程执行服务- Execution Service。
  • 流程管理服务 - Managerment Service。
  • 指令服务 - Command Service。

说说 jBPM 工作流引擎的设计原理

应用通过这些服务与 PVM 进行数据交互,这些都是在支持事务的持久化模式下运行的。比如:
* ExecutionService.startProcessInstanceByKey - 发起流程实例。
* TaskService.completeTask - 完成任务。

客户端 API 是核心工作流模型对象对外暴露的公共方法,我们可以直接使用客户端 API 来执行一些流程操作,客户端 API 不会进行任何持久化操作,它操作的结果是通过调用相应服务的 API 后才会被持久化。比如:
* ProcessInstance.getName - 获取流程实例名称。
* Task.setAssignee - 设置任务分配者。

1.1 活动 API

活动 API 用于实现流程活动在运行时的行为。所有的活动类型都要实现 ActivityBehaviour 接口,它提供了控制流程执行的方法,接口定义如下:

public interface ActivityBehaviour extends Serializable {

  /** invoked when an execution arrives in an activity.
   * 
   * <p>An ActivityBehaviour can control the propagation 
   * of execution.  ActivityBehaviour's can become external activities when they 
   * invoke {@link ActivityExecution#waitForSignal()}.  That means the 
   * activity will become a wait state.  In that case, {@link ExternalActivityBehaviour} 
   * should be implemented to also handle the external signals. 
   * </p> */
  void execute(ActivityExecution execution) throws Exception;
}

执行对象的类型需要实现 ActivityExecution 接口,这个接口定义了控制流程推进的方法:

活动定义 说明
String getActivityName() 获取当前活动名称。
void waitForSignal() 等待执行信号。
void takeDefaultTransition() 选择一个默认的流出转移。
void take(String transitionName) 选择一个指定名称的流出转移。
void execute(String activityName) 执行子活动。
void end() 结束当前流程(包括子流程)。
void end(String state) 结束当前流程(包括子流程),并为子流程指定结束状态。
void setPriority(int priority) 设置活动优先级。

1.2 事件监听 API

事件监听 API 用于自定义事件监听器,它可以用来处理被监听到的流程事件。

它与活动 API 的区别是:它不能控制流程的执行。假设一个活动通过 execution 已经确定了一个转移,这时就会触发它所对应的事件监听器,因为转移已经先被确定,所以事件监听器必然无法改变流程的推进路线。

自定义的事件监听器,需要实现 EventListener 接口,这个接口定义如下:

public interface EventListener extends Serializable {

  /** is invoked when an execution crosses the event on which this listener is registered */
  void notify(EventListenerExecution execution) throws Exception;

}

这里的 notify 方法需要一个 EventListenerExecution 类型的参数,它与 ActivityExecution 的相同之处是,它们都继承自 OpenExecution 接口,但它只定义了一个设置优先级的方法:

public interface EventListenerExecution extends OpenExecution {

  /** setter for the priority.  The default priority is 0, which means 
   * NORMAL. Other recognized named priorities are HIGHEST (2), HIGH (1), 
   * LOW (-1) and LOWEST (-2). For the rest, the user can set any other 
   * priority integer value, but then, the UI will have to display it as 
   * an integer and not the named value.*/
  void setPriority(int priority);
}

再次强调:事件监听器无法改变流程的推进路径。

2 运行环境设计

为了让流程可以在不同的事务环境(Java EE 或 Spring )中运行,PVM 定义了运行环境对象,它会根据配置的环境,执行服务延迟加载与获取事务管理等操作。

运行环境是 EnvironmentFactory 对象,它有两个实现:

  • ProcessEngineImpl - 默认的 Java EE 环境。
  • SpringProcessEngine - 基于 Spring 框架的环境。

通过以下方式获取默认环境工厂对象,从而执行任意流程操作:

ConfigurationImpl cfg = new ConfigurationImpl();
cfg.setResource("jbpm.cfg.xml");//指定配置文件

//创建环境工厂对象
EnvironmentFactory factory=new ProcessEngineImpl(cfg);

//执行任意流程操作
Environment environment=factory.openEnvironment();
try {
     RepositoryService repositoryService = environment.get(RepositoryService.class);
} finally {
    factory.close();
}

注意:通过 Environment 对象获取的流程服务受到事务的控制。

也可以通过 Configuration 类加载默认的配置文件,获取各项流程服务,这种方式更方便:

ProcessEngine engine= Configuration.getProcessEngine();
RepositoryService repositoryService=engine.getRepositoryService();

3 命令设计模式

命令设计模式是 jBPM4 实现流程逻辑的核心思想。所有的命令都需要实现 Command 接口,并在 execute() 方法中实现逻辑:

public interface Command<T> extends Serializable {

  T execute(Environment environment) throws Exception;
}

注意: 每个命令都是独立的事务操作,即每一个 execute() 方法的实现都被一个 Hibernate 事务所包含。

public class CustomCommand implements Command<Void> {
    private String executionId;

    @Override
    public Void execute(Environment environment) throws Exception {
        //从环境对象中获取执行服务
        ExecutionService executionService = environment.get(ExecutionService.class);

        //执行服务,完成流程逻辑
        executionService.signalExecutionById(executionId);
        return null;
    }

}

命令定义后,可以通过流程引擎对象来执行自定义的命令:

ProcessEngine engine = Configuration.getProcessEngine();
engine.execute(new CustomCommand());

4 服务设计

外部应用程序(比如客户端)会调用服务 API 来作为操作工作流引擎,也可以通过它来持久化 PVM 的操作。

三个基本的服务接口:

服务类 说明
RepositoryService 流程定义及其相关资源的服务
ExecutionService 流程实例及其执行的服务
ManagementService Job 相关服务

所有的流程逻辑都被封装为命令,因此上述的三个服务类的方法实现执行的都是命令。比如 ManagementService 中的 createJobQuery 的实现:

public JobQuery createJobQuery() {
    JobQueryImpl query = commandService.execute(new CreateJobQueryCmd());
    query.setCommandService(commandService);
    return query;
  }

所有的 PVM 命令都统一委派给 CommandService,由它来执行这些命令:

public interface CommandService {

  String NAME_TX_REQUIRED_COMMAND_SERVICE = "txRequiredCommandService";
  String NAME_NEW_TX_REQUIRED_COMMAND_SERVICE = "newTxRequiredCommandService";

  /**
   * @throws JbpmException if command throws an exception.
   */
  <T> T execute(Command<T> command);
}

CommandService 有两种工作模式:
* NAME_TX_REQUIRED_COMMAND_SERVICE :在同一线程中使用一个事务来执行所有的命令。
* NAME_NEW_TX_REQUIRED_COMMAND_SERVICE :一个命令执行一个事务。

CommandService 只定义了一个用于执行命令方法 execute()。

在默认的配置文件 jbpm.default.cfg.xml 中,预设了以下这些服务:

<repository-service />
<repository-cache />
<execution-service />
<history-service />
<management-service />
<identity-service />
<task-service />

CommandService 的设计采用了职责链的设计模式,它是环绕在命令周围的一群拦截器所组成的一条职责链。我们可以组合不同的拦截器,按照不同的顺序,在不同的环境下实现不同的持久化事务策略。

在 jbpm.tx.hibernate.cfg.xml 中,描述了 CommandService 的实现策略:

<command-service name="txRequiredCommandService">
  <skip-interceptor />
  <retry-interceptor />
  <environment-interceptor />
  <standard-transaction-interceptor />
</command-service>

<command-service name="newTxRequiredCommandService">
  <retry-interceptor />
  <environment-interceptor policy="requiresNew" />
  <standard-transaction-interceptor />
</command-service>

这就是我们之前所说的 CommandService 存在的两种工作模式的配置方式。

各个服务会按照需要来选择合适的 CommandService 工作模式来执行命令。各个拦截器继承自 Interceptor 抽象类,而它实现的就是 CommandService 接口:

public abstract class Interceptor implements CommandService {

  protected CommandService next;

  public CommandService getNext() {
    return next;
  }
  public void setNext(CommandService next) {
    this.next = next;
  }
}

多个 CommandService 被配置为一条职责链来拦截命令,这样各个服务就通过职责链来选择不同的策略,而无须改变命令本身啦O(∩_∩)O哈哈~

我们以 newTxRequiredCommandService 的 CommandService 实现为例,来说明这条职责链的作用,调用一条命令后,它会依次执行以下的拦截器——

  1. retry-interceptor:在数据库的乐观锁失败时,捕获 Hibernate 的 StaleObjectException,并尝试重新调用命令。
  2. environment-interceptor:为命令的调用提供一个环境对象。
  3. standard-transaction-interceptor:初始化标准事务对象(StandardTransaction)。
  4. 最后,由 DefaultCommandService 来调用命令。

也可以在此通过配置,使用其他的方式来调用命令——

  • EjbLocalCommandService:把命令委派给一个本地的 EJB,这样可以启动一个 EJB 内容管理事务。
  • EjbRemoteCommandService:把命令委派给一个远程的 EJB,这样命令可以在另一个 JVM 上被执行。
  • AsyncCommandService:命令被包装为一个异步消息,这样命令就会在一个新的事务中被异步执行。

5 流程历史库

在整个流程实例执行过程的各个关键阶段,都设计了历史事件触发器,它会把流程实例数据存入历史库,实现了运行中的流程数据与历史流程数据的分离。

在流程实例的运行过程中,或触发历史流程事件,然后根据分类被分发到配置好的 HistorySession 中,HistorySession 的默认实现 HistorySessionImpl 会调用相应的历史事件对象 (HistoryEvent )的 process 方法来执行相应的历史事件处理逻辑:

public class HistorySessionImpl implements HistorySession {

  public void process(HistoryEvent historyEvent) {
    historyEvent.process();
  }
}

抽象类 HistoryEvent 的事件本身不会被持久化,它的抽象方法 process() 在它的实现类中,创建了历史实体,比如 HistoryEvent 的一个实现类 ActivityStart:

public void process() {
    DbSession dbSession = EnvironmentImpl.getFromCurrent(DbSession.class);

    long processInstanceDbid = execution.getProcessInstance().getDbid();

    HistoryProcessInstance historyProcessInstanceImpl = dbSession.get(HistoryProcessInstanceImpl.class, processInstanceDbid);

    HistoryActivityInstanceImpl historyActivityInstance = 
        createHistoryActivityInstance(historyProcessInstanceImpl);

    String activityType = execution.getActivity().getType();
    historyActivityInstance.setType(activityType);

    dbSession.save(historyActivityInstance);

    execution.setHistoryActivityInstanceDbid(historyActivityInstance.getDbid());
}

这里创建了 HistoryActivityInstanceImpl ,并执行了持久化操作。

在 process() 中历史事件创建的实体与当前的流程实体是对应、归并的关系,比如 ProcessInstanceCreate 事件会创建与持久化 HistoryProcessInstance;而 ProcessInstanceEnd 事件会设置与持久化对应的 HistoryProcessInstance 对象的状态(结束)。

历史流程库维护着过往流程的归档信息。但流程实例或活动实例结束时,就会在历史流程库中写入数据,因为这些数据对于当前运行着的流程来说,是历史(过时)信息。

历史流程库使用 5 张表维护着 4 种实体历史信息:

实体 表名
历史流程实例 jbpm4_hist_procinst
历史活动实例 jbpm4_hist_actinst
历史任务 jbpm4_hist_task
历史流程变量 jbpm4_hist_var

最后一张是 jbpm4_hist_detail,它记录着上述这些实体的历史明细表。

可以使用 HistoryService 的 createHistroyXxxQuery() 方法来获取上述实体的查询对象,来获取历史流程实体信息:
说说 jBPM 工作流引擎的设计原理

在 HistoryService 中还提供了一些用于数据分析的方法,比如:

方法 说明
avgDurationPerActivity(String processDefinitionId) 获取活动的平均执行时间。
choiceDistribution(String processDefinitionId, String activityName) 获取流程转移的选择次数。

需要的话,也可以根据历史明细表 jbpm4_hist_detail,扩展出我们自己的流程数据分析方法哦O(∩_∩)O哈哈~

相关标签: jBPM