MapReduce FlowSum编程案例

1. 官方WordCount源码分析

1.1. 添加依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<!-- 3个基本依赖 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-examples</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>

1.2. 查看WordCount源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable> {
/**
* 输出的value是常量1
*/
private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

public static class IntSumReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
// 过时源码 Job job = new Job(conf, "word count");
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

2. FlowSum总流量统计

目的:学习自定义序列化Bean

2.1. 输入数据

输入数据 phone_data.txt 的内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1	13736230513	192.196.100.1	www.atguigu.com	2481	24681	200
2 13846544121 192.196.100.2 264 0 200
3 13956435636 192.196.100.3 132 1512 200
4 13966251146 192.168.100.1 240 0 404
5 18271575951 192.168.100.2 www.atguigu.com 1527 2106 200
6 84188413 192.168.100.3 www.atguigu.com 4116 1432 200
7 13590439668 192.168.100.4 1116 954 200
8 15910133277 192.168.100.5 www.hao123.com 3156 2936 200
9 13729199489 192.168.100.6 240 0 200
10 13630577991 192.168.100.7 www.shouhu.com 6960 690 200
11 15043685818 192.168.100.8 www.baidu.com 3659 3538 200
12 15959002129 192.168.100.9 www.atguigu.com 1938 180 500
13 13560439638 192.168.100.10 918 4938 200
14 13470253144 192.168.100.11 180 180 200
15 13682846555 192.168.100.12 www.qq.com 1938 2910 200
16 13992314666 192.168.100.13 www.gaga.com 3008 3720 200
17 13509468723 192.168.100.14 www.qinghua.com 7335 110349 404
18 18390173782 192.168.100.15 www.sogou.com 9531 2412 200
19 13975057813 192.168.100.16 www.baidu.com 11058 48243 200
20 13768778790 192.168.100.17 120 120 200
21 13568436656 192.168.100.18 www.alibaba.com 2481 24681 200
22 13568436656 192.168.100.19 1116 954 200

输入数据格式如下,每个字段之间以 制表符分隔

id 手机号码 ip地址 上行流量 下行流量 网络状态码
7 13560436666 120.196.100.99 1116 954 200

注意同一手机号可能有多条记录,要记得累加起来

1
2
21 	13568436656	192.168.100.18	www.alibaba.com	2481	24681	200
22 13568436656 192.168.100.19 1116 954 200

2.2. 需求说明

统计每一个手机号耗费的总上行流量、下行流量、总流量。期望输出数据格式如下:

手机号码 上行流量 下行流量 总流量(上行+下行)
13560436666 1116 120.196.100.99 2070

2.3. 需求分析

程序实现的功能是:

1
SELECT SUM(上行流量), SUM(下行流量)、SUM(上行流量+下行流量) GROUP BY 手机号

Input阶段:使用默认的TextInputFormat,按行读取

Map阶段:

  • 读取一行数据,切分字段

  • 提取出手机号、上行流量、下行流量

  • 输出 <手机号,bean(上行流量,下行流量,总流量)>

Reduce阶段:

  • 累加所有的上行流量、下行流量、得到总流量
  • 输出 <手机号,bean(上行流量,下行流量,总流量)>

2.4. 编写MapReduce程序

2.4.1. 编写流量统计的Bean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package cn.pancx.mr.flowsum;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

// 1. 实现Writable接口
public class FlowBean implements Writable {
/**
* 上行流量
*/
private long upFlow;
/**
* 下行流量
*/
private long downFlow;
/**
* 总流量(上行流量 + 下行流量)
*/
private long sumFlow;

// 2. 反序列化时,需要反射调用空参构造函数,所以必须有一个空参构造函数
public FlowBean() {
super();
}

public FlowBean(long upFlow, long downFlow) {
super();
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}

// 3. 实现序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}

// 4. 实现反序列化方法
@Override
public void readFields(DataInput in) throws IOException {
// 反序列化方法读顺序必须和写序列化方法的写顺序必须一致
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}

// 5. 编写toString方法,方便后续输出观察
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}

public long getUpFlow() {
return upFlow;
}

public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}

public long getDownFlow() {
return downFlow;
}

public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}

public long getSumFlow() {
return sumFlow;
}

public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
}

