欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

mina read方法出现BufferUnderflowException异常的解决办法

程序员文章站 2022-03-02 13:50:43
...

现象: 先连续发几十个很小很小的包(<10 byte) 再突然发一个大小64byte的包 这时你会发现mina就会出现以下错误

java.nio.BufferUnderflowException
 at java.nio.HeapByteBuffer.get(Unknown Source)
 at org.apache.mina.core.buffer.AbstractIoBuffer.get(AbstractIoBuffer.java:419)
 at org.apache.mina.core.buffer.AbstractIoBuffer.get(AbstractIoBuffer.java:827)
 at com.labox.common.net.ProtocolHandler.messageReceived(ProtocolHandler.java:81)
 at org.apache.mina.core.filterchain.DefaultIoFilterChain$TailFilter.messageReceived(DefaultIoFilterChain.java:752)
 at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:414)
 at org.apache.mina.core.filterchain.DefaultIoFilterChain.access$5(DefaultIoFilterChain.java:411)
 at org.apache.mina.core.filterchain.DefaultIoFilterChain$EntryImpl$1.messageReceived(DefaultIoFilterChain.java:832)
 at org.apache.mina.core.filterchain.DefaultIoFilterChain$HeadFilter.messageReceived(DefaultIoFilterChain.java:616)
 at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:414)
 at org.apache.mina.core.filterchain.DefaultIoFilterChain.fireMessageReceived(DefaultIoFilterChain.java:408)
 at org.apache.mina.core.polling.AbstractPollingIoProcessor.read(AbstractPollingIoProcessor.java:582)
 at org.apache.mina.core.polling.AbstractPollingIoProcessor.process(AbstractPollingIoProcessor.java:542)
 at org.apache.mina.core.polling.AbstractPollingIoProcessor.process(AbstractPollingIoProcessor.java:534)
 at org.apache.mina.core.polling.AbstractPollingIoProcessor.access$7(AbstractPollingIoProcessor.java:532)
 at org.apache.mina.core.polling.AbstractPollingIoProcessor$Worker.run(AbstractPollingIoProcessor.java:861)

经过对mina的分析,这是由对包长度不对做成的(即,我们发的包长是大于64byte的,但他的byteBuffer大小只有64byte,当我们尝试读取第65个byte就会出现这个错误)

mina怎会出现这种错误的呢??是不是有什么配置可以调整

找到mina读取byte的方法AbstractPollingIoProcessor类的read(T session)

此方法源代码如下

  

private void read(T session) {
        IoSessionConfig config = session.getConfig();

        System.out.println("cap buffer size"+config.getReadBufferSize());//这句我自己加的
        IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());

        final boolean hasFragmentation =
            session.getTransportMetadata().hasFragmentation();

        try {
            int readBytes = 0;
            int ret;

            try {
                if (hasFragmentation) {
                    while ((ret = read(session, buf)) > 0) {
                        readBytes += ret;
                        if (!buf.hasRemaining()) {
                            break;
                        }
                    }
                } else {
                    ret = read(session, buf);
                    if (ret > 0) {
                        readBytes = ret;
                    }
                }
            } finally {
                buf.flip();
            }

            if (readBytes > 0) {
                session.getFilterChain().fireMessageReceived(buf);
                buf = null;

                if (hasFragmentation) {
                    if (readBytes << 1 < config.getReadBufferSize()) {
                        session.decreaseReadBufferSize();
                    } else if (readBytes == config.getReadBufferSize()) {
                        session.increaseReadBufferSize();
                    }
                }
            }
            if (ret < 0) {
                scheduleRemove(session);
            }
        } catch (Throwable e) {
            if (e instanceof IOException) {
                scheduleRemove(session);
            }
            session.getFilterChain().fireExceptionCaught(e);
        }
    }


 

经过对这段代码的分析终于发现问题所在了

大家注意if (readBytes > 0) 这个块下的代码

你不难发现

  

if (hasFragmentation) {
                    if (readBytes << 1 < config.getReadBufferSize()) {
                        session.decreaseReadBufferSize();
                    } else if (readBytes == config.getReadBufferSize()) {
                        session.increaseReadBufferSize();
                    }
                }

 

意思是if hasFragmentation==true

