根据wordcount进行分析:
import org.apache.hadoop.conf.Configuration; org.apache.hadoop.fs.Path; org.apache.hadoop.io.IntWritable; org.apache.hadoop.io.Text; org.apache.hadoop.mapreduce.Job; org.apache.hadoop.mapreduce.Mapper; org.apache.hadoop.mapreduce.Reducer; org.apache.hadoop.mapreduce.lib.input.FileInputFormat; org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; java.io.IOException; /** * @author: LUGH1 * @date: 2019-4-8 * @description: */ public class WordCount { static void main(String[] args) throws IOException,ClassNotFoundException,InterruptedException { Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://192.168.88.130:9000"); Job job = Job.getInstance(conf); job.setJarByClass(WordCount.); job.setMapperClass(WdMapper.); job.setReducerClass(WdReducer.); job.setMapOutputKeyClass(Text.); job.setMapOutputValueClass(IntWritable.); job.setOutputKeyClass(Text.); job.setOutputValueClass(IntWritable.); FileInputFormat.setInputPaths(job,new Path("/test/word.txt")); FileOutputFormat.setOutputPath(job,1)">new Path("/test/output")); boolean result = job.waitForCompletion(true); System.exit(result?0:1); System.out.println("good job"); } } class WdMapper extends Mapper<Object,Text,IntWritable> { @Override protected void map(Object key,Text value,Context context) value.toString(); String[] split = line.split(" "); for(String word : split){ context.write(new Text(word),new IntWritable(1)); } } } class WdReducer extends Reducer<Text,IntWritable,1)">void reduce(Text key,Iterable<IntWritable> values,InterruptedException { int count = 0; (IntWritable i : values){ count += i.get(); } context.write(key,1)"> IntWritable(count)); } }
这上面是个简单wordcount的代码,这里就不一一说明了,我们首先看main方法:获取一个job对象,然后经过一系列的设置,最后调用waitForCompletion方法
public static void main(String[] args) throws IOException,InterruptedException {
//....省略具体代码.....
boolean result = job.waitForCompletion(true); //调用由Job类提供的方法waitForCompletion()提交作业
System.exit(result?0:1);
}
接下来我们看下一调用waitForCompletion方法的这个类Job(由于类的内容很多,这里只展示我们需要的部分):
public class Job extends JobContextImpl implements JobContext {
private static final Log LOG = LogFactory.getLog(Job.class);
public static enum JobState {DEFINE,RUNNING}; //定义两种状态
private static final long MAX_JOBSTATUS_AGE = 1000 * 2; //表示最多2000毫秒刷新状态
public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";
public static final String COMPLETION_POLL_INTERVAL_KEY = "mapreduce.client.completion.pollinterval";
static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;
public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY ="mapreduce.client.progressmonitor.pollinterval";
static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;
public static final String USED_GENERIC_PARSER = "mapreduce.client.genericoptionsparser.used";
public static final String SUBMIT_REPLICATION = "mapreduce.client.submit.file.replication";
public static final int DEFAULT_SUBMIT_REPLICATION = 10;
public static enum TaskStatusFilter { NONE,KILLED,Failed,SUCCEEDED,ALL }
static {
ConfigUtil.loadResources(); //加载配置
}
private JobState state = JobState.DEFINE; //加载类的时候默认设置状态为DEFINE状态
private JobStatus status;
private long statustime;
private Cluster cluster;
private ReservationId reservationId;
boolean waitForCompletion(booleanverbose)
submit()
setUseNewAPI()
connect()
getJobSubmitter(FileSystemfs,ClientProtocolsubmitClient)
isUber() //是否“拼车”模式(MapTask与ReduceTask在同一节点上)
setPartitionerClass()//Mapper的输出可能要由Partitioner按某种规则分发给多个Reducer
setMapSpeculativeExecution() //是否需要有Speculative的Mapper起预备队的作用
setReduceSpeculativeExecution() //是否需要有Speculative的Reducer起预备队的作用
setCacheFiles()
}
在Job类中有很多的静态变量,代码块等,我们知道在java中初始化会先加载静态的这些变量和代码块,所以我们在main方法中调用Job job = Job.getInstance(conf);方法的时候,就会对这些静态的变量和代码进行加载,这些静态的变量和代码块就是设置一些参数,比如设置job的默认状态的DEFINE状态,以及加载一些配置文件,加载配置文件的方法如下:
public static void loadResources() {
addDeprecatedKeys();
Configuration.addDefaultResource("mapred-default.xml");
Configuration.addDefaultResource("mapred-site.xml");
Configuration.addDefaultResource("yarn-default.xml");
Configuration.addDefaultResource("yarn-site.xml");
}
记载配置文件就是加载hadoop的一些配置文件,所以在我们调用waitForCompletion方法之前这些都是已经加载好了的,接下来我们看waitForCompletion方法:
//org.apache.hadoop.mapreduce中的Job类
public boolean waitForCompletion(boolean verbose) throws IOException,InterruptedException,ClassNotFoundException {
if (state == JobState.DEFINE) { //判断作业是否是DEFINE状态,防止重复提交作业
submit(); //提交作业
}
if (verbose) { //提交之后监控其运行,直到作业结束
monitorAndPrintJob(); //周期性报告作业进度情况
} else { //要不然就周期行询问作业是否文成
// get the completion poll interval from the client.
int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
从作业提交流程的角度看,这个方法的代码再简单不过了,实际就是对Job.submit()的调用,只是在调用之前要检查一下本作业是否处于 DEFINE 状态,以确保一个作业不会被提交多次。 如上所述,JobState的值只有 DEFINE 和 RUNNING 两种,具体Job对象创建之初在构造函数Job()中将其设置成 DEFINE,作业提交成功之后就将其改成 RUNNING,这就把门关上了。
在正常的情况下,Job.submit() 很快就会返回,因为这个方法的作用只是把作业提交上去,而无须等待作业的执行和完成。 但是,在Job.submit()返回之后,Job.waitForCompletion()则要等待作业执行完成了以后才会返回。 在等待期间,如果参数verbose为true,就要周期地报告作业执行的进展,或者就只是周期地检测作业是否已经完成。
所以我们的作业提交流程目前是:
[WordCount.main() -> Job.waitForCompletion() -> Job.submit() ]
那么,接下来,看一看这个submit方法:
void submit() final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(),cluster.getClient());//获取JobSubmitter的实例对象submitter
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { //ugi.doAs用来控制权限 public JobStatus run() return submitter.submitJobInternal(Job.this,cluster); //真正用于提交作业 } }); state = JobState.RUNNING; //设置job的状态为RUNNING LOG.info("The url to track the job: " + getTrackingURL()); }
接下来我们先看connect方法:
private synchronized connect()
if (cluster == null) { //如果cluter为空,我们就创建一个cluster实例
cluster =
ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run()
return Cluster(getConfiguration()); //创建cluster
}
});
}
}
可见connect()的作用就是保证节点上有个Cluster类对象,如果还没有,就创建一个。 那我们就看一下Cluster这个类(列出一部分):
Cluster {
@InterfaceStability.Evolving enum JobTrackerStatus {INITIALIZING,RUNNING}; //作业跟踪状态
private ClientProtocolProvider clientProtocolProvider; 集群版为YarnClientProtocolProvider ,本地模式为LocalClientProtocolProvider
private ClientProtocol client; 在集群条件下,这是与外界通信的渠道和规则
private UserGroupInformation ugi; 用来控制权限
private Configuration conf; 配置信息
private FileSystem fs = null; 文件系统
private Path sysDir = 系统目录
private Path stagingAreaDir = ;
private Path jobHistoryDir = 历史作业目录
final Log LOG = LogFactory.getLog(Cluster.);
ServiceLoader<ClientProtocolProvider>,就是针对
ClientProtocolProvider类的ServiceLoader,而且这就是通过ServiceLoaderl.oad()装载的ServiceLoader实现了Iterable界面,
//提供一个iterator()函数,因而可以用在for循环中。
它还提供了一个load()方法,可以通过ClassLoader加载Class
static ServiceLoader<ClientProtocolProvider> frameworkLoader = ServiceLoader.load(ClientProtocolProvider.);
static {
ConfigUtil.loadResources(); 加载配置文件
}
构造器
public Cluster(Configuration conf) IOException {
this( Cluster(InetSocketAddress jobTrackAddr,Configuration conf)
this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser();
initialize(jobTrackAddr,conf); 调用initialize方法
}
目的是要创建ClientProtocolProvider和ClientProtocol
initialize(InetSocketAddress jobTrackAddr,Configuration conf)
synchronized (frameworkLoader) { 不允许多个线程同时进入此段代码,需要加锁
for (ClientProtocolProvider provider : frameworkLoader) { 遍历frameworkLoader获取provider
LOG.debug("Trying ClientProtocolProvider : " + provider.getClass().getName());
ClientProtocol clientProtocol = ;
try {
if (jobTrackAddr == null) { 通过ClientProtocolProvider的create方法创建clientProtocol
clientProtocol = provider.create(conf);
} else {
clientProtocol = provider.create(jobTrackAddr,conf);
}
if (clientProtocol != ) {
clientProtocolProvider = provider;
client = clientProtocol; 已经创建了ClientProtocol对象,YARNRunner或LocalJobRunner
LOG.debug("Picked " + provider.getClass().getName()
+ " as the ClientProtocolProvider");
break; 成功后结束循环
}
else { 失败,记录日志
LOG.debug("Cannot pick " + provider.getClass().getName()
+ " as the ClientProtocolProvider - returned null protocol");
}
}
catch (Exception e) {
LOG.info("Failed to use " + provider.getClass().getName()
+ " due to error: "if (null == clientProtocolProvider || null == client) { 判断是否创建了ClientProtocolProvider和ClientProtocol对象
throw IOException(
"Cannot initialize Cluster. Please check your configuration for "
+ MRConfig.FRAMEWORK_NAME
+ " and the correspond server addresses.");
}
}
那么知道job类的connect方法就是确保有实例cluster,如果没有就通过Cluster的构造函数进行创建,在创建之前需要加载一些配置信息ConfigUtil.loadResources()和对静态的变量frameworkLoader等赋值,然后在调用Cluster的构造方法,在Cluster的构造方法中必定调用Cluster.initialize()方法,其中ClientProtocolProvider和ClientProtocol:用户向RM节点提交作业,是要RM为其安排运行,所以RM起着服务提供者的作用,而用户则处于客户的位置。既然如此,双方就得有个协议,对于双方怎么交互,乃至服务怎么提供,都得有个规定。在Hadoop的代码中,这所谓Protocol甚至被“上纲上线”到了计算框架的高度,连是否采用YARN框架也被纳入了这个范畴。实际上ClientProtocol就起着这样的作用,而ClientProtocolProvider顾名思义是ClientProtocol的提供者,起着有点像是Factory的作用。
至于ServiceLoader<ClientProtocolProvider>,那是用来装载ClientProtocolProvider的。
我们首先看一下这个类ClientProtocolProvider,很明显是一个抽象类,这意味着只有继承和扩充了这个抽象类的具体类才能被实体化成对象:
abstract ClientProtocolProvider {
abstract ClientProtocol create(Configuration conf) IOException;
abstract ClientProtocol create(InetSocketAddress addr,Configuration conf) IOException;
void close(ClientProtocol clientProtocol) IOException;
}
接下来我们看看这个抽象类的两个子类YarnClientProtocolProvider和LocalClientProtocolProvider
package org.apache.hadoop.mapred;
class YarnClientProtocolProvider extends ClientProtocolProvider {
@Override
public ClientProtocol create(Configuration conf) IOException {
if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
new YARNRunner(conf); YARNRunner实现了ClientProtocol接口
}
;
}
@Override
return create(conf);
}
@Override
if (clientProtocol instanceof YARNRunner) {
((YARNRunner)clientProtocol).close();
}
}
class LocalClientProtocolProvider IOException {
String framework =
conf.get(MRConfig.FRAMEWORK_NAME,MRConfig.LOCAL_FRAMEWORK_NAME);
if (!MRConfig.LOCAL_FRAMEWORK_NAME.equals(framework)) {
;
}
conf.setInt(JobContext.NUM_MAPS,1); map数为1
new LocalJobRunner(conf); LocalJobRunner实现了ClientProtocol接口
}
@Override
LocalJobRunner doesn't use a socket
close(ClientProtocol clientProtocol) {
no clean up required
}
现在返回来在聊聊Cluster.initialize()方法:
其中ServiceLoader实现了Iterable界面,提供一个iterator()函数,因而可以用在for循环中。它还提供了一个load()方法,可以通过ClassLoader加载Class。此外,它还提供解析文件内容的功能装载了作为ServiceLoader对象的frameworkLoader,其LinkedHashMap中就有了上述的两个路径,这样就可以通过其iterator()函数依次引用这两个路径了
然后,在Cluster类的构造函数中就会调用其initialize(),目的是要创建ClientProtocolProvider和ClientProtocol。
但是ClientProtocolProvider是个抽象类,这意味着只有继承和扩充了这个抽象类的具体类才能被实体化成对象。Hadoop的源码中一共只有两个类扩充和落实了这个抽象类,那就是LocalClientProtocolProvider和YarnClientProtocolProvide
可想而知,由这两种ClientProtocolProvider提供的ClientProtocol也是不一样的。事实上ClientProtocol是个界面,实现了这个界面的类也有两个,分别为LocalJobRunner和YARNRunner。但是实际使用的只能是其中之一。
initialize的for循环,是基于前述ServiceLoader中iterator()的循环。实际上也就是对两个ClientProtocolProvider的循环,目的是要通过ClientProtocolProvider.create()创建用户所要求的ClientProtocol,也无非就是LocalJobRunner或YARNRunner。只要有一次创建成功,循环就没有必要继续了,因为只能有一种选择;但是,如果两次都失败,程序就无法继续了,因为不知道该怎样让RM提供计算服务。而能否成功创建,则取决于前述配置项的设置。不过ClientProtocolProvider是抽象类,实际上依次进行尝试的是LocalClientProtocolProvider和YarnClientProtocolProvider。假定第一轮循环时进行尝试的是前者,那么作业的流程就是:
[WordCount.main() -> Job.waitForCompletion() -> Job.submit() -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> LocalClientProtocolProvider.create()]
如果是后者,则作业的流程就是:
[WordCount.main() -> Job.waitForCompletion() -> Job.submit() -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create()]
这里我们假定以yarn方式提交,所以流程为第二种。
通过YarnClientProtocolProvider.create()方法,最终返回的是一个new YARNRunner(conf)对象。
好了,继续回到我们的Job.submit()方法,到这里connect方法就算执行完毕了,接下就是对getJobSubmitter()的调用。 这个函数创建一个JobSubmitter类对象,然后Jobs. ubmit()就调用它的submitJobInternal()方法,完成作业的提交。创建JobSubmitter对象时的两个参数就是调用getJobSubmitter()时的两个参数,就是cluster.getFileSystem()和cluster.getClient()。 其中cluster.getClient()返回的就是 YARNRunner或LocalJobRunner;而cluster.getFileSystem()的返回结果对于 YARNRunner是 RM 节点上文件系统的 URL,对于 LocalJobRunner则是本节点上的一个相对路径为“mapred/system”的目录。
接下来了解下JobSubmitter这个类(部分展示):
org.apache.hadoop.mapreduce; JobSubmitter { final Log LOG = LogFactory.getLog(JobSubmitter.final String SHUFFLE_KEYGEN_ALGORITHM = "HmacSHA1"; //shuffle算法 final int SHUFFLE_KEY_LENGTH = 64; private FileSystem jtFs; ClientProtocol submitClient; String submitHostName; String submitHostAddress; JobSubmitter(FileSystem submitFs,ClientProtocol submitClient) this.submitClient = submitClient; 在集群条件下是YARNRunner this.jtFs = submitFs; } compareFs(FileSystemsrcFs,FileSystemdestFs) 比较两个文件系统是否相同 getPathURI() checkSpecs() copyRemoteFiles() copyAndConfigureFiles() copyJar(PathoriginalJarPath,PathsubmitJarFile,shortreplication) addMRFrameworkToDistributedCache() submitJobInternal(Jobjob,Clustercluster) 将作业提交给集群 writeNewSplits(JobContextjob,PathjobSubmitDir)
getJobSubmitter(FileSystem fs,ClientProtocol submitClient)//底层调用的就是JobSubmitter的构造方法
}
接下来看看submitJobInternal方法
JobStatus submitJobInternal(Job job,Cluster cluster) ClassNotFoundException,IOException { validate the jobs output specs 验证输出格式等配置 checkSpecs(job); Configuration conf = job.getConfiguration(); 获取配置信息 addMRFrameworkToDistributedCache(conf); 添加到缓存 Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster,conf); 获取目录路径 configure the command line options correctly on the submitting dfs InetAddress ip = InetAddress.getLocalHost(); 获取本节点(该主机)的ip地址 if (ip != ) { submitHostAddress = ip.getHostAddress();本节点IP地址的字符串形式 submitHostName = ip.getHostName();本节点名称 conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName); 写入配置conf中 conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress); } JobID jobId = submitClient.getNewJobID(); 设置JOBId(作业ID唯一) job.setJobID(jobId); 设置job的id Path submitJobDir = new Path(jobStagingArea,jobId.toString());本作业的临时子目录名中包含着作业ID号码 JobStatus status = { conf.set(MRJobConfig.USER_NAME,UserGroupInformation.getCurrentUser().getShortUserName()); 这是用户名 conf.set("hadoop.http.filter.initializers"准备用于Http接口的过滤器初始化 conf.set(MRJobConfig.MAPREDUCE_JOB_DIR,submitJobDir.toString());设置提交job的路径 LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir"); get delegation token for the dir /* 准备好与访问权限有关的证件(token) */ TokenCache.obtainTokensForNamenodes(job.getCredentials(),1)">new Path[] { submitJobDir },conf); 获取与NameNode打交道所需证件 populateTokenCache(conf,job.getCredentials()); generate a secret to authenticate shuffle transfers需要生成Mapper与Reducer之间的数据流动所用的密码 if (TokenCache.getShuffleSecretKey(job.getCredentials()) == ) { KeyGenerator keyGen; { keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM); keyGen.init(SHUFFLE_KEY_LENGTH); } (NoSuchAlgorithmException e) { new IOException("Error generating shuffle secret key" keyGen.generateKey(); TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),job.getCredentials()); } (CryptoUtils.isEncryptedSpillEnabled(conf)) { conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,1); LOG.warn("Max job attempts set to 1 since encrypted intermediate" + "data spill is enabled"); } copyAndConfigureFiles(job,submitJobDir);将可执行文件之类拷贝到HDFS中,默认的是保留10份,会存在不同的节点上 Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);配置文件路径 Create the splits for the job LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir)); int maps = writeSplits(job,submitJobDir); 设置map数,这里如何设置map的数量我会单独写一篇介绍, conf.setInt(MRJobConfig.NUM_MAPS,maps); LOG.info("number of splits:" + maps); write "queue admins of the queue to which job is being submitted" to job file. String queue = conf.get(MRJobConfig.QUEUE_NAME,JobConf.DEFAULT_QUEUE_NAME); 默认作业调度队列名为“default” AccessControlList acl = submitClient.getQueueAdmins(queue); conf.set(toFullPropertyName(queue,QueueACL.ADMINISTER_JOBS.getAclName()),acl.getAclString()); 设置acl权限 removing jobtoken referrals before copying the jobconf to HDFS as the tasks don't need this setting,actually they may break because of it if present as the referral will point to a different job. TokenCache.cleanUpTokenReferral(conf); 清楚Token引用的缓存 (conf.getBoolean( MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) { Add HDFS tracking ids 如果启用了跟踪机制的话 ArrayList<String> trackingIds = new ArrayList<String>(); for (Token<? extends TokenIdentifier> t : job.getCredentials().getAllTokens()) { trackingIds.add(t.decodeIdentifier().getTrackingId()); 获取所有相关跟踪机制 } conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,trackingIds.toArray(new String[trackingIds.size()])); 设置跟踪机制 } Set reservation info if it exists设置预设参数(如果有) ReservationId reservationId = job.getReservationId(); if (reservationId != ) { conf.set(MRJobConfig.RESERVATION_ID,reservationId.toString()); } Write job file to submit dir writeConf(conf,submitJobFile);将conf的内容写入一个.xml文件 // Now,actually submit the job (using the submit name) // printTokens(jobId,job.getCredentials()); 提交作业,通过YarnRunner.submitJob()或LocalJobRunner.submitJob() status = submitClient.submitJob( jobId,submitJobDir.toString(),job.getCredentials()); if (status != ) { return status; 返回状态 } { new IOException("Could not launch job"); } } finally { if (status == ) { LOG.info("Cleaning up the staging area " + submitJobDir); if (jtFs != null && submitJobDir != ) jtFs.delete(submitJobDir,1)">true); 删除临时目录 } } }
从submitJobInternal方法可以得知,需要随同作业单一起提交的资源和信息有两类:
一类是需要交到资源管理器RM手里,供RM在立项和调度时使用的;
一类则并非供RM直接使用,而是供具体进行计算的节点使用的。前者包括本节点即作业提交者的IP地址、节点名、用户名、作业ID号,以及有关MapReduce计算输入数据文件的信息,还有为提交作业而提供的“证章(Token)”等。这些信息将被打包提交给RM,这就是狭义的作业提交,是流程的主体。后者则有作业执行所需的jar可执行文件、外来对象库等。如果计算的输入文件在本地,则后者还应包括输入文件。这些资源并不需要提交给RM,因为RM本身并不需要用到这些资源,但是必须要把这些资源复制或转移到全局性的HDFS文件系统中,让具体承担计算任务的节点能够取用。
为了上传相关的资源和信息,需要在HDFS文件系统中为本作业创建一个目录。HDFS文件系统中有一个目录是专门用于作业提交的,称为“舞台目录(stagingdirectory)”。所以这里要通过JobSubmissionFiles.getStagingDir()从集群获取这个目录的路径。然后就以本作业的ID,即JobId为目录名在这个舞台目录中创建一个临时的子目录,这就是代码中的submitJobDir。以后凡是与本作业有关的资源和信息,就都上传到这个子目录中。
这个方法还包括设置map数,执行队列呀等最后执行connect()方法中创建的对象YARNRunner(或者是LocalJobRunner)的submitJob方法。这样我们的作业就提交给RM了,作业流程如下:
[WordCount.main() -> Job.waitForCompletion() -> Job.submit() -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create() -> JobSubmitter.sbumitJobInternal() -> YARNRunner.submitJob()]
可继续看(hadoop2.7之作业提交详解(下))
原文链接:/bigdata/997878.html