2.4.2. 编写Mapper/Reducer/Driver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package cn.pancx.mr.flowsum;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


public class FlowSumDriver {
public static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
FlowBean v = new FlowBean();
Text k = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 切割字段
String[] fields = line.split("\t");

// 3 封装对象
// 取出手机号码
String phoneNum = fields[1];
// 取出上行流量和下行流量
long upFlow = Long.parseLong(fields[fields.length - 3]);
long downFlow = Long.parseLong(fields[fields.length - 2]);
k.set(phoneNum);
v.setDownFlow(downFlow);
v.setUpFlow(upFlow);
v.setSumFlow(upFlow + downFlow);

// 4 写出
context.write(k, v);
}
}

public static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long sumUpFlow = 0;
long sumDownFlow = 0;

// 1 遍历所用bean,将其中的上行流量,下行流量分别累加
for (FlowBean flowBean : values) {
sumUpFlow += flowBean.getUpFlow();
sumDownFlow += flowBean.getDownFlow();
}

// 2 封装对象
FlowBean resultBean = new FlowBean(sumUpFlow, sumDownFlow);

// 3 写出
context.write(key, resultBean);
}
}


public static void main(String[] args) throws Exception {
// 1 获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);

// 6 指定本程序的jar包所在的本地路径
job.setJarByClass(FlowSumDriver.class);

// 2 指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);

// 3 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);

// 4 指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

// 5 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}

2.5. 上传到服务器运行

本地打包,再重命名为flowsum.jar

1
mvn package

上传数据到HDFS

1
2
hdfs dfs -mkdir /input
hdfs dfs -put phone_data.txt /input/

运行MapReduce

1
hadoop jar flowsum.jar cn.pancx.mr.flowsum.FlowSumDriver /input /output

查看结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
$ hdfs dfs -cat /output/part*
13470253144 180 180 360
13509468723 7335 110349 117684
13560439638 918 4938 5856
13568436656 3597 25635 29232
13590439668 1116 954 2070
13630577991 6960 690 7650
13682846555 1938 2910 4848
13729199489 240 0 240
13736230513 2481 24681 27162
13768778790 120 120 240
13846544121 264 0 264
13956435636 132 1512 1644
13966251146 240 0 240
13975057813 11058 48243 59301
13992314666 3008 3720 6728
15043685818 3659 3538 7197
15910133277 3156 2936 6092
15959002129 1938 180 2118
18271575951 1527 2106 3633
18390173782 9531 2412 11943
84188413 4116 1432 5548

3. FlowSum扩展:手机号按前3位分区

目的:学习自定义分区,以及分区数的设置

3.1. 需求说明

统计每一个手机号耗费的总上行流量、下行流量、总流量。期望输出数据格式如下:

手机号码 上行流量 下行流量 总流量(上行+下行)
13560436666 1116 954 2070

分区要求:手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中

3.2. 需求分析

Map和Reduce的业务逻辑都没有变,唯一要做的就是对结果进行分区

增加一个自定义分区类CustomPartitioner,分出以下5个区:

1
2
3
4
5
136       分区0
137 分区1
138 分区2
139 分区3
other 分区4

再在Driver中添加相应设置

1
2
3
4
// 指定自定义分区
job.setPartitionerClass(CustomPartitioner.class);
// 同时指定相应数量的reduce task
job.setNumReduceTasks(5);

3.3. 编写MapReduce程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package cn.pancx.mr.flowsum;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


public class FlowSumDriver {
/**
* 自定义分区类
*/
public static class CustomPartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text key, FlowBean flowBean, int numPartitions) {
// 1 获取电话号码的前三位
String preNum = key.toString().substring(0, 3);

// 2 根据前三位进行分区
int partition = 4;
if ("136".equals(preNum)) {
partition = 0;
} else if ("137".equals(preNum)) {
partition = 1;
} else if ("138".equals(preNum)) {
partition = 2;
} else if ("139".equals(preNum)) {
partition = 3;
}
return partition;
}
}


public static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
FlowBean v = new FlowBean();
Text k = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 切割字段
String[] fields = line.split("\t");

