MapReduce阅读-上

MapReduce是一个应对实际任务重处理大规模数据集的编程模型与相关的实现,用户只需要指定map与reduce方面的代码,基础实时系统就可以为了将计算下发到集群中的不同主机,并会处理机器失效问题,负载均衡各主机的资源——内存、硬盘、网络带宽等。MapReduce的易用性使其在Google中得到广泛的应用。

Posted on April 20, 2017 in DistributedSystem, ReadPaper






1 Introduction

Google拥有很多的raw data,如爬虫文档、web请求日志,往往需要从中导出一些简单的数据。虽然应用的计算模型比较简单,但涉及到大量数据,需要成百上千的机器来协作处理计算。如何并行计算分布数据集处理故障呢?MapReduce的出现即为了解决这个问题。通过提供的编程库,用户只需要设计计算逻辑,而内部的细节,如并行化计算、容错、数据分布和负载均衡等,由MapReduce来处理,大大简化了用户的编程逻辑。

MapReduce受到lisp等函数式编程语言的启发,发现大部分的计算任务包括两个处理流程:

1.map操作:输入原始数据,对每条逻辑记录计算中间级的Key/Value对。

2.reduce操作:对Key/Value按照Key进行归纳聚合

通过上面的两个流程可以方便实施大型的并行计算操作以及通过再执行实现容错

2 Programming Model

MapReduce的计算以一组Key/Value对为输入,然后输出一组Key/Value对,用户通过编写Map和Reduce函数来控制处理逻辑。

Map函数把输入转换成一组中间的Key/Value对,MapReduce library会聚合相同key的键值对,并把所有Key的中间结果传递给Reduce函数处理。

Reduce函数接收相同的Key和其对应的一组Value,它的作用就是聚合这些Value,产生最终的结果。Reduce的输入是以迭代器的方式输入,使得MapReduce可以处理数据量比内存大的情况。

2.1 Example

以单词统计为例对MapReduce模型进行介绍,伪代码如下:

map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
    EmitIntermediate(w, "1");  //输出过渡级的键值对
reduce(String key, Iterator values):
// key: a word
// values: a list of counts 一个key有多个计算结果
int result = 0;
for each v in values:
    result += ParseInt(v);
Emit(AsString(result));  //输出结果

Map函数吐出(word, count)的K/V对,Reduce把某个单词的所有的count加起来,最终每个单词吐出一个值。

除了Map和Reduce函数之外,用户还需要指定输入和输出文件名,以及一些可选的调节的参数。

Google利用该编程模型已经实现了很多业务需求,例如大型图形计算、文本处理、数据挖掘、机器学习、统计等。

2.2 Types

Map和Reduce函数的操作可以抽象的表示为: 
map    (k1,v2)          ======>list(k2,v2)
reduce (k2, list(v2))   ======>list(v2)
 
如上所示,map函数生成一系列的K/V中间结果,然后reduce对每个key,聚合其value。

2.3 More Example

Distributed Grep

1.对于map,如果输入的行匹配到相应的pattern,则吐出这行

2.对于reduce,仅仅是把map吐出的行拷贝到输出中

Count of URL Access Frequency

1.对于map,处理web日志,生成(URL, 1)中间结果

2.对于reduce,聚合相同URL的值,生成(URL, total count)结果

Reverse Web-Link Graph(查询一个URL被什么资源引用)

1.对于map,吐出(target, source)中间结果,其中target是被source引用的URL

2.对于reduce,聚合相同target的source,吐出(target, list(source))

Term-Vector per Host(Term Vector指的是一篇文档中的(word, frequency)K/V对)

1.对于map,吐出(hostname, term vector)中间结果

2.对于reduce,聚合相同hostname的term vector,吐出最终(hostname, term vector)


Inverted Index

1.对于map,吐出一系列的(word, document ID)

2.对于reduce,对相同word,按照document ID排序进行聚合,吐出(word, list(document ID))

Distributed Sort

1.对于map,吐出(key, record)中间结果(key是序号,record是那个被排序的目标)

2.对于reduce,把map的中间结果写入到结果文件中,这里不需要显式地排序,因为MapReduce会自动地按照key来排序,方便在reduce的时候进行聚合。

3 Implementation

根据不同的环境,MapReduce的实现可以多种多样,例如,基于共享内存的,基于NUMA多核环境的,以及基于多台机器组成的集群环境的。 
而MapReduce的编程环境是多台千兆以太网相连的PC组成的大型计算机集群

Google的环境如下

1.双核X86系统,运行linux系统,2-4GB内存。

2.100M或1000M带宽网卡

3.集群由大量机器组成,故障是常态

4.每台机器使用廉价的IDE磁盘,采用GFS作为底层存储

5.使用一个调度系统来处理用户的任务

文件系统采用GFS提供可靠与可用的服务,用户只需提交jobs到调度系统上,每一个jobs包含一系列的tasks,任务被布置到集群中可用的机器上。

3.1 Execution Overview

Map会自动地把输入数据划分成M份,这些数据划分可以并行地被不同机器处理。Reduce按照划分函数划分数据,例如hash(key) mod R,其中R是由用户指定的。下图描述了MapReduce的整个流程,如下 

