Akka之消息路由
程序员文章站
2024-02-15 19:04:11
...
Akka提供了非常灵活的消息发送机制。有时候我们使用一组Actor来提供服务,这一组Actor中所有的Actor都是对等的,也就是说你可以找任何一个Actor来为你服务。这种情况下,如何才能快速有效地找到合适的Actor呢?或者说如何调度这些消息,才可以使负载更为均衡地分配在这一组Actor中呢?为了解决这个问题,Akka使用一个路由器组件(Router)来封装消息的调度。系统提供了几种实用的消息路由策略,比如,轮询选择Actor进行消息发送,随机消息发送,将消息发送给最为空闲的Actor,甚至是在组内广播消息。
实例
package com.bzb.java8.akka;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
/**
* @author bzb
* @Description: 当前示例中的唯一一个Actor
* @date 2018/9/14 14:16
*/
public class MyWorker extends UntypedActor {
private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
public static enum Msg {
WORKING, DONE, CLOSE;
}
@Override
public void onReceive(Object msg) throws Exception {
if (msg == Msg.WORKING) {
System.out.println("I am working");
}
if (msg == Msg.DONE) {
System.out.println("Stop working");
}
if (msg == Msg.CLOSE) {
System.out.println("I will shutdow");
getSender().tell(Msg.CLOSE, getSelf());
// 收到CLOSE消息时,关闭自己,结束运行。
getContext().stop(getSelf());
} else {
unhandled(msg);
}
}
}
package com.bzb.java8.akka;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.routing.ActorRefRoutee;
import akka.routing.RoundRobinRoutingLogic;
import akka.routing.Routee;
import akka.routing.Router;
import java.util.ArrayList;
import java.util.List;
/**
* @author bzb
* @Description: 监视者,如同劳动监工一样,一旦被监视者因为停止工作,则监视者就会收到一条消息
* @date 2018/9/14 14:27
*/
public class WatchActor extends UntypedActor {
private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
// 定义路由器组件Router
public Router router;
{
List<Routee> routees = new ArrayList<>();
for (int i = 0; i < 5; i++) {
ActorRef worker = getContext().actorOf(Props.create(MyWorker.class), "worker-" + i);
getContext().watch(worker);
routees.add(new ActorRefRoutee(worker));
}
// 构造Router时,需要指定路由策略和一组被路由的Actor(Routee)。这里使用了RoundRobinRoutingLogin,也就是对所有的Routee轮询发送消息。
/**
* RoundRobinRoutingLogic: 轮询
* BroadcastRoutingLogic: 广播
* RandomRoutingLogic: 随机
* SmallestMailboxRoutingLogic: 空闲
*/
router = new Router(new RoundRobinRoutingLogic(), routees);
}
@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof MyWorker.Msg) {
// 当需要投递消息给这5个worker时,只需要将消息投递给这个Router即可。
// Router给根据给定的路由策略进行消息投递。
router.route(msg, getSender());
} else if (msg instanceof Terminated) { // 终止消息
// 当一个worker停止工作时,可以简单的将其从工作组中移除
router = router.removeRoutee(((Terminated) msg).actor());
// 打印终止的Actor路径和剩余工作组的大小
System.out.println(((Terminated) msg).actor().path() + " is closed, routees = " + router.routees().size());
// 如果系统中没有可以的Actor,就会直接关闭系统
if (router.routees().size() == 0) {
System.out.println("Closed Systme");
RouteMain.flag.send(false);
getContext().system().shutdown();
}
} else {
unhandled(msg);
}
}
}
package com.bzb.java8.akka;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.agent.Agent;
import akka.dispatch.ExecutionContexts;
import com.typesafe.config.ConfigFactory;
import scala.concurrent.ExecutionContext;
/**
* @author bzb
* @Description:
* @date 2018/9/14 17:42
*/
public class RouteMain {
public static Agent<Boolean> flag = Agent.create(true, ExecutionContexts.global());
public static void main(String[] args) throws InterruptedException {
ActorSystem system = ActorSystem.create("route", ConfigFactory.load("samplehello.conf"));
ActorRef watcher = system.actorOf(Props.create(WatchActor.class), "watcher");
int i = 1;
// 不停发送消息,知道flag为false,即路由绑定的工作组中的Actor为0
while (flag.get()) {
watcher.tell(MyWorker.Msg.WORKING, ActorRef.noSender());
if (i % 10 == 0) {
watcher.tell(MyWorker.Msg.CLOSE, ActorRef.noSender()); // 每10次发送一条关闭消息
}
i++;
Thread.sleep(100);
}
}
}
参考资源
<实战java高并发程序设计>
上一篇: Linux 路由基础知识介绍
下一篇: Angular 路由 ( 二 )