Hadoop原理(三):MapReduce框架原理

本文为学习笔记,对应视频教程来自尚硅谷大数据Hadoop教程(Hadoop 3.x安装搭建到集群调优

image-20220806123843410

切片与 MapTask 并行度决定机制

问题引出

  1. 对于一个大文件,Hadoop会将其拆成若干个MapTask进行并发处理。但是具体拆成多少个任务是通过什么决定的呢?
  2. HDFS中存储的block块是严格安装字节数进行拆分的,那么MapReduce时是如何保证读取文件的逻辑完整性呢?

名词解释

数据块:Block 是 HDFS 物理上把数据分成一块一块。数据块是 HDFS 存储数据单位。

数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是 MapReduce 程序计算输入数据的单位,一个切片会对应启动一个 MapTask。

Job 提交流程源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Submit the job to the cluster and wait for it to finish.
* @param verbose print the progress to the user
* @return true if the job succeeded
* @throws IOException thrown if the communication with the
* <code>JobTracker</code> is lost
*/
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
submit();
}
if (verbose) {
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
  1. 只有当 JobState 为 DEFINE 时提交任务,否则直接执行后续代码;
  2. 通过 verbose 字段判断是否打印运行时的详细信息,如果为 false 则循环判断是否完成并返回结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* Submit the job to the cluster and return immediately.
* @throws IOException
*/
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
// 判断状态
ensureState(JobState.DEFINE);
// 兼容新旧API
setUseNewAPI();
// 建立连接
connect();
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
// 提交Job
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
  1. ensureState(JobState.DEFINE)检验了任务提交之前的状态;

  2. setUseNewAPI()是为了兼容新旧AIP

  3. connect()方法会创建一个Cluster,该Cluster类的initialize方法会通过以下两个provider尝试初始化客户端,当YarnClientProvider无法创建出Cluster时则使用LocalClientProtocolProvider;

    image-20220806151610084

  4. submitter.submitJobInternal(Job.this, cluster)用于提交Job

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
JobStatus submitJobInternal(Job job, Cluster cluster) 
throws ClassNotFoundException, InterruptedException, IOException {
...

// 1.初始化暂存目录并返回路径,该路径下存放的是提交任务的相关信息。
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
...

// 2.获取jobid
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());

try {
...
// 3.configure the jobconf of the user with the command line options of -libjars, -files, -archives.
copyAndConfigureFiles(job, submitJobDir);

Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);

// 4.计算切片,生成切片规划文件
int maps = writeSplits(job, submitJobDir);
...

// 5.Write job file to submit dir
writeConf(conf, submitJobFile);
...

// 6.提交Job
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
...
} finally {
...
}
}
  1. JobSubmissionFiles.getStagingDir(cluster, conf)方法运行完后将会在文件系统下创建一个目录用于暂存任务提交的基本信息。

    image-20220806152815997

    image-20220806152913860

  2. submitClient.getNewJobID()用于生成JobID,每个Job的JobID均不相同,且该Job的基本信息将存储在 jobStagingArea+’/‘+jobID下

  3. copyAndConfigureFiles(job, submitJobDir) 将会上传与Job有关的文件,如 libjars, jobjars, and archives

  4. writeSplits(job, submitJobDir)计算切片,生成切片规划文件,具体细节后续分析

image-20220806160243782

  1. writeConf(conf, submitJobFile),将yarn-default.xml、mapred-default.xml、core-default.xml等配置文件内容整合成Job.xml并上传

    image-20220806160709853

  2. submitClient.submitJob(…),提交Job

image-20220806161537929

切片源码解析

1
2
3
4
5
6
7
8
9
10
11
12
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
if (jConf.getUseNewMapper()) {
maps = writeNewSplits(job, jobSubmitDir);
} else {
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

// 最终将调用InputFormat类的getSplits方法进行切片
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}

观看上诉源代码,我们可以发现,切片的划分最终将调用InputFormat类的getSplits方法,因此不同的InputFormat将使用不同的划分规则,这边我们使用FileInputFormat类进行学习。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);

// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
// 获取需要处理的文件信息,包括文件所在位置
List<FileStatus> files = listStatus(job);

boolean ignoreDirs = !getInputDirRecursive(job)
&& job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
// 遍历文件,这里可以判断出切片不是考虑数据集整体,而是针对每一个文件单独切片
for (FileStatus file: files) {
if (ignoreDirs && file.isDirectory()) {
continue;
}
Path path = file.getPath();
// 获取文件长度
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
// 判断该文件是否可以切片,有些压缩过的文件不支持切片
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
// 计算切片大小,默认等于blockSize
long splitSize = computeSplitSize(blockSize, minSize, maxSize);

long bytesRemaining = length;
// SPLIT_SLOP为1.1,即当文件剩余大小小于块文件的1.1倍时,不进行切片避免产生小文件
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
// 将切片信息记入在splits数组中
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}

