您可能从先前的第1部分和第2 部分中记得,我们正在实现一个解决方案,该解决方案将同步客户端与异步系统集成在一起。 今天,我们将看到如何跟踪异步操作,以便我们可以向客户端提供同步响应。 让我们从架构图开始。
我们可以通过一个例子来了解系统。 警察向我们发送了删除非法物品的请求,并期望在10秒内做出回应。 本示例的相关状态为:
- 200:该项目已在发布它的每个容器中成功删除
- 404:该项目在我们的系统中不存在
- 504:尝试删除该项目的超时
执法服务使用Kafka异步与Items服务通信。 这意味着我们需要订阅一个名为item_deleted
的主题 。 为了增加系统的复杂性,我们需要处理一些复用,因为该项目可以作为个人时间线或不同的组发布在不同的容器中。 在进入解决方案的细节之前,让我们定义状态的含义。
定义状态
状态是跟踪我们系统中发生的情况的能力。 无状态应用程序将是一个纯函数,没有任何副作用。 它接收输入,按照一些规则对其进行转换,然后返回输出。 这样的无状态应用程序在业务环境中不是很有用。 企业和用户希望了解过去发生的事情,因此他们可以做出明智的决定。
但是,我们不需要将状态保留在我们的应用服务器中。 状态通常存储在数据存储区或客户端中。 一个典型的例子是基于http的应用程序中的会话管理。 Http是一种无状态协议,这意味着要在请求之间保持状态,我们需要自己做,而无需协议的帮助。
粘性会议是几年前的流行解决方案。 状态存储在服务器的内存中,因此客户端需要跟踪已为其分配了哪个服务器。 此解决方案有几个问题:
- 容错能力:如果服务器崩溃,会话将丢失。 在这种情况下,用户体验确实很差
- 可扩展性:如果某些服务器不堪重负,我们将不容易进行横向扩展,因为某些用户在会话结束之前一直与该特定服务器绑定。 在服务器之间复制会话非常复杂
另一种方法是将会话保留在客户端和/或某些数据存储区(例如Redis)中的cookie中。 因此,我们使服务器保持无状态,从而有助于负载平衡器有效地分配请求。
此示例使我们了解请求之间的状态,但是我们可以在单个请求中包含状态。 让我们看看OOP如何处理状态。
OOP中的状态和行为
对象和演员负责保持自己的状态。 这种封装迫使客户端通过公开的接口与该状态进行交互。 在本例中,状态会影响对象的行为:
class Account(var balance: Int, var overdraft: Int = 0) {
def deposit(value: Int) = {
balance = balance + value
}
def withdrawal(value: Int) = {
val remaining = balance - value
if (remaining < 0) {
balance = 0
overdraft = overdraft + remaining.abs
notifyAccountHolder(overdraft)
} else {
balance = remaining
}
}
}
我们将根据余额中包含的状态交换取款行为。 一旦此代码获得更多特殊情况并分支,可读性和可维护性就会恶化。 我们可以使用多态性,合成或简单地提取私有方法来使这种复杂性可以承受。 Akka提供了一种非常方便的DSL,称为有限状态机,以实现这一目标。
有限状态机
如Erlang文档所述 :
FSM可以描述为以下形式的一组关系:
状态(S)x事件(E)->动作(A),状态(S')
这些关系被解释为以下含义:
如果我们处于状态S且事件E发生,则应执行动作A并转换到状态S'。
在我们的示例中:
State(positive balance) x Event(significative withdrawal) -> Actions (update balance, withdrawal, and notify account holder), State(negative balance)
同时,在负余额状态下,我们可以定义不同的规则,因为我们允许透支操作多少次。
Akka的有限状态机
让我们从一个较低的角度看一下有关将要实现的体系结构的图:
执法服务将包含ItemCensor actor的多个实例。
class ItemCensor extends Actor with FSM[State, Data]
为了使示例易于理解,我们将仅对两种可能的状态建模。 简单的FSM使用“ 变得/变得不受欢迎”功能进行了精心设计。
重要的是要注意状态和数据之间的区别。 您可以将状态视为FSM视觉表示的标签。 数据在每个州都是本地的。
sealed trait State
case object Idle extends State
case object Active extends State
sealed trait Data
case object Uninitialized extends Data
final case class ItemsToBeDeleted(items: Seq[Item]) extends Data
case class Item(itemId: UUID, containerId: UUID, containerType: String) {
def partitionKey = s"${itemId.toString}-${containerId.toString}"
}
FSM的第一步
在以后的文章中,我们将介绍如何创建参与者并管理其生命周期。 到目前为止,只需知道我们的系统中有一个具有协调职责的参与者就可以负责创建,恢复和合并这些ItemCensor参与者。 当协调器创建实例时,将在FSM内部执行该实例:
startWith(Idle, Uninitialized)
该Akka方法设置初始状态,在我们的示例中为空状态数据。 现在我们准备接收消息:
when(Idle) {
case Event(ItemsReported(items), _) =>
items match {
case List() => finishWorkWith(CensorResult(Left(ItemNotFound)))
case items =>
setTimer("CensorTimer", CensorTimeout, 10 seconds)
items.foreach(item => {
pipe(itemReportedProducer.publish(item)) to self
itemDeletedBus.subscribe(self, item.partitionKey)
})
goto(Active) using ItemsToBeDeleted(items)
}
}
让我们解释一下此片段。 when
方法定义某些状态的范围。 当处于Idle
状态时,此ItemsReported
将收到ItemsReported
类型的ItemsReported
。 您定义为与消息匹配的部分功能具有某些特殊性。 FSM将收到的消息包装到Event
对象中,并包含当前状态数据。 我们对消息进行模式匹配,如果我们发现任何容器中都没有项目,我们甚至都不会开始工作(这最终将导致404响应)。 否则,我们将启动一个计时器,该计时器将在10秒后发送CensorTimeout
类型的消息。
之后,我们将每个项目发布到Kafka中。 请记住,单个物理物品可以生活在不同的容器中,所以这就是我们谈论items
的原因。 ItemReportedProducer
返回未来,我们可以pipe
入同一个演员。 因此,我们可以倾听未来的失败,并使流程尽早失败。
我们将项目订阅到Akka Event Bus中 ,因此此actor可以对其删除的项目做出唯一反应。 最后,我们将FSM移至Active状态,其中包括ItemsToBeDeleted
的状态数据。
在FSM中更新数据状态
ItemCensor
actor需要等到Items服务完成删除项目为止。 Items服务将在Kafka中发布一些事件,我们的事件总线将订阅该主题。 ItemCensor
仅订阅了它感兴趣的项目,事件总线将把ItemCensor
类型的消息发送给ItemDeleted
。
when(Active) {
case Event(ItemDeleted(item), aaa@qq.com(items)) =>
val newItemsToBeDeleted = items.filterNot(_ == item)
newItemsToBeDeleted.size match {
case 0 => finishWorkWith(CensorResult(Right()))
case _ => stay using currentItemsToBeDeleted.copy(items = newItemsToBeDeleted)
}
}
一旦从事件总线中收到ItemDeleted
消息,我们就更新状态数据ItemsToBeDeleted
。 如果我们用尽了所有物品,则可以成功显示“ 正确”消息。 否则,我们将与新状态数据stay
相同的状态,等待新消息到达。
处理失败
Akka中的FSM允许您捕获actor收到的消息,但没有部分功能与其正确匹配。
whenUnhandled {
case Event(CensorTimeout, _) =>
finishWorkWith(CensorResult(Left(CensorTimeout("Censor timed out"))))
case Event(failure: Failure, _) =>
finishWorkWith(CensorResult(Left(CensorException(failure.cause.getMessage))))
}
whenUnhandled
将尝试匹配所有未处理的消息。 如果10秒后,男主角依然是围绕CensorTimeout
信息就会发送CensorTimer
所以我们可以适当的错误的情况下完成这项工作。 如果itemReportedProducer
无法将项目发布到Kafka,则当我们将itemReportedProducer
管道传输到self
,此代码将收到Failure
消息。
完成FSM
FSM的生命周期将由外部参与者(称为协调者)控制。 每当我们要完成此FSM的工作时,就必须向协调员发送一条消息:
private def finishWorkWith(message: Any) = {
coordinator ! message
goto(Idle)
}
我们不需要进入Idle
状态,但这样做会使读者更清楚该演员不再值班。
摘要
FSM是我们解决方案的核心。 在接下来的文章中,我们将看到我们如何集成,协调和监督这些FSM,以便它们可以实现其将同步客户端与异步系统桥接的目的。 同时,我们将看到Kafka和Akka Event Bus如何实现自己的pub-sub哲学版本,因此它们可以对系统中的更改做出异步反应。
翻译自: https://www.javacodegeeks.com/2016/05/finite-state-machines-akka.html