欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

DataNode DataXceiverServer readBlock详解

程序员文章站 2022-07-11 23:46:28
...

       鉴于在hdfs客户端读取hdfs文件过程中,其在获取到数据块所在的DataNode之后,会构造blockReader对象用来从指定数据节点上读取数据块;其中RemoteBlockReader2是使用socket连接从datanode读取数据块的实现类,其reader.read()方法用于从socket stream中读取对应的数据包。接下来将分析一下该处对应的DataNode节点上是如何响应该请求的:

       在介绍如何响应之前,先简单介绍一下DataNode进程在启动中所开启的一些基本服务如下(其启动源码在DataNode.main()方法中,不过多赘述,后续会写一篇blog来介绍具体的启动过程):

  1. DataNode.startDataNode():
    1. 初始化DataStorage对象
    2. 初始化DataXceiverServer对象
    3. 启动HttpInfoServer对象
    4. 初始化DataNode的IPC Server对象
    5. 创建BlockPoolManager对象
  2. DataNode.runDatanodeDaemon():
    1. 启动BlockPoolManager所管理的所有线程
    2. 启动dataXceiverServer线程
    3. 启动DataNode的IPC Server

其中主要用于响应客户端流式接口请求的服务就是DataXceiverServer服务线程;其基本逻辑接口图如下:

DataNode DataXceiverServer readBlock详解

接下来一步步来看DataXceiverServer是如何启动并对外提供服务的:

1、DataXceiverServer的初始化

       在数据节点DataNode进程启动的startDataNode()方法中,会调用initDataXceiver()方法,完成DataXceiverServer的初始化;首先其会创建tcpPeerServer对象(对ServerSocket的封装),其能够通过accept()方法返回Peer对象(对Socket的封装)用于提供输入输出流。并针对短路读提供domainPeerServer对于本地短路读请求;其源码如下:

  private void initDataXceiver(Configuration conf) throws IOException {
    // find free port or use privileged port provided
    // 对ServerSocket的封装
    TcpPeerServer tcpPeerServer;
    if (secureResources != null) {
      tcpPeerServer = new TcpPeerServer(secureResources);
    } 
    // .........
    // 设置tcp接受缓冲区  并绑定对应InetSocketAddress
    tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
    streamingAddr = tcpPeerServer.getStreamingAddr();
    LOG.info("Opened streaming server at " + streamingAddr);
    this.threadGroup = new ThreadGroup("dataXceiverServer");

    // 构造DataXceiverServer对象
    xserver = new DataXceiverServer(tcpPeerServer, conf, this);
    this.dataXceiverServer = new Daemon(threadGroup, xserver);
    this.threadGroup.setDaemon(true); // auto destroy when empty

    // 针对短路读情况,构造localDataXceiverServer 用于本地进程通信
    if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
              DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT) ||
        conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
              DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
      DomainPeerServer domainPeerServer =
                getDomainPeerServer(conf, streamingAddr.getPort());
      if (domainPeerServer != null) {
        this.localDataXceiverServer = new Daemon(threadGroup,
            new DataXceiverServer(domainPeerServer, conf, this));
        LOG.info("Listening on UNIX domain socket: " +
            domainPeerServer.getBindPath());
      }
    }
    this.shortCircuitRegistry = new ShortCircuitRegistry(conf);
  }

之后其便会通过DataNode.runDatanodeDaemon();启动dataXceiverServer线程;其run()方法的基本逻辑和源码如下:

  1. while循环,等待阻塞TcpPeerServer(也就是ServerSocket)的accept()方法,直到接收到客户端或者其他DataNode的连接请求;
  2. 获得peer,即Socket的封装;
  3. 判断当前DataNode上DataXceiver线程数量是否超过阈值,如果超过的话,直接抛出IOException,利用IOUtils的cleanup()方法关闭peer后继续循环,否则继续4;
  4. 创建一个后台线程DataXceiver,并将其加入到datanode的线程组threadGroup中,并启动该线程,响应数据读写请求;