// 3 封装对象
// 取出手机号码
String phoneNum = fields[1];
// 取出上行流量和下行流量
long upFlow = Long.parseLong(fields[fields.length - 3]);
long downFlow = Long.parseLong(fields[fields.length - 2]);
k.set(phoneNum);
v.setDownFlow(downFlow);
v.setUpFlow(upFlow);
v.setSumFlow(upFlow + downFlow);

// 4 写出
context.write(k, v);
}
}

public static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long sumUpFlow = 0;
long sumDownFlow = 0;

// 1 遍历所用bean,将其中的上行流量,下行流量分别累加
for (FlowBean flowBean : values) {
sumUpFlow += flowBean.getUpFlow();
sumDownFlow += flowBean.getDownFlow();
}

// 2 封装对象
FlowBean resultBean = new FlowBean(sumUpFlow, sumDownFlow);

// 3 写出
context.write(key, resultBean);
}
}


public static void main(String[] args) throws Exception {
// 1 获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);

// 6 指定本程序的jar包所在的本地路径
job.setJarByClass(FlowSumDriver.class);

// 2 指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);

// 3 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);

// 4 指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

// 5 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 8 指定自定义数据分区
job.setPartitionerClass(CustomPartitioner.class);
// 9 同时指定相应数量的reduce task
job.setNumReduceTasks(5);

// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}

3.4. 上传到服务器运行

1
hadoop jar flowsum.jar cn.pancx.mr.flowsum.FlowSumDriver /input /output

查看分区结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
$ hdfs dfs -cat /output/part*0
13630577991 6960 690 7650
13682846555 1938 2910 4848

$ hdfs dfs -cat /output/part*1
13729199489 240 0 240
13736230513 2481 24681 27162
13768778790 120 120 240

$ hdfs dfs -cat /output/part*2
13846544121 264 0 264

$ hdfs dfs -cat /output/part*3
13956435636 132 1512 1644
13966251146 240 0 240
13975057813 11058 48243 59301
13992314666 3008 3720 6728

$ hdfs dfs -cat /output/part*4
13470253144 180 180 360
13509468723 7335 110349 117684
13560439638 918 4938 5856
13568436656 3597 25635 29232
13590439668 1116 954 2070
15043685818 3659 3538 7197
15910133277 3156 2936 6092
15959002129 1938 180 2118
18271575951 1527 2106 3633
18390173782 9531 2412 11943
84188413 4116 1432 5548

3.5. 测试调整numReduceTasks对结果的影响

设置为1

1
job.setNumReduceTasks(1);

运行,结果相当于只有1个分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
$ hdfs dfs -ls -R /output
-rw-r--r-- 1 root supergroup 0 2019-02-26 20:29 /output/_SUCCESS
-rw-r--r-- 1 root supergroup 550 2019-02-26 20:29 /output/part-r-00000
$ hdfs dfs -cat /output/part*
13470253144 180 180 360
13509468723 7335 110349 117684
13560439638 918 4938 5856
13568436656 3597 25635 29232
13590439668 1116 954 2070
13630577991 6960 690 7650
13682846555 1938 2910 4848
13729199489 240 0 240
13736230513 2481 24681 27162
13768778790 120 120 240
13846544121 264 0 264
13956435636 132 1512 1644
13966251146 240 0 240
13975057813 11058 48243 59301
13992314666 3008 3720 6728
15043685818 3659 3538 7197
15910133277 3156 2936 6092
15959002129 1938 180 2118
18271575951 1527 2106 3633
18390173782 9531 2412 11943
84188413 4116 1432 5548

设置为2,小于实际分区数

1
job.setNumReduceTasks(2);

运行,出现异常

1
java.io.IOException: Illegal partition for 13846544121 (2)

设置为6,大于实际分区数

1
job.setNumReduceTasks(6);

运行,结果生成6个文件,最后1个为空,前5个有结果

1
2
3
4
5
6
7
8
$ hdfs dfs -ls -R /output
-rw-r--r-- 1 root supergroup 0 2019-02-26 20:33 /output/_SUCCESS
-rw-r--r-- 1 root supergroup 53 2019-02-26 20:33 /output/part-r-00000
-rw-r--r-- 1 root supergroup 75 2019-02-26 20:33 /output/part-r-00001
-rw-r--r-- 1 root supergroup 22 2019-02-26 20:33 /output/part-r-00002
-rw-r--r-- 1 root supergroup 105 2019-02-26 20:33 /output/part-r-00003
-rw-r--r-- 1 root supergroup 295 2019-02-26 20:33 /output/part-r-00004
-rw-r--r-- 1 root supergroup 0 2019-02-26 20:33 /output/part-r-00005 # 空结果

总结:

  • 如果ReduceTask的数量 > getPartition的结果数,则会多产生几个空的输出文件
  • 如果1 < ReduceTask的数量 < getPartition的结果数,则有一部分分区数据无处安放,抛出异常
  • 如果ReduceTask的数量=1 ,则不管MapTask端输出多少个分区文件,最终结 ReduceTask都交给这一个ReduceTask,最终也就只会产生一个结果文件part-r-00000
  • 分区号必须从零开始,逐一累加

4. FlowSum总流量统计结果全排序

4.1. 输入数据

输入数据就是FlowSum总流量的统计结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
13470253144     180     180     360
13509468723 7335 110349 117684
13560439638 918 4938 5856
13568436656 3597 25635 29232
13590439668 1116 954 2070
13630577991 6960 690 7650
13682846555 1938 2910 4848
13729199489 240 0 240
13736230513 2481 24681 27162
13768778790 120 120 240
13846544121 264 0 264
13956435636 132 1512 1644
13966251146 240 0 240
13975057813 11058 48243 59301
13992314666 3008 3720 6728
15043685818 3659 3538 7197
15910133277 3156 2936 6092
15959002129 1938 180 2118
18271575951 1527 2106 3633
18390173782 9531 2412 11943
84188413 4116 1432 5548

输入数据格式如下,每个字段之间以 制表符分隔

手机号码 上行流量 下行流量 总流量(上行+下行)
13560436666 1116 954 2070

4.2. 需求说明

输出与输入格式相同,但是要求按照总流量降序排序

4.3. 需求分析

FlowBean实现WritableComparable接口,重写compareTo方法,实现总流量降序排序

在Mapper中,输出<FlowBean,手机号>,这样Shuffle就能对key进行排序了

在Reducer中,接收到的根据FlowBean排序过的。直接输出<手机号,FlowBean>即可。注意在排序过程中,可能有两条记录的总流量是一样大的,所以valueIn可能对应多个手机号,统一循环输出所有的<手机号,FlowBean>即可。

4.4. 编写程序

4.4.1. FlowBean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package cn.pancx.mr.flowsum;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

// 1. 实现WritableComparable接口
public class FlowBean implements WritableComparable<FlowBean> {
/**
* 上行流量
*/
private long upFlow;
/**
* 下行流量
*/
private long downFlow;
/**
* 总流量(上行流量 + 下行流量)
*/
private long sumFlow;

// 2. 反序列化时,需要反射调用空参构造函数,所以必须有一个空参构造函数
public FlowBean() {
super();
}

public FlowBean(long upFlow, long downFlow) {
super();
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}

// 3. 实现序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}

// 4. 实现反序列化方法
@Override
public void readFields(DataInput in) throws IOException {
// 反序列化方法读顺序必须和写序列化方法的写顺序必须一致
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}

// 5. 编写toString方法,方便后续输出观察
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}

public long getUpFlow() {
return upFlow;
}

public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}

