基本流程:
一个图片太大了,只好分割成为两部分。根据流程图来说一下具体的一个任务执行的情况。
1. 分布式环境中客户端创建任务并提交。
2. InputFormat做Map前的预处理,主要负责以下工作:
a) 验证输入的格式是否符合JobConfig的输入定义,这个在实现Map和构建Conf的时候就会知道,不定义可以是Writable的任意子类。
b) 将input的文件split为逻辑上的输入InputSplit,其实这就是在上面提到的在分布式文件系统中blocksize是有大小限制的,因此大文件会被划分为多个block。
c) 通过RecordReader来再次处理inputsplit为一组records,输出给Map。(inputsplit只是逻辑切分的第一步,但是如何根据文件中的信息来切分还需要RecordReader来实现,例如最简单的默认方式就是回车换行的切分)
3. RecordReader处理后的结果作为Map的输入,Map执行定义的Map逻辑,输出处理后的key,value对到临时中间文件。
4. Combiner可选择配置,主要作用是在每一个Map执行完分析以后,在本地优先作Reduce的工作,减少在Reduce过程中的数据传输量。
5. Partitioner可选择配置,主要作用是在多个Reduce的情况下,指定Map的结果由某一个Reduce处理,每一个Reduce都会有单独的输出文件。(后面的代码实例中有介绍使用场景)
6. Reduce执行具体的业务逻辑,并且将处理结果输出给OutputFormat。
7. OutputFormat的职责是,验证输出目录是否已经存在,同时验证输出结果类型是否如Config中配置,最后输出Reduce汇总后的结果。
代码范例:
业务场景描述:
可设定输入和输出路径(操作系统的路径非HDFS路径),根据访问日志分析某一个应用访问某一个API的总次数和总流量,统计后分别输出到两个文件中。
仅仅为了测试,因此没有去细分很多类,将所有的类都归并于一个类便于说明问题。
图4 测试代码类图
LogAnalysiser就是主类,主要负责创建,提交任务,并且输出部分信息。内部的几个子类用途可以参看流程中提到的角色职责。具体的看看几个类和方法的代码片断:
LogAnalysiser::MapClass
public static class MapClass extends MapReduceBase
implements Mapper<LongWritable, Text, Text, LongWritable>
{
public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter)
throws IOException
&