1. 官方WordCount源码分析
1.1. 添加依赖
1 | <!-- 3个基本依赖 --> |
1.2. 查看WordCount源码
1 | package org.apache.hadoop.examples; |
2. FlowSum总流量统计
目的:学习自定义序列化Bean
2.1. 输入数据
输入数据 phone_data.txt 的内容如下:
1 | 1 13736230513 192.196.100.1 www.atguigu.com 2481 24681 200 |
输入数据格式如下,每个字段之间以 制表符分隔
| id | 手机号码 | ip地址 | 上行流量 | 下行流量 | 网络状态码 |
|---|---|---|---|---|---|
| 7 | 13560436666 | 120.196.100.99 | 1116 | 954 | 200 |
注意同一手机号可能有多条记录,要记得累加起来
1 | 21 13568436656 192.168.100.18 www.alibaba.com 2481 24681 200 |
2.2. 需求说明
统计每一个手机号耗费的总上行流量、下行流量、总流量。期望输出数据格式如下:
| 手机号码 | 上行流量 | 下行流量 | 总流量(上行+下行) |
|---|---|---|---|
| 13560436666 | 1116 | 120.196.100.99 | 2070 |
2.3. 需求分析
程序实现的功能是:
1 | SELECT SUM(上行流量), SUM(下行流量)、SUM(上行流量+下行流量) GROUP BY 手机号 |
Input阶段:使用默认的TextInputFormat,按行读取
Map阶段:
读取一行数据,切分字段
提取出手机号、上行流量、下行流量
- 输出 <手机号,bean(上行流量,下行流量,总流量)>
Reduce阶段:
- 累加所有的上行流量、下行流量、得到总流量
- 输出 <手机号,bean(上行流量,下行流量,总流量)>
2.4. 编写MapReduce程序
2.4.1. 编写流量统计的Bean
1 | package cn.pancx.mr.flowsum; |
2.4.2. 编写Mapper/Reducer/Driver
1 | package cn.pancx.mr.flowsum; |
2.5. 上传到服务器运行
本地打包,再重命名为flowsum.jar
1 | mvn package |
上传数据到HDFS
1 | hdfs dfs -mkdir /input |
运行MapReduce
1 | hadoop jar flowsum.jar cn.pancx.mr.flowsum.FlowSumDriver /input /output |
查看结果
1 | $ hdfs dfs -cat /output/part* |
3. FlowSum扩展:手机号按前3位分区
目的:学习自定义分区,以及分区数的设置
3.1. 需求说明
统计每一个手机号耗费的总上行流量、下行流量、总流量。期望输出数据格式如下:
| 手机号码 | 上行流量 | 下行流量 | 总流量(上行+下行) |
|---|---|---|---|
| 13560436666 | 1116 | 954 | 2070 |
分区要求:手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中
3.2. 需求分析
Map和Reduce的业务逻辑都没有变,唯一要做的就是对结果进行分区
增加一个自定义分区类CustomPartitioner,分出以下5个区:
1 | 136 分区0 |
再在Driver中添加相应设置
1 | // 指定自定义分区 |
3.3. 编写MapReduce程序
1 | package cn.pancx.mr.flowsum; |
3.4. 上传到服务器运行
1 | hadoop jar flowsum.jar cn.pancx.mr.flowsum.FlowSumDriver /input /output |
查看分区结果
1 | $ hdfs dfs -cat /output/part*0 |
3.5. 测试调整numReduceTasks对结果的影响
设置为1
1 | job.setNumReduceTasks(1); |
运行,结果相当于只有1个分区
1 | $ hdfs dfs -ls -R /output |
设置为2,小于实际分区数
1 | job.setNumReduceTasks(2); |
运行,出现异常
1 | java.io.IOException: Illegal partition for 13846544121 (2) |
设置为6,大于实际分区数
1 | job.setNumReduceTasks(6); |
运行,结果生成6个文件,最后1个为空,前5个有结果
1 | $ hdfs dfs -ls -R /output |
总结:
- 如果ReduceTask的数量 > getPartition的结果数,则会多产生几个空的输出文件
- 如果1 < ReduceTask的数量 < getPartition的结果数,则有一部分分区数据无处安放,抛出异常
- 如果ReduceTask的数量=1 ,则不管MapTask端输出多少个分区文件,最终结 ReduceTask都交给这一个ReduceTask,最终也就只会产生一个结果文件part-r-00000
- 分区号必须从零开始,逐一累加
4. FlowSum总流量统计结果全排序
4.1. 输入数据
输入数据就是FlowSum总流量的统计结果
1 | 13470253144 180 180 360 |
输入数据格式如下,每个字段之间以 制表符分隔
| 手机号码 | 上行流量 | 下行流量 | 总流量(上行+下行) |
|---|---|---|---|
| 13560436666 | 1116 | 954 | 2070 |
4.2. 需求说明
输出与输入格式相同,但是要求按照总流量降序排序
4.3. 需求分析
FlowBean实现WritableComparable接口,重写compareTo方法,实现总流量降序排序
在Mapper中,输出<FlowBean,手机号>,这样Shuffle就能对key进行排序了
在Reducer中,接收到的根据FlowBean排序过的。直接输出<手机号,FlowBean>即可。注意在排序过程中,可能有两条记录的总流量是一样大的,所以valueIn可能对应多个手机号,统一循环输出所有的<手机号,FlowBean>即可。
4.4. 编写程序
4.4.1. FlowBean
1 | package cn.pancx.mr.flowsum; |
4.4.2. 编写MapReduce
1 | package cn.pancx.mr.flowsum; |
4.5. 运行
输出结果
1 | $ hdfs dfs -cat /output/part* |
5. FlowSum总流量统计结果分区排序
5.1. 输入数据
输入数据就是FlowSum总流量的统计结果
1 | 13470253144 180 180 360 |
输入数据格式如下,每个字段之间以 制表符分隔
| 手机号码 | 上行流量 | 下行流量 | 总流量(上行+下行) |
|---|---|---|---|
| 13560436666 | 1116 | 954 | 2070 |
5.2. 需求说明
输出与输入格式相同,但是要求按照总流量降序排序,并且根据手机号前3位分区:手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中
5.3. 需求分析
与”FlowSum总流量统计结果全排序”的思路一致,加上自定义分区即可
在Mapper中,输出<FlowBean,手机号>,这样Shuffle就能对key进行排序了
在Reducer中,接收到的根据FlowBean排序过的。直接输出<手机号,FlowBean>即可。注意在排序过程中,可能有两条记录的总流量是一样大的,所以valueIn可能对应多个手机号,统一循环输出所有的<手机号,FlowBean>即可。
5.4. 编写程序
5.4.1. FlowBean
实现WritableComparable
1 | package cn.pancx.mr.flowsum; |
5.4.2. 编写MapReduce
1 | package cn.pancx.mr.flowsum; |
5.5. 运行
1 | $ hdfs dfs -cat /output/part*0 |
6. FlowSum总流量统计结果TopN
目的:练习TopN
6.1. 输入数据
输入数据就是FlowSum总流量的统计结果
1 | 13470253144 180 180 360 |
6.2. 需求说明
根据总流量进行降序排序,取TopN条记录
6.3. 需求分析
要根据总流量降序排序,所以在Mapper中,输出
取TopN思路1:在Map阶段取TopN。在执行Mapper的map()时,用TreeMap收集TopN个
取TopN思路2:在Map阶段直接输出。在Reduce阶段也是直接输出。但是要设置一个变量,统计输出了
两种思路对比:一般来説,ReduceTask数量要少于MapTask数量,所以计算能在map中完成,就尽量不要拖到reduce再做。所以思路1更好
6.4. 编写程序
6.4.1. FlowBean
1 | package mr.flowsum; |
6.4.2. 编写MapReduce(思路1:在Map阶段处理TopN)
1 | package mr.flowsum; |
6.4.3. 编写MapReduce(思路2:在Reduce阶段处理TopN)
1 | package mr.flowsum; |