你好,游客 登录
背景:
阅读新闻

Hadoop基础之MapReduce原理、序列化和源码分析

[日期:2014-11-19] 来源:CSDN博客  作者:Fortyone41 [字体: ]

1. MapReduce原理

1.1.          MapReduce概述

(1)MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题.

(2)MapReduce由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单。这两个函数的形参是key、value对,表示函数的输入信息。

(3)在hadoop 中,map 函数 位 于 内 置 类 org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN,KEYOUT, VALUEOUT>中,reduce 函数位于内置类 org.apache.hadoop.mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT,VALUEOUT>中。 我们要做的就是覆盖 map 函数和 reduce 函数。

对于 Hadoop 的 map 函数和reduce 函数,处理的数据是键值对,也就是说 map 函数接收的数据是键值对,两个参数;输出的也是键值对,两个参数;reduce 函数接收的参数和输出的结果也是键值对。

在Mapper 类,有四个泛型,分别是 KEYIN、VALUEIN、KEYOUT、VALUEOUT,前面两个 KEYIN、 VALUEIN 指的是map 函数输入的参数 key、 value 的类型;后面两个 KEYOUT、VALUEOUT 指的是 map 函数输出的 key、value的类型。

输入参数 key、value 的类型就是KEYIN、VALUEIN,每一个键值对都会调用一次 map 函数。在这里,map 函数没有处理输入的 key、value,直接通过 context.write(…)方法输出了,输出的 key、value 的类型就是KEYOUT、VALUEOUT。这是默认实现,通常是需要根据业务逻辑覆盖的。

查看 Reducer 类,也有四个泛型,同理,分别指的是 reduce 函数输入的 key、value类型,和输出的 key、value 类型。看一下reduce 函数定义,如下图所示:

reduce 函数的形参 key、value 的类型是KEYIN、VALUEIN。要注意这里的value是存在于java.lang.Iterable<VALUEIN>中的,这是一 个迭代器,用于集合遍历的,意味着values 是一个集合。reduce 函数默认实现是把每个value 和对应的 key,通过调用context.write(…)输出了,这里输出的类型是 KEYOUT、VALUEOUT。通常会根据业务逻辑覆盖 reduce 函数的实现。

1.2.          MapReduce执行过程

MapReduce 运行的时候,会通过 Mapper 运行的任务读取HDFS 中的数据文件,然后调用自己的方法,处理数据,最后输出。Reducer任务会接收 Mapper 任务输出的数据,作为自己的输入数据,调用自己的方法,最后输出到 HDFS 的文件中。

1.3.          MapReduce原理及执行步骤

MapReduce原理图如下所示:

简单理解如下所示:

(1)Map任务处理

l  读取输入文件内容,解析成key、value对,对输入文件的每一行,解析成key、value对。每一个键值调用一次map函数;

l  覆盖map函数,对输入的key、value处理,转换成新的key、value输出;

l  对输出的key、value进行分区;

l  对不同分区的数据,按照key进行排序、分组;相同key的value放到一个集合中;

l  对分组后的数据进行规约。

l  Mapper执行1-6个步骤得到最终的结果

(2)Reduce任务处理

l  对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点;对多个map任务的输出进行合并、排序;

l  覆盖reduce函数,对输入的key、value处理,转换成新的key、value输出;

l  把reduce的输出保存到文件中。

l  Reduce执行4-6个步骤对Mapper最终的结果执行4-6个步骤进一步处理

将上述步骤融入到原理图中后如下所示:

(3)键值对编号

对于 Mapper 任务输入的键值对,定义为key1 和 value1。在 map 方法中处理后,输出的键值对,定义为 key2 和 value2。reduce 方法接收 key2 和 value2,处理后,输出 key3 和 value3。在下文讨论键值对时,可能把 key1 和 value1 简写为<k1,v1>,key2 和value2 简写为<k2,v2>,key3 和 value3 简写为<k3,v3>。

1.4.          单词计数

package mavshuang.mapreduce;
 
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 
public class WordCountDemo {
private static final String INPUT_PATH = "hdfs://hadoop0:9000/hello";
private static final String OUTPUT_PATH = "hdfs://hadoop0:9000/out";
 
public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
final Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), configuration);
// 判断文件是否存在,如果存在则删除
if (fileSystem.exists(new Path(OUTPUT_PATH))) {
fileSystem.delete(new Path(OUTPUT_PATH), true);
}
String jobName = WordCountDemo.class.getSimpleName();
Job job = new Job(configuration, jobName);
// 1.1 读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。
// 设置job执行作业时输入文件的路径
FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
// 设置把输入文件处理成键值对的类
job.setInputFormatClass(TextInputFormat.class);
// 1.2 覆盖map函数,对输入的key、value处理,转换成新的key、value输出。
// 省略的条件是map输出的<k,v>与reduce输出的<k,v>格式相同
// 设置自定义的MyMapper类
job.setMapperClass(MyMapper.class);
// 设置map方法输出的k2,v2值得类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
 
