在最近的工作中遇到需要解析大量 pcap 文件的需求,部门里原来的做法是将储存在 HDFS 上的 pcap 文件下载到本地再进行处理,这样无疑会带来处理数量上的瓶颈,并且整个处理逻辑也比较复杂。通过为 Hadoop 编写解析 pcap 插件的方式使得可以直接使用 Hadoop 处理。

hadoop-pcap

github 上有一个 RIPE-NCC 的开源项目 hadoop-pcap,项目中的 hadoop-pcap-lib 可以在 MapReduce 作业中使用来读取 pcap 文件。hadoop-pcap-lib 中还提供了各种协议的解析功能,如 dns、http 等,当然也可以根据自己的需求定制。在我们的需求中只需要将每个 pcap 文件中 packet 的 payload 按照顺序输出给同一个 mapper。下边我们来看一下项目中用到的关键类。

PcapInputFormat

hadoop steaming 中有一个 -inputformat 参数,来为 Hadoop 指定 InputFormatClass,这个类描述了 MR 的输入规范。
InputFormat 的作用:

  1. 验证作业的输入格式
  2. 将输入文件拆分成 InputSplits,并将每个 InputSplit 分配给一个 mapper
  3. 提供 RecordReader 实现,用于从 InputSplit 读取输入(key-value)并提供给 mapper 处理

用一句话来总结就是:InputFormat 定义如何将数据切割成分片和如何读取分片中的数据。这两个功能分别由 getSplits()RecordReader 完成。

hadoop 中默认的 InputFormat 是 TextInputFormat,用于纯文本文件,也是我们一般所处理的文件,其输出的 value 是文件中的每一行, key 是每一行在文件中的位置。hadoop 中也提供了用于读取普通文件的 FileInputFormat、用于读取数据库的 DBInputFormat 等。
我们所要实现的 PcapInputFormat 继承自 FileInputFormat。FileInputFormat 是所有基于文件的 InputFormat 的基类,其提供了 getSplits 的通用实现,但一个 pcap 文件作为一个整体是不能拆分的,FileInputFormat 同样也提供了 isSplitable 方法防止文件被拆分,在 hadoop-pcap-lib 的 PcapInputFormat.java 中也可以看到重写了 isSplitable 方法让其 return false。这也正好符合我们让一个 mapper 处理一个 pcap 文件的需求。
PcapInputFormat.java 中还重写了 createRecordReader 方法,其返回一个自定义的 RecordReader:PcapRecordReader

PcapRecordReader

PcapRecordReader 其实并没有太关键的实现,也没有太复杂的逻辑。比较关键的几个方法是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
PcapReader pcapReader;
Iterator<Packet> pcapReaderIterator;
long packetCount = 0;

@Override
public boolean nextKeyValue() throws IOException {
if (!pcapReaderIterator.hasNext())
return false;

key.set(++packetCount);
value.set(pcapReaderIterator.next());

context.setStatus("Read " + getPos() + " of " + end + " bytes");
context.progress();

return true;
}

@Override
public ObjectWritable getCurrentValue() {
return value;
}

@Override
public LongWritable getCurrentKey() {
return key;
}
  • nextKeyValue:读取下一个 key-value 键值对,可以看到 key 被设置成当前读取的 pcap 文件中 packetcount,value 被设置成 pcapReaderIterator.next(),pcapReaderIterator 是一个 packet 的迭代器,他来自 PcapRecordReader 的第一个参数 PcapReader类,其返回的其实是一个 Packet
  • getCurrentValue:获取当前的 value
  • getCurrentKey:获取当前的 key

mapper 在运行的过程中,首先会判断是否有下一个 key-value,如果有就传入当前的 key 和 value 到 map。

另外需要注意的一点是不同版本的 Hadoop API 是有区别的,比如说在 RecordReader (Apache Hadoop Main 3.2.2 API)RecordReader (Apache Hadoop Main 3.3.0 API) 中,RecordReader 类的方法就有所不同。目前的 hadoop-pcap-lib 跟 3.3.0 版本是一致的,但在实际环境中需要跟 hadoop 的版本保持一致。

Packet

