hadoop2.7作业提交详解之文件分片 hadoop2.7之作业提交详解(上)

前端之家收集整理的这篇文章主要介绍了hadoop2.7作业提交详解之文件分片 hadoop2.7之作业提交详解(上)前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

在前面一篇文章中(hadoop2.7之作业提交详解(上))中涉及到文件的分片。

JobSubmitter.submitJobInternal方法调用
int maps = writeSplits(job,submitJobDir); //设置map的数量,而map的数量是根据文件的大小和分片的大小,以及文件数量决定的

接下来我们看一下JobSubmitter.writeSplits方法

  1. private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,Path jobSubmitDir) throws IOException,InterruptedException,ClassNotFoundException {
  2. JobConf jConf = (JobConf)job.getConfiguration();
  3. maps;
  4. if (jConf.getUseNewMapper()) {
  5. maps = writeNewSplits(job,jobSubmitDir); //这里我们使用新的方式
  6. } else {
  7. maps = writeOldSplits(jConf,jobSubmitDir);
  8. }
  9. return maps;
  10. }

接下来继续看JobSubmitter.writeNewSplits方法:

  1. private <T extends InputSplit>
  2. int writeNewSplits(JobContext job,Path jobSubmitDir) job.getConfiguration();
  3. InputFormat<?,?> input =
  4. ReflectionUtils.newInstance(job.getInputFormatClass(),conf); 输入对象,InputFormat是个抽象类
  5. List<InputSplit> splits = input.getSplits(job); 调用InputFormat实现类的getSplits方法
  6. T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
  7. sort the splits into order based on size,so that the biggest
  8. go first
  9. Arrays.sort(array,new SplitComparator()); 对切片的大小进行排序,最大的放最前面
  10. JobSplitWriter.createSplitFiles(jobSubmitDir,conf,jobSubmitDir.getFileSystem(conf),array);创建Split文件
  11. array.length;
  12. }

