欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

计算过程中的故障和容灾处理

程序员文章站 2022-05-21 16:51:54
...
使用Fourinone可以完成大部分分布式并行计算需求, 但是计算过程中的故障和容灾处理是怎么进行的呢, 这里详细分析一下:

总的来说,Fourinone框架不会在设计中抛弃错误不处理或者容忍错误导致框架崩溃,框架通常会捕获所有的错误反馈给开发者去处理,但是框架本身不自作主张,替开发者考虑处理方案,只有这样框架才能从特定场景中抽象出来,给开发者更灵活的发挥和去满足各种更复杂业务容错情况。

那么框架究竟关注和不关注哪个层面的故障呢?
并行计算过程中,通常有两种类型的故障:一种是系统故障引起的计算中断(宕机和网络故障), 一种是业务逻辑意义上的错误数据。前者是框架关注的,后者是业务逻辑开发者关注的。
系统故障导致网络断掉或者宕机,框架会捕获故障信息并通告,工头在检验工人执行状态时会获知,并进行相应的业务上的故障处理,比如重发或者单独记录日志。业务逻辑意义上的错误数据,通常在工人的业务实现逻辑里去判断,比如计算结果的金额为负数是一个不符合业务要求的错误数据,这个是由开发者去控制,框架不做业务逻辑上的错误处理。

针对故障,框架又是怎样容灾的呢?
通常一个典型的分布式计算结构,由工头、工人、职介所组成,我们详细分析一下这几个角色在故障时各自如何容灾:

工头是嵌入式的,他不是一个服务程序,由嵌入他的系统new工头类并管理他的生命周期,工头不存在恢复或者容灾的概念,就好比我们写一个helloworld的main函数,很少考虑程序运行到hello, world没有输出时就宕机了。但是如果嵌入工头的系统是一个定时执行的计算任务时,也许要考虑容灾,因为涉及单点问题,可以让两个工头竞争一个分布式锁实现(详细参考分布式锁demo)。

工人和职介所是服务程序,如果工人节点故障,职介所会实时感知,工头分配计算时会获取到最新活跃工人数量,如果是职介所节点故障,Fourinone实现了领导者选举机制,会实时切换到备份职介所上(详细参考统一配置管理主备领导者切换)。

换句话说,如果一个工人节点在计算开始前发生故障不可用,工头通过getWaitingWorkers获取可用工人时不会包括该工人节点,因为职介所会感知每个工人的可用状态。

如果工人在计算过程中发生故障,框架会进行截获,然后提前返回计算结果,并设置结果的状态为异常。也就是正常完成计算时:result.getStatus()==WareHouse.READY
计算过程发生故障中断时:result.getStatus()==WareHouse.EXCEPTION
这样工头就可以根据检查结果的状态,来做故障时的容灾处理。
实际上也可以在工人的doTask实现方法内部捕捉业务异常,由开发者根据程序实现*决定。

以下demo演示了Fourinone计算过程中的故障容灾处理:
FaultCtor:是一个工头实现,它调用集群中一个工人doTask执行任务,然后轮询该结果,判断结果是否完成或者是否异常,如果结果状态为异常,则打印消息。实际上这里只是简单演示机制,现实场景中,可以将任务先记录,工人执行成功后再删除并跳转下一个任务,如果异常则继续重发其他工人执行该任务,或者采用其他故障策略,统一记录到错误日志,在其他时间再另行排查处理。

FaultWorker:是一个工人实现,它模拟了一个任务执行,睡眠了8秒钟,然后再制造一个空指针异常。该工人模拟了两种系统异常状况,计算过程中可以关闭它,或者等待它运行到空指针异常查看效果,注意这里doTask本身是不抛出和捕捉异常的,由框架去处理。
运行步骤:
编译demo的java类:Javac –classpath fourinone.jar; *.java
1、 启动ParkServerDemo(它的IP端口已经在配置文件的PARK部分的SERVERS指定)
Java –classpath fourinone.jar; ParkServerDemo
2、 运行FaultWorker(传入端口号参数)
Java –classpath fourinone.jar; FaultWorker 2008
3、 运行FaultCtor
Java –classpath fourinone.jar; FaultCtor

运行后工人进入8秒中“任务执行”,这时可以将该工人进程关闭,然后会查看到工头界面输出something wrong about wks[0] result,说明框架已经屏蔽系统故障并反馈到任务结果的异常状态中,如果8秒中内不关闭,会引发另外一个空指针异常,产生同样的异常状态。

完整demo源码如下:
// ParkServerDemo
import com.fourinone.BeanContext;
public class ParkServerDemo
{
	public static void main(String[] args)
	{
		BeanContext.startPark();
	}
}


// FaultWorker
import com.fourinone.MigrantWorker;
import com.fourinone.WareHouse;
import com.fourinone.Workman;

public class FaultWorker extends MigrantWorker
{
	public WareHouse doTask(WareHouse inhouse)
	{	
		System.out.println(inhouse.getString("word"));
		try{Thread.sleep(8000L);}catch(Exception ex){}
		String[] strs = null;
		System.out.println(strs.length);
		WareHouse wh = new WareHouse("word", "hello ");
		return wh;
	}
	
	public static void main(String[] args)
	{
		FaultWorker mw = new FaultWorker();
		mw.waitWorking("localhost",Integer.parseInt(args[0]),"faultworker");
	}
}


// FaultCtor
import com.fourinone.Contractor;
import com.fourinone.WareHouse;
import com.fourinone.WorkerLocal;
import java.util.ArrayList;

public class FaultCtor extends Contractor
{
	public WareHouse giveTask(WareHouse inhouse)
	{
		WorkerLocal[] wks = getWaitingWorkers("faultworker");
		System.out.println("wks.length:"+wks.length);
		
		WareHouse wh = new WareHouse("word", "hello");
		WareHouse result = wks[0].doTask(wh);
		System.out.println("result:"+result);
		
		while(true){
			if(result.getStatus()==WareHouse.READY){
				System.out.println("result:"+result);
				break;
			}
			else if(result.getStatus()==WareHouse.EXCEPTION){
				System.out.println("something wrong about wks[0] result");
				//doTask(wh) again or put wh into log
				break;
			}
		}
		
		return null;
	}
	
	public static void main(String[] args)
	{
		FaultCtor a = new FaultCtor();
		a.giveTask(null);
		a.exit();
	}
}