欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

81.分布式消息中间件-基于Netty简单手写消息中间件思路

程序员文章站 2022-05-20 14:57:32
...

一、基于Netty实现消息中间件效果演示

private void procucterService(String queueName, String msg) {
        Queue queue = queues.get(queueName);
        if (queue == null) {
            // 如果队列不存在的情况下,就创建
            queue = new LinkedList();
            queues.put(queueName, queue);
        }
        // 将消息缓存到队列中
        queue.offer(msg);

        // 主动将消息推送给消费者
        ChannelHandlerContext ctx = ctxs.get(queueName);
        if (ctx != null) {
            ctx.writeAndFlush(queue.poll());
        }

    }

二、Java语言创建队列实现的方式

1.点对点

81.分布式消息中间件-基于Netty简单手写消息中间件思路

public class QueueTest {

    public static void main(String[] args) {
        Queue queue = new LinkedList();
        queue.offer("mayikt");
        queue.offer("xiaowei");
        queue.offer("xiaojun");

        System.out.println(queue.size());//3
        System.out.println(queue.poll());//mayikt
        System.out.println(queue.poll());//xiaowei
        System.out.println(queue.poll());
        System.out.println(queue.size());//0

    }
}

三、手写消息中间件思路的分析

81.分布式消息中间件-基于Netty简单手写消息中间件思路

四、基于Netty实现MQ消息中间件服务端

 <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.42.Final</version>
        </dependency>
        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling</artifactId>
            <version>1.4.10.Final</version>
        </dependency>
        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling-serial</artifactId>
            <version>1.4.10.Final</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.9</version>
        </dependency>

五、基于NettyClient端实现生产者投递消息

六、基于NettyClient端实现手写消费者

七、生产者与消费者一直保持连接投递

@Data
public class DeliveryInfoEntity implements Serializable {

    /**
     * 发送消息内容
     */
    private String msg;

    /**
     * 队列名称
     */
    private String queueName;
    /**
     * true 生产者投递消息
     * false 消费者获取消息
     */
    private Boolean connType;

    public DeliveryInfoEntity(String msg, String queueName, Boolean connType) {
        this.msg = msg;
        this.queueName = queueName;
        this.connType = connType;
    }
}

handler

public class NettyMQConsumerHandler extends SimpleChannelInboundHandler {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("消费者获取生产者消息:" + msg);
    }
}

public class NettyMQServerHandler extends SimpleChannelInboundHandler<DeliveryInfoEntity> {
    /**
     * mq存放所有的队列
     */
    private static Map<String, Queue> queues = new HashMap<String, Queue>();

    /**
     * 存放我们消费者连接
     */
    private static Map<String, ChannelHandlerContext> ctxs =
            new HashMap<String, ChannelHandlerContext>();

    /**
     * Netty的服务器端接受 客户端消息  MQ服务器端
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, DeliveryInfoEntity dInfo) throws Exception {
        String queueName = dInfo.getQueueName();
        if (StringUtils.isEmpty(queueName)) {
            return;
        }
        Boolean connType = dInfo.getConnType();
        // 如果是为true的情况下 为生产者角色
        if (connType) {
            // 处理生产者角色
            procucterService(queueName, dInfo.getMsg());
            return;
        }
        consumer(queueName, ctx);

    }

    private void procucterService(String queueName, String msg) {
        Queue queue = queues.get(queueName);
        if (queue == null) {
            // 如果队列不存在的情况下,就创建
            queue = new LinkedList();
            queues.put(queueName, queue);
        }
        // 将消息缓存到队列中
        queue.offer(msg);

        // 主动将消息推送给消费者
        ChannelHandlerContext ctx = ctxs.get(queueName);
        if (ctx != null) {
            ctx.writeAndFlush(queue.poll());
        }

    }

    /**
     * 消费和mq建立连接主动拉取消息
     */
    private void consumer(String queueName, ChannelHandlerContext ctx) {
        Queue queue = queues.get(queueName);
        if (queue == null) {
            return;
        }
        // 获取队列中消息
        Object poll = queue.poll();
        ctx.writeAndFlush(poll);

        // 将消费者连接存放到集合中
        ctxs.put(queueName, ctx);
    }

}

server端

public class NettyMQServer {


    public static void start(int port) {
        /**
         *  客户端创建两个线程池组分别为 boss线程组和工作线程组
         */
        // 用于接受客户端连接的请求 (并没有处理请求)
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        // 用于处理客户端连接的读写操作
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        // 用于创建我们的ServerBootstrap
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // 解决netty可以支持传输对象
                        ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                        ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                        ch.pipeline().addLast(new NettyMQServerHandler());

                    }
                });
        //  绑定我们的端口号码
        try {
            // 绑定端口号,同步等待成功
            ChannelFuture future = serverBootstrap.bind(port).sync();
            System.out.println("MQ服务器启动成功:" + port);
            // 等待服务器监听端口
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();

        } finally {
            // 优雅的关闭连接
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        start(5872);
    }
}

producer

public class NettyMQProducer {
    private static final String host = "127.0.0.1";
    private static final int port = 5872;

    private static String queueName = "mayikt";

    public static void sendMsg(String msg) {
        //创建nioEventLoopGroup
        NioEventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group).channel(NioSocketChannel.class)
                .remoteAddress(new InetSocketAddress(host, port))
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                        ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                    }
                });
        try {
            // 发起同步连接
            ChannelFuture sync = bootstrap.connect().sync();
            DeliveryInfoEntity deliveryInfoEntity = new DeliveryInfoEntity(msg, queueName,
                    true);
            sync.channel().writeAndFlush(deliveryInfoEntity);
            sync.channel().closeFuture().sync();
        } catch (Exception e) {

        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        sendMsg("每特教育第六期平均突破3万月薪");
    }
}

consumer

public class NettyMQConsumer {
    private static final String host = "127.0.0.1";
    private static final int port = 5872;

    private static String queueName = "mayikt";

    public static void sendMsg(String msg) {
        //创建nioEventLoopGroup
        NioEventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group).channel(NioSocketChannel.class)
                .remoteAddress(new InetSocketAddress(host, port))
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                        ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                        ch.pipeline().addLast(new NettyMQConsumerHandler());
                    }
                });
        try {
            // 发起同步连接
            ChannelFuture sync = bootstrap.connect().sync();
            DeliveryInfoEntity deliveryInfoEntity = new DeliveryInfoEntity(null, queueName,
                    false);
            sync.channel().writeAndFlush(deliveryInfoEntity);
            sync.channel().closeFuture().sync();
        } catch (Exception e) {

        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        sendMsg("每特教育第六期平均突破3万月薪");
    }
}

utils

public final class MarshallingCodeCFactory {

    /**
     * 创建Jboss Marshalling解码器MarshallingDecoder
     * @return MarshallingDecoder
     */
    public static MarshallingDecoder buildMarshallingDecoder() {
        //首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        //创建了MarshallingConfiguration对象,配置了版本号为5
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        //根据marshallerFactory和configuration创建provider
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
        //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
        return decoder;
    }

    /**
     * 创建Jboss Marshalling编码器MarshallingEncoder
     * @return MarshallingEncoder
     */
    public static MarshallingEncoder buildMarshallingEncoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
        //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
        MarshallingEncoder encoder = new MarshallingEncoder(provider);
        return encoder;
    }

}
相关标签: 6期