欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MongoDB Java Driver 源码分析(6):com.mongodb.DBTCPConnector

程序员文章站 2022-04-26 12:27:33
...
  DBTCPConnecror 是对 DBPort 类的封装,借助 DBPort 实现读写操作、获取服务器状态等。
say 方法和 call 方法

  DBTCPConnecror 类中比较值得分析的是 say 方法和 call 方法的实现:
// 执行写操作
WriteResult say( DB db , OutMessage m , WriteConcern concern , ServerAddress hostNeeded )
// 执行读操作
Response call( DB db , DBCollection coll , OutMessage m , ServerAddress hostNeeded , int retries )

  这两个方法的实现方式相似,实际上都是通过 DBPort 实现,同时增加了检查连接和错误处理的代码。
  这里仅以 say 方法的实现作为例子进行说明:
    public WriteResult say( DB db , OutMessage m , WriteConcern concern , ServerAddress hostNeeded )
        throws MongoException {

        // 检查数据库连接
        _checkClosed();
        checkMaster( false , true );

        // 获取数据连接端口
        MyPort mp = _myPort.get();
        DBPort port = mp.get( true , false , hostNeeded );

        try {
            // 检查权限
            port.checkAuth( db );

            // 通过 DBPort 实现写操作
            port.say( m );

            // 检查错误或返回正确的结果
            if ( concern.callGetLastError() ){
                return _checkWriteError( db , mp , port , concern );
            }
            else {
                return new WriteResult( db , port , concern );
            }
        }
        catch (...){
            // ...
        }
        finally {
            // 结束操作
            mp.done( port );
            m.doneWithMessage();
        }
    }

  say 和 call 方式是借助 DBPort  实现的,而 DBPort 对象是通过内部类 DBTCPConnector.MyPort 的 get 方法获取的:
        // 获取数据库端口
        DBPort get( boolean keep , boolean slaveOk , ServerAddress hostNeeded ){

            // 如果指定了服务器,就获取指定服务器的端口
            if ( hostNeeded != null ){
                // asked for a specific host
                return _portHolder.get(hostNeeded ).get();
            }

            // 在一个请求中,如果已经使用了一个端口,则继续使用它
            if ( _requestPort != null ){
                if ( _requestPort.getPool() == _masterPortPool || !keep ) {
                    return _requestPort;
                }

                _requestPort.getPool().done(_requestPort);
                _requestPort = null;
            }

            //  集群部署,可以从 Slave 获取端口
            if ( slaveOk && _rsStatus != null ){
                // if slaveOk, try to use a secondary
                ServerAddress slave = _rsStatus.getASecondary();
                if ( slave != null ){
                    return _portHolder.get( slave ).get();
                }
            }

            //  master 端口池为空,抛异常
            if (_masterPortPool == null) {
                throw new MongoException("Rare case where master=null, probably all servers are down");
            }

            // 通过 master 端口池获取端口
            DBPort p = _masterPortPool.get();
            if ( keep && _inRequest ) {
                _requestPort = p;
            }

            return p;
        }

其他方法

  getServerAddressList (获取服务器地址列表),checkMaster(检查 master 服务器)和 fetchMaxBsonObjectSize (获取 maxBsonObjectSize) 也有一定的分析价值:
    // 获取服务器地址列表
    public List<ServerAddress> getServerAddressList() {
        // 如果是集群方式,则通过 ReplicaSetStatus 返回地址列表
        if (_rsStatus != null) {
            return _rsStatus.getServerAddressList();
        }

        // 否则,返回 master 服务器的地址
        ServerAddress master = getAddress();
        if (master != null) {
            List<ServerAddress> list = new ArrayList<ServerAddress>();
            list.add(master);
            return list;
        }
        return null;
    }

    // 检查 master 服务器
    void checkMaster( boolean force , boolean failIfNoMaster )
        throws MongoException {
        
        // 检查是集群部署还是单机部署
        if ( _rsStatus != null ){
            if ( _masterPortPool == null || force ){

                // 集群部署
                // 通过 ReplicaSetStatus 获取 master 节点
                ReplicaSetStatus.Node n = _rsStatus.ensureMaster();
                if ( n == null ){
                    if ( failIfNoMaster )
                        throw new MongoException( "can't find a master" );
                }
                else {
                    // 根据 master 节点的信息设置 DBTCPConnector 的属性
                    _set( n._addr );
                    maxBsonObjectSize = _rsStatus.getMaxBsonObjectSize();
                }
            }
        } else {
            // 单机部署
            // 根据服务器信息设置 maxBsonObjectSize
            if (maxBsonObjectSize == 0)
                    maxBsonObjectSize = fetchMaxBsonObjectSize();
        }
    }

    // 获取并设置 maxBsonObjectSize
    int fetchMaxBsonObjectSize() {
        if (_masterPortPool == null)
            return 0;
        DBPort port = _masterPortPool.get();
        try {
            // 连接 admin 数据库,执行检查 master 服务器的命令
            CommandResult res = port.runCommand(_mongo.getDB("admin"), new BasicDBObject("isMaster", 1));
 
           // 1.8 之后的版本返回的结果中包含 maxBsonObjectSize
            if (res.containsField("maxBsonObjectSize")) {
                maxBsonObjectSize = ((Integer) res.get("maxBsonObjectSize")).intValue();
            } else {
                maxBsonObjectSize = Bytes.MAX_OBJECT_SIZE;
            }
        } catch (Exception e) {
            _logger.log(Level.WARNING, null, e);
        } finally {
            port.getPool().done(port);
        }
        return maxBsonObjectSize;
    }