官方文档中,OutputFormat 的功能被这样描述:
OutputFormat 描述了 MR job 的输出规范。

  1. 验证作业的输出规范,例如检查输出目录是否已经存在。
  2. 提供 RecordWriter 的实现以用于写入作业的输出文件,输出文件存储在 FileSystem 中。

OutputFormat 类中只有两个方法:checkOutputSpecsgetRecordWriter。分别用来检查输出和获取需要使用的 RecordWriter
RecordWriter 从 reducer (或者 mapper)以 <Key, Value> 的形式获取输出数据,之后将这些数据写入输出文件。通过编写自己的 RecordWriter ,我们可以控制其写入的内容或者路径。RecordWriter 类也只有两个方法。close负责关闭它自己打开的文件流,write负责向输出文件中写入键值对。

TextOutputFormat

在自定义我们自己的 OutputFormat 之前,先来看看 hadoop 提供的 TextOutputFormat,它一般用来写入纯文本文件,也是 MR 中使用的默认 OutputFormat。TextOutputFormat 其实继承于 FileOutputFormat,FileOutputFormat 类提供了关于输出路径的一系列方法,接下来我们会用到其中的几个。
TextOutputFormat 并没有重写 checkOutputSpecs 方法,只定义了 getRecordWriter方法,还有一个自己使用的内部类 LineRecordWriter。

首先我们来看 getRecordWriter方法是如何实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public RecordWriter<K, V> getRecordWriter(FileSystem ignored,
JobConf job,
String name,
Progressable progress)
throws IOException {
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator",
"\t"); // 获取key和value的分隔符
if (!isCompressed) {
Path file = FileOutputFormat.getTaskOutputPath(job, name); // 创建任务的临时输出目录并返回
FileSystem fs = file.getFileSystem(job); // 返回拥着这个路径的FileSystem
FSDataOutputStream fileOut = fs.create(file, progress); // 在指定的路径创建一个FSDataOutputStream
return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
} else {
......
}
}

getRecordWriter首先得到了isCompressed 用以判断输出是否需要压缩,我们只需要关注更加简单的不需要压缩的逻辑即可。它先是获取了 keyValueSeparator,这个变量之后还要传给 LineRecordWriter 用来在输出文件中分隔 key 和 value。

之后它调用了 getTaskOutputPath 方法获取了任务的临时输出目录,这个目录一般在输出目录的 _temporary 下。
在 MR 任务的执行过程中,输出文件并不是直接写在提交者设置的输出目录中,而是先写入到临时目录下,在写入完成后再移动到输出目录。此外,一个 map 任务可能会有多个 mapper 在跑,他们的输出写入到临时目录下的不同目录中。当一个 mapper 成功后,其他的 mapper 将会被 kill,输出的文件也会被清理,这个过程是由 OutputCommitter 控制的。

在网上的很多博客中,需要控制写入文件时,比如写入到不同的文件,都是调用了getOutputPath获取任务的输出路径后,直接写入。而如果一个任务有多个 mapper 同时在跑的时候,就会带来冲突。所以更正确的方法是先写入到临时路径,在写入成功后,MR 框架会帮我们自动移动到目标输出路径。(如果不对 OutputCommitter 做另外设置的话,最终的路径会跟想要设置的有所不同,下文中会有示例说明)

然后它调用了 getFileSystem 方法获取了拥着这个路径的 FileSystem,又调用了create在指定的路径上创建了一个 FSDataOutputStream 用来写入数。最后用 FSDataOutputStream 和 分隔符 新建了一个 LineRecordWriter 并返回。
接下来来看看 LineRecordWriter 的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
protected static class LineRecordWriter<K, V>
implements RecordWriter<K, V> {
private static final byte[] NEWLINE =
"\n".getBytes(StandardCharsets.UTF_8);

protected DataOutputStream out;
private final byte[] keyValueSeparator;

public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
this.out = out;
this.keyValueSeparator =
keyValueSeparator.getBytes(StandardCharsets.UTF_8);
}

public LineRecordWriter(DataOutputStream out) {
this(out, "\t");
}

/**
* Write the object to the byte stream, handling Text as a special
* case.
* @param o the object to print
* @throws IOException if the write throws, we pass it on
*/
private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else {
out.write(o.toString().getBytes(StandardCharsets.UTF_8));
}
}

public synchronized void write(K key, V value)
throws IOException {

boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
if (!nullKey) {
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(NEWLINE);
}

public synchronized void close(Reporter reporter) throws IOException {
out.close();
}
}

