配置参数

  • mapreduce.job.name:job name。
  • mapreduce.job.priority:优先级调度,只是在计算任务向集群服务申请资源的时候会起作用。
  • mapreduce.job.running.map.limit:每个作业最多可以同时跑多少个 map,默认是0,当值为0或负数时无限制。
  • mapreduce.job.reduces:reduce任务个数,默认是1。
  • mapred.input.dir.error.pass:跳过输入路径错误。
  • mapred.split.zero.file.skip:如果文件长度为0则划分 split。

容错

  • mapreduce.map.maxattempts:每个 map 任务最大重试次数,重试次数超过该值则认为任务运行失败。默认是4。
  • mapreduce.reduce.maxattempts:每个 reduce 任务最大重试次数,同上。
  • mapreduce.task.timeout:Task 超时时间。如果一个任务在一定时间内不读取新的数据,也没有输出数据,则认为其处于 block 状态。为了防止用户程序永远 block 住不退出,强制设置了一个超时时间(单位毫秒),默认是300000。

压缩

  • mapreduce.map.output.compress:在通过网络发送之前map的输出是否压缩。默认 false。
  • mapreduce.map.output.compress.codec:指定 map 输出所用的压缩 codec。

对 map 输出进行数据压缩可以加速网络传输,对 reduce 输出进行数据压缩可以减少磁盘空间,如果输入文件是压缩的,那么在根据文件扩展名推断出相应的 codec 后,MapReduce 会在读取文件是自动解压缩文件。

效率和稳定性

  • mapreduce.map.speculative:map 开启推测执行。默认 true。
  • mapreduce.reduce.speculative:reduce 开启推测执行。默认 true。

在作业执行时,由于一些原因,比如硬件老化、软件层面的不恰当配置等。某些机器上的任务实例执行的很慢,拖慢了整个作业的进度。Hadoop 不会尝试去诊断或者修复这些慢任务,相反它会在集群的其他节点上去启动这些慢任务的多个示例作为备份,这就是hadoop的推测执行(speculative execution)。

map数量如何确定

map 的个数等于 split 的个数。
其由三个因素决定:

  1. 输入文件数目
  2. 输入文件大小
  3. 配置参数

一般来说,对于每一个输入的文件会有一个 map split。如果输入文件太大,会把大文件划分成多个 split 进行处理,每个 map 处理一个 split。
涉及的参数:

  • mapreduce.input.fileinputformat.split.minsize:启动 map 最小的 split size 大小,默认0
  • mapreduce.input.fileinputformat.split.maxsize:启动 map 最大的 split size 大小,默认256M
  • dfs.block.size:block 块大小。

计算公式:splitSize = Math.max(minSize, Math.min(maxSize, blockSize))

FileInputFormat class 的 getSplits() 的伪代码:

1
2
3
4
5
6
7
8
9
num_splits = 0
for each input file f:
remaining = f.length
while remaining / split_size > split_slope:
num_splits += 1
remaining -= split_size
where:
split_slope = 1.1 分割斜率
split_size =~ dfs.blocksize 分割大小约等于hdfs块大小

会有一个比例进行运算来进行切片,为了减少资源的浪费
例如一个文件大小为260M,在进行 MapReduce 运算时,会首先使用260M/128M(blocksize),得出的结果和1.1进行比较,大于则切分出一个128M作为一个分片,剩余132M,再次除以128(blocksize),得到结果为1.03,小于1.1则将132作为一个切片,即最终260M被切分为两个切片进行处理,而非3个切片。

传参

sys.argv

mapper:

1
2
3
4
import sys

arg1 = sys.argv[1]
arg2 = sys.argv[2]

hadoop straming:

1
2
3
$HADOOP jar $streaming_path \
-mapper "mapper.py arg1 arg2" \
-file "mapper.py"

环境变量

mapper:

1
2
3
import os  

date_start = os.environ.get("date_start")

hadoop straming:

1
2
3
$HADOOP jar $streaming_path \
-mapper "mapper.py" \
-cmdenv "date_start=$date"

设置多个输入目录

使用多个-input选项 or 逗号分割

多个 -input:

1
2
3
$HADOOP jar $streaming_path \
-input "/user/foo/dir1" \
-input "/user/foo/dir2"

逗号分割:

1
2
$HADOOP jar $streaming_path \
-input "/user/foo/dir1,/user/foo/dir2"

通配符

1
2
3
4
5
$HADOOP jar $streaming_path \
-input "/user/foo/dir[1-2]"

$HADOOP jar $streaming_path \
-input "/user/foo/{dir1,dir2}"

数组

1
2
3
4
input_file=("/user/foo/dir1" "/user/foo/dir2")

$HADOOP jar $streaming_path \
-input ${input_file[@]}