通过该方法可以知道dataXceiverServer只负责连接的建立以及构造并启动DataXceiver,流式接口的请求则是由DataXceiver响应的,所有的输入输出流的操作都是由DataXceiver来执行的;(这种连接建立和响应分离的设计方式在hadoop rpc处也出现过)

  @Override
  // 核心方法
  public void run() {
    Peer peer = null;
    // 如果标志位shouldRun为true,且没有为升级而执行shutdown
    while (datanode.shouldRun && !datanode.shutdownForUpgrade) {
      try {
    	// 阻塞,直到接收到客户端或者其他DataNode的连接请求
        peer = peerServer.accept();
 
        // Make sure the xceiver count is not exceeded
        // 确保DataXceiver数目没有超过最大限制
        /**
         * DataNode的getXceiverCount方法计算得到,返回线程组的活跃线程数目
         * threadGroup == null ? 0 : threadGroup.activeCount();
         */
        int curXceiverCount = datanode.getXceiverCount();
        if (curXceiverCount > maxXceiverCount) {
          throw new IOException("Xceiver count " + curXceiverCount
              + " exceeds the limit of concurrent xcievers: "
              + maxXceiverCount);
        }
 
        // 创建一个后台线程,DataXceiver,并加入到线程组datanode.threadGroup
        new Daemon(datanode.threadGroup,
            DataXceiver.create(peer, datanode, this))
            .start();
      } catch (SocketTimeoutException ignored) {
        // wake up to see if should continue to run
    	// 等待唤醒看看是否能够继续运行
      } catch (AsynchronousCloseException ace) {// 异步的关闭异常
    	// 正如我们所预料的,只有在关机的过程中,通过其他线程关闭我们的侦听套接字,其他情况下则不会发生
        if (datanode.shouldRun && !datanode.shutdownForUpgrade) {
          LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ace);
        }
      } catch (IOException ie) {
        IOUtils.cleanup(null, peer);
        LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ie);
      } catch (OutOfMemoryError ie) {
        IOUtils.cleanup(null, peer);
        // 数据节点可能由于存在太多的数据传输导致内存溢出,记录该事件,并等待30秒,其他的数据传输可能到时就完成了
        LOG.warn("DataNode is out of memory. Will retry in 30 seconds.", ie);
        try {
          Thread.sleep(30 * 1000);
        } catch (InterruptedException e) {
          // ignore
        }
      } catch (Throwable te) {
        LOG.error(datanode.getDisplayName()
            + ":DataXceiverServer: Exiting due to: ", te);
        datanode.shouldRun = false;
      }
    }
 
    // Close the server to stop reception of more requests.
    // 关闭服务器停止接收更多请求
    try {
      peerServer.close();
      closed = true;
    } catch (IOException ie) {
      LOG.warn(datanode.getDisplayName()
          + " :DataXceiverServer: close exception", ie);
    }
 
    // if in restart prep stage, notify peers before closing them.
    // 如果在重新启动前准备阶段,在关闭前通知peers
    if (datanode.shutdownForUpgrade) {
      restartNotifyPeers();
      LOG.info("Shutting down DataXceiverServer before restart");
      // Allow roughly up to 2 seconds.
      for (int i = 0; getNumPeers() > 0 && i < 10; i++) {
        try {
          Thread.sleep(200);
        } catch (InterruptedException e) {
          // ignore
        }
      }
    }
    // Close all peers.
    // 关闭所有的peers
    closeAllPeers();
  }

2、DataXceiver流式请求的读写

       DataXceiver是一个线程类,其运行的run方法的主要功能是读取请求的类型,根据类型执行相应的操作,我们主要分析读和写数据块相关的方法:

  /**
   * Read/write data from/to the DataXceiverServer.
   */
  @Override
  public void run() {
    int opsProcessed = 0;
    Op op = null;
	
    try {
	  // 先根据传入的peer对象获取对应的输入/输出流,并对流进行装饰 
      dataXceiverServer.addPeer(peer, Thread.currentThread(), this);
      peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
      InputStream input = socketIn;
      try {
        IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
          socketIn, datanode.getXferAddress().getPort(),
          datanode.getDatanodeId());
        input = new BufferedInputStream(saslStreams.in,
          HdfsConstants.SMALL_BUFFER_SIZE);
        socketOut = saslStreams.out;
      } catch (InvalidMagicNumberException imne) {
        // ......
        return;
      }
      super.initialize(new DataInputStream(input));
      
      // We process requests in a loop, and stay around for a short timeout.
      // This optimistic behaviour allows the other end to reuse connections.
      // Setting keepalive timeout to 0 disable this behavior.
	  // DataXceiver主体循环
      do {
        updateCurrentThreadName("Waiting for operation #" + (opsProcessed + 1));
        try {
          if (opsProcessed != 0) {
            assert dnConf.socketKeepaliveTimeout > 0;
            peer.setReadTimeout(dnConf.socketKeepaliveTimeout);
          } else {
            peer.setReadTimeout(dnConf.socketTimeout);
          }
		  // 调用Receiver.readOp()从输入流中解析操作符
          op = readOp();
        } catch (InterruptedIOException ignored) {
          // ......
        }
        // ......
		
        opStartTime = now();
        processOp(op); // 调用processOp()处理该流式请求操作
        ++opsProcessed;
      } while ((peer != null) &&
          (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0));
    } catch (Throwable t) {
      // ......
    }
  }

