MapReduce Join操作

1. Reduce Join

1.1. 工作原理

Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段(两张表中相同的列)作为key,其余部分和新加的标志作为value,最后进行输出。

Reduce端的主要工作:在reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行合并就ok了

1.2. Reduce Join 案例

1.2.1. 数据说明

订单数据表 order.txt

订单oid 产品pid 产品数量amount
1001 01 1
1
2
3
4
5
6
1001	01	1
1002 02 2
1003 03 3
1004 01 4
1005 02 5
1006 03 6

商品信息表 product.txt

产品pid 产品名称pname
01 小米
1
2
3
01	小米
02 华为
03 格力

输出要求:将商品信息表中数据根据商品pid合并到订单数据表中

订单oid 产品名称pname 数量amount
1001 小米 1
1004 小米 4
1002 华为 2
1005 华为 5
1003 格力 3
1006 格力 6

1.2.2. 编程程序

1.2.2.1. TableBean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package mr.join;

import lombok.Data;
import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

@Data
public class TableBean implements Cloneable, Writable {
/**
* 订单ID
*/
private String oid;

/**
* 产品ID
*/
private String pid;

/**
* 产品数量
*/
private Integer amount;

/**
* 产品名称
*/
private String pname;

/**
* 标记数据来源于哪个表
*/
private String flag;

@Override
protected TableBean clone() {
TableBean tableBean = null;
try {
tableBean = (TableBean) super.clone();
} catch (CloneNotSupportedException e) {
e.printStackTrace();
}
return tableBean;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(oid);
out.writeUTF(pid);
out.writeInt(amount);
out.writeUTF(pname);
out.writeUTF(flag);
}

@Override
public void readFields(DataInput in) throws IOException {
this.oid = in.readUTF();
this.pid = in.readUTF();
this.amount = in.readInt();
this.pname = in.readUTF();
this.flag = in.readUTF();
}

/**
* 返回join之后要输出的表结构
* @return
*/
@Override
public String toString() {
return oid + "\t" + pname + "\t" + amount;
}
}

1.2.2.2. 编写MapReduce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package mr.join;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class JoinDriver {
public static final String ORDER_FLAG = "order";
public static final String PRODUCT_FLAG = "product";

public static class JoinMapper extends Mapper<LongWritable, Text,
Text, TableBean> {


// 记录当前切片所属的文件名
private String filename;

// <key,value>
private Text k = new Text();
private TableBean tableBean = new TableBean();

/**
* 在setup()阶段先获取切片所属的文件名
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// 获取文件切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();
// 获取切片所属的文件名
filename = inputSplit.getPath().getName();

System.out.println("DEBUG Mapper setup: filename=" + filename);
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();


// 根据数据来源,在map阶段做不同的处理
if (filename.startsWith("order")) {
/*
当前切片数据来自 订单表
| 订单oid | 产品pid | 产品数量amount |
| ---- | ---- | ------ |
| 1001 | 01 | 1 |
*/

String[] split = line.split("\t");
tableBean.setOid(split[0]);
tableBean.setPid(split[1]);
tableBean.setAmount(Integer.valueOf(split[2]));
// 注意:Writable通过out.writeUTF(field)等方法进行序列化时,要求field不能为NULL,否则会报空指针异常。所以一些没有数据的字段,要填充一些无意义的值
tableBean.setPname("");
// 标记当前数据来自order表
tableBean.setFlag(ORDER_FLAG);

// key是两表相同的列(join的列),即pid
k.set(split[1]);

} else {
/*
当前切片数据来自 产品表
| 产品pid | 产品名称pname |
| ---- | ----- |
| 01 | 小米 |
*/
String[] split = line.split("\t");
tableBean.setPid(split[0]);
tableBean.setPname(split[1]);
// 填充无意义值
tableBean.setOid("");
tableBean.setAmount(0);
tableBean.setFlag(PRODUCT_FLAG);

// key是两表相同的列(join的列),即pid
k.set(split[0]);
}

// 输出
context.write(k, tableBean);

System.out.println(String.format("DEBUG Mapper map: k=%s, v=(%s,%s,%s,%d,%s)",
k.toString(), tableBean.getPid(), tableBean.getPname(),
tableBean.getOid(), tableBean.getAmount(), tableBean.getFlag()));
}
}