public long getDownFlow() {
return downFlow;
}

public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}

public long getSumFlow() {
return sumFlow;
}

public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}

/**
* 6、重写compareTo方法
* @param o
* @return
*/
@Override
public int compareTo(FlowBean o) {
int result;
if (sumFlow > o.getSumFlow()) {
result = -1;
} else if (sumFlow < o.getSumFlow()) {
result = 1;
} else {
result = 0;
}
return result;
}
}

4.4.2. 编写MapReduce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package cn.pancx.mr.flowsum;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


public class FlowSumDriver {
public static class FlowCountMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
FlowBean k = new FlowBean();
Text v = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 切割字段
String[] fields = line.split("\t");

// 3 封装对象
// 取出手机号码
String phoneNum = fields[0];
// 取出上行流量和下行流量
long upFlow = Long.parseLong(fields[1]);
long downFlow = Long.parseLong(fields[2]);
long sumFlow = Long.parseLong(fields[3]);
k.setUpFlow(upFlow);
k.setDownFlow(downFlow);
k.setSumFlow(sumFlow);
v.set(phoneNum);
// 4 写出
context.write(k, v);
}
}

public static class FlowCountReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
@Override
protected void reduce(FlowBean bean, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text phone : values) {
context.write(phone, bean);
}
}
}