image.png


1.MapReduce library会把输入文件划分成多个16到64MB大小的分片(大小可以通过参数调节),然后在一组机器上启动程序。 
2.其中比较特殊的程序是master,剩下的由master分配任务的程序叫worker。总共有M个map任务和R个reduce任务需要分配,master会选取空闲的worker,然后分配一个map任务或者reduce任务。 
3.处理map任务的worker会从输入分片读入数据,解析出输入数据的K/V对,然后传递给Map函数,生成的K/V中间结果会缓存在内存中。 
4.map任务的中间结果会被周期性地写入到磁盘中,以partition函数来分成R个部分。R个部分的磁盘地址会推送到master,然后由它转发给响应的reduce worker。 
5.当reduce worker接收到master发送的地址信息时,它会通过RPC来向map worker读取对应的数据。当reduce worker读取到了所有的数据,它先按照key来排序,方便聚合操作。 
6.reduce worker遍历排序好的中间结果,对于相同的key,把其所有数据传入到Reduce函数进行处理,生成最终的结果会被追加到结果文件中。 
7.当所有的map和reduce任务都完成时,master会唤醒用户程序,然后返回到用户程序空间执行用户代码。

成功执行后,输出结果在R个文件中,通常,用户不需要合并这R个文件,因为,可以把它们作为新的MapReduce处理逻辑的输入数据,或者其它分布式应用的输入数据。

3.2 Master Data Structures

master维护了以下信息:

1.对每个map和reduce任务,记录了任务状态,包括idle,in-progress或completed,并且对于非idle状态的任务还记录了worker机器的信息

2.记录了map任务生成R个部分的文件位置信息,方便推给reduce worker

3.3 Fault Tolerance

由于MapReduce是的出现是为了处理大量数据以及在大量主机上运行的,由此需要优雅地容错。

3.3.1 Handling Worker Failures

master采用ping的方式(心跳)检测故障,如果一台worker机器在一定时间内没有响应,则认为这台机器故障。

1.对于map任务机器故障,完成了的map任务也需要完全重新执行,因为计算结果是存储在map任务所在机器的本地磁盘上的。而且即使在同一台主机上,不同时间的map操作的结果存放点不同,因此可以重新再执行map操作。 
当一个map任务开始由A来执行,而后挂掉后由B来执行,所有的为接收改任务数据的reduce任务的机器都会收到新的通知。一个已经完成任务或者已经fail的map worker会被重置会idle状态,等待下次被调用。

2.对于完成了的reduce任务则不需要重新执行,因为结果已经输出到GFS中。

3.3.2 Handling Master Failures

可以通过定期的checkpoint来保存状态,master挂掉后,可以回到最近checkpoint所在的状态。

但google没有采用这种方案,因为任务master挂掉概率极小,只需要让应用重试这次操作。

3.3.3 Semantics in the Presence of Failure

当用户提供的Map和Reduce函数的执行结果是确定的,那么最终的执行结果就是确定的。

当用户提供的执行结果不是确定的,那么最终结果也是不确定的,但是每个reduce任务产生的结果都是不确定的某次串行执行的结果。

当reduce任务被完成,就会原子地重命名其永久的输出文件。若相同的reduce task被不同的worker执行,其输出文件将会由重命名程序进行统一的命名。以此确保一个输出文件是由一个进程所制造的。

3.4 Locality

由于输入数据是存储在GFS上的,所以,MapReduce为了减少网络通信,采取了以下优化策略: 
1.因为GFS是按照64MB的chunk来存储数据的,这样可以把worker按照这个信息调度,尽量是每个worker都起到相应的GFS副本上,这样输入基本上是走本地磁盘 
2.如果上面的条件无法满足,那么尽量找一台和GFS副本机器在同一个交换机的主机,即做副本邻近主机生成worker

3.5 Task Granularity

MapReduce将map任务分成M份,reduce任务分成R份,理想状态M和R的值应该比worker机器大很多,这样有助于负载均衡以及故障恢复。因为当一台机器挂掉后,它的map任务可以分配给很多其他的机器执行。

实际应用中,因为master需要O(M+R)的空间来做调度决策,需要存储O(M*R)的任务产生的结果位置信息,对于每个任务产生的结果位置信息大约每个任务需要一个字节。

通常R的数量是由用户执行的,实际应用中对M的划分是要保证一个分片的数据量大小大约是16-64M,R的期望值是一个比较小的数。典型的M和R的值为 M = 200000,R = 5000,使用2000台worker机器。

3.6 Backup Tasks

通常,在执行过程中,会有少数几台机器的执行特别慢,可能是由于磁盘故障等原因引起的,这些机器会大大地增加任务的执行时间,MapReduce采用的方案是:

当一个MapReduce操作快执行完成的时候,master会生成正在进行的任务的备份任务。备份任务和源任务做的是同样的事情,只要其中一个任务执行完成,就认为该任务执行完成。 
该机制在占有很少的计算资源的情况下,大大缩短了任务的执行时间。