可以看到它的write方法其实非常简单,把 key、分隔符、value 和 \n 顺序写入。close方法则是把传入的 DataOutputStream 关闭。在 MR 框架的运行过程中,每个 mapper 会对应一个 RecordWriter,它会对 mapper 输出的每个 key-value 执行 write 操作。

举个栗子

例如我们需要处理的文件长这样,每行的 key 是一个 md5,value 是该 md5 对应的 base64 编码后的数据。

1
2
3
35f1c5a1f891e055900bc8a37c56e6d6    MTIzNDU2
0137d92b3d5df2dbb23b8ab6470b7a96 MTIzNDU2Nzg5MA==
28da0a34aa2c76a84eb0bfdfa492af89 YWJjZGVmZw==

我们想让每个 md5 对应的 base64 decode 后的数据都输出到一个单独的文件中,并且这个文件名中要有 md5 的值。

TestOutputFormat

1
2
3
4
5
6
7
8
9
10
11
12
13
public class TestOutputFormat extends FileOutputFormat<Text, Text> {
@Override
public RecordWriter<Text, Text> getRecordWriter(FileSystem ignored,
JobConf job, String name,
Progressable progress)
throws IOException {
Path file = FileOutputFormat.getTaskOutputPath(job, name);
String uniqueName = FileOutputFormat.getUniqueName(job, "part");

return new TestRecordWriter(file, job, uniqueName);
}

}

首先通过getTaskOutput获取临时输出文件的路径,需要注意的是这里得到的是类似于 xxxxxx/xxxxxx/part-00000的一个路径,如果用这个路径直接与想要输出的文件名进行拼接,会在 output 目录下生成part-00000的目录,所以在 RecordWriter 中需要将这个路径名进行一些修改,去掉最后的part-00000
getUniqueName方法返回的 uniqueName 类似于part-m-00001,我们用它来区分不同 map 任务的输出文件。

TestRecordWriter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class TestRecordWriter implements RecordWriter<Text, Text> {

FileSystem fs;
String outputPath;
String uniqueName;
ArrayList<FSDataOutputStream> fileOuts = new ArrayList<FSDataOutputStream>();
HashMap<String, Integer> hashCounter = new HashMap<String, Integer>();

public TestRecordWriter(Path file, JobConf job, String uniqueName) {
try {
this.fs = file.getFileSystem(job);
String tempOutputPath = file.toString();
this.outputPath = tempOutputPath.substring(0, tempOutputPath.length() - 11);
this.uniqueName = uniqueName;
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void write(Text key, Text value) throws IOException {
String md5 = key.toString();
String base64Value = value.toString();
MessageDigest md = null;

try {
md = MessageDigest.getInstance("SHA-256");
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}

md.update(md5.getBytes());
md.update(base64Value.getBytes());
byte[] result = md.digest();
BigInteger hash = new BigInteger(1, result);
String hashStr = hash.toString(16);

if (hashCounter.containsKey(hashStr)) {
return;
} else {
hashCounter.put(hashStr, 1);
}

Path testPath = new Path(outputPath, md5 + "_" + hashStr + "_" + uniqueName);
FSDataOutputStream fileOut = null;
fileOut = fs.create(testPath);
fileOuts.add(fileOut);
fileOut.write(Base64.decodeBase64(base64Value));
}

@Override
public void close(Reporter reporter) throws IOException {
for (FSDataOutputStream fileOut: fileOuts) {
if (fileOut != null) {
try {
fileOut.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}

首先对 tempOutputPath 进行了一些分割,目的是去掉其最后边的/part-00000。这里我们以 hash(md5+value) 对数据做了一个去重操作。可以看到输出的文件路径是 Path(outputPath, md5 + "_" + hashStr + "_" + uniqueName),最后输出的文件名类似于fffa244c21c0f21f62b9dc8b6d8e382a_de2a0e997648fdd240fe4012b518d07246e6b560114516c468f886fa6f010ba5_part-m-00001。给文件名加上part-m-00001是为了防止不同的 mapper 输出同名文件。由于我们前边已经做了去重工作,而文件名又是包含 md5 和 hash 值的,所以同一个 mapper 输出的文件名是不会重复的,而如果可能出现重名的情况时,需要注意默认的create方法是不会覆盖的,FileSystem 提供了existscreate等方法让我们使用,可以根据实际情况定制化开发。