接下来看一下InputFormat这个抽象类:

  1. public abstract class InputFormat<K,V> {
  2. 用来返回分片结果
  3. abstract
  4. List<InputSplit> getSplits(JobContext context
  5. ) RecordReader是用来从一个输入分片中读取一个一个的K-V对的抽象类,我们可以将其看作是在InputSplit上的迭代器。
  6. 最主要的方法就是nextKeyvalue()方法,由它获取分片上的下一个K-V 对。
  7. RecordReader<K,1)"> createRecordReader(InputSplit split,TaskAttemptContext context
  8. ) 方法都来自于FileInputFormat类,TextInputFormat类只重写了两个方法:如下:

  9. class TextInputFormat extends FileInputFormat<LongWritable,Text> {
  10.   @Override
  11.   public RecordReader<LongWritable,1)"> 
  12.     createRecordReader(InputSplit split,TaskAttemptContext context) {
  13.     String delimiter = context.getConfiguration().get(
  14.         "textinputformat.record.delimiter");
  15.     byte[] recordDelimiterBytes = null;
  16.     if (null != delimiter)
  17.       recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
  18.       LineRecordReader由一个FileSplit构造出来,start是这个FileSplit的起始位置,pos是当前读取分片的位置,
  19.       end是分片结束位置,in是打开的一个读取这个分片的输入流,它是使用这个FileSplit对应的文件名来打开的。
  20.       keyvalue则分别是每次读取的K-V对。然后我们还看到可以利用getProgress()来跟踪读取分片的进度,
  21.       这个函数就是根据已经读取的K-V对占总K-V对的比例来显示进度的
  22.     return  LineRecordReader(recordDelimiterBytes);
  23.   }
  24.   @Override
  25.   protected boolean isSplitable(JobContext context,Path file) {
  26.  如果是压缩文件就不切分,非压缩文件就切分。
  27.     final CompressionCodec codec =
  28.        CompressionCodecFactory(context.getConfiguration()).getCodec(file);
  29.     null == codec) {
  30.       true;
  31.     }
  32.     return codec instanceof SplittableCompressionCodec;
  33.   }
  34. }
  35. 我们在返回到JobSubmitter.writeNewSplits方法中,有List<InputSplit> splits = input.getSplits(job);主要是调用TextInputFormat.getSplits()方法,而TextInputFormat继承了FileInputFormat类,所以调用的就是FileInputFormat.getSplits()方法

  36. public List<InputSplit> getSplits(JobContext job)  IOException {
  37.   StopWatch sw = new StopWatch().start();用来计算纳秒级别的时间
  38.   long minSize = Math.max(getFormatMinSplitSize(),getMinSplitSize(job)); 最小值默认为1
  39.   long maxSize = getMaxSplitSize(job); 最大值为long的最大值,默认为0x7fffffffffffffffL
  40.    generate splits
  41.   List<InputSplit> splits = new ArrayList<InputSplit>();
  42.   List<FileStatus> files = listStatus(job); 获得所有的输入文件
  43.   for (FileStatus file: files) {
  44.     Path path = file.getPath(); 文件路径
  45.     long length = file.getLen(); 文件大小
  46.     if (length != 0) {
  47.       BlockLocation[] blkLocations;
  48.       if (file instanceof LocatedFileStatus) {如果是个含有数据块位置信息的文件 
  49.         blkLocations = ((LocatedFileStatus) file).getBlockLocations();
  50.       } else { 一般文件 
  51.         FileSystem fs = path.getFileSystem(job.getConfiguration());
  52.         blkLocations = fs.getFileBlockLocations(file,0,length);
  53.       }
  54.       if (isSplitable(job,path)) { 判断是否可以分片
  55.         long blockSize = file.getBlockSize(); 128M
  56.         long splitSize = computeSplitSize(blockSize,minSize,maxSize); 计算分片的大小,默认为128M 
  57.         long bytesRemaining = length;
  58.         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 判断剩余文件大小是否大于128M*1.1 
  59.           int blkIndex = getBlockIndex(blkLocations,length-bytesRemaining);f返回每个分片起始位置
  60.           splits.add(makeSplit(path,length-bytesRemaining,splitSize,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));
  61.           bytesRemaining -= splitSize;  依次减去分片的大小,对剩余长度再次分片
  62.         }
  63.  多次分片后,最后的数据长度仍不为0但又不足一个分片大小
  64.         if (bytesRemaining != 0) {
  65.           bytesRemaining);
  66.           splits.add(makeSplit(path,length-不可分,则把整个文件作为一个分片
  67.       }  not splitable
  68.         splits.add(makeSplit(path,length,blkLocations[0].getHosts(),blkLocations[0].getCachedHosts()));
  69.       }
  70.     }  { 
  71. 创建空的分片
  72.       Create empty hosts array for zero length files
  73.       splits.add(makeSplit(path,1)">new String[0]));
  74.     }
  75.   }
  76.    Save the number of input files for metrics/loadgen
  77.   job.getConfiguration().setLong(NUM_INPUT_FILES,files.size()); 设置参数NUM_INPUT_FILES
  78.   sw.stop();
  79.    (LOG.isDebugEnabled()) {
  80.     LOG.debug("Total # of splits generated by getSplits: " + splits.size()
  81.         + ",TiMetaken: " + sw.now(TimeUnit.MILLISECONDS));
  82.   }
  83.    splits;
  84. }
  85. public class FileSplit extends InputSplit implements Writable {
  86.   private Path file;输入文件路径 
  87.   private long start;分片在文件中的位置(起点)
  88.   private long length;分片长度
  89.   private String[] hosts;这个分片所在数据块的多个复份所在节点
  90.   private SplitLocationInfo[] hostInfos;每个数据块复份所在节点,以及是否缓存 
  91. }
  92. makeSplit方法存放的分片格式
  93. protected FileSplit makeSplit(Path file,1)">long start,1)">long length,String[] hosts,String[] inMemoryHosts) {
  94.    FileSplit(file,start,hosts,inMemoryHosts);
  95. }
  96. 计算分片的大小
  97. long computeSplitSize(long blockSize,1)"> minSize, maxSize) {
  98.    Math.max(minSize,Math.min(maxSize,blockSize));
  99. }
  100. 通过FileInputFormat.getSplits(),可以返回一个存放分片的ArraryList,接下继续回到JobSubmitter.writeNewSplits方法中:

  101. 接下来将ArrayList转换为数组,并根据分片的大小排序。然后调用JobSplitWriter.createSplitFiles()方法创建split文件。最后返回数组的长度,也就是map的个数。

猜你在找的大数据相关文章