计算过程中的故障和容灾处理
程序员文章站
2022-03-03 07:57:17
...
使用Fourinone可以完成大部分分布式并行计算需求, 但是计算过程中的故障和容灾处理是怎么进行的呢, 这里详细分析一下:
总的来说,Fourinone框架不会在设计中抛弃错误不处理或者容忍错误导致框架崩溃,框架通常会捕获所有的错误反馈给开发者去处理,但是框架本身不自作主张,替开发者考虑处理方案,只有这样框架才能从特定场景中抽象出来,给开发者更灵活的发挥和去满足各种更复杂业务容错情况。
那么框架究竟关注和不关注哪个层面的故障呢?
并行计算过程中,通常有两种类型的故障:一种是系统故障引起的计算中断(宕机和网络故障), 一种是业务逻辑意义上的错误数据。前者是框架关注的,后者是业务逻辑开发者关注的。
系统故障导致网络断掉或者宕机,框架会捕获故障信息并通告,工头在检验工人执行状态时会获知,并进行相应的业务上的故障处理,比如重发或者单独记录日志。业务逻辑意义上的错误数据,通常在工人的业务实现逻辑里去判断,比如计算结果的金额为负数是一个不符合业务要求的错误数据,这个是由开发者去控制,框架不做业务逻辑上的错误处理。
针对故障,框架又是怎样容灾的呢?
通常一个典型的分布式计算结构,由工头、工人、职介所组成,我们详细分析一下这几个角色在故障时各自如何容灾:
工头是嵌入式的,他不是一个服务程序,由嵌入他的系统new工头类并管理他的生命周期,工头不存在恢复或者容灾的概念,就好比我们写一个helloworld的main函数,很少考虑程序运行到hello, world没有输出时就宕机了。但是如果嵌入工头的系统是一个定时执行的计算任务时,也许要考虑容灾,因为涉及单点问题,可以让两个工头竞争一个分布式锁实现(详细参考分布式锁demo)。
工人和职介所是服务程序,如果工人节点故障,职介所会实时感知,工头分配计算时会获取到最新活跃工人数量,如果是职介所节点故障,Fourinone实现了领导者选举机制,会实时切换到备份职介所上(详细参考统一配置管理主备领导者切换)。
换句话说,如果一个工人节点在计算开始前发生故障不可用,工头通过getWaitingWorkers获取可用工人时不会包括该工人节点,因为职介所会感知每个工人的可用状态。
如果工人在计算过程中发生故障,框架会进行截获,然后提前返回计算结果,并设置结果的状态为异常。也就是正常完成计算时:result.getStatus()==WareHouse.READY
计算过程发生故障中断时:result.getStatus()==WareHouse.EXCEPTION
这样工头就可以根据检查结果的状态,来做故障时的容灾处理。
实际上也可以在工人的doTask实现方法内部捕捉业务异常,由开发者根据程序实现*决定。
以下demo演示了Fourinone计算过程中的故障容灾处理:
FaultCtor:是一个工头实现,它调用集群中一个工人doTask执行任务,然后轮询该结果,判断结果是否完成或者是否异常,如果结果状态为异常,则打印消息。实际上这里只是简单演示机制,现实场景中,可以将任务先记录,工人执行成功后再删除并跳转下一个任务,如果异常则继续重发其他工人执行该任务,或者采用其他故障策略,统一记录到错误日志,在其他时间再另行排查处理。
FaultWorker:是一个工人实现,它模拟了一个任务执行,睡眠了8秒钟,然后再制造一个空指针异常。该工人模拟了两种系统异常状况,计算过程中可以关闭它,或者等待它运行到空指针异常查看效果,注意这里doTask本身是不抛出和捕捉异常的,由框架去处理。
运行步骤:
编译demo的java类:Javac –classpath fourinone.jar; *.java
1、 启动ParkServerDemo(它的IP端口已经在配置文件的PARK部分的SERVERS指定)
Java –classpath fourinone.jar; ParkServerDemo
2、 运行FaultWorker(传入端口号参数)
Java –classpath fourinone.jar; FaultWorker 2008
3、 运行FaultCtor
Java –classpath fourinone.jar; FaultCtor
运行后工人进入8秒中“任务执行”,这时可以将该工人进程关闭,然后会查看到工头界面输出something wrong about wks[0] result,说明框架已经屏蔽系统故障并反馈到任务结果的异常状态中,如果8秒中内不关闭,会引发另外一个空指针异常,产生同样的异常状态。
完整demo源码如下:
// ParkServerDemo
// FaultWorker
// FaultCtor
总的来说,Fourinone框架不会在设计中抛弃错误不处理或者容忍错误导致框架崩溃,框架通常会捕获所有的错误反馈给开发者去处理,但是框架本身不自作主张,替开发者考虑处理方案,只有这样框架才能从特定场景中抽象出来,给开发者更灵活的发挥和去满足各种更复杂业务容错情况。
那么框架究竟关注和不关注哪个层面的故障呢?
并行计算过程中,通常有两种类型的故障:一种是系统故障引起的计算中断(宕机和网络故障), 一种是业务逻辑意义上的错误数据。前者是框架关注的,后者是业务逻辑开发者关注的。
系统故障导致网络断掉或者宕机,框架会捕获故障信息并通告,工头在检验工人执行状态时会获知,并进行相应的业务上的故障处理,比如重发或者单独记录日志。业务逻辑意义上的错误数据,通常在工人的业务实现逻辑里去判断,比如计算结果的金额为负数是一个不符合业务要求的错误数据,这个是由开发者去控制,框架不做业务逻辑上的错误处理。
针对故障,框架又是怎样容灾的呢?
通常一个典型的分布式计算结构,由工头、工人、职介所组成,我们详细分析一下这几个角色在故障时各自如何容灾:
工头是嵌入式的,他不是一个服务程序,由嵌入他的系统new工头类并管理他的生命周期,工头不存在恢复或者容灾的概念,就好比我们写一个helloworld的main函数,很少考虑程序运行到hello, world没有输出时就宕机了。但是如果嵌入工头的系统是一个定时执行的计算任务时,也许要考虑容灾,因为涉及单点问题,可以让两个工头竞争一个分布式锁实现(详细参考分布式锁demo)。
工人和职介所是服务程序,如果工人节点故障,职介所会实时感知,工头分配计算时会获取到最新活跃工人数量,如果是职介所节点故障,Fourinone实现了领导者选举机制,会实时切换到备份职介所上(详细参考统一配置管理主备领导者切换)。
换句话说,如果一个工人节点在计算开始前发生故障不可用,工头通过getWaitingWorkers获取可用工人时不会包括该工人节点,因为职介所会感知每个工人的可用状态。
如果工人在计算过程中发生故障,框架会进行截获,然后提前返回计算结果,并设置结果的状态为异常。也就是正常完成计算时:result.getStatus()==WareHouse.READY
计算过程发生故障中断时:result.getStatus()==WareHouse.EXCEPTION
这样工头就可以根据检查结果的状态,来做故障时的容灾处理。
实际上也可以在工人的doTask实现方法内部捕捉业务异常,由开发者根据程序实现*决定。
以下demo演示了Fourinone计算过程中的故障容灾处理:
FaultCtor:是一个工头实现,它调用集群中一个工人doTask执行任务,然后轮询该结果,判断结果是否完成或者是否异常,如果结果状态为异常,则打印消息。实际上这里只是简单演示机制,现实场景中,可以将任务先记录,工人执行成功后再删除并跳转下一个任务,如果异常则继续重发其他工人执行该任务,或者采用其他故障策略,统一记录到错误日志,在其他时间再另行排查处理。
FaultWorker:是一个工人实现,它模拟了一个任务执行,睡眠了8秒钟,然后再制造一个空指针异常。该工人模拟了两种系统异常状况,计算过程中可以关闭它,或者等待它运行到空指针异常查看效果,注意这里doTask本身是不抛出和捕捉异常的,由框架去处理。
运行步骤:
编译demo的java类:Javac –classpath fourinone.jar; *.java
1、 启动ParkServerDemo(它的IP端口已经在配置文件的PARK部分的SERVERS指定)
Java –classpath fourinone.jar; ParkServerDemo
2、 运行FaultWorker(传入端口号参数)
Java –classpath fourinone.jar; FaultWorker 2008
3、 运行FaultCtor
Java –classpath fourinone.jar; FaultCtor
运行后工人进入8秒中“任务执行”,这时可以将该工人进程关闭,然后会查看到工头界面输出something wrong about wks[0] result,说明框架已经屏蔽系统故障并反馈到任务结果的异常状态中,如果8秒中内不关闭,会引发另外一个空指针异常,产生同样的异常状态。
完整demo源码如下:
// ParkServerDemo
import com.fourinone.BeanContext; public class ParkServerDemo { public static void main(String[] args) { BeanContext.startPark(); } }
// FaultWorker
import com.fourinone.MigrantWorker; import com.fourinone.WareHouse; import com.fourinone.Workman; public class FaultWorker extends MigrantWorker { public WareHouse doTask(WareHouse inhouse) { System.out.println(inhouse.getString("word")); try{Thread.sleep(8000L);}catch(Exception ex){} String[] strs = null; System.out.println(strs.length); WareHouse wh = new WareHouse("word", "hello "); return wh; } public static void main(String[] args) { FaultWorker mw = new FaultWorker(); mw.waitWorking("localhost",Integer.parseInt(args[0]),"faultworker"); } }
// FaultCtor
import com.fourinone.Contractor; import com.fourinone.WareHouse; import com.fourinone.WorkerLocal; import java.util.ArrayList; public class FaultCtor extends Contractor { public WareHouse giveTask(WareHouse inhouse) { WorkerLocal[] wks = getWaitingWorkers("faultworker"); System.out.println("wks.length:"+wks.length); WareHouse wh = new WareHouse("word", "hello"); WareHouse result = wks[0].doTask(wh); System.out.println("result:"+result); while(true){ if(result.getStatus()==WareHouse.READY){ System.out.println("result:"+result); break; } else if(result.getStatus()==WareHouse.EXCEPTION){ System.out.println("something wrong about wks[0] result"); //doTask(wh) again or put wh into log break; } } return null; } public static void main(String[] args) { FaultCtor a = new FaultCtor(); a.giveTask(null); a.exit(); } }