欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MongoDB Java Driver 源码分析(9):com.mongodb.DBport

程序员文章站 2022-03-02 16:13:25
...
  DBPort 是表示数据库端口的类,分别用 call 和 say 方法实现读取和写入操作。
  这两个方法都调用了 go 方法。
    // 读取操作
    Response call( OutMessage msg , DBCollection coll )
        throws IOException {
        return go( msg , coll );
    }

    // 写入操作
    void say( OutMessage msg )
        throws IOException {
        go( msg , null );
    }

    // 执行操作
    private synchronized Response go( OutMessage msg , DBCollection coll )
        throws IOException {
        return go( msg , coll , false );
    }

    // 执行操作
    private synchronized Response go( OutMessage msg , DBCollection coll , boolean forceReponse )
        throws IOException {

        // 正在处理请求
        if ( _processingResponse ){
            if ( coll == null ){
                // this could be a pipeline and should be safe
            }
            else {
                // this could cause issues since we're reading data off the wire
                throw new IllegalStateException( "DBPort.go called and expecting a response while processing another response" );
            }
        }

        // 增加调用次数计数
        _calls++;
    
        // _sorket 为空,打开连接
        if ( _socket == null )
            _open();
        
        if ( _out == null )
            throw new IllegalStateException( "_out shouldn't be null" );

        try {
            // 准备消息
            msg.prepare();

            // 输出
            msg.pipe( _out );
            
            if ( _pool != null )
                _pool._everWorked = true;
            
            if ( coll == null && ! forceReponse )
                return null;
            
            _processingResponse = true;

            // 返回结果
            return new Response( _sa , coll , _in , _decoder);
        }
        catch ( IOException ioe ){
            close();
            throw ioe;
        }
        finally {
            _processingResponse = false;
        }
    }

  DBProt 的 go 方法调用了 OutMessage 的 prepare、pipe 等方法,实际上这些方法又是间接地通过 PoolOutputBuffer 实现的,这将在后面的文章中提到。

  另外 DBPort 的 open 方法用于打开数据连接:
    // 打开连接
    boolean _open()
        throws IOException {
        
        long sleepTime = 100;

        final long start = System.currentTimeMillis();
        while ( true ){
            
            IOException lastError = null;

            try {
                // 创建 socket 并连接
                _socket = new Socket();
                _socket.connect( _addr , _options.connectTimeout );
                
                // 设置 socket 参数
                _socket.setTcpNoDelay( ! USE_NAGLE );
                _socket.setKeepAlive( _options.socketKeepAlive );
                _socket.setSoTimeout( _options.socketTimeout );

                // 获取输入输出流
                _in = new BufferedInputStream( _socket.getInputStream() );
                _out = _socket.getOutputStream();
                return true;
            }
            catch ( IOException ioe ){
               // ...
            }
            
            if ( ! _options.autoConnectRetry || ( _pool != null && ! _pool._everWorked ) )
                throw lastError;


            // 超时处理
            
            long sleptSoFar = System.currentTimeMillis() - start;

            if ( sleptSoFar >= CONN_RETRY_TIME_MS )
                throw lastError;
            
            if ( sleepTime + sleptSoFar > CONN_RETRY_TIME_MS )
                sleepTime = CONN_RETRY_TIME_MS - sleptSoFar;

            //  等待重试
            _logger.severe( "going to sleep and retry.  total sleep time after = " + ( sleptSoFar + sleptSoFar ) + "ms  this time:" + sleepTime + "ms" );
            ThreadUtil.sleep( sleepTime );
            sleepTime *= 2;
        }
    }