hadoop的hdfs的namnode源码分析(一)
RandomAccessFile
此类的实例支持对随机访问文件的读取和写入。随机访问文件的行为类似存储在文件系统中的一个大型 byte 数组。存在指向该隐含数组的光标或索引,称为文件指针;输入操作从文件指针开始读取字节,并随着对字节的读取而前移此文件指针。如果随机访问文件以读取/写入模式创建,则输出操作也可用;输出操作从文件指针开始写入字节,并随着对字节的写入而前移此文件指针。写入隐含数组的当前末尾之后的输出操作导致该数组扩展。该文件指针可以通过 getFilePointer 方法读取,并通过 seek 方法设置。
通常,如果此类中的所有读取例程在读取所需数量的字节之前已到达文件末尾,则抛出 EOFException(是一种 IOException)。如果由于某些原因无法读取任何字节,而不是在读取所需数量的字节之前已到达文件末尾,则抛出 IOException,而不是 EOFException。需要特别指出的是,如果流已被关闭,则可能抛出 IOException。
protected void writeCorruptedData(RandomAccessFile file) throws IOException { final String messageForPreUpgradeVersion = "\nThis file is INTENTIONALLY CORRUPTED so that versions\n" + "of Hadoop prior to 0.13 (which are incompatible\n" + "with this directory layout) will fail to start.\n"; file.seek(0); file.writeInt(FSConstants.LAYOUT_VERSION); org.apache.hadoop.hdfs.DeprecatedUTF8.writeString(file, ""); file.writeBytes(messageForPreUpgradeVersion); file.getFD().sync(); }
NameNode 目录结构
data
|
dfs
| |
name1 name2
| | | ...
current image in_use.lock
| | | |
fsimage fstime VERSION fsimage
NameNode 的格式化(NameNode.format)
FSImage.format(Storage$StorageDirectory) line: 1431 FSImage.format() line: 1444 NameNode.format(Configuration, boolean) line: 1242 NameNode.format(Configuration) line: 186
格式步骤
a).data/dfs/name1/current删除 , 并创建data/dfs/name1/current
b).创建镜像文件 data/dfs/name1/current/fsimage
保存文件树从根开始镜像。这是一个递归的过程,首先保存当前目录的所有子目录,然后子目录里面的子目录。
/**
* Save the contents of the FS image to the file. */ void saveFSImage(File newFile) throws IOException { FSNamesystem fsNamesys = getFSNamesystem(); FSDirectory fsDir = fsNamesys.dir; long startTime = FSNamesystem.now(); // // Write out data // DataOutputStream out = new DataOutputStream( new BufferedOutputStream( new FileOutputStream(newFile))); try { out.writeInt(FSConstants.LAYOUT_VERSION); out.writeInt(namespaceID); out.writeLong(fsDir.rootDir.numItemsInTree()); out.writeLong(fsNamesys.getGenerationStamp()); byte[] byteStore = new byte[4*FSConstants.MAX_PATH_LENGTH]; ByteBuffer strbuf = ByteBuffer.wrap(byteStore); // save the root saveINode2Image(strbuf, fsDir.rootDir, out); // save the rest of the nodes saveImage(strbuf, 0, fsDir.rootDir, out); fsNamesys.saveFilesUnderConstruction(out); fsNamesys.saveSecretManagerState(out); strbuf = null; } finally { out.close(); } LOG.info("Image file of size " + newFile.length() + " saved in " + (FSNamesystem.now() - startTime)/1000 + " seconds."); }
相关算法
生成NameSpaceID的算法
FSImage .new
private int newNamespaceID() { Random r = new Random(); r.setSeed(FSNamesystem.now()); int newID = 0; while(newID == 0) newID = r.nextInt(0x7FFFFFFF); // use 31 bits only return newID; }
c).创建编辑日志
data/dfs/name1/current/edits
EditLogFileOutputStream .EditLogFileOutputStream
/** * 创建一个输出缓存和文件对象 * Creates output buffers and file object. */ EditLogFileOutputStream(File name, int size) throws IOException { super(); file = name; bufCurrent = new DataOutputBuffer(size); bufReady = new DataOutputBuffer(size); RandomAccessFile rp = new RandomAccessFile(name, "rw"); fp = new FileOutputStream(rp.getFD()); // open for append fc = rp.getChannel(); fc.position(fc.size()); }
FSEditLog .createEditLogFile
synchronized void createEditLogFile(File name) throws IOException { waitForSyncToFinish(); EditLogOutputStream eStream = new EditLogFileOutputStream(name, sizeOutputFlushBuffer); eStream.create(); eStream.close(); }
EditLogFileOutputStream .create()
/** * Create empty edits logs file. */ @Override void create() throws IOException { fc.truncate(0); fc.position(0); bufCurrent.writeInt(FSConstants.LAYOUT_VERSION); /** * All data that has been written to the stream so far will be flushed. New * data can be still written to the stream while flushing is performed. 到目前为止已写入流的所有数据将被刷新。当刷新的时候,新数据 仍可写入流。 */ setReadyToFlush(); flush(); }
堆栈
EditLogFileOutputStream.flushAndSync() line: 141 EditLogFileOutputStream(EditLogOutputStream).flush() line: 83 EditLogFileOutputStream.create() line: 100 FSEditLog.createEditLogFile(File) line: 221 FSImage.saveCurrent(Storage$StorageDirectory) line: 1353 FSImage.format(Storage$StorageDirectory) line: 1428
EditLogFileOutputStream
.flushAndSync
/**
* Flush ready buffer to persistent store. currentBuffer is not flushed as it * accumulates new log records while readyBuffer will be flushed and synced. *刷新准备缓冲区的持久性存储。 currentBuffer是不会刷新的,因为它积累了新的日志记录,同时readyBuffer会被刷 新,同步。 */ @Override protected void flushAndSync() throws IOException { preallocate(); // preallocate file if necessary bufReady.writeTo(fp); // write data to file bufReady.reset(); // erase all data in the buffer fc.force(false); // metadata updates not needed because of preallocation fc.position(fc.position() - 1); // skip back the end-of-file marker }
d).保存版本日志
StorageDirectory .write
/* Write version file. 写 版本文件 * * @throws IOException */ public void write() throws IOException { corruptPreUpgradeStorage(root); write(getVersionFile()); }
方法
corruptPreUpgradeStorage(root)
目录data/dfs/name1/image
文件data/dfs/name1/image/fsimage
写版本文件
public void write(File to) throws IOException { Properties props = new Properties(); setFields(props, this); RandomAccessFile file = new RandomAccessFile(to, "rws"); FileOutputStream out = null; try { file.seek(0); out = new FileOutputStream(file.getFD()); /* * If server is interrupted before this line, * the version file will remain unchanged. */ props.store(out, null); /* * Now the new fields are flushed to the head of the file, but file * length can still be larger then required and therefore the file can * contain whole or corrupted fields from its old contents in the end. * If server is interrupted here and restarted later these extra fields * either should not effect server behavior or should be handled * by the server correctly. */ file.setLength(out.getChannel().position()); } finally { if (out != null) { out.close(); } file.close(); } }
写版本文件的内容
/** * Set common storage fields. * Should be overloaded if additional fields need to be set. * * @param props * @throws IOException */ protected void setFields(Properties props, StorageDirectory sd ) throws IOException { props.setProperty("layoutVersion", String.valueOf(layoutVersion)); props.setProperty("storageType", storageType.toString()); props.setProperty("namespaceID", String.valueOf(namespaceID)); props.setProperty("cTime", String.valueOf(cTime)); }
备注:
/* Lock storage to provide exclusive access. * 磁盘提供独占锁,不是又有文件系统支持独占锁,例如NFS(Network File System的简写,即网络文件系统 .) * <p> Locking is not supported by all file systems. * E.g., NFS does not consistently support exclusive locks. * * <p> If locking is supported we guarantee exculsive access to the * storage directory. Otherwise, no guarantee is given. * * @throws IOException if locking fails */ public void lock() throws IOException { this.lock = tryLock(); if (lock == null) { String msg = "Cannot lock storage " + this.root + ". The directory is already locked."; LOG.info(msg); throw new IOException(msg); } }
/** * Unlock storage. * * @throws IOException */ public void unlock() throws IOException { if (this.lock == null) return; this.lock.release(); lock.channel().close(); lock = null; } }
NameNode的启动 NameNode .createNameNode
URI:localhost/127.0.0.1:0
/**
* Initialize name-node. * 初始化名称节点 * @param conf the configuration */ protected void initialize(Configuration conf) throws IOException { InetSocketAddress socAddr = getRpcServerAddress(conf);//URI:localhost/127.0.0.1:0 int handlerCount = conf.getInt("dfs.namenode.handler.count", 10); // set service-level authorization security policy if (serviceAuthEnabled = conf.getBoolean( ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) { ServiceAuthorizationManager.refresh(conf, new HDFSPolicyProvider()); } NameNode.initMetrics(conf, this.getRole()); loadNamesystem(conf); // create rpc server this.server = RPC.getServer(NamenodeProtocols.class, this, socAddr.getHostName(), socAddr.getPort(), handlerCount, false, conf, namesystem.getDelegationTokenSecretManager()); // The rpc-server port can be ephemeral... ensure we have the correct info this.rpcAddress = this.server.getListenerAddress(); setRpcServerAddress(conf); activate(conf); LOG.info(getRole() + " up at: " + rpcAddress); }
a). loadNamesystem加载名称系统
FSNamesystem .initialize
/** * Initialize FSNamesystem. 初始化命名文件系统 */ private void initialize(Configuration conf, FSImage fsImage) throws IOException { this.systemStart = now(); this.blockManager = new BlockManager(this, conf); setConfigurationParameters(conf); dtSecretManager = createDelegationTokenSecretManager(conf); this.registerMBean(conf); // register the MBean for the FSNamesystemStutus if(fsImage == null) { this.dir = new FSDirectory(this, conf); StartupOption startOpt = NameNode.getStartupOption(conf); this.dir.loadFSImage(getNamespaceDirs(conf), getNamespaceEditsDirs(conf), startOpt); long timeTakenToLoadFSImage = now() - systemStart; LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs"); NameNode.getNameNodeMetrics().fsImageLoadTime.set( (int) timeTakenToLoadFSImage); } else { this.dir = new FSDirectory(fsImage, this, conf); } this.safeMode = new SafeModeInfo(conf); this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""), conf.get("dfs.hosts.exclude","")); if (isAccessTokenEnabled) { accessTokenHandler = new AccessTokenHandler(true, accessKeyUpdateInterval, accessTokenLifetime); } }
BlockManager
Keeps information related to the blocks stored in the Hadoop cluster. This class is a helper class for
FSNamesystem
and requires several methods to be called with lock held on
FSNamesystem
.
保持存储在Hadoop集群的块相关的信息。这个类是一个FSNamesystem helper类,需要几种方法 获得FSNamesystem的锁。
BlockManager(FSNamesystem fsn, Configuration conf, int capacity) throws IOException { namesystem = fsn; pendingReplications = new PendingReplicationBlocks( conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L); setConfigurationParameters(conf); blocksMap = new BlocksMap(capacity, DEFAULT_MAP_LOAD_FACTOR); }
PendingReplicationBlocks
PendingReplicationBlocks does the bookkeeping of all blocks that are getting replicated.
PendingReplicationBlocks 处理即将复制所有块的bookkeeping。
It does the following:
它做以下几件事
1) record blocks that are getting replicated at this instant.
记录块在这个实例上
2) a coarse grain timer to track age of replication request
一个跟踪复制请求时期的粗粒度的定时器
3) a thread that periodically identifies replication-requests that never made it.
一个线程认定复制请求的块
INFO namenode.FSNamesystem: defaultReplication = 1
INFO namenode.FSNamesystem: maxReplication = 512
INFO namenode.FSNamesystem: minReplication = 1
INFO namenode.FSNamesystem: maxReplicationStreams = 2
INFO namenode.FSNamesystem: shouldCheckForEnoughRacks = false
BlocksMap
This class maintains the map from a block to its metadata. block's metadata currently includes INode it belongs to and the datanodes that store the block.
这个类包含从块到块的元数据的映射。当前块的元数据包括INODE,INODE属于
datanodes存储块。
INode
We keep an in-memory representation of the file/block hierarchy. This is a base INode class containing common fields for file and directory inodes.
我们保持在内存中一个文件/块层次结构。这是一个基本INode类,包含文件和目录inode。
FSDirectory .loadFSImage
void loadFSImage(Collection<URI> dataDirs, Collection<URI> editsDirs, StartupOption startOpt) throws IOException { // format before starting up if requested if (startOpt == StartupOption.FORMAT) { fsImage.setStorageDirectories(dataDirs, editsDirs); fsImage.format(); startOpt = StartupOption.REGULAR; } try { if (fsImage.recoverTransitionRead(dataDirs, editsDirs, startOpt)) { //分析目录系统 fsImage.saveNamespace(true); } FSEditLog editLog = fsImage.getEditLog(); assert editLog != null : "editLog must be initialized"; fsImage.setCheckpointDirectories(null, null); } catch(IOException e) { fsImage.close(); throw e; } synchronized (this) { this.ready = true; this.notifyAll(); } }
/*
*Analyze storage directories. Recover from previous transitions if required. Perform fs state transition if necessary depending on the namespace info.Read storage info.
*/
FSImage
.recoverTransitionRead
时间文件:检查点时间
data/dfs/name1/current/fstime
b).初始化RPC 服务器
// create rpc server this.server = RPC.getServer(NamenodeProtocols.class, this, socAddr.getHostName(), socAddr.getPort(), handlerCount, false, conf, namesystem.getDelegationTokenSecretManager());
RPC .getProtocolEngine
// 返回配置过的RpcEngine Rpc 引擎处理协议
// return the RpcEngine configured to handle a protocol
private static synchronized RpcEngine getProtocolEngine(Class protocol, Configuration conf) { RpcEngine engine = PROTOCOL_ENGINES.get(protocol); if (engine == null) { Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(), WritableRpcEngine.class); engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf); if (protocol.isInterface()) PROXY_ENGINES.put(Proxy.getProxyClass(protocol.getClassLoader(), protocol), engine); PROTOCOL_ENGINES.put(protocol, engine); } return engine; }
org.apache.hadoop.ipc.Server .Server
/** Constructs a server listening on the named port and address. Parameters passed must * be of the named class. The <code>handlerCount</handlerCount> determines * the number of handler threads that will be used to process calls. * */ protected Server(String bindAddress, int port, Class<? extends Writable> paramClass, int handlerCount, Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager) throws IOException { this.bindAddress = bindAddress; this.conf = conf; this.port = port; this.paramClass = paramClass; this.handlerCount = handlerCount; this.socketSendBufferSize = 0; this.maxQueueSize = handlerCount * conf.getInt( CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT); this.maxRespSize = conf.getInt( CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY, CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT); this.readThreads = conf.getInt( CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT); this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize); this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000); this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10); this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000); this.secretManager = (SecretManager<TokenIdentifier>) secretManager; this.authorize = conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false); this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled(); // 开始监听, 让它绑定端口 // Start the listener here and let it bind to the port listener = new Listener(); this.port = listener.getAddress().getPort(); this.rpcMetrics = new RpcMetrics(serverName, Integer.toString(this.port), this); this.rpcDetailedMetrics = new RpcDetailedMetrics(serverName, Integer.toString(this.port)); this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false); // 创建一个相应器 // Create the responder here responder = new Responder(); }
c).name node 激活
//Activate name-node servers and threads.
//
NameNode .activate
/**
* Activate name-node servers and threads. * 激活 name-node 的服务器和线程 */ void activate(Configuration conf) throws IOException { if ((isRole(NamenodeRole.ACTIVE)) && (UserGroupInformation.isSecurityEnabled())) { namesystem.activateSecretManager(); } namesystem.activate(conf); startHttpServer(conf); server.start(); //start RPC server startTrashEmptier(conf); plugins = conf.getInstances("dfs.namenode.plugins", ServicePlugin.class); for (ServicePlugin p: plugins) { try { p.start(this); } catch (Throwable t) { LOG.warn("ServicePlugin " + p + " could not be started", t); } } }
(A).FSNamesystem .activate
/**
* Activate FSNamesystem daemons. */ void activate(Configuration conf) throws IOException { setBlockTotal(); blockManager.activate(); this.hbthread = new Daemon(new HeartbeatMonitor()); this.lmthread = new Daemon(leaseManager.new Monitor()); this.replthread = new Daemon(new ReplicationMonitor()); hbthread.start(); lmthread.start(); replthread.start(); this.dnthread = new Daemon(new DecommissionManager(this).new Monitor( conf.getInt("dfs.namenode.decommission.interval", 30), conf.getInt("dfs.namenode.decommission.nodes.per.interval", 5))); dnthread.start(); this.dnsToSwitchMapping = ReflectionUtils.newInstance( conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, ScriptBasedMapping.class, DNSToSwitchMapping.class), conf); /* If the dns to swith mapping supports cache, resolve network * locations of those hosts in the include list, * and store the mapping in the cache; so future calls to resolve * will be fast. */ if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) { dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts())); } }
/**
* 定期调用 heartbeatCheck 和updateAccessKey
* Periodically calls heartbeatCheck() and updateAccessKey()
*/
HeartbeatMonitor
/******************************************************
* Monitor checks for leases that have expired,
* and disposes of them.
******************************************************/
LeaseManager .Monitor
**
* Periodically calls computeReplicationWork().
*/
ReplicationMonitor
/************************************************************
* A Lease governs all the locks held by a single client.
* For each client there's a corresponding lease, whose
* timestamp is updated when the client periodically
* checks in. If the client dies and allows its lease to
* expire, all the corresponding locks can be released.
租赁管理由一个单一的客户端的所有锁。对于每个客户端有一个对应的租赁,其时间戳更新时,客户端定期检查,如果客户死亡,并允许其租约到期,可以释放相应的锁。
*************************************************************/
Lease
(B).启动HTTP服务器
/**
* Create a status server on the given port.
* The jsp scripts are taken from src/webapps/<name>.
* 创建一个给定的端口上的地位服务器。 JSP脚本是从src /webapps/ <name> 。
*/
public HttpServer(String name, String bindAddress, int port, boolean findPort, Configuration conf, AccessControlList adminsAcl) throws IOException { webServer = new Server(); this.findPort = findPort; this.adminsAcl = adminsAcl; listener = createBaseListener(conf); listener.setHost(bindAddress); listener.setPort(port); webServer.addConnector(listener); int maxThreads = conf.getInt(HTTP_MAX_THREADS, -1); // If HTTP_MAX_THREADS is not configured, QueueThreadPool() will use the // default value (currently 254). QueuedThreadPool threadPool = maxThreads == -1 ? new QueuedThreadPool() : new QueuedThreadPool(maxThreads); webServer.setThreadPool(threadPool); final String appDir = getWebAppsPath(name); ContextHandlerCollection contexts = new ContextHandlerCollection(); webServer.setHandler(contexts); webAppContext = new WebAppContext(); webAppContext.setDisplayName("WepAppsContext"); webAppContext.setContextPath("/"); webAppContext.setWar(appDir + "/" + name); webAppContext.getServletContext().setAttribute(CONF_CONTEXT_ATTRIBUTE, conf); webAppContext.getServletContext().setAttribute(ADMINS_ACL, adminsAcl); webServer.addHandler(webAppContext); addDefaultApps(contexts, appDir, conf); addGlobalFilter("safety", QuotingInputFilter.class.getName(), null); final FilterInitializer[] initializers = getFilterInitializers(conf); if (initializers != null) { for(FilterInitializer c : initializers) { c.initFilter(this, conf); } } addDefaultServlets(); }
/** * Add default servlets. */ protected void addDefaultServlets() { // set up default servlets addServlet("stacks", "/stacks", StackServlet.class); addServlet("logLevel", "/logLevel", LogLevel.Servlet.class); addServlet("metrics", "/metrics", MetricsServlet.class); addServlet("conf", "/conf", ConfServlet.class); }
/** * A very simple servlet to serve up a text representation of the current * stack traces. It both returns the stacks to the caller and logs them. * Currently the stack traces are done sequentially rather than exactly the * same data. */
StackServlet
/**
* Change log level in runtime.
*/
LogLevel .Servlet
/**
* A servlet to print out metrics data. By default, the servlet returns a
* textual representation (no promises are made for parseability), and
* users can use "?format=json" for parseable output.
*/
MetricsServlet
/**
* A servlet to print out the running configuration data.
*/
ConfServlet
httpServer 的使用
name.node:this
name.node.address:localhost/127.0.0.1:60914
name.system.image:org.apache.hadoop.hdfs.server.namenode.FSImage@244f74
name.conf: Configuration: core-default.xml, core-site.xml, hdfs-default.xml, hdfs-site.xml, mapred-default.xml, mapred-site.xml
addInternalServlet
/**
* Serve delegation tokens over http for use in hftp.
* 通过在 hftp 中使用 http 服务代表团令牌。
*/
getDelegationToken:/getDelegationToken,DelegationTokenServlet.class
/**
* This class is used in Namesystem's web server to do fsck on namenode.
*这个类是用来在Namesystem Web服务器上做的NameNode的fsck。
*/
fsck:/fsck,FsckServlet.class
/**
* This class is used in Namesystem's jetty to retrieve a file.
* Typically used by the Secondary NameNode to retrieve image and
* edit file for periodic checkpointing.
* 这个类用于获取NameSystem‘s jetty 来获取文件。典型的用于第二个NameNode获取image和editlog file达到定期检查的目地
*/
getimage, /getimage, GetImageServlet.class
/**
* 获取文件系统的元数据信息
* Obtain meta-information about a filesystem.
* @see org.apache.hadoop.hdfs.HftpFileSystem
*/
listPaths, /listPaths/*, ListPathsServlet.class
/** Redirect queries about the hosted filesystem to an appropriate datanode.
*关于hosted的文件系统重定查询到合适的datanode。
* @see org.apache.hadoop.hdfs.HftpFileSystem
*/
data, /data/*, FileDataServlet.class
/** Redirect file checksum queries to an appropriate datanode. */
/**重定向文件的校验和查询,以适当的datanode*/
checksum, /fileChecksum/*,FileChecksumServlets.RedirectServlet.class
/** Servlets for file checksum */
contentSummary,/contentSummary/*, ContentSummaryServlet.class
this.httpServer.start();
(C).Server .start
/** Starts the service. Must be called before any calls will be handled. */ public synchronized void start() throws IOException { responder.start(); listener.start(); handlers = new Handler[handlerCount]; for (int i = 0; i < handlerCount; i++) { handlers[i] = new Handler(i); handlers[i].start(); } }
(D).name node 启动服务器插件
plugins = conf.getInstances("dfs.namenode.plugins", ServicePlugin.class); for (ServicePlugin p: plugins) { try { p.start(this); } catch (Throwable t) { LOG.warn("ServicePlugin " + p + " could not be started", t); } }
d). 启动数据节点
DefaultUri: hdfs://localhost:46620
// Set up the right ports for the datanodes
"dfs.datanode.address", "127.0.0.1:0"
"dfs.datanode.http.address", "127.0.0.1:0"
"dfs.datanode.ipc.address", "127.0.0.1:0"
目录
/data/dfs/data/data1
/data/dfs/data/data2
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
Configuration dnConf = new HdfsConfiguration(conf); if (manageDfsDirs) { File dir1 = new File(data_dir, "data"+(2*i+1)); File dir2 = new File(data_dir, "data"+(2*i+2)); dir1.mkdirs(); dir2.mkdirs(); Configuration newconf = new HdfsConfiguration(dnConf); // save config if (hosts != null) { NetUtils.addStaticResolution(hosts[i - curDatanodesNum], "localhost"); } DataNode dn = DataNode.instantiateDataNode(dnArgs, dnConf); if(dn == null) throw new IOException("Cannot start DataNode in " + dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY)); //since the HDFS does things based on IP:port, we need to add the mapping //for IP:port to rackId String ipAddr = dn.getSelfAddr().getAddress().getHostAddress(); if (racks != null) { int port = dn.getSelfAddr().getPort(); System.out.println("Adding node with IP:port : " + ipAddr + ":" + port+ " to rack " + racks[i-curDatanodesNum]); StaticMapping.addNodeToRack(ipAddr + ":" + port, racks[i-curDatanodesNum]); } DataNode.runDatanodeDaemon(dn); dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs)); }
DataNode .startDataNode 注册数据节点
/** * This method starts the data node with the specified conf. * 通过制定参数启动数据节点 */ storage = new DataStorage();// Data storage information file. 数据存储信息文件 // construct registration this.dnRegistration = new DatanodeRegistration(machineName + ":" + tmpPort);// DataNodeProtocal.registerDatanode 的参数 // connect to name node this.namenode = namenode; // get version and id info from the name-node NamespaceInfo nsInfo = handshake();
FSDatasetInterface
/**
* This is an interface for the underlying storage that stores blocks for
* a data node.
* Examples are the FSDataset (which stores blocks on dirs) and
* SimulatedFSDataset (which simulates data).
*这是一个操作数据节点底层块块存储的接口。例如FSDataset(存储在DIRS上的块)和SimulatedFSDataset(模拟数据)
*/
class SimulatedFSDataset implements FSConstants, FSDatasetInterface, Configurable
/**
* This class implements a simulated FSDataset.
*
* Blocks that are created are recorded but their data (plus their CRCs) are
* discarded.
* Fixed data is returned when blocks are read; a null CRC meta file is
* created for such data.
*
* This FSDataset does not remember any block information across its
* restarts; it does however offer an operation to inject blocks
* (See the TestInectionForSImulatedStorage()
* for a usage example of injection.
*
* Note the synchronization is coarse grained - it is at each method.
这个类实现了模拟FSDataset。创建的块被记录,但他们的数据(加上他们的CRC)被丢弃。固定的数据块被读取时,返回一个空的CRC元文件是对这些数据的创建。这FSDataset不记得块在其重新启动的任何信息,但它提供的运作注入了注射的用法的例子块(请参阅TestInectionForSImulatedStorage()注意同步的是粗粒 - 它在每个方法。
crc:checksum stream
*/
FSDataset .FSDataset
/**************************************************
* FSDataset manages a set of data blocks. Each block
* has a unique name and an extent on disk.
*FSDataset管理的数据块集合。每个块都有一个唯一的名称和磁盘上的extent。
***************************************************/
DataXceiverServer
/**
* Server used for receiving/sending a block of data.
* This is created to listen for requests from clients or
* other DataNodes. This small server does not use the
* Hadoop IPC mechanism.
*/
服务器用于接收/发送data的块.This被创建用于监听来自客户或其他DataNodes的请求。这种小型服务器不使用Hadoop的IPC机制。
DataXceiver
/**
* Thread for processing incoming/outgoing data stream.
* 这个线程处理输入/输出数据流
*/
DataStorage
/**
* Data storage information file.
* 数据存储信息文件
*/
Storage
/**
* Storage information file.
* <p>
* Local storage information is stored in a separate file VERSION.
* It contains type of the node,
* the storage layout version, the namespace id, and
* the fs state creation time.
* <p>
* Local storage can reside in multiple directories.
* Each directory should contain the same VERSION file as the others.
* During startup Hadoop servers (name-node and data-nodes) read their local
* storage information from them.
* <p>
* The servers hold a lock for each storage directory while they run so that
* other nodes were not able to startup sharing the same storage.
* The locks are released when the servers stop (normally or abnormally).
*
*/
存储
信息的文件。
本地
存储的信息
是
存储
在
一个单独的文件
版本
。
它包含
类型
的
节点
,
存储布局
的
版本
,
命名空间的
ID
,
和
FS
状态
创建时间
本地存储
可以驻留
在多个目录中
。
每个目录
应该包含
为
同一版本
的
其他
文件
。
在
启动
Hadoop的
服务器
(
名称
节点
和
数据
节点
)
从其中读取
其
本地存储
信息
。
同时
运行,这样
,
其他节点
无法
启动
共享
相同的存储
,
服务器
持有
锁定
每个存储
目录
。
锁被释放
时,
服务器
停止
(
正常或异常
)
。
/**
* This method starts the data node with the specified conf.
* 这个方法通过制定参数启动数据节点
*/
DataNode .startDataNode
// construct registration // 构造一个registration 注册器 this.dnRegistration = new DatanodeRegistration(machineName + ":" + tmpPort); // connect to name node // 连接到名称节点 this.namenode = namenode; // get version and id info from the name-node // 从名称节点获取版本和id信息 NamespaceInfo nsInfo = handshake(); // initialize data node internal structure // 初始化 数据 节点 的内部结构 this.data = new FSDataset(storage, conf); // adjust machine name with the actual port // 调整机器名 tmpPort = ss.getLocalPort(); selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(), tmpPort); this.dnRegistration.setName(machineName + ":" + tmpPort); LOG.info("Opened info server at " + tmpPort);
/** * Server used for receiving/sending a block of data. * This is created to listen for requests from clients or * other DataNodes. This small server does not use the * Hadoop IPC mechanism. *服务器 用于 接收 / 发送 一个数据块 。 这是 创建 监听 来自 客户 或 其他 DataNodes 的 请求 。 *这种 小型服务器 不使用 thHadoop IPC机制 。 */ this.threadGroup = new ThreadGroup("dataXceiverServer"); this.dataXceiverServer = new Daemon(threadGroup, new DataXceiverServer(ss, conf, this)); this.threadGroup.setDaemon(true); // auto destroy when empty //initialize periodic block scanner // 初始化一个定期检查scanner if ( reason == null ) { blockScanner = new DataBlockScanner(this, (FSDataset)data, conf); } else { LOG.info("Periodic Block Verification is disabled because " + reason + "."); }
DataBlockScanner
Performs two types of scanning:
Gets block files from the data directories and reconciles the difference between the blocks on the disk and in memory in FSDataset
Scans the data directories for block files and verifies that the files are not corrupt
This keeps track of blocks and their last verification times. Currently it does not modify the metadata for block.
执行
两种
类型
的扫描
:
获取
块
从
数据目录
的
文件和
和解
磁盘上的
块
之间
的
差异
,
并
在
内存
中
FSDataset
扫描
数据
块文件
的目录
,并
验证
该文件
未损坏
这使
跟踪
块
和
他们
最后
的
验证时间
。
目前,它
不
修改
块
的
元
数据
。
//create a servlet to serve full-file content this.infoServer = new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0, conf); if (conf.getBoolean("dfs.https.enable", false)) { boolean needClientAuth = conf.getBoolean(DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY, DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT); InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get( "dfs.datanode.https.address", infoHost + ":" + 0)); Configuration sslConf = new HdfsConfiguration(false); sslConf.addResource(conf.get("dfs.https.server.keystore.resource", "ssl-server.xml")); this.infoServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth); } this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class); this.infoServer.addInternalServlet(null, "/getFileChecksum/*", FileChecksumServlets.GetServlet.class); this.infoServer.setAttribute("datanode.blockScanner", blockScanner); this.infoServer.setAttribute("datanode.conf", conf); this.infoServer.addServlet(null, "/blockScannerReport", DataBlockScanner.Servlet.class); this.infoServer.start(); //init ipc server // 初始化一个ipc 服务器 InetSocketAddress ipcAddr = NetUtils.createSocketAddr( conf.get("dfs.datanode.ipc.address")); ipcServer = RPC.getServer(DataNode.class, this, ipcAddr.getHostName(), ipcAddr.getPort(), conf.getInt("dfs.datanode.handler.count", 3), false, conf); ipcServer.start(); dnRegistration.setIpcPort(ipcServer.getListenerAddress().getPort());
DataNode .runDatanodeDaemon
对应NameNode的register方法, 其中一个是版本校验, 第二个是主要逻辑
FSNamesystem .registerDatanode
/////////////////////////////////////////////////////////
//
// These methods are called by datanodes
//
/////////////////////////////////////////////////////////
Register Datanode.
The purpose of registration is to identify whether the new datanode serves a new data storage, and will report new data block copies, which the namenode was not aware of; or the datanode is a replacement node for the data storage that was previously served by a different or the same (in terms of host:port) datanode. The data storages are distinguished by their storageIDs. When a new data storage is reported the namenode issues a new unique storageID.
Finally, the namenode returns its namespaceID as the registrationID for the datanodes. namespaceID is a persistent attribute of the name space. The registrationID is checked every time the datanode is communicating with the namenode. Datanodes with inappropriate registrationID are rejected. If the namenode stops, and then restarts it can restore its namespaceID and will continue serving the datanodes that has previously registered with the namenode without restarting the whole cluster.
注册
Datanode
。
注册的目的
是确定
是否
新
datanode
提供
一个
新的数据存储
,
并报告
新的数据块
拷贝,
NameNode的
是
不知道的
;
或
datanode
是
由
以前
担任
更换
为
数据存储
节点
不同或相同
(
主机
:
端口
)
datanode
。
数据
存放
的区别在于
他们
storageIDs
。
当
一个新的数据
存储
报道
NameNode的
问题
一个新的唯一
storageID
。
最后,
Namenode会
返回
的
datanodes
registrationID
其
名称环境
。
[名称
是
一个
名称空间
的
持久
属性
。
registrationID
是
检查
每次
datanode
的NameNode
通信
。
不适当
registrationID
Datanodes
被拒绝。
如果
Namenode会
停止
,然后重新启动
,
它可以恢复
其
名称环境
,
并会
继续为
与
NameNode的
注册
,而无需重新启动
整个集群
datanodes
。
NetworkTopology
/** The class represents a cluster of computer with a tree hierarchical
* network topology.
* For example, a cluster may be consists of many data centers filled
* with racks of computers.
* In a network topology, leaves represent data nodes (computers) and inner
* nodes represent switches/routers that manage traffic in/out of data centers
* or racks.
*
*/
该类 分层网络拓扑结构的树 的计算机 集群 。 例如, 群集 可能是 由 充满 计算机 机架 的 许多 数据中心 。 在 网络拓扑结构 , 叶子 代表 的数据节点 (计算机) 和 内部节点 表示 的 交通 管理 数据 中心 或 机架的 / 交换机/路由器 。
DatanodeDescriptor
/**************************************************
* DatanodeDescriptor tracks stats on a given DataNode,
* such as available storage capacity, last update time, etc.,
* and maintains a set of blocks stored on the datanode.
*
* This data structure is a data structure that is internal
* to the namenode. It is *not* sent over-the-wire to the Client
* or the Datnodes. Neither is it stored persistently in the
* fsImage.
**************************************************/
DatanodeDescriptor 跟踪 一个给定的 DataNode 上 的 统计 , 如 可用的存储容量 , 最后更新时间 等 , 并保持 一套 datanode 存储 的 块 。 这个数据结构 是 一个数据结构, 内部 的 NameNode的 。 这是 *不* sent over-the-wire 向客户 或 Datnodes 。 也不是 它 持久化地存储 在 fsImage 持久化 。
DNSToSwitchMapping
/**
* An interface that should be implemented to allow pluggable
* DNS-name/IP-address to RackID resolvers.
*
*/
实现 允许 可插拔 DNS-name/IP-address RackID 的解析器 接口。
(B).
报告数据节点状态
MiniDFSCluster .waitActive
/** * Wait until the cluster is active and running. */ public void waitActive() throws IOException { if (nameNode == null) { return; } InetSocketAddress addr = new InetSocketAddress("localhost", getNameNodePort()); DFSClient client = new DFSClient(addr, conf); // make sure all datanodes have registered and sent heartbeat while (shouldWait(client.datanodeReport(DatanodeReportType.LIVE))) { try { Thread.sleep(100); } catch (InterruptedException e) { } } client.close(); }
FSNamesystem .datanodeReport
HostsFileReader
// Keeps track of which datanodes/tasktrackers are allowed to connect to the
// namenode/jobtracker.
跟踪 datanodes / TaskTracker 允许 连接到 Namenode / jobtracker 。
FSNamesystem.handleHeartbeat
/**
* The given node has reported in. This method should:
* 1) Record the heartbeat, so the datanode isn't timed out
* 2) Adjust usage stats for future block allocation
*
* If a substantial amount of time passed since the last datanode
* heartbeat then request an immediate block report.
*/
报告给定的节点,这种方法应该:1)记录心跳,所以datanode不超时)调整为未来的块分配使用情况统计,如果自上次datanode心跳通过一个相当长的时间,然后要求立即块报告。
创建URI:
hdfs://localhost:38326
FileSystem.getDefaultUri
创建文件堆栈
FileSystem.createFileSystem(URI, Configuration) line: 1734 FileSystem.get(URI, Configuration) line: 231 FileSystem.get(Configuration) line: 131 MiniDFSCluster.getFileSystem() line: 851 TestDirectoryScanner.createFile(String, long) line: 64 TestDirectoryScanner.runTest(int) line: 224 TestDirectoryScanner.testDirectoryScanner() line: 210
NameNode.blockReceived
blockReceived() allows the DataNode to tell the NameNode about recently-received block data, with a hint for pereferred replica to be deleted when there is any excessive blocks. For example, whenever client code writes a new Block here, or another DataNode copies a Block to this DataNode, it will call blockReceived().
blockReceived()允许的DataNode的NameNode的告诉我们最近收到的数据块带有一丝pereferred副本被删除,当有任何过多的块。例如,每当客户端代码在这里写入一个新的块,或另一个DataNode块复制这个DataNode,它会调用blockReceived()。
BlockManager.addBlock
/**
* The given node is reporting that it received a certain block.
*/
给定的节点报告,收到了一定的块。
void addBlock(DatanodeDescriptor node, Block block, String delHint) throws IOException {
DatanodeDescriptor.processReportedBlock
Process a block replica reported by the data-node.
1.If the block is not known to the system (not in blocksMap) then the data-node should be notified to invalidate this block.
2.If the reported replica is valid that is has the same generation stamp and length as recorded on the name-node, then the replica location is added to the name-node.
3.If the reported replica is not valid, then it is marked as corrupt, which triggers replication of the existing valid replicas. Corrupt replicas are removed from the system when the block is fully replicated.
1。如果块不被知道系统(而不是在blocksMap),然后将数据节点应当通知此块为无效块。
2。如果该报告的副本是有效的,报告副本同一代的邮票,名称节点上所记录的长度,那么副本的位置添加到名称节点。
3。如果该报告的副本是无效的,那么它被标记为损坏,从而触发复制现有的有效副本。损坏的的副本从块时完全复制系统中删除。
ReplicaState
/**
* Block replica states, which it can go through while being constructed.
*/
块副本状态,它可以贯穿正在兴建过程。
/** Replica is finalized. The state when replica is not modified. */ 副本定稿。副本没有被修改时状态。 FINALIZED(0), /** Replica is being written to. */ 副本正在写入。 RBW(1), /** Replica is waiting to be recovered. */ 副本等待恢复。 RWR(2), /** Replica is under recovery. */ 副本在恢复中 RUR(3), /** Temporary replica: created for replication and relocation only. */ 临时副本:创建仅用于复制和重新安置。 TEMPORARY(4); 块副本状态,它可以通过正在建造的同时。
BlockInfo
/**
* Internal class for block metadata.
*/
块的元数据内部类
BlockInfo.triplets
/**
* This array contains triplets of references.
* For each i-th datanode the block belongs to
* triplets[3*i] is the reference to the DatanodeDescriptor
* and triplets[3*i+1] and triplets[3*i+2] are references
* to the previous and the next blocks, respectively, in the
* list of blocks belonging to this data-node.
*/
此数组包含引用3个引用。每一个第i datanode的triplets[3*i] 指向DatanodeDescriptor和triplets[3* I +1]和triplets[3* I +2]引用到以前的和下一个块,分别属7于这个数据节点的块列表。