Netty + ZooKeeper 实现简单的服务注册与发现
一. 背景
最近的一个项目:我们的系统接收到上游系统的派单任务后,会推送到指定的门店的相关设备,并进行相应的业务处理。
二. netty 的使用
在接收到派单任务之后,通过 netty 推送到指定门店相关的设备。在我们的系统中 netty 实现了消息推送、长连接以及心跳机制。
2.1 netty server 端:
每个 netty 服务端通过 concurrenthashmap 保存了客户端的 clientid 以及它连接的 socketchannel。
服务器端向客户端发送消息时,只要获取 clientid 对应的 socketchannel,往 socketchannel 里写入相应的 message 即可。
eventloopgroup boss = new nioeventloopgroup(1); eventloopgroup worker = new nioeventloopgroup(); serverbootstrap bootstrap = new serverbootstrap(); bootstrap.group(boss, worker) .channel(nioserversocketchannel.class) .option(channeloption.so_backlog, 128) .option(channeloption.tcp_nodelay, true) .childoption(channeloption.so_keepalive, true) .childhandler(new channelinitializer() { @override protected void initchannel(channel channel) throws exception { channelpipeline p = channel.pipeline(); p.addlast(new messageencoder()); p.addlast(new messagedecoder()); p.addlast(new pushserverhandler()); } }); channelfuture future = bootstrap.bind(host,port).sync(); if (future.issuccess()) { logger.info("server start..."); }
2.2 netty client 端:
客户端用于接收服务端的消息,随即进行业务处理。客户端还有心跳机制,它通过 idleevent 事件定时向服务端放送 ping 消息以此来检测 socketchannel 是否中断。
public pushclientbootstrap(string host, int port) throws interruptedexception { this.host = host; this.port = port; start(host,port); } private void start(string host, int port) throws interruptedexception { bootstrap = new bootstrap(); bootstrap.channel(niosocketchannel.class) .option(channeloption.so_keepalive, true) .group(workgroup) .remoteaddress(host, port) .handler(new channelinitializer(){ @override protected void initchannel(channel channel) throws exception { channelpipeline p = channel.pipeline(); p.addlast(new idlestatehandler(20, 10, 0)); // idlestatehandler 用于检测心跳 p.addlast(new messagedecoder()); p.addlast(new messageencoder()); p.addlast(new pushclienthandler()); } }); doconnect(port, host); } /** * 建立连接,并且可以实现自动重连. * @param port port. * @param host host. * @throws interruptedexception interruptedexception. */ private void doconnect(int port, string host) throws interruptedexception { if (socketchannel != null && socketchannel.isactive()) { return; } final int portconnect = port; final string hostconnect = host; channelfuture future = bootstrap.connect(host, port); future.addlistener(new channelfuturelistener() { @override public void operationcomplete(channelfuture futurelistener) throws exception { if (futurelistener.issuccess()) { socketchannel = (socketchannel) futurelistener.channel(); logger.info("connect to server successfully!"); } else { logger.info("failed to connect to server, try connect after 10s"); futurelistener.channel().eventloop().schedule(new runnable() { @override public void run() { try { doconnect(portconnect, hostconnect); } catch (interruptedexception e) { e.printstacktrace(); } } }, 10, timeunit.seconds); } } }).sync(); }
三. 借助 zookeeper 实现简单的服务注册与发现
3.1 服务注册
服务注册本质上是为了解耦服务提供者和服务消费者。服务注册是一个高可用强一致性的服务发现存储仓库,主要用来存储服务的api和地址对应关系。为了高可用,服务注册中心一般为一个集群,并且能够保证分布式一致性。目前常用的有 zookeeper、etcd 等等。
在我们项目中采用了 zookeeper 实现服务注册。
public class serviceregistry { private static final logger logger = loggerfactory.getlogger(serviceregistry.class); private countdownlatch latch = new countdownlatch(1); private string registryaddress; public serviceregistry(string registryaddress) { this.registryaddress = registryaddress; } public void register(string data) { if (data != null) { zookeeper zk = connectserver(); if (zk != null) { createnode(zk, data); } } } /** * 连接 zookeeper 服务器 * @return */ private zookeeper connectserver() { zookeeper zk = null; try { zk = new zookeeper(registryaddress, constants.zk_session_timeout, new watcher() { @override public void process(watchedevent event) { if (event.getstate() == event.keeperstate.syncconnected) { latch.countdown(); } } }); latch.await(); } catch (ioexception | interruptedexception e) { logger.error("", e); } return zk; } /** * 创建节点 * @param zk * @param data */ private void createnode(zookeeper zk, string data) { try { byte[] bytes = data.getbytes(); string path = zk.create(constants.zk_data_path, bytes, zoodefs.ids.open_acl_unsafe, createmode.ephemeral_sequential); logger.debug("create zookeeper node ({} => {})", path, data); } catch (keeperexception | interruptedexception e) { logger.error("", e); } } }
有了服务注册,在 netty 服务端启动之后,将 netty 服务端的 ip 和 port 注册到 zookeeper。
eventloopgroup boss = new nioeventloopgroup(1); eventloopgroup worker = new nioeventloopgroup(); serverbootstrap bootstrap = new serverbootstrap(); bootstrap.group(boss, worker) .channel(nioserversocketchannel.class) .option(channeloption.so_backlog, 128) .option(channeloption.tcp_nodelay, true) .childoption(channeloption.so_keepalive, true) .childhandler(new channelinitializer() { @override protected void initchannel(channel channel) throws exception { channelpipeline p = channel.pipeline(); p.addlast(new messageencoder()); p.addlast(new messagedecoder()); p.addlast(new pushserverhandler()); } }); channelfuture future = bootstrap.bind(host,port).sync(); if (future.issuccess()) { logger.info("server start..."); } if (serviceregistry != null) { serviceregistry.register(host + ":" + port); }
3.2 服务发现
这里我们采用的是客户端的服务发现,即服务发现机制由客户端实现。
客户端在和服务端建立连接之前,通过查询注册中心的方式来获取服务端的地址。如果存在有多个 netty 服务端的话,可以做服务的负载均衡。在我们的项目中只采用了简单的随机法进行负载。
public class servicediscovery { private static final logger logger = loggerfactory.getlogger(servicediscovery.class); private countdownlatch latch = new countdownlatch(1); private volatile list<string> serviceaddresslist = new arraylist<>(); private string registryaddress; // 注册中心的地址 public servicediscovery(string registryaddress) { this.registryaddress = registryaddress; zookeeper zk = connectserver(); if (zk != null) { watchnode(zk); } } /** * 通过服务发现,获取服务提供方的地址 * @return */ public string discover() { string data = null; int size = serviceaddresslist.size(); if (size > 0) { if (size == 1) { //只有一个服务提供方 data = serviceaddresslist.get(0); logger.info("unique service address : {}", data); } else { //使用随机分配法。简单的负载均衡法 data = serviceaddresslist.get(threadlocalrandom.current().nextint(size)); logger.info("choose an address : {}", data); } } return data; } /** * 连接 zookeeper * @return */ private zookeeper connectserver() { zookeeper zk = null; try { zk = new zookeeper(registryaddress, constants.zk_session_timeout, new watcher() { @override public void process(watchedevent event) { if (event.getstate() == watcher.event.keeperstate.syncconnected) { latch.countdown(); } } }); latch.await(); } catch (ioexception | interruptedexception e) { logger.error("", e); } return zk; } /** * 获取服务地址列表 * @param zk */ private void watchnode(final zookeeper zk) { try { //获取子节点列表 list<string> nodelist = zk.getchildren(constants.zk_registry_path, new watcher() { @override public void process(watchedevent event) { if (event.gettype() == event.eventtype.nodechildrenchanged) { //发生子节点变化时再次调用此方法更新服务地址 watchnode(zk); } } }); list<string> datalist = new arraylist<>(); for (string node : nodelist) { byte[] bytes = zk.getdata(constants.zk_registry_path + "/" + node, false, null); datalist.add(new string(bytes)); } logger.debug("node data: {}", datalist); this.serviceaddresslist = datalist; } catch (keeperexception | interruptedexception e) { logger.error("", e); } } }
netty 客户端启动之后,通过服务发现获取 netty 服务端的 ip 和 port。
/** * 支持通过服务发现来获取 socket 服务端的 host、port * @param discoveryaddress * @throws interruptedexception */ public pushclientbootstrap(string discoveryaddress) throws interruptedexception { servicediscovery = new servicediscovery(discoveryaddress); serveraddress = servicediscovery.discover(); if (serveraddress!=null) { string[] array = serveraddress.split(":"); if (array!=null && array.length==2) { string host = array[0]; int port = integer.parseint(array[1]); start(host,port); } } }
四. 总结
服务注册和发现一直是分布式的核心组件。本文介绍了借助 zookeeper 做注册中心,如何实现一个简单的服务注册和发现。其实,注册中心的选择有很多,例如 etcd、eureka 等等。选择符合我们业务需求的才是最重要的。
以上所述是小编给大家介绍的netty + zookeeper 实现简单的服务注册与发现,希望对大家有所帮助
推荐阅读
-
详解使用Spring Cloud Consul实现服务的注册和发现
-
详解使用Spring Cloud Consul实现服务的注册和发现
-
C#使用Socket实现服务器与多个客户端通信(简单的聊天系统)
-
动手造*:实现一个简单的依赖注入(二) --- 服务注册优化
-
springcloud使用之服务的注册发现与消费
-
基于 Consul 实现 MagicOnion(GRpc) 服务注册与发现
-
Eureka(服务注册与发现)简单入门
-
SpringCloud-微服务的注册与发现Eureka
-
SpringCloud(二):服务的注册与发现(Eureka)
-
SpringCloud第二代实战系列:一文搞定Nacos实现服务注册与发现