MapReduce 运行原理

1. InputFormat数据输入阶段

1.1. 数据切片与MapTask并行度决定机制

问题引出:MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度。思考:1G的数据,启动8个MapTask,可以提高集群的并发处理能力。那么1K的数据,也启动8个MapTask,会提高集群性能吗?MapTask并行任务是否越多越好呢?哪些因素影响了MapTask并行度?

概念区分:

  • 数据块:Block是HDFS物理上把数据分成一块一块。
  • 数据切片:只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储

1.2. Job提交流程源码详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
waitForCompletion()

submit();

// 1建立连接
connect();
// 1)创建提交Job的代理
new Cluster(getConfiguration());
// (1)判断是本地yarn还是远程
initialize(jobTrackAddr, conf);

// 2 提交job
submitter.submitJobInternal(Job.this, cluster)
// 1)创建给集群提交数据的Stag路径
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

// 2)获取jobid ,并创建Job路径
JobID jobId = submitClient.getNewJobID();

// 3)拷贝jar包到集群
copyAndConfigureFiles(job, submitJobDir);
rUploader.uploadFiles(job, jobSubmitDir);

// 4)计算切片,生成切片规划文件
writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir);
input.getSplits(job);

// 5)向Stag路径写XML配置文件
writeConf(conf, submitJobFile);
conf.writeXml(out);

// 6)提交Job,返回提交状态
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

1.3. FileInputFormat切片源码解析

  1. 找到你数据存储的目录
  2. 开始遍历处理(规划切片)目录下的每一个文件
  3. 遍历第一个文件ss.txt

    • 获取文件大小fs.sizeOf(ss.txt)
    • 计算切片大小 computeSliteSize(Math.max(minSize,Math.max(maxSize,blocksize))) = blocksize = 128M(集群模式下是128M,本地模式是32M)。本地模式运行时,HDFS认为一台机器性能不怎么好,所以就设置为32M。默认情况下,切片大小=blocksize,集群就是128M。
    • 开始切,形成第1个切片:ss.txt—0:128M 第2个切片ss.txt—128:256M 第3个切片ss.txt—256M:300M(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片)
    • 将切片信息写到一个切片规划文件中
    • 整个切片的核心过程在getSplit()方法中完成
    • InputSplit只记录了切片的元数据信息,比如起始位置、长度以及所在的节点列表等
  4. 提交切片规划文件到yarn上,yarn上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数

1.4. FileInputFormat切片机制

  • 简单地按照文件的内容长度进行切片
  • 切片大小,默认等于block大小。MapReduce本地模式32M,运行在集群上128M
  • 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片

假设有两个文件

1
2
file1.txt    320M
file2.txt 10M

要切成4片

1
2
3
4
file1.txt.split1 -- 0~128
file1.txt.split2 -- 128~256
file1.txt.split3 -- 256~320
file2.txt.split1 -- 0~10M

1.4.1. FileInputFormat切片大小的参数配置

(1)通过分析源码,在FileInputFormat中,计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize));

切片主要由这几个值来运算决定

  • mapreduce.input.fileinputformat.split.minsize=1 默认值为1
  • mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默认值Long.MAXValue

因此,默认情况下,切片大小=blocksize

(2)切片大小设置

maxsize(切片最大值):参数如果调得比blocksize小,则会让切片变小,而且就等于配置的这个参数的值。

minsize (切片最小值):参数调的比blockSize大,则可以让切片变得比blocksize还大。

(3)获取切片信息API

1
2
3
4
// 根据文件类型获取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();
// 获取切片的文件名称
String name = inputSplit.getPath().getName();

1.5. CombineTextInputFormat切片机制

框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下

1.5.1. 应用场景

CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理

1.5.2. 虚拟存储切片最大值设置

1
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 4m

注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。

1.5.3. 切片机制

生成切片过程包括:虚拟存储过程和切片过程二部分。

(1)虚拟存储过程:

将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。

例如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分成一个4M。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。

(2)切片过程:

  • 判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。
  • 如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
  • 测试举例:有4个小文件大小分别为1.7M、5.1M、3.4M以及6.8M这四个小文件,则虚拟存储之后形成6个文件块,大小分别为:

1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M)

最终会形成3个切片,大小分别为:

