在官方文档 中,OutputFormat 的功能被这样描述: OutputFormat 描述了 MR job 的输出规范。
提供 RecordWriter 的实现以用于写入作业的输出文件,输出文件存储在 FileSystem 中。
OutputFormat 类中只有两个方法:checkOutputSpecs
和 getRecordWriter
。分别用来检查输出和获取需要使用的 RecordWriter 。 RecordWriter 从 reducer (或者 mapper)以 <Key, Value> 的形式获取输出数据,之后将这些数据写入输出文件。通过编写自己的 RecordWriter ,我们可以控制其写入的内容或者路径。RecordWriter 类也只有两个方法。close
TextOutputFormat 在自定义我们自己的 OutputFormat 之前,先来看看 hadoop 提供的 TextOutputFormat ,它一般用来写入纯文本文件,也是 MR 中使用的默认 OutputFormat。TextOutputFormat 其实继承于 FileOutputFormat ,FileOutputFormat 类提供了关于输出路径的一系列方法,接下来我们会用到其中的几个。 TextOutputFormat 并没有重写 checkOutputSpecs
方法,只定义了 getRecordWriter
方法,还有一个自己使用的内部类 LineRecordWriter。
首先我们来看 getRecordWriter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public RecordWriter<K, V> getRecordWriter (FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { boolean isCompressed = getCompressOutput(job); String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator" , "\t" ); if (!isCompressed) { Path file = FileOutputFormat.getTaskOutputPath(job, name); FileSystem fs = file.getFileSystem(job); FSDataOutputStream fileOut = fs.create(file, progress); return new LineRecordWriter<K, V>(fileOut, keyValueSeparator); } else { ...... } }
用以判断输出是否需要压缩,我们只需要关注更加简单的不需要压缩的逻辑即可。它先是获取了 keyValueSeparator
,这个变量之后还要传给 LineRecordWriter 用来在输出文件中分隔 key 和 value。
之后它调用了 getTaskOutputPath
方法获取了任务的临时输出目录,这个目录一般在输出目录的 _temporary 下。 在 MR 任务的执行过程中,输出文件并不是直接写在提交者设置的输出目录中,而是先写入到临时目录下,在写入完成后再移动到输出目录。此外,一个 map 任务可能会有多个 mapper 在跑,他们的输出写入到临时目录下的不同目录中。当一个 mapper 成功后,其他的 mapper 将会被 kill,输出的文件也会被清理,这个过程是由 OutputCommitter 控制的。
获取任务的输出路径后,直接写入。而如果一个任务有多个 mapper 同时在跑的时候,就会带来冲突。所以更正确的方法是先写入到临时路径,在写入成功后,MR 框架会帮我们自动移动到目标输出路径。(如果不对 OutputCommitter 做另外设置的话,最终的路径会跟想要设置的有所不同,下文中会有示例说明)
然后它调用了 getFileSystem
方法获取了拥着这个路径的 FileSystem,又调用了create
在指定的路径上创建了一个 FSDataOutputStream 用来写入数。最后用 FSDataOutputStream 和 分隔符 新建了一个 LineRecordWriter 并返回。 接下来来看看 LineRecordWriter 的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 protected static class LineRecordWriter<K, V> implements RecordWriter<K, V> { private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); protected DataOutputStream out; private final byte[] keyValueSeparator; public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; this.keyValueSeparator = keyValueSeparator.getBytes(StandardCharsets.UTF_8); } public LineRecordWriter(DataOutputStream out) { this(out, "\t"); } /** * Write the object to the byte stream, handling Text as a special * case. * @param o the object to print * @throws IOException if the write throws, we pass it on */ private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; out.write(to.getBytes(), 0, to.getLength()); } else { out.write(o.toString().getBytes(StandardCharsets.UTF_8)); } } public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (nullKey && nullValue) { return; } if (!nullKey) { writeObject(key); } if (!(nullKey || nullValue)) { out.write(keyValueSeparator); } if (!nullValue) { writeObject(value); } out.write(NEWLINE); } public synchronized void close(Reporter reporter) throws IOException { out.close(); } }
方法其实非常简单,把 key、分隔符、value 和 \n 顺序写入。close
方法则是把传入的 DataOutputStream 关闭。在 MR 框架的运行过程中,每个 mapper 会对应一个 RecordWriter,它会对 mapper 输出的每个 key-value 执行 write
举个栗子 例如我们需要处理的文件长这样,每行的 key 是一个 md5,value 是该 md5 对应的 base64 编码后的数据。
1 2 3 35f1c5a1f891e055900bc8a37c56e6d6 MTIzNDU2 0137d92b3d5df2dbb23b8ab6470b7a96 MTIzNDU2Nzg5MA== 28da0a34aa2c76a84eb0bfdfa492af89 YWJjZGVmZw==
我们想让每个 md5 对应的 base64 decode 后的数据都输出到一个单独的文件中,并且这个文件名中要有 md5 的值。
1 2 3 4 5 6 7 8 9 10 11 12 13 public class TestOutputFormat extends FileOutputFormat <Text , Text > { @Override public RecordWriter<Text, Text> getRecordWriter (FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { Path file = FileOutputFormat.getTaskOutputPath(job, name); String uniqueName = FileOutputFormat.getUniqueName(job, "part" ); return new TestRecordWriter(file, job, uniqueName); } }
获取临时输出文件的路径,需要注意的是这里得到的是类似于 xxxxxx/xxxxxx/part-00000
的一个路径,如果用这个路径直接与想要输出的文件名进行拼接,会在 output 目录下生成part-00000
的目录,所以在 RecordWriter 中需要将这个路径名进行一些修改,去掉最后的part-00000
方法返回的 uniqueName 类似于part-m-00001
,我们用它来区分不同 map 任务的输出文件。
TestRecordWriter 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 public class TestRecordWriter implements RecordWriter <Text , Text > { FileSystem fs; String outputPath; String uniqueName; ArrayList<FSDataOutputStream> fileOuts = new ArrayList<FSDataOutputStream>(); HashMap<String, Integer> hashCounter = new HashMap<String, Integer>(); public TestRecordWriter (Path file, JobConf job, String uniqueName) { try { this .fs = file.getFileSystem(job); String tempOutputPath = file.toString(); this .outputPath = tempOutputPath.substring(0 , tempOutputPath.length() - 11 ); this .uniqueName = uniqueName; } catch (IOException e) { e.printStackTrace(); } } @Override public void write (Text key, Text value) throws IOException { String md5 = key.toString(); String base64Value = value.toString(); MessageDigest md = null ; try { md = MessageDigest.getInstance("SHA-256" ); } catch (NoSuchAlgorithmException e) { e.printStackTrace(); } md.update(md5.getBytes()); md.update(base64Value.getBytes()); byte [] result = md.digest(); BigInteger hash = new BigInteger(1 , result); String hashStr = hash.toString(16 ); if (hashCounter.containsKey(hashStr)) { return ; } else { hashCounter.put(hashStr, 1 ); } Path testPath = new Path(outputPath, md5 + "_" + hashStr + "_" + uniqueName); FSDataOutputStream fileOut = null ; fileOut = fs.create(testPath); fileOuts.add(fileOut); fileOut.write(Base64.decodeBase64(base64Value)); } @Override public void close (Reporter reporter) throws IOException { for (FSDataOutputStream fileOut: fileOuts) { if (fileOut != null ) { try { fileOut.close(); } catch (Exception e) { e.printStackTrace(); } } } } }
首先对 tempOutputPath 进行了一些分割,目的是去掉其最后边的/part-00000
。这里我们以 hash(md5+value) 对数据做了一个去重操作。可以看到输出的文件路径是 Path(outputPath, md5 + "_" + hashStr + "_" + uniqueName)
。给文件名加上part-m-00001是为了防止不同的 mapper 输出同名文件。由于我们前边已经做了去重工作,而文件名又是包含 md5 和 hash 值的,所以同一个 mapper 输出的文件名是不会重复的,而如果可能出现重名的情况时,需要注意默认的create
方法是不会覆盖的,FileSystem 提供了exists