欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

手写MQ框架(四)-使用netty改造梳理

程序员文章站 2022-07-01 08:06:25
新手应该怎样使用netty?如果将http服务(不含页面)改造为使用socket的服务? ......

一、背景

书接上文手写mq框架(三)-客户端实现 ,前面通过web的形式实现了mq的服务端和客户端,现在计划使用netty来改造一下。前段时间学习了一下netty的使用(https://www.w3cschool.cn/netty4userguide/52ki1iey.html)。大概有一些想法。

netty封装了socket的使用,我们通过简单的调用即可构建高性能的网络应用。我计划采用以下例子来对gmq进行改造。

本文主要参考:https://www.w3cschool.cn/netty4userguide/、https://www.w3cschool.cn/essential_netty_in_action/

二、netty是什么

netty是由jboss提供的一个java开源框架。netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

--来自https://www.w3cschool.cn/netty4userguide/52ki1iey.html

netty是一个java框架,是网络编程框架,支持异步、事件驱动的特性,所以性能表现很好。

 

三、netty的简单实现

1、服务端

1)simpleserverhandler

handler是处理器,handler 是由 netty 生成用来处理 i/o 事件的。

package me.lovegao.netty.learnw3c.mqdemo;

import io.netty.channel.channel;
import io.netty.channel.channelhandlercontext;
import io.netty.channel.simplechannelinboundhandler;
import io.netty.channel.group.channelgroup;
import io.netty.channel.group.defaultchannelgroup;
import io.netty.util.concurrent.globaleventexecutor;

public class simpleserverhandler extends simplechannelinboundhandler<string> {
    public static channelgroup channels = new defaultchannelgroup(globaleventexecutor.instance);

    @override
    public void handleradded(channelhandlercontext ctx) throws exception {
        channel incoming = ctx.channel();
        system.out.println("[server] - " + incoming.remoteaddress() + " 加入\n");
        channels.add(ctx.channel());
    }

    @override
    public void handlerremoved(channelhandlercontext ctx) throws exception {
        channel incoming = ctx.channel();
        system.out.println("[server] - " + incoming.remoteaddress() + " 离开\n");
        channels.remove(ctx.channel());
    }
    
    @override
    protected void channelread0(channelhandlercontext ctx, string s) throws exception {
        channel incoming = ctx.channel();
        system.out.println("[" + incoming.remoteaddress() + "]" + s);
        if(s == null || s.length() == 0) {
            incoming.writeandflush("消息是空的呀!\n");
        } else {
//            mqrouter<?> mqrouter = jsonobject.parseobject(s, mqrouter.class);
//            system.out.println(mqrouter.geturi());
            string responsemsg = "收到了," + s + "\n";
            incoming.writeandflush(responsemsg);
        }
    }

    @override
    public void channelactive(channelhandlercontext ctx) throws exception {
        channel incoming = ctx.channel();
        system.out.println("simplechatclient:"+incoming.remoteaddress()+"在线");
    }

    @override
    public void channelinactive(channelhandlercontext ctx) throws exception {
        channel incoming = ctx.channel();
        system.out.println("simplechatclient:"+incoming.remoteaddress()+"掉线");
    }

    @override
    public void exceptioncaught(channelhandlercontext ctx, throwable cause) throws exception {
        channel incoming = ctx.channel();
        system.out.println("simplechatclient:"+incoming.remoteaddress()+"异常");
        
        cause.printstacktrace();
        ctx.close();
    }

}

2)simpleserverinitializer

simpleserverinitializer 用来增加多个的处理类到 channelpipeline 上,包括编码、解码、simpleserverhandler 等。

package me.lovegao.netty.learnw3c.mqdemo;

import io.netty.channel.channelinitializer;
import io.netty.channel.channelpipeline;
import io.netty.channel.socket.socketchannel;
import io.netty.handler.codec.delimiterbasedframedecoder;
import io.netty.handler.codec.delimiters;
import io.netty.handler.codec.string.stringdecoder;
import io.netty.handler.codec.string.stringencoder;

public class simpleserverinitializer extends channelinitializer<socketchannel> {

    @override
    protected void initchannel(socketchannel ch) throws exception {
        channelpipeline pipeline = ch.pipeline();
        pipeline.addlast("framer", new delimiterbasedframedecoder(8192, delimiters.linedelimiter()));
        pipeline.addlast("decoder", new stringdecoder());
        pipeline.addlast("encoder", new stringencoder());
        pipeline.addlast("handler", new simpleserverhandler());
        
        system.out.println("simplechatclient:" + ch.remoteaddress() + "连接上");
    }

}

 

3)simpleserver

package me.lovegao.netty.learnw3c.mqdemo;

import io.netty.bootstrap.serverbootstrap;
import io.netty.channel.channelfuture;
import io.netty.channel.channeloption;
import io.netty.channel.eventloopgroup;
import io.netty.channel.nio.nioeventloopgroup;
import io.netty.channel.socket.nio.nioserversocketchannel;

public class simpleserver {
    private int port;

    public simpleserver(int port) {
        this.port = port;
    }

