在最近的工作中遇到需要解析大量 pcap 文件的需求,部门里原来的做法是将储存在 HDFS 上的 pcap 文件下载到本地再进行处理,这样无疑会带来处理数量上的瓶颈,并且整个处理逻辑也比较复杂。通过为 Hadoop 编写解析 pcap 插件的方式使得可以直接使用 Hadoop 处理。
hadoop-pcap github 上有一个 RIPE-NCC 的开源项目 hadoop-pcap ,项目中的 hadoop-pcap-lib 可以在 MapReduce 作业中使用来读取 pcap 文件。hadoop-pcap-lib 中还提供了各种协议的解析功能,如 dns、http 等,当然也可以根据自己的需求定制。在我们的需求中只需要将每个 pcap 文件中 packet 的 payload 按照顺序输出给同一个 mapper。下边我们来看一下项目中用到的关键类。
hadoop steaming 中有一个 -inputformat
参数,来为 Hadoop 指定 InputFormatClass,这个类描述了 MR 的输入规范。 InputFormat 的作用:
验证作业的输入格式
将输入文件拆分成 InputSplits,并将每个 InputSplit 分配给一个 mapper
提供 RecordReader 实现,用于从 InputSplit 读取输入(key-value)并提供给 mapper 处理
用一句话来总结就是:InputFormat 定义如何将数据切割成分片和如何读取分片中的数据。这两个功能分别由 getSplits()
和 RecordReader
完成。
hadoop 中默认的 InputFormat 是 TextInputFormat,用于纯文本文件,也是我们一般所处理的文件,其输出的 value 是文件中的每一行, key 是每一行在文件中的位置。hadoop 中也提供了用于读取普通文件的 FileInputFormat、用于读取数据库的 DBInputFormat 等。 我们所要实现的 PcapInputFormat 继承自 FileInputFormat。FileInputFormat 是所有基于文件的 InputFormat 的基类,其提供了 getSplits 的通用实现,但一个 pcap 文件作为一个整体是不能拆分的,FileInputFormat 同样也提供了 isSplitable
方法防止文件被拆分,在 hadoop-pcap-lib 的 PcapInputFormat.java 中也可以看到重写了 isSplitable
方法让其 return false。这也正好符合我们让一个 mapper 处理一个 pcap 文件的需求。 PcapInputFormat.java 中还重写了 createRecordReader 方法,其返回一个自定义的 RecordReader
:PcapRecordReader
PcapRecordReader PcapRecordReader 其实并没有太关键的实现,也没有太复杂的逻辑。比较关键的几个方法是
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 PcapReader pcapReader; Iterator<Packet> pcapReaderIterator; long packetCount = 0 ; @Override public boolean nextKeyValue () throws IOException { if (!pcapReaderIterator.hasNext()) return false ; key.set(++packetCount); value.set(pcapReaderIterator.next()); context.setStatus("Read " + getPos() + " of " + end + " bytes" ); context.progress(); return true ; } @Override public ObjectWritable getCurrentValue () { return value; } @Override public LongWritable getCurrentKey () { return key; }
nextKeyValue:读取下一个 key-value 键值对,可以看到 key 被设置成当前读取的 pcap 文件中 packetcount,value 被设置成 pcapReaderIterator.next(),pcapReaderIterator 是一个 packet 的迭代器,他来自 PcapRecordReader
的第一个参数 PcapReader
类,其返回的其实是一个 Packet
。
getCurrentValue:获取当前的 value
getCurrentKey:获取当前的 key
mapper 在运行的过程中,首先会判断是否有下一个 key-value,如果有就传入当前的 key 和 value 到 map。
另外需要注意的一点是不同版本的 Hadoop API 是有区别的,比如说在 RecordReader (Apache Hadoop Main 3.2.2 API) 和 RecordReader (Apache Hadoop Main 3.3.0 API) 中,RecordReader 类的方法就有所不同。目前的 hadoop-pcap-lib 跟 3.3.0 版本是一致的,但在实际环境中需要跟 hadoop 的版本保持一致。
Packet 在 PcapReader 之前先来说一下 Packet,它是一个 HashMap,它的 key 是 pcap 文件格式中 packet 的一些字段和 TCP/IP 协议中数据包的一些字段。其字段的值是在 PcapReader 中解析并设置的。在上面的 PcapRecordReader 中我们可以看到其输出的 key 就是一个 Packet,map 最终得到的值就是 Packet 中 toString 方法的输出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class Packet extends HashMap <String , Object > { private static final long serialVersionUID = 8723206921174160146L ; public static final String TIMESTAMP = "ts" ; public static final String TIMESTAMP_USEC = "ts_usec" ; public static final String TIMESTAMP_MICROS = "ts_micros" ; public static final String TTL = "ttl" ; ...... ...... @Override public String toString () { StringBuffer sb = new StringBuffer(); for (Map.Entry<String, Object> entry : entrySet()) { sb.append(entry.getKey()); sb.append('=' ); sb.append(entry.getValue()); sb.append(',' ); } if (sb.length() > 0 ) return sb.substring(0 , sb.length() - 1 ); return null ; } }
如果需要自定义参数或者自定义输出,就改动 Packet 类的实现,比如在我们的需求中,就加入了 payload 字段。
PcapReader PcapReader 是 PcapInputFormat 中的 initPcapReader 方法创建的,其只有一个参数 is,类型是 DataInputStream,其实就是当前 PcapRecordReader 所处理的 pcap 文件流。
1 2 3 4 5 6 7 8 9 10 11 12 public static PcapRecordReader initPcapRecordReader (Path path, long start, long length, Reporter reporter, Configuration conf) throws IOException { FileSystem fs = path.getFileSystem(conf); FSDataInputStream baseStream = fs.open(path); DataInputStream stream = baseStream; CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf); final CompressionCodec codec = compressionCodecs.getCodec(path); if (codec != null ) stream = new DataInputStream(codec.createInputStream(stream)); PcapReader reader = initPcapReader(stream, conf); return new PcapRecordReader(reader, start, length, baseStream, stream, reporter); }
PcapReader 首先会使用 readBytes 方法读取 pcap 头,关于 pcap 文件格式这里就不过多介绍了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public PcapReader(DataInputStream is) throws IOException { this.is = is; iterator = new PacketIterator(); pcapHeader = new byte[HEADER_SIZE]; if (!readBytes(pcapHeader)) { // // This special check for EOF is because we don't want // PcapReader to barf on an empty file. This is the only // place we check caughtEOF. // if (caughtEOF) { LOG.warn("Skipping empty file"); return; } throw new IOException("Couldn't read PCAP header"); } if (!validateMagicNumber(pcapHeader)) throw new IOException("Not a PCAP file (Couldn't find magic number)"); snapLen = PcapReaderUtil.convertInt(pcapHeader, PCAP_HEADER_SNAPLEN_OFFSET, reverseHeaderByteOrder); long linkTypeVal = PcapReaderUtil.convertInt(pcapHeader, PCAP_HEADER_LINKTYPE_OFFSET, reverseHeaderByteOrder); if ((linkType = getLinkType(linkTypeVal)) == null) throw new IOException("Unsupported link type: " + linkTypeVal); }
这部分就是 PcapRecordReader 直接调用的部分,PcapReader 本身是一个 Iterable 接口,它的 iterator 方法返回一个 Iterator 对象,可以使用for each 循环进行遍历,而 fetchNext 方法调用的是 PcapReader 的 nextPacket 方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private class PacketIterator implements Iterator <Packet > { private Packet next; private void fetchNext () { if (next == null ) next = nextPacket(); } @Override public boolean hasNext () { fetchNext(); if (next != null ) return true ; int remainingFlows = flows.size(); if (remainingFlows > 0 ) LOG.warn("Still " + remainingFlows + " flows queued. Missing packets to finish assembly?" ); return false ; } @Override public Packet next () { fetchNext(); try { return next; } finally { next = null ; } } @Override public void remove () { } }
nextPacket 方法首先尝试读取一个 pcapPacketHeader ,即 pcap 中的 Packet Header 部分,接着创建一个 createPacket 对象,然后向其中写入 Packet Header 的一些字段,之后根据 Packet Header 中的 CAP_LEN 读取 Packet Data 部分,接着读取数据包的一些参数。 如果需要自定义输出,在这里也需要做出修改,在我们的需求中,写入 Packet 的 Payload 就可以直接返回了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private Packet nextPacket () { pcapPacketHeader = new byte [PACKET_HEADER_SIZE]; if (!readBytes(pcapPacketHeader)) return null ; Packet packet = createPacket(); long packetTimestamp = PcapReaderUtil.convertInt(pcapPacketHeader, TIMESTAMP_OFFSET, reverseHeaderByteOrder); packet.put(Packet.TIMESTAMP, packetTimestamp); long packetTimestampMicros = PcapReaderUtil.convertInt(pcapPacketHeader, TIMESTAMP_MICROS_OFFSET, reverseHeaderByteOrder); packet.put(Packet.TIMESTAMP_MICROS, packetTimestampMicros); BigDecimal packetTimestampUsec = new BigDecimal(packetTimestamp + packetTimestampMicros / 1000000.0 , tsUsecMc); packet.put(Packet.TIMESTAMP_USEC, packetTimestampUsec.doubleValue()); long packetSize = PcapReaderUtil.convertInt(pcapPacketHeader, CAP_LEN_OFFSET, reverseHeaderByteOrder); packetData = new byte [(int )packetSize]; if (!readBytes(packetData)) return packet; int ipStart = findIPStart(packetData); ...... ......
使用 hadoop-pcap 在了解了 hadoop-pcap 的基本原理后,我们可以定制自己的 pcap 解析程序,当然也可以使用其提供的一些功能。下面介绍一下如何使用已经定制好的 hadoop-pcap。
编译&打包 为了简便,我把改写过的几个 java 文件放在了同一个文件夹里,项目结构如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 . ├── lib │ ├── apache-httpcomponents-httpclient.jar │ ├── commons-codec-1.9.jar │ ├── commons-lang3-3.3.2.jar │ ├── commons-logging-1.1.1.jar │ ├── commons-net-3.0.1.jar │ ├── dnsjava-2.1.1.jar │ ├── guava-11.0.jar │ ├── hadoop-0.20.2.1U11-core.jar │ ├── hadoop-0.20.2-cdh3u4-core.jar │ └── httpcore-4.2.1.jar ├── src │ └── com │ └── xxx │ └── xxx │ ├── CombinePcapInputFormat.java │ ├── CombinePcapRecordReader.java │ ├── Flow.java │ ├── Packet.java │ ├── PcapInputFormat.java │ ├── PcapReader.java │ ├── PcapReaderUtil.java │ └── PcapRecordReader.java └── target
使用 javac 编译。因为公司的 hadoop 版本不是最新的,使用的是 1.7 的 jdk,需要在编译时使用 -source 和 -target 参数指定版本。-cp 参数指定 lib 目录里程序所依赖的 jar 包。-d 指定输出目录,编译的结果会存放到 target 目录里。
1 javac -source 1.7 -target 1.7 -cp "./lib/*" -d target ./src/com/xxx/xxx/*.java
在 target 目录里新建 META-INF/MENIFEST.MF 并写入 Main-Class: PcapInputFormat
,之后执行:
1 jar -cvfm pcapinputformat.jar META-INF/MENIFEST.MF com/xxx/xxx/*.class
运行 Hadoop Straming 1 2 3 4 5 6 7 streaming_path=/usr/bin/hadoop/software/hadoop/contrib/streaming/hadoop-streaming.jar HADOOP=/usr/bin/hadoop/software/hadoop/bin/hadoop $HADOOP jar $streaming_path \ -libjars "path of pcapinputformat.jar" \ -inputformat "com.xxx.xxx.PcapInputFormat" \ ......
-libjars 参数是打包好的 jar 的位置,-inputformat 参数是 PcapInputFormat 的 package 名。