欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Netty——自定义协议解决TCP粘包拆包问题

程序员文章站 2022-03-09 13:46:19
什么是TCP粘包拆包简单的来讲,就是多次TCP请求传递的msg,由于TCP的请求没有隔离,造成服务端接收到消息不知道哪些字节是属于同一个请求的。拆包:某一次TCP请求的msg被拆开读取粘包:某一次TCP请求的msg包含了其他请求的字节粘包拆包问题实例问题示范就不贴全部代码了客户端Handlerpackage com.leolee.netty.demo.unpackAdherepackage;import io.netty.buffer.ByteBuf;import io....

什么是TCP粘包拆包

简单的来讲,就是多次TCP请求传递的msg,由于TCP的请求没有隔离,造成服务端接收到消息不知道哪些字节是属于同一个请求的。

拆包:某一次TCP请求的msg被拆开读取

粘包:某一次TCP请求的msg包含了其他请求的字节

粘包拆包问题实例

问题示范就不贴全部代码了

客户端Handler

package com.leolee.netty.demo.unpackAdherepackage;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.nio.charset.Charset;

/**
 * @ClassName MyClientHandler
 * @Description: TODO
 * @Author LeoLee
 * @Date 2020/11/22
 * @Version V1.0
 **/
public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private int count;

    //连续发送10条信息,测试服务端接受的消息的拆包粘包情况
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        for (int i = 0; i < 10; i++) {
            ByteBuf byteBuf = Unpooled.copiedBuffer("hello server " + i + "|", Charset.forName("utf-8"));
            channel.writeAndFlush(byteBuf);
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        byte[] bytes = new byte[msg.readableBytes()];
        msg.readBytes(bytes);

        String message = new String(bytes, Charset.forName("utf-8"));
        System.out.println("客户端接收到的数据:");
        System.out.println(message);
        System.out.println("客户端接收到数据量:" + ++this.count);
    }
}

服务端Handler

package com.leolee.netty.demo.unpackAdherepackage;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.nio.charset.Charset;
import java.util.UUID;

/**
 * @ClassName MyServerHandler
 * @Description: TODO
 * @Author LeoLee
 * @Date 2020/11/22
 * @Version V1.0
 **/
public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private int count;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {

        byte[] bytes = new byte[msg.readableBytes()];
        msg.readBytes(bytes);

        String message = new String(bytes, Charset.forName("utf-8"));
        System.out.println("服务端接收到的数据:");
        System.out.println(message);
        System.out.println("服务端接收到数据量:" + ++this.count);


        ByteBuf byteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString() + "\n", Charset.forName("utf-8"));
        ctx.writeAndFlush(byteBuf);
    }
}

拆包粘包现象:

服务端接收到的数据:
hello server 0|
服务端接收到数据量:1
服务端接收到的数据:
hello server 1|
服务端接收到数据量:2
服务端接收到的数据:
hello server 2|hello server 3|hello server 4|hello server 5|
服务端接收到数据量:3
服务端接收到的数据:
hello server 6|hello server 7|
服务端接收到数据量:4
服务端接收到的数据:
hello server 8|hello server 9|
服务端接收到数据量:5

自定义协议解决拆包粘包问题

自定义协议:

package com.leolee.netty.demo.unpackAdherepackage.protocol;

/**
 * @ClassName MessageProtocol
 * @Description: TODO
 * @Author LeoLee
 * @Date 2020/11/22
 * @Version V1.0
 **/
public class MessageProtocol {

    private int length;//读取的长度

    private byte[] content;

    public int getLength() {
        return length;
    }

    public void setLength(int length) {
        this.length = length;
    }

    public byte[] getContent() {
        return content;
    }

    public void setContent(byte[] content) {
        this.content = content;
    }
}

自定义编码器:

package com.leolee.netty.demo.unpackAdherepackage.protocol;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
 * @ClassName MessageEncoder
 * @Description: TODO
 * @Author LeoLee
 * @Date 2020/11/22
 * @Version V1.0
 **/
public class MessageEncoder extends MessageToByteEncoder<MessageProtocol> {

    @Override
    protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
        System.out.println("自定义编码器被调用");
        out.writeInt(msg.getLength());
        out.writeBytes(msg.getContent());
    }
}

自定义解码器:

package com.leolee.netty.demo.unpackAdherepackage.protocol;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;

import java.util.List;