public static void main(String[] args) throws Exception {
// 1 获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);

// 6 指定本程序的jar包所在的本地路径
job.setJarByClass(FlowSumDriver.class);

// 2 指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);

// 3 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);

// 4 指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

// 5 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}

4.5. 运行

输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
$ hdfs dfs -cat /output/part*
13509468723 7335 110349 117684
13975057813 11058 48243 59301
13568436656 3597 25635 29232
13736230513 2481 24681 27162
18390173782 9531 2412 11943
13630577991 6960 690 7650
15043685818 3659 3538 7197
13992314666 3008 3720 6728
15910133277 3156 2936 6092
13560439638 918 4938 5856
84188413 4116 1432 5548
13682846555 1938 2910 4848
18271575951 1527 2106 3633
15959002129 1938 180 2118
13590439668 1116 954 2070
13956435636 132 1512 1644
13470253144 180 180 360
13846544121 264 0 264
13966251146 240 0 240
13768778790 120 120 240
13729199489 240 0 240

5. FlowSum总流量统计结果分区排序

5.1. 输入数据

输入数据就是FlowSum总流量的统计结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
13470253144     180     180     360
13509468723 7335 110349 117684
13560439638 918 4938 5856
13568436656 3597 25635 29232
13590439668 1116 954 2070
13630577991 6960 690 7650
13682846555 1938 2910 4848
13729199489 240 0 240
13736230513 2481 24681 27162
13768778790 120 120 240
13846544121 264 0 264
13956435636 132 1512 1644
13966251146 240 0 240
13975057813 11058 48243 59301
13992314666 3008 3720 6728
15043685818 3659 3538 7197
15910133277 3156 2936 6092
15959002129 1938 180 2118
18271575951 1527 2106 3633
18390173782 9531 2412 11943
84188413 4116 1432 5548

输入数据格式如下,每个字段之间以 制表符分隔

手机号码 上行流量 下行流量 总流量(上行+下行)
13560436666 1116 954 2070

5.2. 需求说明

输出与输入格式相同,但是要求按照总流量降序排序,并且根据手机号前3位分区:手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中

5.3. 需求分析

与”FlowSum总流量统计结果全排序”的思路一致,加上自定义分区即可

在Mapper中,输出<FlowBean,手机号>,这样Shuffle就能对key进行排序了

在Reducer中,接收到的根据FlowBean排序过的。直接输出<手机号,FlowBean>即可。注意在排序过程中,可能有两条记录的总流量是一样大的,所以valueIn可能对应多个手机号,统一循环输出所有的<手机号,FlowBean>即可。

5.4. 编写程序

5.4.1. FlowBean

实现WritableComparable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package cn.pancx.mr.flowsum;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

// 1. 实现WritableComparable接口
public class FlowBean implements WritableComparable<FlowBean> {
/**
* 上行流量
*/
private long upFlow;
/**
* 下行流量
*/
private long downFlow;
/**
* 总流量(上行流量 + 下行流量)
*/
private long sumFlow;

// 2. 反序列化时,需要反射调用空参构造函数,所以必须有一个空参构造函数
public FlowBean() {
super();
}

public FlowBean(long upFlow, long downFlow) {
super();
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}

// 3. 实现序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}

// 4. 实现反序列化方法
@Override
public void readFields(DataInput in) throws IOException {
// 反序列化方法读顺序必须和写序列化方法的写顺序必须一致
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}

// 5. 编写toString方法,方便后续输出观察
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}

public long getUpFlow() {
return upFlow;
}

public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}

public long getDownFlow() {
return downFlow;
}

public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}

public long getSumFlow() {
return sumFlow;
}

public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}

/**
* 重写compareTo方法
* @param o
* @return
*/
@Override
public int compareTo(FlowBean o) {
int result;
if (sumFlow > o.getSumFlow()) {
result = -1;
} else if (sumFlow < o.getSumFlow()) {
result = 1;
} else {
result = 0;
}
return result;
}
}

5.4.2. 编写MapReduce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package cn.pancx.mr.flowsum;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