// 1.3 对输出的key、value进行分区。
// 设置对k2分区的类
job.setPartitionerClass(HashPartitioner.class);
// 设置运行的Reducer任务的数量
job.setNumReduceTasks(1);
 
// 1.4 对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。
 
// 1.5 (可选)分组后的数据进行归约。
 
// 2.1 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。对多个map任务的输出进行合并、排序。
// 设置自定义的MyReducer类
job.setReducerClass(MyReducer.class);
 
// 2.2 覆盖reduce函数,对输入的key、value处理,转换成新的key、value输出。
// 设置reduce方法输出的k3,v3值得类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
 
// 2.3 把reduce的输出保存到文件中。
// 设置job执行作业时的输出路径
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
// 设置把输出文件处理成键值对的类
job.setOutputFormatClass(TextOutputFormat.class);
 
// 把job提交给JobTracker执行,等待执行结果返回
job.waitForCompletion(true);
 
}
 
// KEYIN:表示每一行的偏移量
// VALUEIN:表示每一行的内容
// KEYOUT:表示每一行中的每个单词
// VALUEOUT:表示每一行中每个单词的出现次数,常量为1
// 继承Mapper类实现map方法
static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
// 源文件会被解析成2个键值对,分别为<0,hello you >,<10,hello mavs>
// 每个<k,v>都调用一次函数
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
final String[] splited = value.toString().split("\t");
for (String word : splited) {
final Text k2 = new Text(word);
final LongWritable v2 = new LongWritable(1);
context.write(k2, v2);
}
}
}
 
// KEYIN:表示整个文件中的不同单词
// VALUEIN:表示整个文件中的不同单词出现的次数
// KEYOUT:表示整个文件中的不同单词
// VALUEOUT:表示整个文件中的不同单词出现的总次数
// 继承Reducer类实现reduce方法
static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
// reduce会被调用3次,分别是<hello,{1,1}>、<mavs,{1}>、<you,{1}>
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException {
Long count = 0L;// 定义成Java类型,便于操作
for (LongWritable times : v2s) {
count += times.get();
}
final LongWritable v3 = new LongWritable(count);
context.write(k2, v3);
}
}
}

上述结果错误原因在于: hello 文件,单词与单词之间应该使用 “Tab” 键来区分,而不是空格键;修改后的结果如下所示:

1.5.          分析上述代码执行过程

(1)JobTracker

负责接收用户提交的作业,负责启动、跟踪任务的执行。

JobSubmissionProtocol是JobClient与JobTracker通信的接口;

InterTrackerProtocol是TaskTracker与JobTracker通信的接口;

(2)TaskTracker

负责执行任务。

(3)JobClient

是用户作业与JobTracker交互的主要接口;

负责提交作业的,负责启动、跟踪任务执行、访问任务状态和日志等。

(4)MapReduce驱动默认的设置

以上代码是从上述代码中截取出来的,其中一些设置未按照MapReduce中的默认设置,MapReduce中的默认设置如下所示:

(5)上述代码分析执行过程

(6)总结图解

2. 序列化

2.1.序列化概念

(1)序列化(Serialization)是指把结构化对象转换为字节流;

(2)反序列化(Deserialization)是序列化的逆过程,即把字节流转回结构化对象;

(3)Java序列化是指java.io.Serializable接口。

2.2.Hadoop序列化特点

(1)紧凑

高效使用存储空间。

(2)快速

读写数据的额外开销小。

(3)可扩展

可透明地读取老格式的数据。

(5)互操作

支持多语言的交互。

2.3.Hadoop序列化的作用

主要有两大作用:进程间通信和永久存储;

Hadoop节点间通信如下图所示:

原文链接:http://blog.csdn.net/mavs41/article/details/41179673





收藏 推荐 打印 | 录入: | 阅读:
本文评论   查看全部评论 (0)
表情: 表情 姓名: 字数
点评:
       
评论声明
  • 尊重网上道德,遵守中华人民共和国的各项有关法律法规
  • 承担一切因您的行为而直接或间接导致的民事或刑事法律责任
  • 本站管理人员有权保留或删除其管辖留言中的任意内容
  • 本站有权在网站内转载或引用您的评论
  • 参与本评论即表明您已经阅读并接受上述条款