if 当前配置初始化ByteBuffer大小 > 当前读取包的平方 为 true 就把配置中初始化byteBuffer大小减半

else if 当前已读取字节==配置包初始化大小  为true时 把配置中初始化byteBuffer大小加倍 

接下来结合我出错的现象看看

当我接连发几十个小于10byte的包时,这时配置中的初始化ByteBuffer大小就为取小,默认最小为64byte

当我再发一个大于64byte的包,但整个ByteBuffer只有64byte,那就出错了。

接下来我们来修正这个问题

方法一:不要改变默认初始化byteBuffer大小,要修改mina的源码

找到org.apache.mina.transport.socket.nio.NioSocketSession 这个类的METADATA变量

把 new DefaultTransportMetadata()的第四个参数改成false就ok了

方法二:自己写read()方法中得到byteBuffer实例的方法

从read()方法看出,他得到byteBuffer实例是每次去请求的,如果我们在这里做一个cache,每次从cache中得到,自然byteBuffer的大小也是固定的,只要按自己业务最大包大小去开就可以了。

每个线程用一个自己的ByteBuffer实例,这样就不会有同步问题.

找到org.apache.mina.core.polling.AbstractPollingIoProcessor类中的read(T session)方法改成

  

static ThreadLocal readCache=new ThreadLocal();//这个是放ByteBuffer实例的cache

private void read(T session) {

IoBuffer buf=readCache.get();
if(buf==null){
 buf=IoBuffer.allocate(512);//512为包默认大小
 readCache.set(buf);
}else{
 buf.clear();
}

try {
    int readBytes = 0;
    int ret;

    try {
            ret = read(session, buf);
            if (ret > 0) {
                readBytes = ret;
            }
    } finally {
        buf.flip();
    }

    if (readBytes > 0) {
        session.getFilterChain().fireMessageReceived(buf);
    }
    if (ret < 0) {
        scheduleRemove(session);
    }
} catch (Throwable e) {
    if (e instanceof IOException) {
        scheduleRemove(session);
    }
    session.getFilterChain().fireExceptionCaught(e);
}

}

 

另外发现一篇说mina处理断包和粘包处理的

一.  解码方法
mina中有个内置类CumulativeProtocolDecoder是专门用来处理断包和粘包的。该类的api文档中有个实现的例子。
类org.apache.mina.filter.codec.CumulativeProtocolDecoder
public abstract class CumulativeProtocolDecoder extends ProtocolDecoderAdapter {
    private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer");

    public void decode(IoSession session, IoBuffer in,
            ProtocolDecoderOutput out) throws Exception {
        if (!session.getTransportMetadata().hasFragmentation()) {       //用来判断是否还有分帧(断包)
            while (in.hasRemaining()) {
                if (!doDecode(session, in, out)) {
                    break;
                }
            }
            return;
        }
       
      ////处理断包,省略
      ............................................

    }

    //需要实现的方法
    protected abstract boolean doDecode(IoSession session, IoBuffer in,
            ProtocolDecoderOutput out) throws Exception;
}

CumulativeProtocolDecoder是一个抽象类,必须继承并实现其doDecode方法,用户自定义协议的拆分就应该写在doDecode方法中,下面的类MessageDecoder是一个实现的例子。MessageDecoder解码网络数据到一种有两字节长度头的自定义消息协议格式。
/**
 * 断包和粘包处理,处理后的消息为一个或多个完整的数据消息
 * @author blc
 */