其主要的工作流程是:从IO中读取流式接口请求并解析处对应的操作符,并调用processOp()处理该流式请求操作,该方法会根据Op调用DataXceiver对应的处理方法:

  /** Read an Op.  It also checks protocol version. */
  protected final Op readOp() throws IOException {
    final short version = in.readShort();
    if (version != DataTransferProtocol.DATA_TRANSFER_VERSION) {
      throw new IOException( "Version Mismatch (Expected: " +
          DataTransferProtocol.DATA_TRANSFER_VERSION  +
          ", Received: " +  version + " )");
    }
    return Op.read(in);
  }

  /** Process op by the corresponding method. */
  protected final void processOp(Op op) throws IOException {
    switch(op) {
    case READ_BLOCK:
      opReadBlock();
      break;
    case WRITE_BLOCK:
      opWriteBlock(in);
      break;
    case REPLACE_BLOCK:
      opReplaceBlock(in);
      break;
    case COPY_BLOCK:
      opCopyBlock(in);
      break;
    case BLOCK_CHECKSUM:
      opBlockChecksum(in);
      break;
    case TRANSFER_BLOCK:
      opTransferBlock(in);
      break;
    case REQUEST_SHORT_CIRCUIT_FDS:
      opRequestShortCircuitFds(in);
      break;
    case RELEASE_SHORT_CIRCUIT_FDS:
      opReleaseShortCircuitFds(in);
      break;
    case REQUEST_SHORT_CIRCUIT_SHM:
      opRequestShortCircuitShm(in);
      break;
    default:
      throw new IOException("Unknown op " + op + " in data stream");
    }
  }

3、数据块读取readBlock():

        我们知道在client读取hdfs文件数据块的时候,会在构造blockReader的时候调用

new Sender(out).readBlock(block, blockToken, clientName, startOffset, len, verifyChecksum, cachingStrategy);

向目标DataNode发送对应数据块的Op.READ_BLOCK操作码,DataReceiver在接收到来自客户端的Op.READ_BLOCK读数据块操作码后,会调用DataXceiver.readBlock()响应这个读请求。其调用流程如下:

DataNode DataXceiverServer readBlock详解

DataXceiver.readBlock()首先会向客户端回复一个BlockOpResponseProto响应,表示当前请求DataXceiver已经成功接收,并通过BlockOpResponseProto告知Client客户端当前DataNode所使用的校验方式。接下来便会将数据块block切分成若干个数据包packet,然后依次将数据包发送至客户端。客户端在接收到每个数据包packet时会进行校验,并将校验结果发送给DataNode;其基本的读取流程如下:

DataNode DataXceiverServer readBlock详解

DataXceiver.readBlock()方法的基本流程如下:

  1. 创建BlockSender对象,首先调用getOutputStream()获取DataNode连接到客户端的IO流,并创建构造BlockSender对象;
  2. 之后便调用writeSuccessWithChecksumInfo()向客户端发送BlockOpResponseProto响应,告知客户端读请求已经接收,并告知客户端当前节点的校验信息;
  3. 之后便调用blockSender.sendBlock()方法将数据块按照数据包packet的形式发送给客户端;
  4. 当blockSender完成发送数据块数据包后,客户端会响应一个ReadStatus状态码告知DataNode;

DataNode DataXceiverServer readBlock详解

