NServiceBus+Saga开发分布式应用
前言
当你在处理异步消息时,每个单独的消息处理程序都是一个单独的handler,每个handler之间互不影响。这时如果一个消息依赖另一个消息的状态呢? 这时业务逻辑怎么处理?
借用我们上篇文章的业务场景,如果在ship项目里需要发送一个shiporder command。这个shiporder需要依赖sales.orderplaced和bill.orderbilled command的状态,目前我们的两个单独的message handler都没有保持任何的状态字段,所以这时如果我们需要完成这个业务模型,就需要跟踪他们的状态。
什么是saga
这个就是本篇文章要提的saga,定义在nservicebus框架里,他的本质是一个消息驱动模型里的状态机,或者也可以理解为一系列消息处理程序用来共享状态的业务模型。我理解在消息队列里如果我们要保证消息一致性通常会自己创建一张event表,这里saga维持状态的角色有点像我们这里的event表。
好的,回到正题上,如果我们需要在shipping service里发送一个shiporder,发送他之前需要确定orderplaced和orderbilled的状态,确保这两个消息都收到以后才能发送shiporder。
如何使用saga
当然,我暂且理解saga的目的是为了处理在长时间运行的任务里保证数据一致性这样的一个角色。
saga状态
saga状态主要是告诉nservicebus在处理数据一致性的判断逻辑,这里需要继承抽象类containsagadata,在我们这个业务场景中则主要是判断orderplaced和orderbilled消息是否已经接收到并处理。
public class shippingpolicydata:containsagadata { public string orderid { get; set; } public bool isorderplaced { get; set; } public bool isorderbilled { get; set; } }
saga如何工作
有了状态以后,我们还需要一个“handler”来告诉nservicebus,在这个handler里主要用来处理消息数据一致性,我看了官方文档后,他们建议我们这里的handler角色使用policy后缀命名,当然我觉的也可以用saga后缀命名,比如shippingpolicy或者shippingsaga。
到这里也就是我们的orderplaced和orderbilled消息都收到了,业务逻辑符合要求,可以发送shiporder消息了,也就是用户创建了订单,付了款,可以发货了。 新建shiporder类 新建shiporderhandler 运行shipping项目,看到下图,则说明程序运行成功,我们这个业务场景里orderplaced消息肯定先接受到,orderbilled消息后接受到。
同时这里我们这个handler觉色还要继承saga public class shippolicy:saga<shippingpolicydata>,
iamstartedbymessages<orderplaced>,
iamstartedbymessages<orderbilled> //都可以创建saga实例
{
private static ilog log = logmanager.getlogger<shippolicy>();
protected override void configurehowtofindsaga(sagapropertymapper<shippingpolicydata> mapper)
{
mapper.configuremapping<orderplaced>(t=>t.orderid).tosaga(sagadata=>sagadata.orderid);
mapper.configuremapping<orderbilled>(t=>t.orderid).tosaga(sagadata=>sagadata.orderid);
}
public task handle(orderplaced message, imessagehandlercontext context)
{
log.info("orderplaced message received ");
this.data.isorderplaced = true;
return processorder(context);
}
public task handle(orderbilled message, imessagehandlercontext context)
{
log.info("orderbilled message received");
this.data.isorderbilled = true;
return processorder(context);
}
private async task processorder(imessagehandlercontext context)
{
if (data.isorderbilled && data.isorderplaced)
{
await context.sendlocal(new shiporder()
{
orderid = data.orderid
});
markascomplete();
}
}
}
这个类里你会发现还实现了接口iamstartedbymessages发送shiporder command
public class shiporder:icommand
{
public string orderid { get; set; }
}
public class shiporderhandler:ihandlemessages<shiporder>
{
private static ilog log = logmanager.getlogger<shiporderhandler>();
public task handle(shiporder message, imessagehandlercontext context)
{
log.info($"order [{message.orderid}] - successfully shipped");
return task.completedtask;
}
}
参考链接
上一篇: C#运算符