欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Netty + ZooKeeper 实现简单的服务注册与发现

程序员文章站 2024-02-14 15:05:10
一. 背景 最近的一个项目:我们的系统接收到上游系统的派单任务后,会推送到指定的门店的相关设备,并进行相应的业务处理。 二. netty 的使用 在接收到派单任务之后...

一. 背景

最近的一个项目:我们的系统接收到上游系统的派单任务后,会推送到指定的门店的相关设备,并进行相应的业务处理。

二. netty 的使用

在接收到派单任务之后,通过 netty 推送到指定门店相关的设备。在我们的系统中 netty 实现了消息推送、长连接以及心跳机制。

Netty + ZooKeeper 实现简单的服务注册与发现

2.1 netty server 端:

每个 netty 服务端通过 concurrenthashmap 保存了客户端的 clientid 以及它连接的 socketchannel。

服务器端向客户端发送消息时,只要获取 clientid 对应的 socketchannel,往 socketchannel 里写入相应的 message 即可。

eventloopgroup boss = new nioeventloopgroup(1);
  eventloopgroup worker = new nioeventloopgroup();
  serverbootstrap bootstrap = new serverbootstrap();
  bootstrap.group(boss, worker)
    .channel(nioserversocketchannel.class)
    .option(channeloption.so_backlog, 128)
    .option(channeloption.tcp_nodelay, true)
    .childoption(channeloption.so_keepalive, true)
    .childhandler(new channelinitializer() {
     @override
     protected void initchannel(channel channel) throws exception {
      channelpipeline p = channel.pipeline();
      p.addlast(new messageencoder());
      p.addlast(new messagedecoder());
      p.addlast(new pushserverhandler());
     }
    });
  channelfuture future = bootstrap.bind(host,port).sync();
  if (future.issuccess()) {
   logger.info("server start...");
  }

2.2 netty client 端:

客户端用于接收服务端的消息,随即进行业务处理。客户端还有心跳机制,它通过 idleevent 事件定时向服务端放送 ping 消息以此来检测 socketchannel 是否中断。

public pushclientbootstrap(string host, int port) throws interruptedexception {
  this.host = host;
  this.port = port;
  start(host,port);
 }
 private void start(string host, int port) throws interruptedexception {
  bootstrap = new bootstrap();
  bootstrap.channel(niosocketchannel.class)
    .option(channeloption.so_keepalive, true)
    .group(workgroup)
    .remoteaddress(host, port)
    .handler(new channelinitializer(){
     @override
     protected void initchannel(channel channel) throws exception {
      channelpipeline p = channel.pipeline();
      p.addlast(new idlestatehandler(20, 10, 0)); // idlestatehandler 用于检测心跳
      p.addlast(new messagedecoder());
      p.addlast(new messageencoder());
      p.addlast(new pushclienthandler());
     }
    });
  doconnect(port, host);
 }
 /**
  * 建立连接,并且可以实现自动重连.
  * @param port port.
  * @param host host.
  * @throws interruptedexception interruptedexception.
  */
 private void doconnect(int port, string host) throws interruptedexception {
  if (socketchannel != null && socketchannel.isactive()) {
   return;
  }
  final int portconnect = port;
  final string hostconnect = host;
  channelfuture future = bootstrap.connect(host, port);
  future.addlistener(new channelfuturelistener() {
   @override
   public void operationcomplete(channelfuture futurelistener) throws exception {
    if (futurelistener.issuccess()) {
     socketchannel = (socketchannel) futurelistener.channel();
     logger.info("connect to server successfully!");
    } else {
     logger.info("failed to connect to server, try connect after 10s");
     futurelistener.channel().eventloop().schedule(new runnable() {
      @override
      public void run() {
       try {
        doconnect(portconnect, hostconnect);
       } catch (interruptedexception e) {
        e.printstacktrace();
       }
      }
     }, 10, timeunit.seconds);
    }
   }
  }).sync();
 }

三. 借助 zookeeper 实现简单的服务注册与发现

3.1 服务注册

服务注册本质上是为了解耦服务提供者和服务消费者。服务注册是一个高可用强一致性的服务发现存储仓库,主要用来存储服务的api和地址对应关系。为了高可用,服务注册中心一般为一个集群,并且能够保证分布式一致性。目前常用的有 zookeeper、etcd 等等。

在我们项目中采用了 zookeeper 实现服务注册。

public class serviceregistry {
 private static final logger logger = loggerfactory.getlogger(serviceregistry.class);
 private countdownlatch latch = new countdownlatch(1);
 private string registryaddress;
 public serviceregistry(string registryaddress) {
  this.registryaddress = registryaddress;
 }
 public void register(string data) {
  if (data != null) {
   zookeeper zk = connectserver();
   if (zk != null) {
    createnode(zk, data);
   }
  }
 }
 /**
  * 连接 zookeeper 服务器
  * @return
  */
 private zookeeper connectserver() {
  zookeeper zk = null;
  try {
   zk = new zookeeper(registryaddress, constants.zk_session_timeout, new watcher() {
    @override
    public void process(watchedevent event) {
     if (event.getstate() == event.keeperstate.syncconnected) {
      latch.countdown();
     }
    }
   });
   latch.await();
  } catch (ioexception | interruptedexception e) {
   logger.error("", e);
  }
  return zk;
 }
 /**
  * 创建节点
  * @param zk
  * @param data
  */
 private void createnode(zookeeper zk, string data) {
  try {
   byte[] bytes = data.getbytes();
   string path = zk.create(constants.zk_data_path, bytes, zoodefs.ids.open_acl_unsafe, createmode.ephemeral_sequential);
   logger.debug("create zookeeper node ({} => {})", path, data);
  } catch (keeperexception | interruptedexception e) {
   logger.error("", e);
  }
 }
}

