在官方文档 中,OutputFormat 的功能被这样描述: OutputFormat 描述了 MR job 的输出规范。
验证作业的输出规范,例如检查输出目录是否已经存在。
提供 RecordWriter 的实现以用于写入作业的输出文件,输出文件存储在 FileSystem 中。
OutputFormat 类中只有两个方法:checkOutputSpecs
和 getRecordWriter
。分别用来检查输出和获取需要使用的 RecordWriter 。 RecordWriter 从 reducer (或者 mapper)以 <Key, Value> 的形式获取输出数据,之后将这些数据写入输出文件。通过编写自己的 RecordWriter ,我们可以控制其写入的内容或者路径。RecordWriter 类也只有两个方法。close
负责关闭它自己打开的文件流,write
负责向输出文件中写入键值对。
TextOutputFormat 在自定义我们自己的 OutputFormat 之前,先来看看 hadoop 提供的 TextOutputFormat ,它一般用来写入纯文本文件,也是 MR 中使用的默认 OutputFormat。TextOutputFormat 其实继承于 FileOutputFormat ,FileOutputFormat 类提供了关于输出路径的一系列方法,接下来我们会用到其中的几个。 TextOutputFormat 并没有重写 checkOutputSpecs
方法,只定义了 getRecordWriter
方法,还有一个自己使用的内部类 LineRecordWriter。
首先我们来看 getRecordWriter
方法是如何实现的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public RecordWriter<K, V> getRecordWriter (FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { boolean isCompressed = getCompressOutput(job); String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator" , "\t" ); if (!isCompressed) { Path file = FileOutputFormat.getTaskOutputPath(job, name); FileSystem fs = file.getFileSystem(job); FSDataOutputStream fileOut = fs.create(file, progress); return new LineRecordWriter<K, V>(fileOut, keyValueSeparator); } else { ...... } }
getRecordWriter
首先得到了isCompressed
用以判断输出是否需要压缩,我们只需要关注更加简单的不需要压缩的逻辑即可。它先是获取了 keyValueSeparator
,这个变量之后还要传给 LineRecordWriter 用来在输出文件中分隔 key 和 value。
之后它调用了 getTaskOutputPath
方法获取了任务的临时输出目录,这个目录一般在输出目录的 _temporary 下。 在 MR 任务的执行过程中,输出文件并不是直接写在提交者设置的输出目录中,而是先写入到临时目录下,在写入完成后再移动到输出目录。此外,一个 map 任务可能会有多个 mapper 在跑,他们的输出写入到临时目录下的不同目录中。当一个 mapper 成功后,其他的 mapper 将会被 kill,输出的文件也会被清理,这个过程是由 OutputCommitter 控制的。
在网上的很多博客中,需要控制写入文件时,比如写入到不同的文件,都是调用了getOutputPath
获取任务的输出路径后,直接写入。而如果一个任务有多个 mapper 同时在跑的时候,就会带来冲突。所以更正确的方法是先写入到临时路径,在写入成功后,MR 框架会帮我们自动移动到目标输出路径。(如果不对 OutputCommitter 做另外设置的话,最终的路径会跟想要设置的有所不同,下文中会有示例说明)
然后它调用了 getFileSystem
方法获取了拥着这个路径的 FileSystem,又调用了create
在指定的路径上创建了一个 FSDataOutputStream 用来写入数。最后用 FSDataOutputStream 和 分隔符 新建了一个 LineRecordWriter 并返回。 接下来来看看 LineRecordWriter 的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 protected static class LineRecordWriter<K, V> implements RecordWriter<K, V> { private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); protected DataOutputStream out; private final byte[] keyValueSeparator; public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; this.keyValueSeparator = keyValueSeparator.getBytes(StandardCharsets.UTF_8); } public LineRecordWriter(DataOutputStream out) { this(out, "\t"); } /** * Write the object to the byte stream, handling Text as a special * case. * @param o the object to print * @throws IOException if the write throws, we pass it on */ private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; out.write(to.getBytes(), 0, to.getLength()); } else { out.write(o.toString().getBytes(StandardCharsets.UTF_8)); } } public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (nullKey && nullValue) { return; } if (!nullKey) { writeObject(key); } if (!(nullKey || nullValue)) { out.write(keyValueSeparator); } if (!nullValue) { writeObject(value); } out.write(NEWLINE); } public synchronized void close(Reporter reporter) throws IOException { out.close(); } }
可以看到它的write
方法其实非常简单,把 key、分隔符、value 和 \n 顺序写入。close
方法则是把传入的 DataOutputStream 关闭。在 MR 框架的运行过程中,每个 mapper 会对应一个 RecordWriter,它会对 mapper 输出的每个 key-value 执行 write
操作。
举个栗子 例如我们需要处理的文件长这样,每行的 key 是一个 md5,value 是该 md5 对应的 base64 编码后的数据。
1 2 3 35f1c5a1f891e055900bc8a37c56e6d6 MTIzNDU2 0137d92b3d5df2dbb23b8ab6470b7a96 MTIzNDU2Nzg5MA== 28da0a34aa2c76a84eb0bfdfa492af89 YWJjZGVmZw==
我们想让每个 md5 对应的 base64 decode 后的数据都输出到一个单独的文件中,并且这个文件名中要有 md5 的值。
1 2 3 4 5 6 7 8 9 10 11 12 13 public class TestOutputFormat extends FileOutputFormat <Text , Text > { @Override public RecordWriter<Text, Text> getRecordWriter (FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { Path file = FileOutputFormat.getTaskOutputPath(job, name); String uniqueName = FileOutputFormat.getUniqueName(job, "part" ); return new TestRecordWriter(file, job, uniqueName); } }
首先通过getTaskOutput
获取临时输出文件的路径,需要注意的是这里得到的是类似于 xxxxxx/xxxxxx/part-00000
的一个路径,如果用这个路径直接与想要输出的文件名进行拼接,会在 output 目录下生成part-00000
的目录,所以在 RecordWriter 中需要将这个路径名进行一些修改,去掉最后的part-00000
。getUniqueName
方法返回的 uniqueName 类似于part-m-00001
,我们用它来区分不同 map 任务的输出文件。
TestRecordWriter 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 public class TestRecordWriter implements RecordWriter <Text , Text > { FileSystem fs; String outputPath; String uniqueName; ArrayList<FSDataOutputStream> fileOuts = new ArrayList<FSDataOutputStream>(); HashMap<String, Integer> hashCounter = new HashMap<String, Integer>(); public TestRecordWriter (Path file, JobConf job, String uniqueName) { try { this .fs = file.getFileSystem(job); String tempOutputPath = file.toString(); this .outputPath = tempOutputPath.substring(0 , tempOutputPath.length() - 11 ); this .uniqueName = uniqueName; } catch (IOException e) { e.printStackTrace(); } } @Override public void write (Text key, Text value) throws IOException { String md5 = key.toString(); String base64Value = value.toString(); MessageDigest md = null ; try { md = MessageDigest.getInstance("SHA-256" ); } catch (NoSuchAlgorithmException e) { e.printStackTrace(); } md.update(md5.getBytes()); md.update(base64Value.getBytes()); byte [] result = md.digest(); BigInteger hash = new BigInteger(1 , result); String hashStr = hash.toString(16 ); if (hashCounter.containsKey(hashStr)) { return ; } else { hashCounter.put(hashStr, 1 ); } Path testPath = new Path(outputPath, md5 + "_" + hashStr + "_" + uniqueName); FSDataOutputStream fileOut = null ; fileOut = fs.create(testPath); fileOuts.add(fileOut); fileOut.write(Base64.decodeBase64(base64Value)); } @Override public void close (Reporter reporter) throws IOException { for (FSDataOutputStream fileOut: fileOuts) { if (fileOut != null ) { try { fileOut.close(); } catch (Exception e) { e.printStackTrace(); } } } } }
首先对 tempOutputPath 进行了一些分割,目的是去掉其最后边的/part-00000
。这里我们以 hash(md5+value) 对数据做了一个去重操作。可以看到输出的文件路径是 Path(outputPath, md5 + "_" + hashStr + "_" + uniqueName)
,最后输出的文件名类似于fffa244c21c0f21f62b9dc8b6d8e382a_de2a0e997648fdd240fe4012b518d07246e6b560114516c468f886fa6f010ba5_part-m-00001
。给文件名加上part-m-00001是为了防止不同的 mapper 输出同名文件。由于我们前边已经做了去重工作,而文件名又是包含 md5 和 hash 值的,所以同一个 mapper 输出的文件名是不会重复的,而如果可能出现重名的情况时,需要注意默认的create
方法是不会覆盖的,FileSystem 提供了exists
,create
等方法让我们使用,可以根据实际情况定制化开发。