手写MQ框架(四)-使用netty改造梳理
一、背景
书接上文手写mq框架(三)-客户端实现 ,前面通过web的形式实现了mq的服务端和客户端,现在计划使用netty来改造一下。前段时间学习了一下netty的使用(https://www.w3cschool.cn/netty4userguide/52ki1iey.html)。大概有一些想法。
netty封装了socket的使用,我们通过简单的调用即可构建高性能的网络应用。我计划采用以下例子来对gmq进行改造。
本文主要参考:https://www.w3cschool.cn/netty4userguide/、https://www.w3cschool.cn/essential_netty_in_action/
二、netty是什么
netty是由jboss提供的一个java开源框架。netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
--来自https://www.w3cschool.cn/netty4userguide/52ki1iey.html
netty是一个java框架,是网络编程框架,支持异步、事件驱动的特性,所以性能表现很好。
三、netty的简单实现
1、服务端
1)simpleserverhandler
handler是处理器,handler 是由 netty 生成用来处理 i/o 事件的。
package me.lovegao.netty.learnw3c.mqdemo; import io.netty.channel.channel; import io.netty.channel.channelhandlercontext; import io.netty.channel.simplechannelinboundhandler; import io.netty.channel.group.channelgroup; import io.netty.channel.group.defaultchannelgroup; import io.netty.util.concurrent.globaleventexecutor; public class simpleserverhandler extends simplechannelinboundhandler<string> { public static channelgroup channels = new defaultchannelgroup(globaleventexecutor.instance); @override public void handleradded(channelhandlercontext ctx) throws exception { channel incoming = ctx.channel(); system.out.println("[server] - " + incoming.remoteaddress() + " 加入\n"); channels.add(ctx.channel()); } @override public void handlerremoved(channelhandlercontext ctx) throws exception { channel incoming = ctx.channel(); system.out.println("[server] - " + incoming.remoteaddress() + " 离开\n"); channels.remove(ctx.channel()); } @override protected void channelread0(channelhandlercontext ctx, string s) throws exception { channel incoming = ctx.channel(); system.out.println("[" + incoming.remoteaddress() + "]" + s); if(s == null || s.length() == 0) { incoming.writeandflush("消息是空的呀!\n"); } else { // mqrouter<?> mqrouter = jsonobject.parseobject(s, mqrouter.class); // system.out.println(mqrouter.geturi()); string responsemsg = "收到了," + s + "\n"; incoming.writeandflush(responsemsg); } } @override public void channelactive(channelhandlercontext ctx) throws exception { channel incoming = ctx.channel(); system.out.println("simplechatclient:"+incoming.remoteaddress()+"在线"); } @override public void channelinactive(channelhandlercontext ctx) throws exception { channel incoming = ctx.channel(); system.out.println("simplechatclient:"+incoming.remoteaddress()+"掉线"); } @override public void exceptioncaught(channelhandlercontext ctx, throwable cause) throws exception { channel incoming = ctx.channel(); system.out.println("simplechatclient:"+incoming.remoteaddress()+"异常"); cause.printstacktrace(); ctx.close(); } }
2)simpleserverinitializer
simpleserverinitializer 用来增加多个的处理类到 channelpipeline 上,包括编码、解码、simpleserverhandler 等。
package me.lovegao.netty.learnw3c.mqdemo; import io.netty.channel.channelinitializer; import io.netty.channel.channelpipeline; import io.netty.channel.socket.socketchannel; import io.netty.handler.codec.delimiterbasedframedecoder; import io.netty.handler.codec.delimiters; import io.netty.handler.codec.string.stringdecoder; import io.netty.handler.codec.string.stringencoder; public class simpleserverinitializer extends channelinitializer<socketchannel> { @override protected void initchannel(socketchannel ch) throws exception { channelpipeline pipeline = ch.pipeline(); pipeline.addlast("framer", new delimiterbasedframedecoder(8192, delimiters.linedelimiter())); pipeline.addlast("decoder", new stringdecoder()); pipeline.addlast("encoder", new stringencoder()); pipeline.addlast("handler", new simpleserverhandler()); system.out.println("simplechatclient:" + ch.remoteaddress() + "连接上"); } }
3)simpleserver
package me.lovegao.netty.learnw3c.mqdemo; import io.netty.bootstrap.serverbootstrap; import io.netty.channel.channelfuture; import io.netty.channel.channeloption; import io.netty.channel.eventloopgroup; import io.netty.channel.nio.nioeventloopgroup; import io.netty.channel.socket.nio.nioserversocketchannel; public class simpleserver { private int port; public simpleserver(int port) { this.port = port; } public void run() throws exception { eventloopgroup bossgroup = new nioeventloopgroup(); eventloopgroup workergroup = new nioeventloopgroup(); try { serverbootstrap b = new serverbootstrap(); b.group(bossgroup, workergroup).channel(nioserversocketchannel.class) .childhandler(new simpleserverinitializer()).option(channeloption.so_backlog, 128) .childoption(channeloption.so_keepalive, true); system.out.println("simplechatserver 启动了"); channelfuture f = b.bind(port).sync(); f.channel().closefuture().sync(); } finally { workergroup.shutdowngracefully(); bossgroup.shutdowngracefully(); system.out.println("simplechatserver 关闭了"); } } public static void main(string[] args) throws exception { int port; if (args.length > 0) { port = integer.parseint(args[0]); } else { port = 8080; } new simpleserver(port).run(); } }
2、客户端
1)simpleclienthandler
package me.lovegao.netty.learnw3c.mqdemo; import io.netty.channel.channelhandlercontext; import io.netty.channel.simplechannelinboundhandler; public class simpleclienthandler extends simplechannelinboundhandler<string> { @override protected void channelread0(channelhandlercontext ctx, string s) throws exception { system.out.println("收到的信息:" + s); } }
2)simpleclientinitializer
package me.lovegao.netty.learnw3c.mqdemo; import io.netty.channel.channelinitializer; import io.netty.channel.channelpipeline; import io.netty.channel.socket.socketchannel; import io.netty.handler.codec.delimiterbasedframedecoder; import io.netty.handler.codec.delimiters; import io.netty.handler.codec.string.stringdecoder; import io.netty.handler.codec.string.stringencoder; public class simpleclientinitializer extends channelinitializer<socketchannel> { @override protected void initchannel(socketchannel ch) throws exception { channelpipeline pipeline = ch.pipeline(); pipeline.addlast("framer", new delimiterbasedframedecoder(8192, delimiters.linedelimiter())); pipeline.addlast("decoder", new stringdecoder()); pipeline.addlast("encoder", new stringencoder()); pipeline.addlast("handler", new simpleclienthandler()); } }
3)simpleclient
package me.lovegao.netty.learnw3c.mqdemo; import java.io.bufferedreader; import java.io.inputstreamreader; import io.netty.bootstrap.bootstrap; import io.netty.channel.channel; import io.netty.channel.eventloopgroup; import io.netty.channel.nio.nioeventloopgroup; import io.netty.channel.socket.nio.niosocketchannel; public class simpleclient { private final string host; private final int port; public simpleclient(string host, int port) { this.host = host; this.port = port; } public static void main(string[] args) throws exception { new simpleclient("localhost", 8080).run(); } public void run() throws exception { eventloopgroup group = new nioeventloopgroup(); try { bootstrap bootstrap = new bootstrap() .group(group) .channel(niosocketchannel.class) .handler(new simpleclientinitializer()); channel channel = bootstrap.connect(host, port).sync().channel(); bufferedreader in = new bufferedreader(new inputstreamreader(system.in)); while(true) { string line = in.readline(); if(line.equals("exit!")) { break; } channel.writeandflush(line + "\r\n"); } } catch(exception e) { e.printstacktrace(); } finally { group.shutdowngracefully(); } } }
3、学习中的一些事
在我把教程上的代码略微改了一下,测试时发现客户端能够发出消息,服务端能够收到消息,服务端也走到了回复客户端的流程,但是客户端却收不到消息。还原代码后是正常的,想了半天,最后才发现是改代码的的时候漏掉了“\n”这个标识,以此导致客户端始终不打印消息。
四、netty如何运用到gmq中
1、运用有什么问题
netty只封装了网络交互,gmq整体使用了gmvc框架,而gmvc框架目前还无法脱离servlet。而我又不太想把之前写的代码全部改为自己new的方式。
2、解决方式
1)改造gmvc框架
对gmvc框架进行重构,使得能够脱离servlet使用。也就是将ioc功能剥离开。
优点:一步到位,符合整体的规划。
缺点:gmq的迭代会延迟一段时间。
2)暂时抛弃gmvc框架
暂时将目前依赖的gmvc框架给去除掉,优先完成gmq的迭代。待后期gmvc框架改造完成后再进行改造。
优点:能够尽早的完成gmq的功能。
缺点:先移除框架,后期再套上框架,相当于做了两次多余的功。费时费力。
3、结论
写框架就是为了学习,写gmvc、写gmq目的都一样。时间宝贵,减少多余功,先对gmvc框架进行改造。
4、一些其他事
运用netty还有一个事,就是路由的问题。使用netty代替servlet,需要解决路由的问题。
五、准备改造gmvc
敬请期待……
上一篇: 夏侯渊死后,曹操帐下还有哪些大将呢?