有了服务注册,在 netty 服务端启动之后,将 netty 服务端的 ip 和 port 注册到 zookeeper。

eventloopgroup boss = new nioeventloopgroup(1);
  eventloopgroup worker = new nioeventloopgroup();
  serverbootstrap bootstrap = new serverbootstrap();
  bootstrap.group(boss, worker)
    .channel(nioserversocketchannel.class)
    .option(channeloption.so_backlog, 128)
    .option(channeloption.tcp_nodelay, true)
    .childoption(channeloption.so_keepalive, true)
    .childhandler(new channelinitializer() {
     @override
     protected void initchannel(channel channel) throws exception {
      channelpipeline p = channel.pipeline();
      p.addlast(new messageencoder());
      p.addlast(new messagedecoder());
      p.addlast(new pushserverhandler());
     }
    });
  channelfuture future = bootstrap.bind(host,port).sync();
  if (future.issuccess()) {
   logger.info("server start...");
  }
  if (serviceregistry != null) {
   serviceregistry.register(host + ":" + port);
  }

3.2 服务发现

这里我们采用的是客户端的服务发现,即服务发现机制由客户端实现。

客户端在和服务端建立连接之前,通过查询注册中心的方式来获取服务端的地址。如果存在有多个 netty 服务端的话,可以做服务的负载均衡。在我们的项目中只采用了简单的随机法进行负载。

public class servicediscovery {
 private static final logger logger = loggerfactory.getlogger(servicediscovery.class);
 private countdownlatch latch = new countdownlatch(1);
 private volatile list<string> serviceaddresslist = new arraylist<>();
 private string registryaddress; // 注册中心的地址
 public servicediscovery(string registryaddress) {
  this.registryaddress = registryaddress;
  zookeeper zk = connectserver();
  if (zk != null) {
   watchnode(zk);
  }
 }
 /**
  * 通过服务发现,获取服务提供方的地址
  * @return
  */
 public string discover() {
  string data = null;
  int size = serviceaddresslist.size();
  if (size > 0) {
   if (size == 1) { //只有一个服务提供方
    data = serviceaddresslist.get(0);
    logger.info("unique service address : {}", data);
   } else {   //使用随机分配法。简单的负载均衡法
    data = serviceaddresslist.get(threadlocalrandom.current().nextint(size));
    logger.info("choose an address : {}", data);
   }
  }
  return data;
 }
 /**
  * 连接 zookeeper
  * @return
  */
 private zookeeper connectserver() {
  zookeeper zk = null;
  try {
   zk = new zookeeper(registryaddress, constants.zk_session_timeout, new watcher() {
    @override
    public void process(watchedevent event) {
     if (event.getstate() == watcher.event.keeperstate.syncconnected) {
      latch.countdown();
     }
    }
   });
   latch.await();
  } catch (ioexception | interruptedexception e) {
   logger.error("", e);
  }
  return zk;
 }
 /**
  * 获取服务地址列表
  * @param zk
  */
 private void watchnode(final zookeeper zk) {
  try {
   //获取子节点列表
   list<string> nodelist = zk.getchildren(constants.zk_registry_path, new watcher() {
    @override
    public void process(watchedevent event) {
     if (event.gettype() == event.eventtype.nodechildrenchanged) {
      //发生子节点变化时再次调用此方法更新服务地址
      watchnode(zk);
     }
    }
   });
   list<string> datalist = new arraylist<>();
   for (string node : nodelist) {
    byte[] bytes = zk.getdata(constants.zk_registry_path + "/" + node, false, null);
    datalist.add(new string(bytes));
   }
   logger.debug("node data: {}", datalist);
   this.serviceaddresslist = datalist;
  } catch (keeperexception | interruptedexception e) {
   logger.error("", e);
  }
 }
}

netty 客户端启动之后,通过服务发现获取 netty 服务端的 ip 和 port。

/**
  * 支持通过服务发现来获取 socket 服务端的 host、port
  * @param discoveryaddress
  * @throws interruptedexception
  */
 public pushclientbootstrap(string discoveryaddress) throws interruptedexception {

  servicediscovery = new servicediscovery(discoveryaddress);
  serveraddress = servicediscovery.discover();

  if (serveraddress!=null) {
   string[] array = serveraddress.split(":");
   if (array!=null && array.length==2) {

    string host = array[0];
    int port = integer.parseint(array[1]);

    start(host,port);
   }
  }
 }

四. 总结

服务注册和发现一直是分布式的核心组件。本文介绍了借助 zookeeper 做注册中心,如何实现一个简单的服务注册和发现。其实,注册中心的选择有很多,例如 etcd、eureka 等等。选择符合我们业务需求的才是最重要的。

以上所述是小编给大家介绍的netty + zookeeper 实现简单的服务注册与发现,希望对大家有所帮助