小小研究一下工作流WorkFlow
修房子
在小牧老家农村,小时候总是看到村里有人在修房子。每次看到有人修房子的时候,他就会爬到房子面前的沙粒堆上去,翻找随着沙子一起被挖出来的贝壳。虽然也不知道拿来干嘛,不过总觉得收集贝壳很好玩。
小牧也喜欢看他们修房子。修房子的时候,专业的修房师傅会指导工人如何一步步修房子。第一步,首先要打地基,把地基打的牢牢的。第二步是浇筑地梁。在地基上面浇筑房子的核心骨架。第三步是盖造房子的主体结构,基本是修出了房子的框架外形了。第四步是封顶,考虑水电走向、防水等[1]。
那么,可以看到修房子是有一个标准流程的,工人们会按照流程一步步去修房子,这样才能保证房子能够修的结实可靠。
工作流
上面的例子只是简单地说明了流程。生活中处处存在各种流程,有简单的,有复杂的。比如办事流程,不仅仅需要严格遵守办事流程顺序,还需要保证办事员具有办事权限,特殊情况还需要保密、不可回退流程等。因此过程管理其实是非常重要且广泛存在的。
同样在软件领域,我们依然有很多流程。同样可举个栗子。小牧是搞云计算的,假如用户现在要准备创建一台虚拟机了,那么也存在一系列的流程,比如首先要进行权限验证,金额验证、参数验证,判断是否运行创建虚拟机;允许创建后需要进行存储资源申请、网络资源,还需要考虑物理集群等一系列问题,最后才是真正地创建虚拟机,返回给用户使用。在这个VM创建流程中,任何一步都不能出错,即使出错,每个步骤也有相应的处理方法,比如回滚或者退出报错。
在软件业务中,存在非常多复杂的流程,也叫工作流 。那么如果对于流程,每个都需要程序员去做流程控制、监控等工作,那就会造成冗余的工作。因此大佬们也做出了很多知名的流程控制框架,如JBPM和Activiti等工具。在这里呢,我们也设计一个简单我们自己的流程管理引擎,做一个小玩具。基本的设计思路参考ZStack的FlowChain结构[2],但是便于理解有一些改动。
基本介绍
图1 为小牧画的一个示意图。Task,也就是一个任务。一个流程里面多个任务,比如“修房子”对应打地基、修框架等等多个任务。而操作员,也就是具体执行流程的部分,操作员只有执行了当前手里的任务后,才会接收下一个Task。当然执行可以多线程多操作员一起执行,但是这里我们简化问题,设计为只有一个操作员,单条流水线线性执行Task。
图1 工作流示意图
从图中可以看出,这个流程中有两个重要的组成部分。
- 任务Task,包含任务的运行信息,具体该task的运行方法
- 传输带,用于存放需要执行的task序列
- 操作员,运行task,根据task中的信息进行处理
因此如果我们要设计实现工作流引擎的话,首先是需要对这三个基本的组件进行设计。接下来分别介绍。
任务Task
Task为具体的一个工作步骤。要描述这个步骤,我们有几个需要考虑的点。
最基本的是需要知道这个任务要执行什么。因此我们可以定义一个run()方法。
当任务运行失败怎么办?因此我们要定义一下运行失败的处理方式,是否回滚rollback,以及rollback的具体操作。
因此我们更希望控制Task的具体操作,因此设计一个Task接口。
public interface Task{
void run(Map data); //任务的运行操作
void rollback(Map data); //任务的回滚操作
}
run函数为任务运行的具体执行流程。rollback为任务回退的时候的运行流程。有了这两个操作接口,那么操作员在拿到任务之后,就能去操作了。
操作员
操作员具体执行任务,根据任务设计好的run和rollback方法来执行。除了直接执行任务的功能外,操作员还需要从流水线上取出任务。因此操作员的描述如下:
public interface TaskOperator {
void fail(ErrorCode errorCode); //任务执行失败后的操作
void next(); //用于获得下一个需要执行的任务
void setError(ErrorCode error); //设置任务错误的状态
}
流水线
流水线的作用,主要承担的是任务的缓存以及单个流程中多个子任务的连接。操作员会从流水线上按照任务顺序取任务。流水线需要支持任务的管理功能、存储功能。想象一下,在工厂里的流水线设备,是不是也有很多控制杆、按钮来控制流水线的运行的?所以在具体的实现中,我们按照如下方式来设计流水线。
public interface TaskChain {
List<Flow> getTasks(); //获得任务
TaskChain insert(Task task); //插入任务
TaskChain insert(int pos, Task task); //在子任务之间插入任务
TaskChain then(Task task); //任务流程描述,用于一步步添加任务
TaskChain done(FlowDoneHandler handler); //任务全部执行结束后的回调函数
TaskChain error(FlowErrorHandler handler); //任务执行出错的回调函数
TaskChain Finally(FlowFinallyHandler handler); //任务执行结束后不管正确与否都会执行的函数
TaskChain setData(Map data); //为任务传递数据
TaskChain setName(String name);
Map getData();
void start(); //启动任务执行流程
TaskChain noRollback(boolean no); //任务是否允许回滚
TaskChain allowEmptyFlow(); //是否允许任务流为空,没有子任务
}
那么一个简单的流水线就设计好了。
现在我们有了各个组件了,接下来我们要让他们按照流程配合工作。流程图如图所示。
其中的历史任务列表,用于支持任务执行失败后的回滚操作,简单起见现在我们暂时不关心。那么代码可以这样写了。
public class WorkFlowChain implements TaskOperator, TaskChain {
private String id;
private List<Task> tasks = new ArrayList<>(); //流水线存储
private Stack<Task> rollBackFlows = new Stack<>(); //历史任务存储
private Map data = new HashMap(); //任务需要用到的数据设置
private Iterator<Task> it;
private FlowErrorHandler errorHandler;
private FlowDoneHandler doneHandler;
private FlowFinallyHandler finallyHandler;
private String name;
public WorkFlowChain() {
id = randomCreateID();
}
//将任务插入流水线中
@Override
public WorkFlowChain insert(Task task) {
tasks.add(0, task);
return this;
}
@Override
public WorkFlowChain insert(int pos, Task task) {
tasks.add(pos, task);
return this;
}
public WorkFlowChain then(Task task) {
tasks.add(task);
return this;
}
//开始运行任务
@Override
public void start() {
if (tasks.isEmpty() && allowEmptyTask) { //假如已经没有任务了,则调用完成回调函数
callDoneHandler();
return;
}
if (tasks.isEmpty()) { //没有任务的任务流无法运行
throw new CloudRuntimeException("you must call then() to add flow before calling start() or allowEmptyFlow() to run empty flow chain on purpose");
}
if (data == null) { //data用于作为共享数据存储部分
data = new HashMap<String, Object>();
}
isStart = true;
if (name == null) {
name = "anonymous-chain";
}
it = tasks.iterator();
Task task = getFirstTask(); //开始执行第一个任务
if (task == null) {
callDoneHandler(); //假如开始执行第一个任务,第一个任务为空,则已经执行完任务了
} else {
runTask(task); //操作工拿到任务后运行该任务。
}
}
//运行任务
@Override
private void runTask(Task task) {
try {
Task toRun = task;
currentFlow = toRun;
String flowName = getFlowName(currentFlow);
String info = String.format("[TaskChain(%s): %s] start executing task[%s]", id, name, flowName);
logger.debug(info);
toRun.run(this, data);
} catch (OperationFailureException oe) {
String errInfo = oe.getErrorCode() != null ? oe.getErrorCode().toString() : "";
logger.warn(errInfo, oe);
fail(oe.getErrorCode());
} catch (FlowException fe) {
String errInfo = fe.getErrorCode() != null ? fe.getErrorCode().toString() : "";
logger.warn(errInfo, fe);
fail(fe.getErrorCode());
} catch (Throwable t) {
logger.warn(String.format("[FlowChain(%s): %s] unhandled exception when executing flow[%s], start to rollback",
id, name, flow.getClass().getName()), t);
fail(errf.throwableToInternalError(t));
}
}
//当前任务运行成功后,运行下一个任务
@Override
public void next() {
if (!isStart) {
throw new CloudRuntimeException(
String.format("[FlowChain(%s): %s] you must call start() first, and only call next() in Flow.run()",
id, name));
}
rollBackTasks.push(currentTask); //将执行了的任务放到已执行结束的历史列表中
Task task = getFirstTask();
if (task == null) {
if (errorCode == null) {
callDoneHandler(); //如果没有错误出现,则完成任务执行,调用结束回调函数
} else {
callErrorHandler(false); //调用错误处理函数
}
} else {
runTask(task); //运行任务
}
}
private Task getFirstTask() {
Task task = null;
while (it.hasNext()) {
task = it.next();
if (!isOKTask(task)) {
break;
}
}
return task;
}
这样我们只要调用start()方法,就能开始实行了。我们可以测试简单测试一下:
public void test() {
WorkFlowChain workChain = new WorkFlowChain();
WorkFlowChain.then(new Task(TaskOperator op){
//Task1;
op.next();
}).then(new Task(TaskOperator op){
//Task2
op.next();
}).then(new Task(TaskOperator op){
//Task3
op.next();
}).start();
}
这样,我们的简单工作流就基本设计好啦。当然在实际的工作中,还需要设计回滚、不同的任务有不同特性等,以及各种异常情况都需要考虑。
上一篇: OC runtime学习笔记之关联对象
下一篇: iOS使用核心的50行代码撸一个路由组件