3.实现
MapReduce模型有多种实现方式,一种是小型共享内存式,一种是基于NUMA架构的大型多处理器。
Google的实现基于:
(1)x86架构、Linux操作系统、双核处理器,4G内存;
(2)普通网络设备,100M或1000M带宽;
(3)成千上百台普通PC机器,故障是常态(GFS里也常提到这个,故障是常态);
(4)存储为廉价的IDE硬盘;
(5)用户提交的作业(job)由系统统一调度:每个工作包含一系列任务(task),调度系统会完成任务的分发、分配;
3.1执行流程
通过将Map的输入数据集分割为M个Map片段集合,可将这些Map片段集合分配到多台机器上执行Map,以实现并行处理;
Map产生的结果数据集又被分配为R个数据分区,该项工作由 分区函数 完成,分区数量R和分区函数都由用户指定,如:
hash(key) mod R,下图展示了MapReduce实现中操作的全部流程:
图一:MapReduce执行流程
当用户调用MapReduce时,将发生以下一系列动作:
(1)用户调用MapReduce,将输入数据分成M个数据片段集合,然后在集群中创建大量程序副本(fork);
(2)这些副本中有一个主程序(master),其他均为工作程序(worker),任务的分配由master完成,它将M个map任务和R个reduce任务分给不同的worker;
(3)被分配到map任务的worker,从数据片段集合中读取kv对,交给用户的Map函数处理,生成的结果kv对放在缓存中;
(4)缓存中的kv对通过分区函数,分成R个区域,周期性的写回本地磁盘,由master再把它们传给负责reduce的worker进行处理;
(5)负责reduce工作的worker从远程读取中间kv数据,对key进行排序,使得相同的key的数据聚合在一起(如果数据量太大,可外部排序);
(6)相同key的中间数据,其value集合交给用户的reduce函数处理,处理结果写入对应分区的输出文件;
(7)所以map和reduce的worker都结束工作后,master唤醒用户程序,MapReduce调用返回,结果被输出到了R个文件中。
3.2master
master会存储一些元信息(GFS的master也用来存元信息),包括每个map和reduce的状态,是“空闲”、“工作”还是“完成”,它当然也保存各个worker机器的标识;
每个map完成后,master会知道R个中间kv数据集合的位置,并把这些数据推给reduce处理;
master就像一个管道,R个中间文件从这个管道从map传递给reduce。
3.3容错
(1)worker失效
master会周期性的ping每个worker,约定时间内仍未收到worker返回的信息,master将标记这个worker失效,这个任务将会分配给其他worker;
失效的worker的状态会重置为空闲,等待其他任务调度;
一个map任务如果先被worker A执行,失效后调度给worker B执行,master会将“重新执行”的命令通知给所有reduce的worker,数据将从worker B中读取;
(2)master失效
一个简单的方法是,将元数据写入磁盘,即加如检查点(checkpoint)。master任务失败了,由另一个master读取检查点,继续执行。
现实的实现是,只设置一个master进程,master失效就终止MapReduce计算,并通知用户,可以让其选择重新执行。
(3)原子性提交
在用户提供的map、reduce函数一定,且无出错的情况下,MapR的产出一定是一样的,使用“原子性提交”来保证这个特性。
每个map完成时,会产生R个私有临时文件,该任务完成的通知,以及R个文件的元信息会传递给master(如果该map任务有多个worker执行,后续的完成通知将忽略);
每个reduce完成时,会产生1个最终的输出文件,其文件名唯一(如果该reduce任务有多个worker执行,GFS提供的文件唯一性,或者叫文件重命名原子性将保证数据只有一份);
3.4中间数据的存储
中间数据的存储都由GFS(Google File System)管理,GFS保证每个文件按照64M一个block分隔,每个block的副本保存在多台机器上。
为了减少数据传输成本,map的任务调度会尽量放到副本机器上执行(实在不行,则调度到较近的机器,实施拷贝);
这样是为了保证大部分数据都是从本地读取,减少网络带宽,提高运行效率。
3.5任务粒度
map分成M个片段执行,reduce分成了R个片段执行,理论上M和R比worker机器数量多得多,任务分配需要具备负载均衡能力;
worker机器的故障需要有快速恢复能力,故障机器上的任务又能快速分配到其他worker机器上去;
实际上,具体实现中M和R都有一定限制,master必须执行O(M+R)次调度,并保存O(M*R)个状态(这个很好理解,每个M要产出R份数据);
进一步,R值通常由用户指定,M根据经验,一个任务大约处理64M的输入数据(本地数据存储优化最有效),通常M=200000,R=5000,worker机器数量=2000。
3.6备用任务
影响一个MapReduce总执行时间,最主要的因素往往是“落伍者”,即短板,常常被称作“长尾效应”:一台机器花了很长的时间才处理完最后几个map或reducer(原因也是多种多样的,例如硬盘坏了,数据读取速度奇慢无比等),导致总执行时间超时。
有一个“备用任务”机制解决“长尾效应”,master启用备用任务完成短板任务。
评论关闭。