MapReduce之Job提交流程源码和切片源码分析
程序员文章站
2023-04-04 22:27:56
hadoop2.7.2 MapReduce Job提交源码及切片源码分析 1. 首先从 函数进入 2. 进入 方法 3. 进入 方法 MapReduce作业提交时连接集群通过Job的Connect方法实现,它实际上是构造集群Cluster实例cluster cluster是连接MapReduce集群 ......
hadoop2.7.2 mapreduce job提交源码及切片源码分析
- 首先从
waitforcompletion
函数进入
boolean result = job.waitforcompletion(true);
/** * submit the job to the cluster and wait for it to finish. * @param verbose print the progress to the user * @return true if the job succeeded * @throws ioexception thrown if the communication with the * <code>jobtracker</code> is lost */ public boolean waitforcompletion(boolean verbose ) throws ioexception, interruptedexception, classnotfoundexception { // 首先判断state,当state为define时可以提交,进入 submit() 方法 if (state == jobstate.define) { submit(); } if (verbose) { monitorandprintjob(); } else { // get the completion poll interval from the client. int completionpollintervalmillis = job.getcompletionpollinterval(cluster.getconf()); while (!iscomplete()) { try { thread.sleep(completionpollintervalmillis); } catch (interruptedexception ie) { } } } return issuccessful(); }
- 进入
submit()
方法
/** * submit the job to the cluster and return immediately. * @throws ioexception */ public void submit() throws ioexception, interruptedexception, classnotfoundexception { // 确认jobstate状态为可提交状态,否则不能提交 ensurestate(jobstate.define); // 设置使用最新的api setusenewapi(); // 进入connect()方法,mapreduce作业提交时连接集群是通过job类的connect()方法实现的, // 它实际上是构造集群cluster实例cluster connect(); // connect()方法执行完之后,定义提交者submitter final jobsubmitter submitter = getjobsubmitter(cluster.getfilesystem(), cluster.getclient()); status = ugi.doas(new privilegedexceptionaction<jobstatus>() { public jobstatus run() throws ioexception, interruptedexception, classnotfoundexception { // 这里的核心方法是submitjobinternal(),顾名思义,提交job的内部方法,实现了提交job的所有业务逻辑 // 进入submitjobinternal return submitter.submitjobinternal(job.this, cluster); } }); // 提交之后state状态改变 state = jobstate.running; log.info("the url to track the job: " + gettrackingurl()); }
- 进入
connect()
方法
- mapreduce作业提交时连接集群通过job的connect方法实现,它实际上是构造集群cluster实例cluster
- cluster是连接mapreduce集群的一种工具,提供了获取mapreduce集群信息的方法
- 在cluster内部,有一个与集群进行通信的客户端通信协议clientprotocol的实例client,它由clientprotocolprovider的静态create()方法构造
- 在create内部,hadoop2.x中提供了两种模式的clientprotocol,分别为yarn模式的yarnrunner和local模式的localjobrunner,cluster实际上是由它们负责与集群进行通信的
private synchronized void connect() throws ioexception, interruptedexception, classnotfoundexception { if (cluster == null) {// cluster提供了远程获取mapreduce的方法 cluster = ugi.doas(new privilegedexceptionaction<cluster>() { public cluster run() throws ioexception, interruptedexception, classnotfoundexception { // 只需关注这个cluster()构造器,构造集群cluster实例 return new cluster(getconfiguration()); } }); } }
- 进入
cluster()
构造器
// 首先调用一个参数的构造器,间接调用两个参数的构造器 public cluster(configuration conf) throws ioexception { this(null, conf); } public cluster(inetsocketaddress jobtrackaddr, configuration conf) throws ioexception { this.conf = conf; this.ugi = usergroupinformation.getcurrentuser(); // 最重要的initialize方法 initialize(jobtrackaddr, conf); } // cluster中要关注的两个成员变量是客户端通讯协议提供者clientprotocolprovider和客户端通讯协议clientprotocol实例client private void initialize(inetsocketaddress jobtrackaddr, configuration conf) throws ioexception { synchronized (frameworkloader) { for (clientprotocolprovider provider : frameworkloader) { log.debug("trying clientprotocolprovider : " + provider.getclass().getname()); clientprotocol clientprotocol = null; try { // 如果配置文件没有配置yarn信息,则构建localrunner,mr任务本地运行 // 如果配置文件有配置yarn信息,则构建yarnrunner,mr任务在yarn集群上运行 if (jobtrackaddr == null) { // 客户端通讯协议client是调用clientprotocolprovider的create()方法实现 clientprotocol = provider.create(conf); } else { clientprotocol = provider.create(jobtrackaddr, conf); } if (clientprotocol != null) { clientprotocolprovider = provider; client = clientprotocol; log.debug("picked " + provider.getclass().getname() + " as the clientprotocolprovider"); break; } else { log.debug("cannot pick " + provider.getclass().getname() + " as the clientprotocolprovider - returned null protocol"); } } catch (exception e) { log.info("failed to use " + provider.getclass().getname() + " due to error: ", e); } } } if (null == clientprotocolprovider || null == client) { throw new ioexception( "cannot initialize cluster. please check your configuration for " + mrconfig.framework_name + " and the correspond server addresses."); } }
- 进入
submitjobinternal()
,job的内部提交方法,用于提交job到集群
jobstatus submitjobinternal(job job, cluster cluster) throws classnotfoundexception, interruptedexception, ioexception { //validate the jobs output specs // 检查结果的输出路径是否已经存在,如果存在会报异常 checkspecs(job); // conf里边是集群的xml配置文件信息 configuration conf = job.getconfiguration(); // 添加mr框架到分布式缓存中 addmrframeworktodistributedcache(conf); // 获取提交执行时相关资源的临时存放路径 // 参数未配置时默认是(工作空间根目录下的)/tmp/hadoop-yarn/staging/提交作业用户名/.staging path jobstagingarea = jobsubmissionfiles.getstagingdir(cluster, conf); //configure the command line options correctly on the submitting dfs inetaddress ip = inetaddress.getlocalhost(); if (ip != null) {//记录提交作业的主机ip、主机名,并且设置配置信息conf submithostaddress = ip.gethostaddress(); submithostname = ip.gethostname(); conf.set(mrjobconfig.job_submithost,submithostname); conf.set(mrjobconfig.job_submithostaddr,submithostaddress); } // 获取jobid jobid jobid = submitclient.getnewjobid(); // 设置jobid job.setjobid(jobid); // 提交作业的路径path(path parent, string child),会将两个参数拼接为一个路径 path submitjobdir = new path(jobstagingarea, jobid.tostring()); // job的状态 jobstatus status = null; try { conf.set(mrjobconfig.user_name, usergroupinformation.getcurrentuser().getshortusername()); conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.amfilterinitializer"); conf.set(mrjobconfig.mapreduce_job_dir, submitjobdir.tostring()); log.debug("configuring job " + jobid + " with " + submitjobdir + " as the submit dir"); // get delegation token for the dir tokencache.obtaintokensfornamenodes(job.getcredentials(), new path[] { submitjobdir }, conf); populatetokencache(conf, job.getcredentials()); // generate a secret to authenticate shuffle transfers if (tokencache.getshufflesecretkey(job.getcredentials()) == null) { keygenerator keygen; try { keygen = keygenerator.getinstance(shuffle_keygen_algorithm); keygen.init(shuffle_key_length); } catch (nosuchalgorithmexception e) { throw new ioexception("error generating shuffle secret key", e); } secretkey shufflekey = keygen.generatekey(); tokencache.setshufflesecretkey(shufflekey.getencoded(), job.getcredentials()); } if (cryptoutils.isencryptedspillenabled(conf)) { conf.setint(mrjobconfig.mr_am_max_attempts, 1); log.warn("max job attempts set to 1 since encrypted intermediate" + "data spill is enabled"); } // 拷贝jar包到集群 // 此方法中调用如下方法:ruploader.uploadfiles(job, jobsubmitdir); // uploadfiles方法将jar包拷贝到集群 copyandconfigurefiles(job, submitjobdir); path submitjobfile = jobsubmissionfiles.getjobconfpath(submitjobdir); // create the splits for the job log.debug("creating splits at " + jtfs.makequalified(submitjobdir)); // 计算切片,生成切片规划文件 int maps = writesplits(job, submitjobdir); conf.setint(mrjobconfig.num_maps, maps); log.info("number of splits:" + maps); // write "queue admins of the queue to which job is being submitted" // to job file. string queue = conf.get(mrjobconfig.queue_name, jobconf.default_queue_name); accesscontrollist acl = submitclient.getqueueadmins(queue); conf.set(tofullpropertyname(queue, queueacl.administer_jobs.getaclname()), acl.getaclstring()); // removing jobtoken referrals before copying the jobconf to hdfs // as the tasks don't need this setting, actually they may break // because of it if present as the referral will point to a // different job. tokencache.cleanuptokenreferral(conf); if (conf.getboolean( mrjobconfig.job_token_tracking_ids_enabled, mrjobconfig.default_job_token_tracking_ids_enabled)) { // add hdfs tracking ids arraylist<string> trackingids = new arraylist<string>(); for (token<? extends tokenidentifier> t : job.getcredentials().getalltokens()) { trackingids.add(t.decodeidentifier().gettrackingid()); } conf.setstrings(mrjobconfig.job_token_tracking_ids, trackingids.toarray(new string[trackingids.size()])); } // set reservation info if it exists reservationid reservationid = job.getreservationid(); if (reservationid != null) { conf.set(mrjobconfig.reservation_id, reservationid.tostring()); } // write job file to submit dir writeconf(conf, submitjobfile); // // now, actually submit the job (using the submit name) // 开始正式提交job printtokens(jobid, job.getcredentials()); status = submitclient.submitjob( jobid, submitjobdir.tostring(), job.getcredentials()); if (status != null) { return status; } else { throw new ioexception("could not launch job"); } } finally { if (status == null) { log.info("cleaning up the staging area " + submitjobdir); if (jtfs != null && submitjobdir != null) jtfs.delete(submitjobdir, true); } } }
- 进入
writesplits(job, submitjobdir)
,计算切片,生成切片规划文件
- 内部会调用
writenewsplits(job, jobsubmitdir)
方法 -
writenewsplits(job, jobsubmitdir)
内部定义了一个inputformat
类型的实例input -
inputformat主要作用:
- 验证job的输入规范
- 对输入的文件进行切分,形成多个inputsplit(切片)文件,每一个inputsplit对应着一个map任务(maptask)
- 将切片后的数据按照规则形成key,value键值对recordreader
- input调用getsplits()方法:
list<inputsplit> splits = input.getsplits(job);
- 进入fileinputformat类下的
getsplits(job)
方法
/** * generate the list of files and make them into filesplits. * @param job the job context * @throws ioexception */ public list<inputsplit> getsplits(jobcontext job) throws ioexception { stopwatch sw = new stopwatch().start(); // getformatminsplitsize()返回值固定为1,getminsplitsize(job)返回job大小 long minsize = math.max(getformatminsplitsize(), getminsplitsize(job)); // getmaxsplitsize(job)返回lang类型的最大值 long maxsize = getmaxsplitsize(job); // generate splits 生成切片 list<inputsplit> splits = new arraylist<inputsplit>(); list<filestatus> files = liststatus(job); // 遍历job下的所有文件 for (filestatus file: files) { // 获取文件路径 path path = file.getpath(); // 获取文件大小 long length = file.getlen(); if (length != 0) { blocklocation[] blklocations; if (file instanceof locatedfilestatus) { blklocations = ((locatedfilestatus) file).getblocklocations(); } else { filesystem fs = path.getfilesystem(job.getconfiguration()); blklocations = fs.getfileblocklocations(file, 0, length); } // 判断是否可分割 if (issplitable(job, path)) { // 获取块大小 // 本地环境块大小默认为32mb,yarn环境在hadoop2.x新版本为128mb,旧版本为64mb long blocksize = file.getblocksize(); // 计算切片的逻辑大小,默认等于块大小 // 返回值为:return math.max(minsize, math.min(maxsize, blocksize)); // 其中minsize=1, maxsize=long类型最大值, blocksize为切片大小 long splitsize = computesplitsize(blocksize, minsize, maxsize); long bytesremaining = length; // 每次切片时就要判断切片剩下的部分是否大于切片大小的split_slop(默认为1.1)倍, // 否则就不再切分,划为一块 while (((double) bytesremaining)/splitsize > split_slop) { int blkindex = getblockindex(blklocations, length-bytesremaining); splits.add(makesplit(path, length-bytesremaining, splitsize, blklocations[blkindex].gethosts(), blklocations[blkindex].getcachedhosts())); bytesremaining -= splitsize; } if (bytesremaining != 0) { int blkindex = getblockindex(blklocations, length-bytesremaining); splits.add(makesplit(path, length-bytesremaining, bytesremaining, blklocations[blkindex].gethosts(), blklocations[blkindex].getcachedhosts())); } } else { // not splitable splits.add(makesplit(path, 0, length, blklocations[0].gethosts(), blklocations[0].getcachedhosts())); } } else { //create empty hosts array for zero length files splits.add(makesplit(path, 0, length, new string[0])); } } // save the number of input files for metrics/loadgen job.getconfiguration().setlong(num_input_files, files.size()); sw.stop(); if (log.isdebugenabled()) { log.debug("total # of splits generated by getsplits: " + splits.size() + ", timetaken: " + sw.now(timeunit.milliseconds)); } return splits; }
下一篇: Python 编程入门