欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Akka之消息路由

程序员文章站 2024-02-15 19:04:11
...

Akka提供了非常灵活的消息发送机制。有时候我们使用一组Actor来提供服务,这一组Actor中所有的Actor都是对等的,也就是说你可以找任何一个Actor来为你服务。这种情况下,如何才能快速有效地找到合适的Actor呢?或者说如何调度这些消息,才可以使负载更为均衡地分配在这一组Actor中呢?为了解决这个问题,Akka使用一个路由器组件(Router)来封装消息的调度。系统提供了几种实用的消息路由策略,比如,轮询选择Actor进行消息发送,随机消息发送,将消息发送给最为空闲的Actor,甚至是在组内广播消息。

实例

package com.bzb.java8.akka;

import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;

/**
 * @author bzb
 * @Description: 当前示例中的唯一一个Actor
 * @date 2018/9/14 14:16
 */
public class MyWorker extends UntypedActor {

    private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    public static enum Msg {
        WORKING, DONE, CLOSE;
    }

    @Override
    public void onReceive(Object msg) throws Exception {
        if (msg == Msg.WORKING) {
            System.out.println("I am working");
        }
        if (msg == Msg.DONE) {
            System.out.println("Stop working");
        }
        if (msg == Msg.CLOSE) {
            System.out.println("I will shutdow");

            getSender().tell(Msg.CLOSE, getSelf());

            // 收到CLOSE消息时,关闭自己,结束运行。
            getContext().stop(getSelf());
        } else {
            unhandled(msg);
        }
    }
}
package com.bzb.java8.akka;

import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.routing.ActorRefRoutee;
import akka.routing.RoundRobinRoutingLogic;
import akka.routing.Routee;
import akka.routing.Router;

import java.util.ArrayList;
import java.util.List;

/**
 * @author bzb
 * @Description: 监视者,如同劳动监工一样,一旦被监视者因为停止工作,则监视者就会收到一条消息
 * @date 2018/9/14 14:27
 */
public class WatchActor extends UntypedActor {

    private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    // 定义路由器组件Router
    public Router router;

    {
        List<Routee> routees = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            ActorRef worker = getContext().actorOf(Props.create(MyWorker.class), "worker-" + i);
            getContext().watch(worker);
            routees.add(new ActorRefRoutee(worker));
        }

        // 构造Router时,需要指定路由策略和一组被路由的Actor(Routee)。这里使用了RoundRobinRoutingLogin,也就是对所有的Routee轮询发送消息。
        /**
         * RoundRobinRoutingLogic: 轮询
         * BroadcastRoutingLogic: 广播
         * RandomRoutingLogic: 随机
         * SmallestMailboxRoutingLogic: 空闲
         */
        router = new Router(new RoundRobinRoutingLogic(), routees);
    }

    @Override
    public void onReceive(Object msg) throws Exception {
        if (msg instanceof MyWorker.Msg) {
            // 当需要投递消息给这5个worker时,只需要将消息投递给这个Router即可。
            // Router给根据给定的路由策略进行消息投递。
            router.route(msg, getSender());
        } else if (msg instanceof Terminated) { // 终止消息

            // 当一个worker停止工作时,可以简单的将其从工作组中移除
            router = router.removeRoutee(((Terminated) msg).actor());

            // 打印终止的Actor路径和剩余工作组的大小
            System.out.println(((Terminated) msg).actor().path() + " is closed, routees = " + router.routees().size());

            // 如果系统中没有可以的Actor,就会直接关闭系统
            if (router.routees().size() == 0) {
                System.out.println("Closed Systme");
                RouteMain.flag.send(false);
                getContext().system().shutdown();
            }
         } else {
            unhandled(msg);
        }
    }
}
package com.bzb.java8.akka;


import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.agent.Agent;
import akka.dispatch.ExecutionContexts;
import com.typesafe.config.ConfigFactory;
import scala.concurrent.ExecutionContext;

/**
 * @author bzb
 * @Description:
 * @date 2018/9/14 17:42
 */
public class RouteMain {

    public static Agent<Boolean> flag = Agent.create(true, ExecutionContexts.global());

    public static void main(String[] args) throws InterruptedException {
        ActorSystem system = ActorSystem.create("route", ConfigFactory.load("samplehello.conf"));

        ActorRef watcher = system.actorOf(Props.create(WatchActor.class), "watcher");

        int i = 1;

        // 不停发送消息,知道flag为false,即路由绑定的工作组中的Actor为0
        while (flag.get()) {
            watcher.tell(MyWorker.Msg.WORKING, ActorRef.noSender());
            if (i % 10 == 0) {
                watcher.tell(MyWorker.Msg.CLOSE, ActorRef.noSender()); // 每10次发送一条关闭消息
            }
            i++;
            Thread.sleep(100);
        }
    }
}

参考资源

<实战java高并发程序设计>

相关标签: akka之消息路由