if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
if (LOG.isDebugEnabled()) {
// Log only if the file is big enough to be splitted
if (length > Math.min(file.getBlockSize(), minSize)) {
LOG.debug("File is not splittable so no parallelization "
+ "is possible: " + file.getPath());
}
}
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits;
}
  1. maxsize(切片最大值):参数如果调得比blockSize小,则会让切片变小,而且就等于配置的这个参数的值;
  2. minsize(切片最小值):参数调的比blockSize大,则可以让切片变得比blockSize还大;
  3. 当文件剩余大小小于单个切片大小的1.1倍时,为了防止参数小文件切片将不再进行划分;
  4. image-20220806170040853

MapReduce中如何处理单行数据分布于不同切片的情况

FileInputFormat对文件的切分是严格按照偏移量来的,因此一行记录比较长的话,其可能被切分到不同的InputSplit。但这并不会对Map造成影响,尽管一行记录可能被拆分到不同的InputSplit,但是与FileInputFormat关联的RecordReader被设计的足够健壮,当一行记录跨InputSplit时,其能够到读取不同的InputSplit,直到把这一行记录读取完成,在Hadoop里,记录行形式的文本,通常采用默认的TextInputFormat,TextInputFormat关联的是LineRecordReader,下面我们来看看LineRecordReader的的nextKeyValue方法里读取文件的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
throws IOException {
/* We're reading data from in, but the head of the stream may be
* already buffered in buffer, so we have several cases:
* 1. No newline characters are in the buffer, so we need to copy
* everything and read another buffer from the stream.
* 2. An unambiguously terminated line is in buffer, so we just
* copy to str.
* 3. Ambiguously terminated line is in buffer, i.e. buffer ends
* in CR. In this case we copy everything up to CR to str, but
* we also need to see what follows CR: if it's LF, then we
* need consume LF as well, so next call to readLine will read
* from after that.
* We use a flag prevCharCR to signal if previous character was CR
* and, if it happens to be at the end of the buffer, delay
* consuming it until we have a chance to look at the char that
* follows.
*/
str.clear();
int txtLength = 0; //tracks str.getLength(), as an optimization
int newlineLength = 0; //length of terminating newline
boolean prevCharCR = false; //true of prev char was CR
long bytesConsumed = 0;
do {
int startPosn = bufferPosn; //starting from where we left off the last time
if (bufferPosn >= bufferLength) {
startPosn = bufferPosn = 0;
if (prevCharCR) {
++bytesConsumed; //account for CR from previous read
}
bufferLength = fillBuffer(in, buffer, prevCharCR);
if (bufferLength <= 0) {
break; // EOF
}
}
for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
if (buffer[bufferPosn] == LF) {
newlineLength = (prevCharCR) ? 2 : 1;
++bufferPosn; // at next invocation proceed from following byte
break;
}
if (prevCharCR) { //CR + notLF, we are at notLF
newlineLength = 1;
break;
}
prevCharCR = (buffer[bufferPosn] == CR);
}
int readLength = bufferPosn - startPosn;
if (prevCharCR && newlineLength == 0) {
--readLength; //CR at the end of the buffer
}
bytesConsumed += readLength;
int appendLength = readLength - newlineLength;
if (appendLength > maxLineLength - txtLength) {
appendLength = maxLineLength - txtLength;
}
if (appendLength > 0) {
str.append(buffer, startPosn, appendLength);
txtLength += appendLength;
}
} while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);

if (bytesConsumed > Integer.MAX_VALUE) {
throw new IOException("Too many bytes before newline: " + bytesConsumed);
}
return (int)bytesConsumed;
}

newlineLength==0则以为一次do-while循环中读取的内容中没有遇到换行符,因maxBytesToConsume的默认值为Integer.MAX_VALUE,所以如果读取的内容没有遇到换行符,则会一直读取下去,知道读取的内容超过maxBytesToConsume。这样的出来方式,解决了一行记录跨InputSplit的读取问题,同样也会造成下面两个疑问:

​ 1.既然在LineReader读取方法里面没有对考虑InputSplit的end进行处理,难道读取一个InputSplit的时候,会这样无限的读取下去么?

​ 2.如果一行记录L跨越了A,B两个InputSplit,读A的时候已经读取了跨越A,B的这条记录L,那么对B这个InputSplit读取的时候,如果做到不读取L这条记录在B中的部分呢?

​ 为了解决这两个问题,Hadoop通过下面的代码来做到:LineRecordReader的nextKeyValue方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);
if (value == null) {
value = new Text();
}
int newSize = 0;
// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
if (pos == 0) {
newSize = skipUtfByteOrderMark();
} else {
newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
pos += newSize;
}

if ((newSize == 0) || (newSize < maxLineLength)) {
break;
}

// line too long. try again
LOG.info("Skipped line of size " + newSize + " at pos " +
(pos - newSize));
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}

如果不是第一InputSplit,则在读取的时候,LineRecordReader会自动忽略掉第一个换行符之前的所有内容,这样就不存在重读读取的问题。

1
2
3
4
5
6
7
// If this is not the first split, we always throw away first record  
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;