public class FlowSumDriver {
/**
* 自定义分区类
*/
public static class CustomPartitioner extends Partitioner<FlowBean, Text> {
@Override
public int getPartition(FlowBean flowBean, Text phone, int numPartitions) {
// 1 获取电话号码的前三位
String preNum = phone.toString().substring(0, 3);

// 2 根据前三位进行分区
int partition = 4;
if ("136".equals(preNum)) {
partition = 0;
} else if ("137".equals(preNum)) {
partition = 1;
} else if ("138".equals(preNum)) {
partition = 2;
} else if ("139".equals(preNum)) {
partition = 3;
}
return partition;
}
}

public static class FlowCountMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
FlowBean k = new FlowBean();
Text v = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 切割字段
String[] fields = line.split("\t");

// 3 封装对象
// 取出手机号码
String phoneNum = fields[0];
// 取出上行流量和下行流量
long upFlow = Long.parseLong(fields[1]);
long downFlow = Long.parseLong(fields[2]);
long sumFlow = Long.parseLong(fields[3]);
k.setUpFlow(upFlow);
k.setDownFlow(downFlow);
k.setSumFlow(sumFlow);
v.set(phoneNum);
// 4 写出
context.write(k, v);
}
}

public static class FlowCountReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
@Override
protected void reduce(FlowBean bean, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text phone : values) {
context.write(phone, bean);
}
}
}


public static void main(String[] args) throws Exception {
// 1 获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);

// 6 指定本程序的jar包所在的本地路径
job.setJarByClass(FlowSumDriver.class);

// 2 指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);

// 3 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);

// 4 指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

// 5 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 8 指定自定义数据分区
job.setPartitionerClass(CustomPartitioner.class);
// 9 同时指定相应数量的reduce task
job.setNumReduceTasks(5);

// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}

5.5. 运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
$ hdfs dfs -cat /output/part*0
13630577991 6960 690 7650
13682846555 1938 2910 4848

$ hdfs dfs -cat /output/part*1
13736230513 2481 24681 27162
13729199489 240 0 240
13768778790 120 120 240

$ hdfs dfs -cat /output/part*2
13846544121 264 0 264

$ hdfs dfs -cat /output/part*3
13975057813 11058 48243 59301
13992314666 3008 3720 6728
13956435636 132 1512 1644
13966251146 240 0 240

$ hdfs dfs -cat /output/part*4
13509468723 7335 110349 117684
13568436656 3597 25635 29232
18390173782 9531 2412 11943
15043685818 3659 3538 7197
15910133277 3156 2936 6092
13560439638 918 4938 5856
84188413 4116 1432 5548
18271575951 1527 2106 3633
15959002129 1938 180 2118
13590439668 1116 954 2070
13470253144 180 180 360

6. FlowSum总流量统计结果TopN

目的:练习TopN

6.1. 输入数据

输入数据就是FlowSum总流量的统计结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
13470253144	180	180	360
13509468723 7335 110349 117684
13560439638 918 4938 5856
13568436656 3597 25635 29232
13590439668 1116 954 2070
13630577991 6960 690 7650
13682846555 1938 2910 4848
13729199489 240 0 240
13736230513 2481 24681 27162
13768778790 120 120 240
13846544121 264 0 264
13956435636 132 1512 1644
13966251146 240 0 240
13975057813 11058 48243 59301
13992314666 3008 3720 6728
15043685818 3659 3538 7197
15910133277 3156 2936 6092
15959002129 1938 180 2118
18271575951 1527 2106 3633
18390173782 9531 2412 11943
84188413 4116 1432 5548

6.2. 需求说明

根据总流量进行降序排序,取TopN条记录

6.3. 需求分析

要根据总流量降序排序,所以在Mapper中,输出。其中FlowBean实现WritableComparable方法,根据总流量降序排序

取TopN思路1:在Map阶段取TopN。在执行Mapper的map()时,用TreeMap收集TopN个。在执行地Mapper的clearup()时,将TreeMap所有的输出

取TopN思路2:在Map阶段直接输出。在Reduce阶段也是直接输出。但是要设置一个变量,统计输出了,达到TopN个就不再输出

两种思路对比:一般来説,ReduceTask数量要少于MapTask数量,所以计算能在map中完成,就尽量不要拖到reduce再做。所以思路1更好

6.4. 编写程序

6.4.1. FlowBean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package mr.flowsum;

import lombok.Data;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

@Data
public class FlowBean implements WritableComparable<FlowBean> {

private long upFlow;

private long downFlow;

private long sumFlow;


@Override
public int compareTo(FlowBean o) {
if (sumFlow > o.sumFlow) {
return -1;
} else if (sumFlow < o.sumFlow) {
return 1;
} else {
return 0;
}
}

@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}

@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}