blockSender.sendBlock()方法会将数据块按照一定的组织格式发送到接收方,其发送数据的格式如下:

DataNode DataXceiverServer readBlock详解

BlockSender发送的数据格式包括两部分:校验信息头(ChecksumHeader)和数据包序列(packets)

  1. ChecksumHeader:用于描述当前DataNode使用的校验方式等信息。一个校验头信息也包括2个部分:
    1. CHECKSUM_TYPE:数据校验类型:包括三种校验—空校验,CRC32以及CRC32C,在这里使用1 byte描述数据校验类型,空校验,CRC32以及CRC32C,分别对应着0,1,2
    2. BYTES_PER_CHECKSUM:校验块大小:也就是多少字节数据产生一个校验值。在这里CRC32为例,一把情况下是512字节数据产生一个4字节的checksum,我们把这512字节的数据叫做一个校验块(Chunk),chunk是HDFS读写数据块操作的最小单元
  2. 数据包序列(packets):BlockSender会将数据块切分成若干数据包对外发送,当数据发送完毕,会以一个空的数据包作为结束。每一个数据包包括一个变长的包头,校验数据和若干字节的实际数据
    1. 数据包头:用于描述当前数据包信息,是通过PtotoBuf序列化的包括4字节的全包长度,以及2字节的包头长度;其数据包信息如下:
      1. 当前数据包在整个数据块中的位置
      2. 数据包在管道中的***
      3. 当前数据包是不是数据块中的最后一个数据包
      4. 当前数据包数据部分的长度
      5. 是否需要DN同步
    2. 校验数据:校验数据是对实际数据做校验操作产生的,它将实际数据以校验块为单位,每一个校验块产生一个checksum,校验数据中包含了所有校验块的checksum.校验数据的大小=(实际数据长度+校验块大小)/ 校验块大小 *校验和长度
    3. 实际数据:数据包中的实际数据就是数据块文件中保存的数据,实际数据的传输是以校验块为单位的,一个校验块对应产生一个checksum的实际数据。在数据包中会将校验块和校验数据分开发送,首先将所有校验块的校验数据发送出去,然后再发所有的校验块

BlockSender中的数据块发送过程主要包括:1、发送准备;2、发送数据块;3、清理工作

1、发送准备:主要是根据参数进行 是否需要验证校验数据、是否开启transferTo模式、从Meta文件中获取当前数据块的校验算法、校验和长度,以及多少字节产生一个校验值、寻找正确的offset等判断;其源码主要在BlockSender的构造函数当中;

2、发送数据块:BlockSender.sendBlock()用读取数据以及校验和,并将它们发送到接收方。整体流程步骤如下:

  1. 在开始读文件时,会触发一次预读取,也就是将数据缓存到操作系统缓冲区中;
  2. 构造pktBuf缓冲区,也即是能容纳一个数据包的缓冲区;
  3. 循环调用sendPacket()发送数据包序列,直到整个数据块发送完毕;
  4. 发送一个空的数据包来标识数据块的结束;
  5. 完成数据包发送过程之后,调用close()方法关闭数据块以及校验文件;