public class MessageDecoder extends CumulativeProtocolDecoder {
    /*
     * (non-Javadoc)
     * 
     * @see
     * org.apache.mina.filter.codec.CumulativeProtocolDecoder#doDecode(org.apache
     * .mina.core.session.IoSession, org.apache.mina.core.buffer.IoBuffer,
     * org.apache.mina.filter.codec.ProtocolDecoderOutput)
     */
    @Override
    protected boolean doDecode(IoSession session, IoBuffer in,
            ProtocolDecoderOutput out) throws Exception {
        
        in.order(ServerConfig.ByteEndian);    //字节序, ServerConfig.ByteEndian = ByteOrder.LITTLE_ENDIAN
        
        //消息buf
        IoBuffer buf = IoBuffer.allocate(ServerConfig.MessageMaxByte);   //ServerConfig.MessageMaxByte 最大消息字节数
        buf.order(ServerConfig.ByteEndian);
        
        //考虑以下几种情况:
        //    1. 一个ip包中只包含一个完整消息
        //    2. 一个ip包中包含一个完整消息和另一个消息的一部分
        //    3. 一个ip包中包含一个消息的一部分
        //    4. 一个ip包中包含两个完整的数据消息或更多(循环处理在父类的decode中)

        if (in.remaining() > 1) {
            int length = in.getShort(in.position());
if (length < 4) {
                throw new ServerException("Error net message. (Message Length="+length+")");
            }
            if (length > ServerConfig.MessageMaxByte) {
                throw new ServerException("Error net message. Message Length("+length+") > MessageMaxByte("+ServerConfig.MessageMaxByte+")");
            }
            if (length > in.remaining()) return false;
            //复制一个完整消息
            byte[] bytes = new byte[length];
            in.get(bytes);
            buf.put(bytes);
            
            buf.flip();
            out.write(buf);
            return true;
        } else {
            return false;
        }
    }
}

二.  使用
将上面的解码器作为一个过滤器配置到mina中即可,在spring中的配置方法如下:
    <!-- 协议过滤器,包括解码和译码 -->
    <bean id="protocolCodecFilter" class="org.apache.mina.filter.codec.ProtocolCodecFilter">
        <constructor-arg>
            <bean id="factory" class="server.ClientConnServer.MessageCodecFactory"></bean>
        </constructor-arg>
    </bean>
    <!-- 将协议过滤器配置到mina的过滤链中 -->
    <bean id="filterChainBuilder" class="org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder">
        <property name="filters">
            <map>
                <entry key="protocolCodecFilter" value-ref="protocolCodecFilter" />
            </map>
        </property>
    </bean>
    <!-- 处理器 -->
    <bean id="clientConnHandler" class="server.ClientConnServer.ClientConnHandler" />
    <!-- socket接收器,接收客户端连接 -->
    <bean id="ioAcceptor" class="org.apache.mina.transport.socket.nio.NioSocketAcceptor" destroy-method="unbind">
        <!--        <property name="defaultLocalAddress" value=":161" />-->
        <property name="handler" ref="clientConnHandler" />
        <property name="reuseAddress" value="true" />
        <property name="filterChainBuilder" ref="filterChainBuilder" />
    </bean>


要配置协议过滤器,必须使用一个ProtocolCodecFactory ,下面是简单实现
public class MessageCodecFactory implements ProtocolCodecFactory {
    private final MessageEncoder encoder;
    private final MessageDecoder decoder;
    
    public MessageCodecFactory() {
        encoder = new MessageEncoder();
        decoder = new MessageDecoder();
    }

    /* (non-Javadoc)
     * @see org.apache.mina.filter.codec.ProtocolCodecFactory#getDecoder(org.apache.mina.core.session.IoSession)
     */
    @Override
    public ProtocolDecoder getDecoder(IoSession session) throws Exception {
        return decoder;
    }

    /* (non-Javadoc)
     * @see org.apache.mina.filter.codec.ProtocolCodecFactory#getEncoder(org.apache.mina.core.session.IoSession)
     */
    @Override
    public ProtocolEncoder getEncoder(IoSession session) throws Exception {
        return encoder;
    }
}

/**
 * 译码器,不做任何事情
 */
public class MessageEncoder extends ProtocolEncoderAdapter {

    /* (non-Javadoc)
     * @see org.apache.mina.filter.codec.ProtocolEncoder#encode(org.apache.mina.core.session.IoSession, java.lang.Object, org.apache.mina.filter.codec.ProtocolEncoderOutput)
     */
    @Override
    public void encode(IoSession session, Object message,
            ProtocolEncoderOutput out) throws Exception {
        //Do nothing
    }

}

转自http://blianchen.blog.163.com/blog/static/1310562992010101891522100/

搞定了

ps:不知这个是不是mina的bug,是不是还有别的方法配置的呢???

请教那位兄弟有更好的解决方法.

qq:85529766

转载于:https://my.oschina.net/javagg/blog/2