(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M

1.6. TextlnputFormat

TextlnputFormat是默认的FilelnputFomiat实现类。按行读取每条记录。键是存储该行在整个文件中的 起始字节偏移量,LongWritable类型。值是这行的内容,不包括任何行终止符(换行符和回车符), Text类型。

以下是一个示例,比如,一个分片包含了如下4条文本记录。

1
2
3
4
Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the enterprise

每条记录表示为以下键/值对:

1
2
3
4
(0,Rich learning form)
(19,Intelligent learning engine)
(47,Learning more convenient)
(72,From the real demand for more close to the enterprise)

1.7. KeyValueTextInputFormat

每一行均为一条记录,被分隔符分割为key,value。可以通过在驱动类中设置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t");来设定分隔符。默认分隔符是tab

以下是一个示例,输入是一个包含4条记录的分片。其中——>表示一个(水平方向的)制表符

1
2
3
4
line1——>Rich learning form
line2——>Intelligent learning engine
line3——>Learning more convenient
line4——>From the real demand for more close to the enterprise

每条记录表示为以下键/值对:此时的键是每行排在制表符之前的Text序列

1
2
3
4
(line1,Rich learning form)
(line2,Intelligent learning engine)
(line3,Learning more convenient)
(line4,From the real demand for more close to the enterprise)

2. MapTask和ReduceTask数量

  • MapTask数量:等于切片数
  • ReduceTask数量:等于分区数

一般来説,ReduceTask数量要少于MapTask数量,所以计算能在map中完成,就尽量不要拖到reduce再做。

3. Shuffle机制

Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle

3.1. Partition分区

问题引出:要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)

3.1.1. 默认partition分区

默认的分区策略取决于key.hashCode()numReduceTasks

1
2
3
4
5
6
7
public class HashPartitioner<K, V> extends Partitioner<K, V> {
public int getPartition(K key, V value, int numReduceTasks) {
// & Integer.MAX_VALUE 是取低32位数据
// 默认numReduceTasks=1,所以取余后返回0,即默认是partition0
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}

分区数量就是ReduceTask的数量,即源代码中的numReduceTasks

结果要对numReduceTasks取余,所以各个分区的编号分别为0、1、2、……、numReduceTasks-1

在没有修改numReduceTasks的情况下,numReduceTasks默认值为1,所以getPartition()返回值对1取余,结果恒定为0,所以就只有1个分区

3.2. 排序

排序是MapReduce框架中最重要的操作之一。MapTask和ReduceTask均会对数据(按照key)进行排序。

㧋操作属于Hadoop的默认行为。任何MapReduce应用程序中的数据均会被排序,而不管逻辑上是否需要。

默认排序是按照字典顺序排序,且实现该排序的方法是快速排序 + 归并排序。生成每个有序的小文件使用快速排序,将有序小文件合并为有序大文件使用归并排序

对于MapTask,它会将处理的结果暂时放到一个缓冲区中,当缓冲区使用率达到一定阈值后(默认环形缓冲区大小是100M,阈值是80%),再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,生成一个有序的文件。而当数据处理完毕后,它会通过归并排序对磁盘上所有文件进行一次合并,以将这些文件合并成一个大的有序文件。

对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件(注意是ReduceTask主动到MapTask拉取数据,而不是MapTask主动将数据送往ReduceTask),如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件(阶段性的归并排序);如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序

3.2.1. 排序分类

1)部分排序:
MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部排序。文件数由numReduceTasks决定

2)全排序:
全排序:最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。

如何用Hadoop产生一个全局排序的文件?最简单的方法是使用一个分区,即设置numReduceTask=1。但该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了MapReduce所提供的并行架构。

替代方案:首先创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序的文件。主要思路是使用一个分区来描述输出的全局排序。例如:可以为上述文件创建3个分区,在第一分区中,记录的单词首字母a-g,第二分区记录单词首字母h-n, 第三分区记录单词首字母o-z。

3)辅助排序:(GroupingComparator分组)
在Reduce端对key进行排序。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部 字段比较不相同)的key进入到同一个reduce方法时,可以采用分组序。

Mapreduce框架在记录到达Reducer之前按key对记录排序,但key所对应的value并没有被排序。甚至在不同的执行轮次中,这些值的排序也不固定,因为它们来自不同的map任务且这些map任务在不同轮次中完成时间各不相同。一般来说,大多数MapReduce程序会避免让reduce函数依赖于值的排序。但是,有时也需要通过特定的方法对键进行排序和分组等以实现对值的排序。

4)二次排序:
在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。相同,如果判断条件为3个,即三次排序; 如果判断条件为N个,即N次排序

panchaoxin wechat
关注我的公众号
支持一下