在 PcapReader 之前先来说一下 Packet,它是一个 HashMap,它的 key 是 pcap 文件格式中 packet 的一些字段和 TCP/IP 协议中数据包的一些字段。其字段的值是在 PcapReader 中解析并设置的。在上面的 PcapRecordReader 中我们可以看到其输出的 key 就是一个 Packet,map 最终得到的值就是 Packet 中 toString 方法的输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Packet extends HashMap<String, Object> {
private static final long serialVersionUID = 8723206921174160146L;

public static final String TIMESTAMP = "ts";
public static final String TIMESTAMP_USEC = "ts_usec";
public static final String TIMESTAMP_MICROS = "ts_micros";
public static final String TTL = "ttl";
......
......

@Override
public String toString() {
StringBuffer sb = new StringBuffer();
for (Map.Entry<String, Object> entry : entrySet()) {
sb.append(entry.getKey());
sb.append('=');
sb.append(entry.getValue());
sb.append(',');
}
if (sb.length() > 0)
return sb.substring(0, sb.length() - 1);
return null;
}
}

如果需要自定义参数或者自定义输出,就改动 Packet 类的实现,比如在我们的需求中,就加入了 payload 字段。

PcapReader

PcapReader 是 PcapInputFormat 中的 initPcapReader 方法创建的,其只有一个参数 is,类型是 DataInputStream,其实就是当前 PcapRecordReader 所处理的 pcap 文件流。

1
2
3
4
5
6
7
8
9
10
11
12
public static PcapRecordReader initPcapRecordReader(Path path, long start, long length, Reporter reporter, Configuration conf) throws IOException {
FileSystem fs = path.getFileSystem(conf);
FSDataInputStream baseStream = fs.open(path);
DataInputStream stream = baseStream;
CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf);
final CompressionCodec codec = compressionCodecs.getCodec(path);
if (codec != null)
stream = new DataInputStream(codec.createInputStream(stream));

PcapReader reader = initPcapReader(stream, conf);
return new PcapRecordReader(reader, start, length, baseStream, stream, reporter);
}

PcapReader 首先会使用 readBytes 方法读取 pcap 头,关于 pcap 文件格式这里就不过多介绍了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public PcapReader(DataInputStream is) throws IOException {
this.is = is;
iterator = new PacketIterator();

pcapHeader = new byte[HEADER_SIZE];
if (!readBytes(pcapHeader)) {
//
// This special check for EOF is because we don't want
// PcapReader to barf on an empty file. This is the only
// place we check caughtEOF.
//
if (caughtEOF) {
LOG.warn("Skipping empty file");
return;
}
throw new IOException("Couldn't read PCAP header");
}

if (!validateMagicNumber(pcapHeader))
throw new IOException("Not a PCAP file (Couldn't find magic number)");

snapLen = PcapReaderUtil.convertInt(pcapHeader, PCAP_HEADER_SNAPLEN_OFFSET, reverseHeaderByteOrder);

long linkTypeVal = PcapReaderUtil.convertInt(pcapHeader, PCAP_HEADER_LINKTYPE_OFFSET, reverseHeaderByteOrder);
if ((linkType = getLinkType(linkTypeVal)) == null)
throw new IOException("Unsupported link type: " + linkTypeVal);
}

这部分就是 PcapRecordReader 直接调用的部分,PcapReader 本身是一个 Iterable 接口,它的 iterator 方法返回一个 Iterator 对象,可以使用for each 循环进行遍历,而 fetchNext 方法调用的是 PcapReader 的 nextPacket 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private class PacketIterator implements Iterator<Packet> {
private Packet next;

private void fetchNext() {
if (next == null)
next = nextPacket();
}

@Override
public boolean hasNext() {
fetchNext();
if (next != null)
return true;
int remainingFlows = flows.size();
if (remainingFlows > 0)
LOG.warn("Still " + remainingFlows + " flows queued. Missing packets to finish assembly?");
return false;
}

@Override
public Packet next() {
fetchNext();
try {
return next;
} finally {
next = null;
}
}

@Override
public void remove() {
// Not supported
}
}

