MapReduce Writable对象复用

1. Map和Reduce端 key/value Writable对象复用

以WordCount为例,我一开始写MapReduce时,会犯这样的毛病,即所有的key/value都单独创建了一个Writable对象

1
2
3
4
5
6
public void map(...) {
for (String word : words) {
// 问题所在:每次创建新的Writable对象,再写出
context.write(new Text(word), new IntWritable(1));
}
}

但是查看官方WordCount源码时,发现key/value都是复用的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable> {
// 创建 key/value Writable对象,用于复用
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
// 复用key/value对象,直接输出
context.write(word, one);
}
}
}

我一开始担心重用Writable对象,会导致数据的覆盖。实际上在执行context.write()之后,Writable对象就已经被序列化了。所以下一次Writable用新的数据覆盖时,不用担心旧数据已经丢失。

所以Writable对象能复用就复用,这样少new一些对象,能给JVM节省垃圾回收的开销。

2. Reduce端 Iterable values中的每个Writable对象的复用

平时Reduce端编写程序时,我一般都是这么处理的,就是对values进行迭代,在迭代中完成业务处理,然后直接context.write()写出。这样确实没有发现什么奇怪的问题,程序一直也是正常运行的

1
2
3
4
5
6
7
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException {
// Do something ....
for (VALUEIN value : values) {
// Do something ....
context.write(k, v)
}
}

但是当我将每个value放到List中,再输出时,发现奇怪的现象

1
2
3
4
5
6
7
8
9
10
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException {
// Do something ....
for (VALUEIN value : values) {
list.add(value);
// 输出value,看到每个value的数据是不同的
System.out.println(value);
}
// 输出list中的每个value,看到所有value都是相同的
list.forEach(System.out::println);
}

遍历list时,输出的value都是相同的,而且都是values中的最后一个元素。结合开发经验,容易得知list中的每个引用指向同一个对象。反推回去,说明values中的每个引用指向同一对象。

如果每values中的每个引用指向同一对象,那么在values的for循环中,输出的value应该是相同的,但是事实上输出的数据是不同的。

这让我很诧异。查看values的类型,实际上是org.apache.hadoop.mapreduce.task.ReduceContextImpl$ValueIterable的一个实例。

查看ReduceContextImpl的源码,就会明白,原来每次执行迭代,都会反序列化得到新的value,但是value对象引用始终没有发生变化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class ReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
extends TaskInputOutputContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
implements ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
private KEYIN key; // current key
private VALUEIN value; // current value

@Override
public VALUEIN next() {
// ......

// 反序列化得到新的value
value = valueDeserializer.deserialize(value);
// ......
return value;
}
}

所以如果想让list保存每一个value的数据,那么在每一次迭代时,就要保存value的对象副本

1
2
3
4
for (VALUEIN value : values) {
// 保存value的副本
list.add(value.clone());
}

panchaoxin wechat
关注我的公众号
支持一下