MongoDB Java Driver 源码分析(9):com.mongodb.DBport
程序员文章站
2022-03-02 16:13:25
...
DBPort 是表示数据库端口的类,分别用 call 和 say 方法实现读取和写入操作。
这两个方法都调用了 go 方法。
DBProt 的 go 方法调用了 OutMessage 的 prepare、pipe 等方法,实际上这些方法又是间接地通过 PoolOutputBuffer 实现的,这将在后面的文章中提到。
另外 DBPort 的 open 方法用于打开数据连接:
这两个方法都调用了 go 方法。
// 读取操作 Response call( OutMessage msg , DBCollection coll ) throws IOException { return go( msg , coll ); } // 写入操作 void say( OutMessage msg ) throws IOException { go( msg , null ); } // 执行操作 private synchronized Response go( OutMessage msg , DBCollection coll ) throws IOException { return go( msg , coll , false ); } // 执行操作 private synchronized Response go( OutMessage msg , DBCollection coll , boolean forceReponse ) throws IOException { // 正在处理请求 if ( _processingResponse ){ if ( coll == null ){ // this could be a pipeline and should be safe } else { // this could cause issues since we're reading data off the wire throw new IllegalStateException( "DBPort.go called and expecting a response while processing another response" ); } } // 增加调用次数计数 _calls++; // _sorket 为空,打开连接 if ( _socket == null ) _open(); if ( _out == null ) throw new IllegalStateException( "_out shouldn't be null" ); try { // 准备消息 msg.prepare(); // 输出 msg.pipe( _out ); if ( _pool != null ) _pool._everWorked = true; if ( coll == null && ! forceReponse ) return null; _processingResponse = true; // 返回结果 return new Response( _sa , coll , _in , _decoder); } catch ( IOException ioe ){ close(); throw ioe; } finally { _processingResponse = false; } }
DBProt 的 go 方法调用了 OutMessage 的 prepare、pipe 等方法,实际上这些方法又是间接地通过 PoolOutputBuffer 实现的,这将在后面的文章中提到。
另外 DBPort 的 open 方法用于打开数据连接:
// 打开连接 boolean _open() throws IOException { long sleepTime = 100; final long start = System.currentTimeMillis(); while ( true ){ IOException lastError = null; try { // 创建 socket 并连接 _socket = new Socket(); _socket.connect( _addr , _options.connectTimeout ); // 设置 socket 参数 _socket.setTcpNoDelay( ! USE_NAGLE ); _socket.setKeepAlive( _options.socketKeepAlive ); _socket.setSoTimeout( _options.socketTimeout ); // 获取输入输出流 _in = new BufferedInputStream( _socket.getInputStream() ); _out = _socket.getOutputStream(); return true; } catch ( IOException ioe ){ // ... } if ( ! _options.autoConnectRetry || ( _pool != null && ! _pool._everWorked ) ) throw lastError; // 超时处理 long sleptSoFar = System.currentTimeMillis() - start; if ( sleptSoFar >= CONN_RETRY_TIME_MS ) throw lastError; if ( sleepTime + sleptSoFar > CONN_RETRY_TIME_MS ) sleepTime = CONN_RETRY_TIME_MS - sleptSoFar; // 等待重试 _logger.severe( "going to sleep and retry. total sleep time after = " + ( sleptSoFar + sleptSoFar ) + "ms this time:" + sleepTime + "ms" ); ThreadUtil.sleep( sleepTime ); sleepTime *= 2; } }
推荐阅读
-
Java并发系列[9]----ConcurrentHashMap源码分析
-
Java并发系列[9]----ConcurrentHashMap源码分析
-
MongoDB Java Driver 源码分析
-
MongoDB Java Driver 源码分析(9):com.mongodb.DBport
-
MongoDB Java Driver 源码分析(11):GridFS 类
-
MongoDB Java Driver 源码分析(13):OutputBuffer,BasicOutputBuffer 和 PoolOutputBuffer
-
MongoDB Java Driver 源码分析(1):Package 概述
-
MongoDB Java Driver 源码分析(12):GridFSFile、GridFSDBFile 和 GridFSInputFile
-
MongoDB Java Driver 源码分析(5):com.mongodb.DB
-
MongoDB Java Driver 源码分析(8):com.mongodb.RelicaSetStatus