nextPacket 方法首先尝试读取一个 pcapPacketHeader ,即 pcap 中的 Packet Header 部分,接着创建一个 createPacket 对象,然后向其中写入 Packet Header 的一些字段,之后根据 Packet Header 中的 CAP_LEN 读取 Packet Data 部分,接着读取数据包的一些参数。
如果需要自定义输出,在这里也需要做出修改,在我们的需求中,写入 Packet 的 Payload 就可以直接返回了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private Packet nextPacket() {
pcapPacketHeader = new byte[PACKET_HEADER_SIZE];
if (!readBytes(pcapPacketHeader))
return null;

Packet packet = createPacket();

long packetTimestamp = PcapReaderUtil.convertInt(pcapPacketHeader, TIMESTAMP_OFFSET, reverseHeaderByteOrder);
packet.put(Packet.TIMESTAMP, packetTimestamp);

long packetTimestampMicros = PcapReaderUtil.convertInt(pcapPacketHeader, TIMESTAMP_MICROS_OFFSET, reverseHeaderByteOrder);
packet.put(Packet.TIMESTAMP_MICROS, packetTimestampMicros);

BigDecimal packetTimestampUsec = new BigDecimal(packetTimestamp + packetTimestampMicros / 1000000.0, tsUsecMc);
packet.put(Packet.TIMESTAMP_USEC, packetTimestampUsec.doubleValue());

long packetSize = PcapReaderUtil.convertInt(pcapPacketHeader, CAP_LEN_OFFSET, reverseHeaderByteOrder);
packetData = new byte[(int)packetSize];
if (!readBytes(packetData))
return packet;

int ipStart = findIPStart(packetData);
......
......

使用 hadoop-pcap

在了解了 hadoop-pcap 的基本原理后,我们可以定制自己的 pcap 解析程序,当然也可以使用其提供的一些功能。下面介绍一下如何使用已经定制好的 hadoop-pcap。

编译&打包

为了简便,我把改写过的几个 java 文件放在了同一个文件夹里,项目结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
.
├── lib
│ ├── apache-httpcomponents-httpclient.jar
│ ├── commons-codec-1.9.jar
│ ├── commons-lang3-3.3.2.jar
│ ├── commons-logging-1.1.1.jar
│ ├── commons-net-3.0.1.jar
│ ├── dnsjava-2.1.1.jar
│ ├── guava-11.0.jar
│ ├── hadoop-0.20.2.1U11-core.jar
│ ├── hadoop-0.20.2-cdh3u4-core.jar
│ └── httpcore-4.2.1.jar
├── src
│ └── com
│ └── xxx
│ └── xxx
│ ├── CombinePcapInputFormat.java
│ ├── CombinePcapRecordReader.java
│ ├── Flow.java
│ ├── Packet.java
│ ├── PcapInputFormat.java
│ ├── PcapReader.java
│ ├── PcapReaderUtil.java
│ └── PcapRecordReader.java
└── target

使用 javac 编译。因为公司的 hadoop 版本不是最新的,使用的是 1.7 的 jdk,需要在编译时使用 -source 和 -target 参数指定版本。-cp 参数指定 lib 目录里程序所依赖的 jar 包。-d 指定输出目录,编译的结果会存放到 target 目录里。

1
javac -source 1.7 -target 1.7 -cp "./lib/*" -d target ./src/com/xxx/xxx/*.java

在 target 目录里新建 META-INF/MENIFEST.MF 并写入 Main-Class: PcapInputFormat,之后执行:

1
jar -cvfm pcapinputformat.jar META-INF/MENIFEST.MF com/xxx/xxx/*.class

运行 Hadoop Straming

1
2
3
4
5
6
7
streaming_path=/usr/bin/hadoop/software/hadoop/contrib/streaming/hadoop-streaming.jar
HADOOP=/usr/bin/hadoop/software/hadoop/bin/hadoop

$HADOOP jar $streaming_path \
-libjars "path of pcapinputformat.jar" \
-inputformat "com.xxx.xxx.PcapInputFormat" \
......

-libjars 参数是打包好的 jar 的位置,-inputformat 参数是 PcapInputFormat 的 package 名。