@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}

6.4.2. 编写MapReduce(思路1:在Map阶段处理TopN)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package mr.flowsum;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.TreeMap;

public class FlowSumDriver {

public static class FlowSumMapper extends Mapper<LongWritable, Text,
FlowBean, Text> {

public static final Integer TOPN = 10;
private TreeMap<FlowBean, Text> treeMap = new TreeMap<>();

/// private FlowBean k = new FlowBean(); 注意:这里key对象不要复用
// 复用时将key添加到treeMap,会视为同一个对象,不断覆盖,使得treeMap始终只有一个元素
private Text v = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();

String[] split = line.split("\t");

String phoneNum = split[0];

// 每次创建一个新的key对象,这样放入TreeMap时,不会发生覆盖
FlowBean k = new FlowBean();
k.setUpFlow(Long.valueOf(split[1]));
k.setDownFlow(Long.valueOf(split[2]));
k.setSumFlow(Long.valueOf(split[3]));
v.set(phoneNum);

/// context.write(k, v); 注意:不在map中直接输出,再是将 <k,v>存入TreeMap
treeMap.put(k, v);

System.out.println("DEBUG Mapper map: " + k + "----" + v);
// TreeMap中只取key最大的TOPN个,因为key(FlowBean)是根据总流量降序排序的,所以是取总流量前10个
if (treeMap.size() > TOPN) {
treeMap.remove(treeMap.lastKey());
}
}

@Override
protected void cleanup(Context context) {
// 不在map阶段输出,而是在clearup输出TreeMap的所有元素
treeMap.forEach((k, v) -> {
try {
System.out.println("DEBUG Mapper cleanup: " + k + "----" + v);
context.write(k, v);
} catch (Exception e) {
e.printStackTrace();
}
});
}
}

public static class FlowSumReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
@Override
protected void reduce(FlowBean flowBean, Iterable<Text> phoneNums, Context context) throws IOException, InterruptedException {
for (Text phone : phoneNums) {
context.write(phone, flowBean);
System.out.println("DEBUG Reducer reduce: " + phone + "----" + flowBean);
}
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

job.setMapperClass(FlowSumMapper.class);
job.setReducerClass(FlowSumReducer.class);

job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

6.4.3. 编写MapReduce(思路2:在Reduce阶段处理TopN)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package mr.flowsum;

import com.sun.tools.javac.comp.Flow;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowSumDriver {

public static class FlowSumMapper extends Mapper<LongWritable, Text,
FlowBean, Text> {
private FlowBean k = new FlowBean();
private Text v = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split("\t");

String phoneNum = split[0];
k.setUpFlow(Long.valueOf(split[1]));
k.setDownFlow(Long.valueOf(split[2]));
k.setSumFlow(Long.valueOf(split[3]));
v.set(phoneNum);
context.write(k, v);

System.out.println("DEBUG Mapper map: " + k + "---" + v);
}
}

public static class FlowSumReducer extends Reducer<FlowBean, Text,
Text, FlowBean> {

public static final int TOPN = 10;

// 用一个变量统计已经输出了多少个<key,value>
private int count = 0;

@Override
protected void reduce(FlowBean flowBean, Iterable<Text> phoneNums, Context context) throws IOException, InterruptedException {
for (Text phoneNum : phoneNums) {
// 已经输出TOPN个,后面的就不再输出
if (count >= TOPN) {
break;
}
context.write(phoneNum, flowBean);
++count;
System.out.println("DEBUG Reducer reduce: " + phoneNum + "---" + flowBean);
}
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

job.setMapperClass(FlowSumMapper.class);
job.setReducerClass(FlowSumReducer.class);

job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
panchaoxin wechat
关注我的公众号
支持一下