/**
 * @ClassName MessageDecoder
 * @Description: TODO
 * @Author LeoLee
 * @Date 2020/11/22
 * @Version V1.0
 **/
public class MessageDecoder extends ReplayingDecoder<Void> {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("自定义解码器被调用");
        int length = in.readInt();
        byte[] bytes = new byte[length];
        in.readBytes(bytes);

        MessageProtocol messageProtocol = new MessageProtocol();
        messageProtocol.setLength(length);
        messageProtocol.setContent(bytes);
        out.add(messageProtocol);
    }
}

客户端代码修改:增加编码器

package com.leolee.netty.demo.unpackAdherepackage;

import com.leolee.netty.demo.unpackAdherepackage.protocol.MessageEncoder;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

/**
 * @ClassName MyClientInitializer
 * @Description: TODO
 * @Author LeoLee
 * @Date 2020/11/22
 * @Version V1.0
 **/
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new MessageEncoder());//加入自定义编码器
        pipeline.addLast(new MyClientHandler());
    }
}

客户端handler

package com.leolee.netty.demo.unpackAdherepackage;

import com.leolee.netty.demo.unpackAdherepackage.protocol.MessageProtocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.nio.charset.Charset;

/**
 * @ClassName MyClientHandler
 * @Description: TODO
 * @Author LeoLee
 * @Date 2020/11/22
 * @Version V1.0
 **/
public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private int count;

    //连续发送10条信息,测试服务端接受的消息的拆包粘包情况
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        for (int i = 0; i < 10; i++) {
            String message = "hello server " + i + "|";
            byte[] bytes = message.getBytes(Charset.forName("utf-8"));
            int length = message.getBytes(Charset.forName("utf-8")).length;

            MessageProtocol messageProtocal = new MessageProtocol();
            messageProtocal.setLength(length);
            messageProtocal.setContent(bytes);
            channel.writeAndFlush(messageProtocal);
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        byte[] bytes = new byte[msg.readableBytes()];
        msg.readBytes(bytes);

        String message = new String(bytes, Charset.forName("utf-8"));
        System.out.println("客户端接收到的数据:");
        System.out.println(message);
        System.out.println("客户端接收到数据量:" + ++this.count);
    }
}

服务端代码修改:增加解码器

package com.leolee.netty.demo.unpackAdherepackage;

import com.leolee.netty.demo.unpackAdherepackage.protocol.MessageDecoder;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

/**
 * @ClassName MyServerInitializer
 * @Description: TODO
 * @Author LeoLee
 * @Date 2020/11/22
 * @Version V1.0
 **/
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {

        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new MessageDecoder());
        pipeline.addLast(new MyServerHandler());
    }
}

服务端handler:

package com.leolee.netty.demo.unpackAdherepackage;

import com.leolee.netty.demo.unpackAdherepackage.protocol.MessageProtocol;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.nio.charset.Charset;

/**
 * @ClassName MyServerHandler
 * @Description: TODO
 * @Author LeoLee
 * @Date 2020/11/22
 * @Version V1.0
 **/
public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {

    private int count;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {

        int length = msg.getLength();
        byte[] content = msg.getContent();
        System.out.println("服务端接收到的数据:");
        System.out.println(new String(content, Charset.forName("utf-8")));
        System.out.println("服务端接收到数据量:" + ++this.count);
    }
}

测试:

自定义解码器被调用
服务端接收到的数据:
hello server 0|
服务端接收到数据量:1
自定义解码器被调用
服务端接收到的数据:
hello server 1|
服务端接收到数据量:2
自定义解码器被调用
服务端接收到的数据:
hello server 2|
服务端接收到数据量:3
自定义解码器被调用
服务端接收到的数据:
hello server 3|
服务端接收到数据量:4
自定义解码器被调用
服务端接收到的数据:
hello server 4|
服务端接收到数据量:5
自定义解码器被调用
服务端接收到的数据:
hello server 5|
服务端接收到数据量:6
自定义解码器被调用
服务端接收到的数据:
hello server 6|
服务端接收到数据量:7
自定义解码器被调用
服务端接收到的数据:
hello server 7|
服务端接收到数据量:8
自定义解码器被调用
服务端接收到的数据:
hello server 8|
服务端接收到数据量:9
自定义解码器被调用
服务端接收到的数据:
hello server 9|
服务端接收到数据量:10

 

本文地址:https://blog.csdn.net/qq_25805331/article/details/109956294