MapReduce
- 概述
- 分布式并行编程
- MapReduce模型简介
- Map和Reduce函数
- 序列化
- 大规模数据的难点
- MapReduce体系结构
- MapReduce1.0体系结构
- JobTracker
- TaskTracker
- 体系结构详解
- MapReduce on Yarn
- MapReduce工作流程
- 工作流程概述
- MapReduce各个执行阶段
- Map任务的数量
- Reduce任务的数量
- Shuffle过程详解
- Map端的Shuffle过程
- Reduce端的Shuffle过程
- 小结
- MapReduce应用程序执行过程
- 常见MapReduce应用场景
- 实例分析:WordCount
- MapReduce的具体应用(了解)
- MapReduce在关系代数运算中的应用
- 分组与聚合运算
- 矩阵-向量乘法
- 矩阵乘法
概述
分布式并行编程
- 在MapReduce中,一个存储在分布式文件系统中的大规模数据集会被切分成许多独立的小数据块,这些小数据块可以被多个Map任务并行处理。MapReduce框架会为每个Map任务输入一个数据子集,Map任务生成的结果会继续作为Reduce任务的输入,最终由Reduce任务输出最后结果,并写入分布式文件系统。特别需要注意的是:适合用MapReduce来处理的数据集需要满足一个前提条件,即待处理的数据集可以分解为许多小的数据集,而且每一个小数据集都可以完全并行地进行处理。
问题:MapReduce相较于传统的并行计算框架有什么优势?
MapReduce模型简介
MapReduce将复杂的、运行于大规模集群上的并行计算过程高度地抽象到了两个函数:Map和Reduce。map和reduce(fold)函数都是属于在函数式编程语言中的高阶函数。
- map函数的功能是接受一个列表list以及一个函数,将这个函数作用于这个列表中的所有成员,并返回所得结果。
- reduce(fold)函数的功能则是接收一个列表、一个初始值以及一个函数,将该函数作为特定的组合方式,将其递归地应用于列表的所有成员,并返回最终结果。
- 编程容易,不需要掌握分布式并行编程细节,也可以很容易把自己的程序运行在分布式系统上,完成海量数据的计算。
问题1:针对大规模数据进行分布式计算,可能会面临数据太大,内存超出范围的情况,如何解决这种情况?
- MapReduce采用**“分而治之”**策略,一个存储在分布式文件系统中的大规模数据集,会被切分成许多独立的分片(split),这些分片可以被多个Map任务并行处理。
- 并行计算的第一个重要问题是如何划分计算任务或者计算数据以便对划分的子任务或数据块同时进行计算。不可分拆的计算任务或相互间有依赖关系的数据无法进行并行计算!
问题2:如何获取这些数据?要把数据集中到一起主机上进行计算吗?
- MapReduce设计的一个理念就是“计算向数据靠拢”,而不是“数据向计算靠拢”,因为移动数据需要大量的网络传输开销,尤其是在大规模数据环境下,所以移动计算比移动数据更加经济。
- MapReduce框架采用了Master/Slave架构,包括一个Master和若干个Slave。Master上运行JobTracker,Slave上运行TaskTracker。
Hadoop框架是用Java实现的,但是MapReduce应用程序则不一定要用Java来写。
优点:
- 易于编程
- 良好的扩展性
- 高容错性
- 适合海量数据的离线处理
局限性:
- 实时计算性能差
- 不能进行流式计算(流式计算特点是数据是源源不断得计算,并且数据是动态的;而MapReduce作为一个离线计算框架,主要是针对静态数据集得,数据是不能动态变化的)
Map和Reduce函数
统计单词个数MapReduce分析图
序列化
序列化(Serialization)是将结构化对象转换成字节流以便于进行网络传输或写入持久存储的过程。
反序列化(Deserialization)是将字节流转换为一系列结构化对象的过程,重新创建该对象。
Java的序列化机制
- Java对象序列化的机制,把对象表示成二进制的字节数组,包含了对象的数据,对象的类型信息,对象内部的数据的类型信息等。通过保存或者转移二进制数组达到持久化、传递的目的。
- 序列化是通过实现java.io.Serializable接口实现。
- 反序列化是和序列化相反的过程,就是把二进制数组转化为对象的过程。
Hadoop序列化机制
Hadoop的序列化没有采用java的序列化机制,而是实现了自己的序列化机制。
原因在于java的序列化机制比较臃肿,重量级,是不断的创建对象的机制,并且会额外附带很多信息(校验、继承关系等)。但在Hadoop的序列化机制中,用户可以复用对象,减少java对象的分配和回收,提高应用效率。
Hadoop通过Writable接口实现序列化机制。
Hadoop中的数据类型
注:如果需要将自定义的类放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。
大规模数据的难点
- 大量的数据,例如,PB级别的数据,需要使用大量的设备进行工作。
- 设备调度问题。
- 数据流程会更加复杂。
MapReduce体系结构
MapReduce1.0体系结构
Hadoop MapReduce采用Master/Slave结构。
- Master:是整个集群的唯一的全局管理者,功能包括:作业管理、状态监控和任务调度等,即MapReduce中的JobTracker。
- Slave:负责任务的执行和任务状态的回报,即MapReduce中的TaskTracker。
JobTracker
概述:
JobTracker是一个后台服务进程,启动之后,会一直监听并接收来自各个TaskTracker发送的心跳信息,包括资源使用情况和任务运行情况等信息。
功能:
- 作业控制:在hadoop中每个应用程序被表示成一个作业,每个作业又被分成多个任务,JobTracker的作业控制模块则负责作业的分解和状态监控。
- 状态监控:主要包括TaskTracker状态监控、作业状态监控和任务状态监控。主要作用是容错和为任务调度提供决策依据。
- 资源管理。
TaskTracker
概述:
- TaskTracker是JobTracker和Task之间的桥梁;
- 从JobTracker接收并执行各种命令:运行任务、提交任务、杀死任务等;
- 将本地节点上各个任务的状态通过心跳周期性汇报给JobTracker。TaskTracker与JobTracker和Task之间采用了RPC协议进行通信。
功能:
-
汇报心跳:Tracker周期性将所有节点上各种信息通过心跳机制汇报给JobTracker。这些信息包括两部分:
1)机器级别信息:节点健康情况、资源使用情况等。
2)任务级别信息:任务执行进度、任务运行状态等。 - 执行命令:JobTracker会给TaskTracker下达各种命令,主要包括:启动任务(LaunchTaskAction)、提交任务(CommitTaskAction)、杀死任务(KillTaskAction)、杀死作业(KillJobAction)和重新初始化(TaskTrackerReinitAction)。
体系结构详解
MapReduce体系结构主要由四个部分组成,分别是:Client、JobTracker、TaskTracker以及Task。
Client
用户编写的MapReduce程序通过Client提交到JobTracker端。
用户可通过Client提供的一些接口查看作业运行状态。
JobTracker
JobTracker负责资源监控和作业调度;
JobTracker 监控所有TaskTracker与Job的健康状况,一旦发现失败,就将相应的任务转移到其他节点;
JobTracker 会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器(TaskScheduler),而调度器会在资源出现空闲时,选择合适的任务去使用这些资源。
TaskTracker
TaskTracker 会周期性地通过“心跳”将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时接收JobTracker 发送过来的命令并执行相应的操作(如启动新任务、杀死任务等);
TaskTracker 使用“slot”等量划分本节点上的资源量(CPU、内存等)。一个Task 获取到一个slot 后才有机会运行,而Hadoop调度器的作用就是将各个TaskTracker上的空闲slot分配给Task使用。slot 分为Map slot 和Reduce slot 两种,分别供MapTask 和Reduce Task 使用。
Task
Task 分为Map Task 和Reduce Task 两种,均由TaskTracker 启动。
MapReduce on Yarn
MapReduce工作流程
Map阶段对应的是MapTask并发实例,完全并行运行。
Reduce阶段对应的是ReduceTask并发实例,数据依赖于上一个阶段所有MapTask并发实例的数据输出结果。
MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。
工作流程概述
- MapReduce计算模型主要由三个阶段构成:Map、shuffle、Reduce。
- Map是映射,负责数据的过滤分法,将原始数据转化为键值对;
- Reduce是合并,将具有相同key值的value进行处理后再输出新的键值对作为最终结果。
- 为了让Reduce可以并行处理Map的结果,必须对Map的输出进行一定的排序与分割,然后再交给对应的Reduce,而这个将Map输出进行进一步整理并交给Reduce的过程就是Shuffle。
注:
- 不同的Map任务之间不会进行通信
- 不同的Reduce任务之间也不会发生任何信息交换
- 用户不能显式地从一台机器向另一台机器发送消息
- 所有的数据交换都是通过MapReduce框架自身去实现的
Map和Reduce操作需要我们自己定义相应Map类和Reduce类,以完成我们所需要的化简、合并操作,而shuffle则是系统自动帮我们实现的,了解shuffle的具体流程能帮助我们编写出更加高效的Mapreduce程序。
Shuffle过程包含在Map和Reduce两端,即Map shuffle和Reduce shuffle。
注:
1)为了让Reduce可以并行处理Map的结果,需要对Map的输出进行一定的分区(Portition)、排序(Sort)、合并(Combine)、归并(Merge)等操作,得到<key,value-list>形式的中间结果,再交给对应的Reduce进行处理,这个过程称为Shuffle。从无序的<key,value>到有序的<key,value-list>,这个过程用Shuffle(洗牌)来称呼是非常形象的。
2)所谓“归并”,是指对于相同key的键值对会被归并成一个新的键值对。具体而言,对于若干个具有key的键值对< k 1 k_1 k1, v 1 v_1 v1>、< k 2 k_2 k2, v 2 v_2 v2>…< k n k_n kn, v n v_n vn>会被归并成一个新的键值对< k 1 k_1 k1,< v 1 v_1 v1, v 2 v_2 v2,…, v n v_n vn>>
3)合并(Combine)和归并(Merge)的区别:两个键值对<“a”,1>和<“a”,1>,如果合并,会得到<“a”,2>,如果归并,会得到<“a”,<1,1>>
MapReduce各个执行阶段
HDFS 以固定大小的block 为基本单位存储数据,而对于MapReduce 而言,其处理单位是Split。
Split (分片)是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。
Map任务的数量
Hadoop为每个split创建一个Map任务,split 的多少决定了Map任务的数目。大多数情况下,理想的分片大小是一个HDFS块
Reduce任务的数量
最优的Reduce任务个数取决于集群中可用的reduce任务槽(slot)的数目
通常设置比reduce任务槽数目稍微小一些的Reduce任务个数(这样可以预留一些系统资源处理可能发生的错误)
Shuffle过程详解
Map端的Shuffle过程
1)输入数据和执行Map任务:Map任务的输入数据一般保存在分布式文件系统(如GFS或HDFS中)。
2)写入缓存:每个Map任务都会被分配一个缓存,Map的输出结果不是立即写入磁盘,而是首先写入缓存。在缓存中积累一定数量的Map输出结果以后,再一次性批量写入磁盘,这样可以大大减少对磁盘I/O的影响。
3)溢写(分区、排序和合并):
溢写(Spill)就是把缓存中的内容一次性写入磁盘,为了保证Map结果能够不停地持续写入缓存,不受溢写过程的影响,就必须让缓存中一直有可用的空间,不能等到全部占满才启动溢写过程,所以一般设置一个溢写比例,如0.8,也就是说当100MB大小的缓存被填满80MB数据时,就启动溢写过程,把已经写入的80MB数据写入磁盘,剩余20MB空间供Map结果继续写入。
在溢写到磁盘前,缓存中的数据首先会被分区(Partition),MapReduce通过Partitioner接口对键值对进行分区,默认采用的分区方式是采用Hash函数对key进行哈希然后用Reduce任务的数量进行取模。
对于每个分区内的所有键值对,后台线程会根据key对它们进行内存排序,排序是MapReduce的默认操作。
所谓合并就是将那些具有相同key的<key,value>的value加起来。不过,并非所有场合都可以使用Combiner,因为Combiner的输出是Reduce任务的输入,Combiner绝不能改变Reduce任务的最终计算结果,一般而言,累加、最大值等场景可以使用合并操作
4)文件归并:每次溢写操作都会在磁盘中生成一个新的溢写文件,最终,在Map任务全部结束之前,系统会对所有的溢写文件中的数据进行归并(Merge),生成一个大的溢写文件,这个大的溢写文件中的所有键值对也是经过分区和排序的。如果磁盘中已经生成的溢写文件的数量超过参数min.num.spills.for.combine的值时,就可以再次运行Combiner,对数据进行合并,从而减少写入磁盘的数据量。但是,如果磁盘中只有一两个溢写文件时,执行合并操作就会得不偿失,执行合并操作本身也需要代价,因此不会运行Combiner。注:
1.combiner
每一个map都可能会产生大量的本地输出,Combiner的作用就是对map端的输出先做一次合并,以减少在map和reduce节点之间的数据传输量,以提高网络IO性能,是MapReduce的一种优化手段之一。
combiner中文叫做数据规约。数据归约是指在尽可能保持数据原貌的前提下,最大限度地精简数据量。
combiner是MR程序中Mapper和Reducer之外的一种组件,默认情况下不启用。
2.partitioner
partitioner决定了Map Task输出的每条数据交给哪个Reduce Task处理
概括:
每个Map任务分配缓存,默认100MB缓存
设置溢写比例0.8
分区默认采用哈希函数
排序是默认的操作
排序后可以合并(Combine)
合并不能改变最终结果
在Map任务全部结束之前进行归并
归并得到一个大的文件,放在本地磁盘
文件归并时,如果溢写文件数量大于预定值(默认是3)则可以再次启动Combiner,少于3不需要
JobTracker会一直监测Map任务的执行,并通知Reduce任务来领取数据
Reduce端的Shuffle过程
1)领取数据:Reduce任务通过RPC向JobTracker询问Map任务是否已经完成,JobTracker检测到一个Map任务完成后,就会通知相关的Reduce任务来领取数据
2)归并数据:Reduce领取数据先放入缓存,从多个Map机器领回属于自己处理的那些分区的数据,因此缓存中的数据是来自不同Map机器的,先归并,再合并(如果用户定义了Combiner),写入磁盘
3)把数据输入给Reduce任务:多个溢写文件归并成一个或多个大文件,文件中的键值对是排序的;当数据很少时,不需要溢写到磁盘,直接在缓存中归并,然后输出给Reduce
小结
MapReduce应用程序执行过程
在Map端的shuffle过程是对Map的结果进行分区、排序、分割,然后将属于同一划分(分区)的输出合并在一起并写在磁盘上,最终得到一个分区有序的文件,分区有序的含义是map输出的键值对按分区进行排列,具有相同partition值的键值对存储在一起,每个分区里面的键值对又按key值进行升序排列(默认)。
问题:所有的partition对应的数据虽然是顺序存放的,但是怎么直接知道某个partition在这个文件中存放的起始位置呢?
有一个三元组记录某个partition对应的数据在这个文件中的索引:起始位置、原始数据长度、压缩之后的数据长度,一个partition对应一个三元组。
索引文件与数据文件的对应关系:
常见MapReduce应用场景
- 简单的数据统计
- 搜索引擎建索引
- 海量数据查找
- 复杂数据分析算法实现
实例分析:WordCount
WordCount程序任务:
一个WordCount的输入和输出实例:
WordCount设计思路:
首先,需要检查WordCount程序任务是否可以采用MapReduce来实现**(待处理的数据集可以分解为许多小的数据集,而且每一个小数据集都可以完全并行地进行处理)**
其次,确定MapReduce程序的设计思路
最后,确定MapReduce程序的执行过程
Map过程示意图:
用户没有定义Combiner时的Reduce过程示意图:
用户没有定义Combiner时的Reduce过程示意图
MapReduce的具体应用(了解)
MapReduce可以很好地应用于各种计算问题:
- 关系代数运算(选择、投影、并、交、差、连接)
- 分组与聚合运算
- 矩阵-向量乘法
- 矩阵乘法
MapReduce在关系代数运算中的应用
1)关系的选择运算
2)关系的投影运算
3)关系的并、交、差运算
4)关系的自然连接运算
- 假设有关系R(A, B)和S(B, C),对二者进行自然连接操作。
- 使用Map过程,把来自R的每个元组<a,b>转换成一个键值对<b, <R,a>>,其中的键就是属性B的值。把关系R包含到值中,这样做使得我们可以在Reduce阶段,只把那些来自R的元组和来自S的元组进行匹配。
- 类似地,使用Map过程,把来自S的每个元组<b,c>,转换成一个键值对<b,<S,c>>。
- 所有具有相同B值的元组被发送到同一个Reduce进程中,Reduce进程的任务是,把来自关系R和S的、具有相同属性B值的元组进行合并。
- Reduce进程的输出则是连接后的元组<a,b,c>,输出被写到一个单独的输出文件中。
分组与聚合运算
矩阵-向量乘法
文章来源:https://uudwc.com/A/ZP05
矩阵乘法
文章来源地址https://uudwc.com/A/ZP05