    public void run() throws exception {
        eventloopgroup bossgroup = new nioeventloopgroup();
        eventloopgroup workergroup = new nioeventloopgroup();
        try {
            serverbootstrap b = new serverbootstrap();
            b.group(bossgroup, workergroup).channel(nioserversocketchannel.class)
                    .childhandler(new simpleserverinitializer()).option(channeloption.so_backlog, 128)
                    .childoption(channeloption.so_keepalive, true);

            system.out.println("simplechatserver 启动了");

            channelfuture f = b.bind(port).sync();

            f.channel().closefuture().sync();
        } finally {
            workergroup.shutdowngracefully();
            bossgroup.shutdowngracefully();

            system.out.println("simplechatserver 关闭了");
        }
    }

    public static void main(string[] args) throws exception {
        int port;
        if (args.length > 0) {
            port = integer.parseint(args[0]);
        } else {
            port = 8080;
        }
        new simpleserver(port).run();
    }
}

 

 2、客户端

1)simpleclienthandler

package me.lovegao.netty.learnw3c.mqdemo;

import io.netty.channel.channelhandlercontext;
import io.netty.channel.simplechannelinboundhandler;

public class simpleclienthandler extends simplechannelinboundhandler<string> {

    @override
    protected void channelread0(channelhandlercontext ctx, string s) throws exception {
        system.out.println("收到的信息:" + s);
    }

}

 

2)simpleclientinitializer

package me.lovegao.netty.learnw3c.mqdemo;

import io.netty.channel.channelinitializer;
import io.netty.channel.channelpipeline;
import io.netty.channel.socket.socketchannel;
import io.netty.handler.codec.delimiterbasedframedecoder;
import io.netty.handler.codec.delimiters;
import io.netty.handler.codec.string.stringdecoder;
import io.netty.handler.codec.string.stringencoder;

public class simpleclientinitializer extends channelinitializer<socketchannel> {

    @override
    protected void initchannel(socketchannel ch) throws exception {
        channelpipeline pipeline = ch.pipeline();
        pipeline.addlast("framer", new delimiterbasedframedecoder(8192, delimiters.linedelimiter()));
        pipeline.addlast("decoder", new stringdecoder());
        pipeline.addlast("encoder", new stringencoder());
        pipeline.addlast("handler", new simpleclienthandler());
    }

}

 

3)simpleclient

package me.lovegao.netty.learnw3c.mqdemo;

import java.io.bufferedreader;
import java.io.inputstreamreader;

import io.netty.bootstrap.bootstrap;
import io.netty.channel.channel;
import io.netty.channel.eventloopgroup;
import io.netty.channel.nio.nioeventloopgroup;
import io.netty.channel.socket.nio.niosocketchannel;

public class simpleclient {
    private final string host;
    private final int port;
    
    public simpleclient(string host, int port) {
        this.host = host;
        this.port = port;
    }

    public static void main(string[] args) throws exception {
        new simpleclient("localhost", 8080).run();
    }
    
    public void run() throws exception {
        eventloopgroup group = new nioeventloopgroup();
        try {
            bootstrap bootstrap = new bootstrap()
                    .group(group)
                    .channel(niosocketchannel.class)
                    .handler(new simpleclientinitializer());
            channel channel = bootstrap.connect(host, port).sync().channel();
            bufferedreader in = new bufferedreader(new inputstreamreader(system.in));
            while(true) {
                string line = in.readline();
                if(line.equals("exit!")) {
                    break;
                }
                channel.writeandflush(line + "\r\n");
            }
        } catch(exception e) {
            e.printstacktrace();
        } finally {
            group.shutdowngracefully();
        }
    }



}

 

3、学习中的一些事

在我把教程上的代码略微改了一下,测试时发现客户端能够发出消息,服务端能够收到消息,服务端也走到了回复客户端的流程,但是客户端却收不到消息。还原代码后是正常的,想了半天,最后才发现是改代码的的时候漏掉了“\n”这个标识,以此导致客户端始终不打印消息。

四、netty如何运用到gmq中

1、运用有什么问题

netty只封装了网络交互,gmq整体使用了gmvc框架,而gmvc框架目前还无法脱离servlet。而我又不太想把之前写的代码全部改为自己new的方式。

2、解决方式

1)改造gmvc框架

对gmvc框架进行重构,使得能够脱离servlet使用。也就是将ioc功能剥离开。

优点:一步到位,符合整体的规划。

缺点:gmq的迭代会延迟一段时间。

2)暂时抛弃gmvc框架

暂时将目前依赖的gmvc框架给去除掉,优先完成gmq的迭代。待后期gmvc框架改造完成后再进行改造。

优点:能够尽早的完成gmq的功能。

缺点:先移除框架,后期再套上框架,相当于做了两次多余的功。费时费力。

3、结论

写框架就是为了学习,写gmvc、写gmq目的都一样。时间宝贵,减少多余功,先对gmvc框架进行改造。

4、一些其他事

运用netty还有一个事,就是路由的问题。使用netty代替servlet,需要解决路由的问题。

五、准备改造gmvc

敬请期待……