sendBlock()以及sendPacket()源码如下:

  long sendBlock(DataOutputStream out, OutputStream baseStream, 
                 DataTransferThrottler throttler) throws IOException {
    // Trigger readahead of beginning of file if configured.
    // 数据预读取至缓存
    manageOsCache();

    final long startTime = ClientTraceLog.isDebugEnabled() ? System.nanoTime() : 0;
    // 构造存放数据包的缓冲区
    try {
      int maxChunksPerPacket;
      int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;
      boolean transferTo = transferToAllowed && !verifyChecksum
          && baseStream instanceof SocketOutputStream
          && blockIn instanceof FileInputStream;
      if (transferTo) {
        FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
        blockInPosition = fileChannel.position();
        streamForSendChunks = baseStream;
        maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);
        
        // Smaller packet size to only hold checksum when doing transferTo
        pktBufSize += checksumSize * maxChunksPerPacket;
      } else {
        maxChunksPerPacket = Math.max(1,
            numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE));
        // Packet size includes both checksum and data
        // 缓冲区存放checksum 和 data
        pktBufSize += (chunkSize + checksumSize) * maxChunksPerPacket;
      }
      // 构造缓冲区pktBuf 
      ByteBuffer pktBuf = ByteBuffer.allocate(pktBufSize);
      
      // 循环调用sendPacket()发送数据包packet
      while (endOffset > offset && !Thread.currentThread().isInterrupted()) {
        manageOsCache();
        long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,
            transferTo, throttler);
        offset += len;
        totalRead += len + (numberOfChunks(len) * checksumSize);
        seqno++;
      }
      // If this thread was interrupted, then it did not send the full block.
      if (!Thread.currentThread().isInterrupted()) {
        try {
          // send an empty packet to mark the end of the block
          // 发送一个空的数据包来标识数据块的结束
          sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo,
              throttler);
          out.flush();
        } catch (IOException e) { //socket error
          throw ioeToSocketException(e);
        }

        sentEntireByteRange = true;
      }
    } finally {
      if ((clientTraceFmt != null) && ClientTraceLog.isDebugEnabled()) {
        final long endTime = System.nanoTime();
        ClientTraceLog.debug(String.format(clientTraceFmt, totalRead,
            initialOffset, endTime - startTime));
      }
      close();
    }
    return totalRead;
  }
private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
					   boolean transferTo, DataTransferThrottler throttler) throws IOException {
	int dataLen = (int) Math.min(endOffset - offset, (chunkSize * (long) maxChunks));

	// 数据包中包含多少校验块
	int numChunks = numberOfChunks(dataLen);
	// 校验数据长度
	int checksumDataLen = numChunks * checksumSize;
	// 数据包长度
	int packetLen = dataLen + checksumDataLen + 4;
	boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;
	// 将数据包头写入缓存
	int headerLen = writePacketHeader(pkt, dataLen, packetLen);
	// 数据包头在缓存中的位置
	int headerOff = pkt.position() - headerLen;
	// 校验数据在缓存中的位置
	int checksumOff = pkt.position();
	byte[] buf = pkt.array();
	if (checksumSize > 0 && checksumIn != null) {
		// 校验数据写入缓存
		readChecksum(buf, checksumOff, checksumDataLen);
		// write in progress that we need to use to get lastchecksu
		if (lastDataPacket && lastChunkChecksum != null) 
			int start = checksumOff + checksumDataLen - checksumSize;
			byte[] updatedChecksum = lastChunkChecksum.getChecksum();
			if (updatedChecksum != null) {
				System.arraycopy(updatedChecksum, 0, buf, start, checksumSize);
			}
		}
	}
	int dataOff = checksumOff + checksumDataLen;
	// 在普通模式下下将数据写入缓存
	if (!transferTo) { // normal transfer
		IOUtils.readFully(blockIn, buf, dataOff, dataLen);
		// 确认校验和数据
		if (verifyChecksum) {
			verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
		}
	}
	try {
		if (transferTo) {
			SocketOutputStream sockOut = (SocketOutputStream) out;
			// 首先将头和校验和数据写入缓存
			sockOut.write(buf, headerOff, dataOff - headerOff);
			// 使用transfer方式,将数据通过0拷贝的方式写入IO流
			FileChannel fileCh = ((FileInputStream) blockIn).getChannel();
			LongWritable waitTime = new LongWritable();
			LongWritable transferTime = new LongWritable();
			sockOut.transferToFully(fileCh, blockInPosition, dataLen, waitTime, transferTime);
			datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
			datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
			blockInPosition += dataLen;
		} else {
			// 普通模式下数据写入IO流
			out.write(buf, headerOff, dataOff + dataLen - headerOff);
		}
	} catch (IOException e) {
		if (e instanceof SocketTimeoutException) 
		} else {
			String ioem = e.getMessage();
			if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connectionreset")) {
				LOG.error("BlockSender.sendChunks()exception: ", e);
			}
			datanode.getBlockScanner().markSuspectBlock(
					volumeRef.getVolume().getStorageID(),
					block);
		}
		throw ioeToSocketException(e);
	}
	if (throttler != null) { // rebalancing so throttle
		throttler.throttle(packetLen);
	}
	return dataLen;
}

3、清理工作:主要是关闭对应的数据流:

   checksumIn.close(); // close checksum file
   blockIn.close(); // close data file