public static class JoinReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {
@Override
protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
/*
产品表 join 订单表,是一对多的关系

MapReduce阶段的key取的是公共列pid,所以此时数据是以下情况
key: 产品pid,例如pid=01
values: {
TableBean1: pid=01 pname=小米 flag=product
TableBean2: pid=01 oid=10001 amount=1 flag=order
TableBean3: pid=01 oid=10001 amount=1 flag=order
}

现在要根据TableBean的flag字段,将数据分离:
如果flag=product,则用一个TableBean保留
如果flag=order,因为order有多个数据,所以要添加到List<TableBean>
*/

// 存放产品记录
TableBean product = new TableBean();
// 存放订单集合
List<TableBean> orderList = new ArrayList<>();

// 注意values存在Writable对象复用现象,所以保存数据时,要到tableBean进行clone
for (TableBean tableBean : values) {
if (ORDER_FLAG.equals(tableBean.getFlag())) {
// 订单表数据
orderList.add(tableBean.clone());
} else {
// 产品表数据
product = tableBean.clone();
}
}

for (TableBean tableBean : orderList) {
// join合并属性
tableBean.setPname(product.getPname());
// 输出
context.write(tableBean, NullWritable.get());

System.out.println(String.format("DEBUG Reducer reduce: k=(%s,%s,%s,%d,%s)",
tableBean.getPid(), tableBean.getPname(),
tableBean.getOid(), tableBean.getAmount(), tableBean.getFlag()));
}
}
}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TableBean.class);

job.setOutputKeyClass(TableBean.class);
job.setOutputValueClass(NullWritable.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

1.2.3. 运行

将两张表数据上传到同一目录下

1
hdfs dfs -put order.txt product.txt /input

运行

1
hadoop jar join.jar mr.join.JoinDriver /input /output

查看结果

1
2
3
4
5
6
7
$ hdfs dfs -cat /output/part*
1004 小米 4
1001 小米 1
1005 华为 5
1002 华为 2
1006 格力 6
1003 格力 3

2. Map Join

2.1. Reduce Join的缺点

Reduce Join合并操作是在Reduce阶段完成。在Reduce端处理过多的表,非常容易产生数据倾斜。

一般来説,ReduceTask数量要少于MapTask数量,Reduce端计算压力大,计算能尽量在Map阶段完成,就不在Reduce阶段做。

Reduce Join无疑给Reduce带来更大的工作量,所以推荐在Map端实现数据的合并

2.2. 适用场景

Map Join适用于一张表十分小(能直接将整张表读入内存),另一张表十分大(内存放不下)的场景

2.3. 编写程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package mr.join;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

public class JoinDriver {
public static class JoinMapper extends Mapper<LongWritable, Text,
Text, NullWritable> {
private Map<String, String> productMap = new HashMap<>();
private Text k = new Text();

@Override
protected void setup(Context context) throws IOException, InterruptedException {
// 读取文件,保存到HashMap中
URI[] cacheFiles = context.getCacheFiles();
String path = cacheFiles[0].getPath().toString();

BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(path), "UTF-8"));

String line = null;
while ((StringUtils.isNotBlank(line = br.readLine()))) {
String[] split = line.split("\t");
productMap.put(split[0], split[1]);
}

IOUtils.closeStream(br);
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
/*
map处理大表: oid | pid | amount
*/
String line = value.toString();
String[] split = line.split("\t");

String oid = split[0];
String pid = split[1];
String amount = split[2];
// 取出pname
String pname = productMap.get(pid);

// 拼接得到结果
String result = oid + "\t" + pname + "\t" + amount;
k.set(result);
context.write(k, NullWritable.get());
}
}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

job.setJarByClass(JoinDriver.class);

job.setMapperClass(JoinMapper.class);
/// job.setReducerClass(JoinReducer.class); 不用Reduce

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// DistributedCache,在job执行前,缓存普通文件到task运行节点的工作目录
job.addCacheFile(new URI("file:///root/product.txt"));

// No reduce task
job.setNumReduceTasks(0);

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

2.4. 运行

1
2
3
mv product.txt /root
hdfs dfs -put order.txt /
hadoop jar join.jar mr.join.JoinDriver /order.txt /output

查看结果

1
2
3
4
5
6
7
$ hdfs dfs -cat /output/part*
1001 小米 1
1002 华为 2
1003 格力 3
1004 小米 4
1005 华为 5
1006 格力 6

panchaoxin wechat
关注我的公众号
支持一下