81.分布式消息中间件-基于Netty简单手写消息中间件思路
程序员文章站
2022-05-20 14:57:32
...
一、基于Netty实现消息中间件效果演示
private void procucterService(String queueName, String msg) {
Queue queue = queues.get(queueName);
if (queue == null) {
// 如果队列不存在的情况下,就创建
queue = new LinkedList();
queues.put(queueName, queue);
}
// 将消息缓存到队列中
queue.offer(msg);
// 主动将消息推送给消费者
ChannelHandlerContext ctx = ctxs.get(queueName);
if (ctx != null) {
ctx.writeAndFlush(queue.poll());
}
}
二、Java语言创建队列实现的方式
1.点对点
public class QueueTest {
public static void main(String[] args) {
Queue queue = new LinkedList();
queue.offer("mayikt");
queue.offer("xiaowei");
queue.offer("xiaojun");
System.out.println(queue.size());//3
System.out.println(queue.poll());//mayikt
System.out.println(queue.poll());//xiaowei
System.out.println(queue.poll());
System.out.println(queue.size());//0
}
}
三、手写消息中间件思路的分析
四、基于Netty实现MQ消息中间件服务端
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.42.Final</version>
</dependency>
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling</artifactId>
<version>1.4.10.Final</version>
</dependency>
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-serial</artifactId>
<version>1.4.10.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.8</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
五、基于NettyClient端实现生产者投递消息
六、基于NettyClient端实现手写消费者
七、生产者与消费者一直保持连接投递
@Data
public class DeliveryInfoEntity implements Serializable {
/**
* 发送消息内容
*/
private String msg;
/**
* 队列名称
*/
private String queueName;
/**
* true 生产者投递消息
* false 消费者获取消息
*/
private Boolean connType;
public DeliveryInfoEntity(String msg, String queueName, Boolean connType) {
this.msg = msg;
this.queueName = queueName;
this.connType = connType;
}
}
handler
public class NettyMQConsumerHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("消费者获取生产者消息:" + msg);
}
}
public class NettyMQServerHandler extends SimpleChannelInboundHandler<DeliveryInfoEntity> {
/**
* mq存放所有的队列
*/
private static Map<String, Queue> queues = new HashMap<String, Queue>();
/**
* 存放我们消费者连接
*/
private static Map<String, ChannelHandlerContext> ctxs =
new HashMap<String, ChannelHandlerContext>();
/**
* Netty的服务器端接受 客户端消息 MQ服务器端
*
* @param ctx
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, DeliveryInfoEntity dInfo) throws Exception {
String queueName = dInfo.getQueueName();
if (StringUtils.isEmpty(queueName)) {
return;
}
Boolean connType = dInfo.getConnType();
// 如果是为true的情况下 为生产者角色
if (connType) {
// 处理生产者角色
procucterService(queueName, dInfo.getMsg());
return;
}
consumer(queueName, ctx);
}
private void procucterService(String queueName, String msg) {
Queue queue = queues.get(queueName);
if (queue == null) {
// 如果队列不存在的情况下,就创建
queue = new LinkedList();
queues.put(queueName, queue);
}
// 将消息缓存到队列中
queue.offer(msg);
// 主动将消息推送给消费者
ChannelHandlerContext ctx = ctxs.get(queueName);
if (ctx != null) {
ctx.writeAndFlush(queue.poll());
}
}
/**
* 消费和mq建立连接主动拉取消息
*/
private void consumer(String queueName, ChannelHandlerContext ctx) {
Queue queue = queues.get(queueName);
if (queue == null) {
return;
}
// 获取队列中消息
Object poll = queue.poll();
ctx.writeAndFlush(poll);
// 将消费者连接存放到集合中
ctxs.put(queueName, ctx);
}
}
server端
public class NettyMQServer {
public static void start(int port) {
/**
* 客户端创建两个线程池组分别为 boss线程组和工作线程组
*/
// 用于接受客户端连接的请求 (并没有处理请求)
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
// 用于处理客户端连接的读写操作
NioEventLoopGroup workGroup = new NioEventLoopGroup();
// 用于创建我们的ServerBootstrap
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 解决netty可以支持传输对象
ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
ch.pipeline().addLast(new NettyMQServerHandler());
}
});
// 绑定我们的端口号码
try {
// 绑定端口号,同步等待成功
ChannelFuture future = serverBootstrap.bind(port).sync();
System.out.println("MQ服务器启动成功:" + port);
// 等待服务器监听端口
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 优雅的关闭连接
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
start(5872);
}
}
producer
public class NettyMQProducer {
private static final String host = "127.0.0.1";
private static final int port = 5872;
private static String queueName = "mayikt";
public static void sendMsg(String msg) {
//创建nioEventLoopGroup
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
}
});
try {
// 发起同步连接
ChannelFuture sync = bootstrap.connect().sync();
DeliveryInfoEntity deliveryInfoEntity = new DeliveryInfoEntity(msg, queueName,
true);
sync.channel().writeAndFlush(deliveryInfoEntity);
sync.channel().closeFuture().sync();
} catch (Exception e) {
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) {
sendMsg("每特教育第六期平均突破3万月薪");
}
}
consumer
public class NettyMQConsumer {
private static final String host = "127.0.0.1";
private static final int port = 5872;
private static String queueName = "mayikt";
public static void sendMsg(String msg) {
//创建nioEventLoopGroup
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
ch.pipeline().addLast(new NettyMQConsumerHandler());
}
});
try {
// 发起同步连接
ChannelFuture sync = bootstrap.connect().sync();
DeliveryInfoEntity deliveryInfoEntity = new DeliveryInfoEntity(null, queueName,
false);
sync.channel().writeAndFlush(deliveryInfoEntity);
sync.channel().closeFuture().sync();
} catch (Exception e) {
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) {
sendMsg("每特教育第六期平均突破3万月薪");
}
}
utils
public final class MarshallingCodeCFactory {
/**
* 创建Jboss Marshalling解码器MarshallingDecoder
* @return MarshallingDecoder
*/
public static MarshallingDecoder buildMarshallingDecoder() {
//首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
//创建了MarshallingConfiguration对象,配置了版本号为5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
//根据marshallerFactory和configuration创建provider
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
//构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
return decoder;
}
/**
* 创建Jboss Marshalling编码器MarshallingEncoder
* @return MarshallingEncoder
*/
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
//构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}
上一篇: 87.分布式消息中间件-Kafka-高可用集群环境kafka环境搭建
下一篇: Redis单机版安装