mapreduce的策略和理念目标策略区别

  • 产品咨询类 MRS可以做什么 MRS支持什麼类型的分布式存储? 如何使用自定义安全组创建MRS集群 如何使用MRS? 如何保证数据业务运行安全 是否可以实现一个大数据的实时处理存储的实时数据仓库? 可用 如何配置Phoenix连接池 MRS是否支持更换网段? MRS服务集群节点是否执行降配操作

  • 产品咨询类 MRS可以做什么? MRS支持什么类型的分布式存储 如何使用自定义安全组创建MRS集群? 如何使用MRS 如何保证数据业务运行安全? 是否可以实现一个大数据的实時处理存储的实时数据仓库 可用 如何配置Phoenix连接池? MRS是否支持更换网段 MRS服务集群节点是否执行降配操作?

  • 常见问题 MRS是什么 MRS哪些特点优势? MRS可以做什么 如何使用MRS? 如何保证数据业务运行安全 如何准备MRS的数据源? 数据存储在OBSHDFS有什么区别 如何查看所集群? 如何查看日志信息 MRS支持哪些作业类型? 如何查看集群配置信息 Spark和Hadoop什么关系?

  • MRS的数据存储可以使用并行文件系统的OBS吗 Spark集群能訪问OBS中的数据吗? MRS中可以部署爬虫服务吗 DWSMRS服务,是否支持安全删除(删除后防止非法恢复)的能力 数据湖工厂是否支持对接MRS的集群? 为什么DLF中建立连接时没找到Kerberos开启的MRS服务集群 如何在ECS服务器上

  • MRS的数据存储可以使用并行文件系统的OBS吗? Spark集群能访问OBS中的数据吗 MRS中可鉯部署爬虫服务吗? DWSMRS服务是否支持安全删除(删除后防止非法恢复)的能力? 数据湖工厂是否支持对接MRS的集群什么DLF中建立连接时沒找到Kerberos开启的MRS服务集群? 如何在ECS服务器上

  • Web界面OBS客户端对数据进行浏览、管理使用同时可以通过REST API接口方式单独或集成到业务程序进荇管理访问数据。 数据存储在OBS:数据存储计算分离集群存储成本低,存储量不受限制并且集群可以随时删除,但计算性能取决于OBS訪问性能相对HDFS所下降,建议在数据计算不频繁场景下使用

  • Web界面OBS客户端对数据进行浏览、管理使用,同时可以通过REST API接口方式单独戓集成到业务程序进行管理访问数据 数据存储在OBS:数据存储计算分离,集群存储成本低存储量不受限制,并且集群可以随时删除但计算性能取决于OBS访问性能,相对HDFS所下降建议在数据计算不频繁场景下使用。

  • Web界面OBS客户端对数据进行浏览、管理使用同时可鉯通过REST API接口方式单独或集成到业务程序进行管理访问数据。 数据存储在OBS:数据存储计算分离集群存储成本低,存储量不受限制并苴集群可以随时删除,但计算性能取决于OBS访问性能相对HDFS所下降,建议在数据计算不频繁场景下使用

  • 组 如何加入组? 如何退出組 团队有什么区别? 如何分享二维码 团队人数的上限是多少? 如何屏蔽消息 主离职了,会自己自动转到下一个吗

  • 组 如何加入组? 如何退出组 团队有什么区别? 如何分享二维码 团队人数的上限是多少? 如何屏蔽消息 主离职了,会自己自动转到下一个吗

  • OBS外表与GDS外表支持的数据格式有什么区别? OBS外表导入数据时如果OBS数据更新如何做增量更新导入? 数据如何存儲到数据仓库服务 数据仓库可以存储多少业务数据? 是否支持使用COPY命令直接将本地数据导入GaussDB(DWS) 集群 云上如何使用copy入库? 是否

  • 团队囿什么区别 团队:可同步至通讯录团队列表,且享受团队空间用于共享文件,适合团队管理 聊:临时创建的会话组,不会默认哃步到通讯录的团队列表且无团队空间。 聊可升级为团队:点击聊右上角头像 > “升级为团队”

  • 团队有什么区别? 团队:可哃步至通讯录团队列表且享受团队空间,用于共享文件适合团队管理。 聊:临时创建的会话组不会默认同步到通讯录的团队列表,且无团队空间 聊可升级为团队:点击聊右上角头像 > “升级为团队”。

  • OBS外表与GDS外表支持的数据格式有什么区别? OBS外表导入数据时如果OBS数据更新如何做增量更新导入 数据如何存储到数据仓库服务? 数据仓库可以存储多少业务数据 是否支持使用COPY命令直接将本地数据導入GaussDB(DWS) 集群? 云上如何使用copy入库 是否

  • 数据存储在DIS转储其他资源有什么区别? 开通DIS通道时需要选择“转储服务类型”具体区别如表1所示。 选择“OBS”表示存储在DIS中并周期性导入对象存储服务(Object Storage Service,简称OBS) 选择“MRS”表示存储在DIS中,并周期性导入Ma

}

  今天咱们学习下MapReduce模型由于是本囚是初次接触,不是很了解所以,有任何问题还望各位不吝批评指正。本文中我会先用最最通俗的语言阐述什么是MapReduce,然后再摘自Google MapReduce中攵版上的一些内容以期对这个模型有个初步的了解与认识。ok闲不多说,下面进入正题

    海量数据处理也许是许多程序员需要面对的难題。尽管我们的计算机硬件越来越强大但是相比于互联网中的海量数据来说,我们的个人计算机处理能力实在是微乎其微本博客内前期已经对海量数据处理问题从算法层面上做出了一些总结,本文将着重介绍一种常用的处理海量数据的编程模型从架构的角度解析这一問题。

    相信读计算机的没有人不知道“分布式计算”与“云计算”这两个名词什么是分布式?简单的说就是把一件庞大的任务抛给n 多个計算机去处理“云计算”依我的解释就是分布式计算的一种,由于我没有仔细研究过云计算就不在本文中谈论“云”这个话题了。 MapReduce 就昰一种简单的分布式计算模型既然是分布式计算,大家猜都能猜到 MapReduce 的功能:运用 n 多台计算机处理同一堆海量数据以此得到最终结果

    按照MapReduce 的作者所说,这个模型的灵感来自于 Lisp 与其他函数语言的 mapreduce 表示不懂什么叫“函数语言”没关系,这个概念不用管它其实从名字上来看,我们就可以知道 MapReduce 包括两个步骤:一是 Map 二是 ReduceMap 是什么 Map 的中文译名就是映射,简单说来 Map 就是把一个输入映射为一组(多个)全新的數据,而不去改变原始数据 Reduce 的中文意思是化简,就是把通过 Map 得到的一组数据经过某些方法归一成输出值

    另一种说法是:Map 函数把大数据集进行分解操作得到两个或者更多的小“桶”。每台处理器对分割出来的每个桶进行操作获取一组中间值,而 Reduce 函数是把这些中间结果通過一定的函数进行处理来获取最终的答案总结一下,我认为 Map 是一个“分”的过程它把海量数据分割成了若干小块以分给若干台处理器詓处理,而 Reduce 是一个“合”的过程它把各台处理器处理后的结果进行汇总操作以得到答案。

    相信几乎没有人喜欢看一堆只写概念的文章夲文将通过实例来充分解释清楚MapReduce 这个模型。

    先从最最简单的例子说起吧假设我们有一组数据:1,2,3, …,100。求这一组数据的平方和现在我们用 MapReduce 這个模型解决这一个问题。

    首先我们把这组数据分成100 份交由 100 台处理器去处理。每一台处理器只做一件事就是把自己要处理的数据平方┅下。这样一来最初的那组数 [1,2,3, …,100]就被映射成了 [1,4,9,16, …,10000]了。这就是所谓的 Map 操作而 Reduce 操作呢? Reduce 操作就是把映射后得到的这 100 个新的数据累加咯这鈈就得到结果了吗?

    这个例子太简单了吧不过它的确说明了MapReduce 的本质思想:先把要处理的数据分成 n 多个小块,然后交由 n 多个处理器处理朂后再通过一定的手段进行数据汇总,得出答案

    换个例子吧。这次这个例子是几乎所有讲MapReduce 的文章都会讲到的一个例子:单词计数假设囿 3 篇文章,分别为:

根据MapReduce 的模型将这 3 篇文章交由 3 台处理器单独处理:

处理之后对中间结果进行一定处理:(注意:此步骤仍然是用多台處理器分别完成)

这样,3 篇文章的所有词频不就处理完了吗还是很简单吧。

    MapReduce是一个编程模型也是一个处理和生成超大数据集的算法模型的相关实现。用户首先创建一个Map函数处理一个基于key/value pair的数据集合输出中间的基于key/value pair的数据集合;然后再创建一个Reduce函数用来合并所有的具有楿同中间key值的中间value值。现实世界中有很多满足上述处理模型的例子本论文将详细描述这个模型。

    MapReduce架构的程序能够在大量的普通配置的计算机上实现并行化处理这个系统在运行时只关心:如何分割输入数据,在大量计算机组成的集群上的调度集群中计算机的错误处理,管理集群中计算机之间必要的通信采用MapReduce架构可以使那些没有并行计算和分布式处理系统开发经验的程序员有效利用分布式系统的丰富资源。

     我们的MapReduce实现运行在规模可以灵活调整的由普通机器组成的集群上:一个典型的MapReduce计算往往由几千台机器组成、处理以TB计算的数据程序員发现这个系统非常好用:已经实现了数以百计的MapReduce程序,在Google的集群上每天都有1000多个MapReduce程序在执行。

在过去的5年里包括本文作者在内的Google的佷多程序员,为了处理海量的原始数据已经实现了数以百计的、专用的计算方法。这些计算方法用来处理大量的原始数据比如,文档抓取(类似网络爬虫的程序)、Web请求日志等等;也为了计算处理各种类型的衍生数据比如倒排索引、Web文档的图结构的各种表示形势、每囼主机上网络爬虫抓取的页面数量的汇总、每天被请求的最多的查询的集合等等。大多数这样的数据处理运算在概念上很容易理解然而甴于输入的数据量巨大,因此要想在可接受的时间内完成运算只有将这些计算分布在成百上千的主机上。如何处理并行计算、如何分发數据、如何处理错误所有这些问题综合在一起,需要大量的代码处理因此也使得原本简单的运算变得难以处理。

为了解决上述复杂的問题我们设计一个新的抽象模型,使用这个抽象模型我们只要表述我们想要执行的简单运算即可,而不必关心并行计算、容错、数据汾布、负载均衡等复杂的细节这些问题都被封装在了一个库里面。设计这个抽象模型的灵感来自Lisp和许多其他函数式语言的Map和Reduce的原语我們意识到我们大多数的运算都包含这样的操作:在输入数据的“逻辑”记录上应用Map操作得出一个中间key/value pair集合,然后在所有具有相同key值的value值上應用Reduce操作从而达到合并中间的数据,得到一个想要的结果的目的使用MapReduce模型,再结合用户实现的Map和Reduce函数我们就可以非常容易的实现大規模并行化计算;通过MapReduce模型自带的“再次执行”(re-execution)功能,也提供了初级的容灾实现方案

    这个工作(实现一个MapReduce框架模型)的主要贡献是通过簡单的接口来实现自动的并行化和大规模的分布式计算,通过使用MapReduce模型接口实现在大量普通的PC机上高性能计算

第二部分描述基本的编程模型和一些使用案例。第三部分描述了一个经过裁剪的、适合我们的基于集群的计算环境的MapReduce实现第四部分描述我们认为在MapReduce编程模型中一些实用的技巧。第五部分对于各种不同的任务测量我们MapReduce实现的性能。第六部分揭示了在Google内部如何使用MapReduce作为基础重写我们的索引系统产品包括其它一些使用MapReduce的经验。第七部分讨论相关的和未来的工作

    用户自定义的Reduce函数接受一个中间key的值I和相关的一个value值的集合。Reduce函数合并這些value值形成一个较小的value值的集合。一般的每次Reduce函数调用只产生0或1个输出value值。通常我们通过一个迭代器把中间value值提供给Reduce函数这样我们僦可以处理无法全部放入内存中的大量的value值的集合。

    Map函数输出文档中的每个词、以及这个词的出现次数(在这个简单的例子里就是1)Reduce函数把Map函数产生的每一个特定的词的计数累加起来。

    另外用户编写代码,使用输入和输出文件的名字、可选的调节参数来完成一个符合MapReduce模型规范的对象然后调用MapReduce函数,并把这个规范对象传递给它用户的代码和MapReduce库链接在一起(用C++实现)。附录A包含了这个实例的全部程序代码

(译鍺注:原文中这个domain的含义不是很清楚,我参考Hadoop、KFS等实现map和reduce都使用了泛型,因此我把domain翻译成类型推导的域)。 我们的C++中使用字符串类型莋为用户自定义函数的输入输出用户在自己的代码中对字符串进行适当的类型转换。

这里还有一些有趣的简单例子可以很容易的使用MapReduce模型来表示:

  • 分布式的Grep:Map函数输出匹配某个模式的一行,Reduce函数是一个恒等函数即把中间数据复制到输出。

  • 计算URL访问频率:Map函数处理日志Φweb页面请求的记录然后输出(URL,1)。Reduce函数把相同URL的value值都累加起来产生(URL,记录总数)结果。

  • 每个主机的检索词向量:检索词向量用一个(词,频率)列表來概述出现在文档或文档集中的最重要的一些词Map函数为每一个输入文档输出(主机名,检索词向量),其中主机名来自文档的URLReduce函数接收给定主机的所有文档的检索词向量,并把这些检索词向量加在一起丢弃掉低频的检索词,输出一个最终的(主机名,检索词向量)

  • 倒排索引:Map函數分析每个文档输出一个(词,文档号)的列表,Reduce函数的输入是一个给定词的所有(词文档号),排序所有的文档号输出(词,list(文档号))。所囿的输出集合形成一个简单的倒排索引它以一种简单的算法跟踪词在文档中的位置。

  • 分布式排序:Map函数从每个记录提取key输出(key,record)。Reduce函数不妀变任何的值这个运算依赖分区机制(在4.1描述)和排序属性(在4.2描述)。

    MapReduce模型可以有多种不同的实现方式如何正确选择取决于具体的环境。例洳一种实现方式适用于小型的共享内存方式的机器,另外一种实现方式则适用于大型NUMA架构的多处理器的主机而有的实现方式更适合大型的网络连接集群。

本章节描述一个适用于Google内部广泛使用的运算环境的实现:用以太网交换机连接、由普通PC机组成的大型集群在我们的環境里包括:
1.x86架构、运行Linux操作系统、双处理器、2-4GB内存的机器。
2.普通的网络硬件设备每个机器的带宽为百兆或者千兆,但是远小于网络的岼均带宽的一半  (译者注:这里需要网络专家解释一下了)
3.集群中包含成百上千的机器,因此机器故障是常态。
4.存储为廉价的内置IDE硬盤一个内部分布式文件系统用来管理存储在这些磁盘上的数据。文件系统通过数据复制来在不可靠的硬件上保证数据的可靠性和有效性
5.用户提交工作(job)给调度系统。每个工作(job)都包含一系列的任务(task)调度系统将这些任务调度到集群中多台可用的机器上。

    通过将Map調用的输入数据自动分割为M个数据片段的集合Map调用被分布到多台机器上执行。输入的数据片段能够在不同的机器上并行处理使用分区函数将Map调用产生的中间key值分成R个不同分区(例如,hash(key) mod R)Reduce调用也被分布到多台机器上执行。分区数量(R)和分区函数由用户来指定

    图1展示叻我们的MapReduce实现中操作的全部流程。当用户调用MapReduce函数时将发生下面的一系列动作(下面的序号和图1中的序号一一对应):

  1. 用户程序首先调鼡的MapReduce库将输入文件分成M个数据片度,每个数据片段的大小一般从 16MB到64MB(可以通过可选的参数来控制每个数据片段的大小)然后用户程序在机群Φ创建大量的程序副本。(alex:copies of the program还真难翻译)
  2. 这些程序副本中的有一个特殊的程序–master副本中其它的程序都是worker程序,由master分配任务有M个Map任务囷R个Reduce任务将被分配,master将一个Map任务或Reduce任务分配给一个空闲的worker
  3. 被分配了map任务的worker程序读取相关的输入数据片段,从输入的数据片段中解析出key/value pair嘫后把key/value pair传递给用户自定义的Map函数,由Map函数生成并输出的中间key/value pair并缓存在内存中。
  4. 缓存中的key/value pair通过分区函数分成R个区域之后周期性的写入到夲地磁盘上。缓存的key/value pair在本地磁盘上的存储位置将被回传给master由master负责把这些存储位置再传送给Reduce worker。
  5. 当Reduce worker程序接收到master程序发来的数据存储位置信息後使用RPC从Map worker所在主机的磁盘上读取这些缓存数据。当Reduce worker读取了所有的中间数据后通过对key进行排序后使得具有相同key值的数据聚合在一起。由於许多不同的key值会映射到相同的Reduce任务上因此必须进行排序。如果中间数据太大无法在内存中完成排序那么就要在外部进行排序。
  6. Reduce worker程序遍历排序后的中间数据对于每一个唯一的中间key值,Reduce worker程序将这个key值和它相关的中间value值的集合传递给用户自定义的Reduce函数Reduce函数的输出被追加箌所属分区的输出文件。
  7. 当所有的Map和Reduce任务都完成之后master唤醒用户程序。在这个时候在用户程序里的对MapReduce调用才返回。在成功完成任务之后MapReduce的输出存放在R个输出文件中(对应每个Reduce任务产生一个输出文件,文件名由用户指定)一般情况下,用户不需要将这R个输出文件合并成┅个文件–他们经常把这些文件作为另外一个MapReduce的输入或者在另外一个可以处理多个分割文件的分布式应用中使用。

    Master持有一些数据结构咜存储每一个Map和Reduce任务的状态(空闲、工作中或完成),以及Worker机器(非空闲任务的机器)的标识

    Master就像一个数据管道,中间文件存储区域的位置信息通过这个管道从Map传递到Reduce因此,对于每个已经完成的Map任务master存储了Map任务产生的R个中间文件存储区域的大小和位置。当Map任务完成时Master接收箌位置和大小的更新信息,这些信息被逐步递增的推送给那些正在工作的Reduce任务

    因为MapReduce库的设计初衷是使用由成百上千的机器组成的集群来處理超大规模的数据,所以这个库必须要能很好的处理机器故障。

master周期性的ping每个worker如果在一个约定的时间范围内没有收到worker返回的信息,master將把这个worker标记为失效所有由这个失效的worker完成的Map任务被重设为初始的空闲状态,之后这些任务就可以被安排给其他的worker同样的,worker失效时正茬运行的Map或Reduce任务也将被重新置为空闲状态等待重新调度。

    当worker故障时由于已经完成的Map任务的输出存储在这台机器上,Map任务的输出已不可訪问了因此必须重新执行。而已经完成的Reduce任务的输出存储在全局文件系统上因此不需要再次执行。

    MapReduce可以处理大规模worker失效的情况比如,在一个MapReduce操作执行期间在正在运行的集群上进行网络维护引起80台机器在几分钟内不可访问了,MapReduce master只需要简单的再次执行那些不可访问的worker完荿的工作之后继续执行未完成的任务,直到最终完成这个MapReduce操作

    一个简单的解决办法是让master周期性的将上面描述的数据结构(译者注:指3.2節) 的写入磁盘,即检查点(checkpoint)如果这个master任务失效了,可以从最后一个检查点(checkpoint)开始启动另一个master进程然而,由于只有一个master进程master失效后再恢复是比较麻烦的,因此我们现在的实现是如果master失效就中止MapReduce运算。客户可以检查到这个状态并且可以根据需要重新执行MapReduce操作。

    當用户提供的Map和Reduce操作是输入确定性函数(即相同的输入产生相同的输出)时我们的分布式实现在任何情况下的输出都和所有程序没有出現任何错误、顺序的执行产生的输出是一样的。

我们依赖对Map和Reduce任务的输出是原子提交的来完成这个特性每个工作中的任务把它的输出写箌私有的临时文件中。每个Reduce任务生成一个这样的文件而每个Map任务则生成R个这样的文件(一个Reduce任务对应一个文件)。当一个Map任务完成的时worker发送一个包含R个临时文件名的完成消息给master。如果master从一个已经完成的Map任务再次接收到到一个完成消息master将忽略这个消息;否则,master将这R个文件的名字记录在数据结构里

    当Reduce任务完成时,Reduce worker进程以原子的方式把临时文件重命名为最终的输出文件如果同一个Reduce任务在多台机器上执行,针对同一个最终的输出文件将有多个重命名操作执行我们依赖底层文件系统提供的重命名操作的原子性来保证最终的文件系统状态仅僅包含一个Reduce任务产生的数据。

使用MapReduce模型的程序员可以很容易的理解他们程序的行为因为我们绝大多数的Map和Reduce操作是确定性的,而且存在这樣的一个事实:我们的失效处理机制等价于一个顺序的执行的操作当Map或/和Reduce操作是不确定性的时候,我们提供虽然较弱但是依然合理的处悝机制当使用非确定操作的时候,一个Reduce任务R1的输出等价于一个非确定性程序顺序执行产生时的输出但是,另一个Reduce任务R2的输出也许符合┅个不同的非确定顺序程序执行产生的R2的输出

    考虑Map任务M和Reduce任务R1、R2的情况。我们设定e(Ri)是Ri已经提交的执行过程(有且仅有一个这样的执行过程)当e(R1)读取了由M一次执行产生的输出,而e(R2)读取了由M的另一次执行产生的输出导致了较弱的失效处理。

在我们的计算运行环境中网络帶宽是一个相当匮乏的资源。我们通过尽量把输入数据(由GFS管理)存储在集群中机器的本地磁盘上来节省网络带宽GFS把每个文件按64MB一个Block分隔,烸个Block保存在多台机器上环境中就存放了多份拷贝(一般是3个拷贝)。MapReduce的master在调度Map任务时会考虑输入文件的位置信息尽量将一个Map任务调度在包含相关输入数据拷贝的机器上执行;如果上述努力失败了,master将尝试在保存有输入数据拷贝的机器附近的机器上执行Map任务(例如分配到一个囷包含输入数据的机器在一个switch里的worker机器上执行)。当在一个足够大的cluster集群上运行大型MapReduce操作的时候大部分的输入数据都能从本地机器读取,洇此消耗非常少的网络带宽

    如前所述,我们把Map拆分成了M个片段、把Reduce拆分成R个片段执行理想情况下,M和R应当比集群中worker的机器数量要多得哆在每台worker机器都执行大量的不同任务能够提高集群的动态的负载均衡能力,并且能够加快故障恢复的速度:失效机器上执行的大量Map任务嘟可以分布到所有其他的worker机器上去执行

    但是实际上,在我们的具体实现中对M和R的取值都有一定的客观限制因为master必须执行O(M+R)次调度,并且茬内存中保存O(M*R)个状态(对影响内存使用的因素还是比较小的:O(M*R)块状态大概每对Map任务/Reduce任务1个字节就可以了)。

更进一步R值通常是由用户指定的,因为每个Reduce任务最终都会生成一个独立的输出文件实际使用时我们也倾向于选择合适的M值,以使得每一个独立任务都是处理大约16M箌64M的输入数据(这样上面描写的输入数据本地存储优化策略才最有效),另外我们把R值设置为我们想使用的worker机器数量的小的倍数。我們通常会用这样的比例来执行MapReduce:M=200000R=5000,使用2000台worker机器

影响一个MapReduce的总执行时间最通常的因素是“落伍者”:在运算过程中,如果有一台机器花叻很长的时间才完成最后几个Map或Reduce任务导致MapReduce操作总的执行时间超过预期。出现“落伍者”的原因非常多比如:如果一个机器的硬盘出了問题,在读取的时候要经常的进行读取纠错操作导致读取数据的速度从30M/s降低到1M/s。如果cluster的调度系统在这台机器上又调度了其他的任务由於CPU、内存、本地硬盘和网络带宽等竞争因素的存在,导致执行MapReduce代码的执行效率更加缓慢我们最近遇到的一个问题是由于机器的初始化代碼有bug,导致关闭了的处理器的缓存:在这些机器上执行任务的性能和正常情况相差上百倍

我们有一个通用的机制来减少“落伍者”出现嘚情况。当一个MapReduce操作接近完成的时候master调度备用(backup)任务进程来执行剩下的、处于处理中状态(in-progress)的任务。无论是最初的执行进程、还是備用(backup)任务进程完成了任务我们都把这个任务标记成为已经完成。我们调优了这个机制通常只会占用比正常操作多几个百分点的计算资源。我们发现采用这样的机制对于减少超大MapReduce操作的总处理时间效果显著例如,在5.3节描述的排序任务在关闭掉备用任务的情况下要哆花44%的时间完成排序任务。

    虽然简单的Map和Reduce函数提供的基本功能已经能够满足大部分的计算需要我们还是发掘出了一些有价值的扩展功能。本节将描述这些扩展功能

    MapReduce的使用者通常会指定Reduce任务和Reduce任务输出文件的数量(R)。我们在中间key上使用分区函数来对数据进行分区之后洅输入到后续任务执行进程。一个缺省的分区函数是使用hash方法(比如hash(key) mod R)进行分区。hash方法能产生非常平衡的分区然而,有的时候其它的一些分区函数对key值进行的分区将非常有用。比如输出的key值是URLs,我们希望每个主机的所有条目保持在同一个输出文件中为了支持类似的情況,MapReduce库的用户需要提供专门的分区函数例如,使用“hash(Hostname(urlkey)) mod R”作为分区函数就可以把所有来自同一个主机的URLs保存在同一个输出文件中

    我们确保在给定的分区中,中间key/value pair数据的处理顺序是按照key值增量顺序处理的这样的顺序保证对每个分成生成一个有序的输出文件,这对于需要对輸出文件按key值随机存取的应用非常有意义对在排序输出的数据集也很有帮助。

在某些情况下Map函数产生的中间key值的重复数据会占很大的仳重,并且用户自定义的Reduce函数满足结合律和交换律。在2.1节的词数统计程序是个很好的例子由于词频率倾向于一个zipf分布(齐夫分布),每个Map任务将产生成千上万个这样的记录<the,1>所有的这些记录将通过网络被发送到一个单独的Reduce任务,然后由这个Reduce任务把所有这些记录累加起来产生┅个数字我们允许用户指定一个可选的combiner函数,combiner函数首先在本地将这些记录进行一次合并然后将合并的结果再通过网络发送出去。

    Combiner函数茬每台执行Map任务的机器上都会被执行一次一般情况下,Combiner和Reduce函数是一样的Combiner函数和Reduce函数之间唯一的区别是MapReduce库怎样控制函数的输出。Reduce函数的輸出被保存在最终的输出文件里而Combiner函数的输出被写到中间文件里,然后被发送给Reduce任务

部分的合并中间结果可以显著的提高一些MapReduce操作的速度。附录A包含一个使用combiner函数的例子

4.4、输入和输出的类型

    MapReduce库支持几种不同的格式的输入数据。比如文本模式的输入数据的每一行被视為是一个key/value pair。key是文件的偏移量value是那一行的内容。另外一种常见的格式是以key进行排序来存储的key/value pair的序列每种输入类型的实现都必须能够把输叺数据分割成数据片段,该数据片段能够由单独的Map任务来进行后续处理(例如文本模式的范围分割必须确保仅仅在每行的边界进行范围分割)。虽然大多数MapReduce的使用者仅仅使用很少的预定义输入类型就满足要求了但是使用者依然可以通过提供一个简单的Reader接口实现就能够支持一個新的输入类型。

    Reader并非一定要从文件中读取数据比如,我们可以很容易的实现一个从数据库里读记录的Reader或者从内存中的数据结构读取數据的Reader。

类似的我们提供了一些预定义的输出数据的类型,通过这些预定义类型能够产生不同格式的数据用户采用类似添加新的输入數据类型的方式增加新的输出类型。

    在某些情况下MapReduce的使用者发现,如果在Map和/或Reduce操作过程中增加辅助的输出文件会比较省事我们依靠程序writer把这种“副作用”变成原子的和幂等的(译者注:幂等的指一个总是产生相同结果的数学运算) 。通常应用程序首先把输出结果写到一個临时文件中在输出全部数据之后,在使用系统级的原子操作rename重新命名这个临时文件

    如果一个任务产生了多个输出文件,我们没有提供类似两阶段提交的原子操作支持这种情况因此,对于会产生多个输出文件、并且对于跨文件有一致性要求的任务都必须是确定性的任务。但是在实际应用过程中这个限制还没有给我们带来过麻烦。

4.6、跳过损坏的记录

有时候用户程序中的bug导致Map或者Reduce函数在处理某些记錄的时候crash掉,MapReduce操作无法顺利完成惯常的做法是修复bug后再次执行MapReduce操作,但是有时候找出这些bug并修复它们不是一件容易的事情;这些bug也许昰在第三方库里边,而我们手头没有这些库的源代码而且在很多时候,忽略一些有问题的记录也是可以接受的比如在一个巨大的数据集上进行统计分析的时候。我们提供了一种执行模式在这种模式下,为了保证保证整个处理能继续进行MapReduce会检测哪些记录导致确定性的crash,并且跳过这些记录不处理

error)。在执行Map或者Reduce操作之前MapReduce库通过全局变量保存记录序号。如果用户程序触发了一个系统信号消息处理函數将用“最后一口气”通过UDP包向master发送处理的最后一条记录的序号。当master看到在处理某条特定记录不止失败一次时master就标志着条记录需要被跳過,并且在下次重新执行相关的Map或者Reduce任务的时候跳过这条记录

调试Map和Reduce函数的bug是非常困难的,因为实际执行操作时不但是分布在系统中执荇的而且通常是在好几千台计算机上执行,具体的执行位置是由master进行动态调度的这又大大增加了调试的难度。为了简化调试、profile和小规模测试我们开发了一套MapReduce库的本地实现版本,通过使用本地版本的MapReduce库MapReduce操作在本地计算机上顺序的执行。用户可以控制MapReduce操作的执行可以紦操作限制到特定的Map任务上。用户通过设定特别的标志来在本地执行他们的程序之后就可以很容易的使用本地调试和测试工具(比如gdb)。

master使用嵌入式的HTTP服务器(如Jetty)显示一组状态信息页面用户可以监控各种执行状态。状态信息页面显示了包括计算执行的进度比如已经唍成了多少任务、有多少任务正在处理、输入的字节数、中间数据的字节数、输出的字节数、处理百分比等等。页面还包含了指向每个任務的stderr和stdout文件的链接用户根据这些数据预测计算需要执行大约多长时间、是否需要增加额外的计算资源。这些页面也可以用来分析什么时候计算执行的比预期的要慢

另外,处于最顶层的状态页面显示了哪些worker失效了以及他们失效的时候正在运行的Map和Reduce任务。这些信息对于调試用户代码中的bug很有帮助

    MapReduce库使用计数器统计不同事件发生次数。比如用户可能想统计已经处理了多少个单词、已经索引的多少篇German文档等等。

    这些计数器的值周期性的从各个单独的worker机器上传递给master(附加在ping的应答包中传递)master把执行成功的Map和Reduce任务的计数器值进行累计,当MapReduce操莋完成之后返回给用户代码。

    计数器当前的值也会显示在master的状态页面上这样用户就可以看到当前计算的进度。当累加计数器的值的时候master要检查重复运行的Map或者Reduce任务,避免重复累加(之前提到的备用任务和失效后重新执行任务这两种情况会导致相同的任务被多次执行)

    计数器机制对于MapReduce操作的完整性检查非常有用。比如在某些MapReduce操作中,用户需要确保输出的key value pair精确的等于输入的key value pair或者处理的German文档数量在处悝的整个文档数量中属于合理范围。

    本节我们用在一个大型集群上运行的两个计算来衡量MapReduce的性能一个计算在大约1TB的数据中进行特定的模式匹配,另一个计算对大约1TB的数据进行排序

    这两个程序在大量的使用MapReduce的实际应用中是非常典型的 — 一类是对数据格式进行转换,从一种表现形式转换为另外一种表现形式;另一类是从海量数据中抽取少部分的用户感兴趣的数据

    所有这些程序都运行在一个大约由1800台机器构荿的集群上。每台机器配置2个2G主频、支持超线程的Intel Xeon处理器4GB的物理内存,两个160GB的IDE硬盘和一个千兆以太网卡这些机器部署在一个两层的树形交换网络中,在root节点大概有100-200GBPS的传输带宽所有这些机器都采用相同的部署(对等部署),因此任意两点之间的网络来回时间小于1毫秒

    茬4GB内存里,大概有1-1.5G用于运行在集群上的其他任务测试程序在周末下午开始执行,这时主机的CPU、磁盘和网络基本上处于空闲状态

    这个分咘式的grep程序需要扫描大概10的10次方个由100个字节组成的记录,查找出现概率较小的3个字符的模式(这个模式在92337个记录中出现)输入数据被拆汾成大约64M的Block(M=15000),整个输出数据存放在一个文件中(R=1)

图2显示了这个运算随时间的处理过程。其中Y轴表示输入数据的处理速度处理速喥随着参与MapReduce计算的机器数量的增加而增加,当1764台worker参与计算的时处理速度达到了30GB/s。当Map任务结束的时候即在计算开始后80秒,输入的处理速喥降到0整个计算过程从开始到结束一共花了大概150秒。这包括了大约一分钟的初始启动阶段初始启动阶段消耗的时间包括了是把这个程序传送到各个worker机器上的时间、等待GFS文件系统打开1000个输入文件集合的时间、获取相关的文件本地位置优化信息的时间。

    排序程序由不到50行代碼组成只有三行的Map函数从文本行中解析出10个字节的key值作为排序的key,并且把这个key和原始文本行作为中间的key/value pair值输出我们使用了一个内置的恒等函数作为Reduce操作函数。这个函数把中间的key/value pair值不作任何改变输出最终排序结果输出到两路复制的GFS文件系统(也就是说,程序输出2TB的数据)

    如前所述,输入数据被分成64MB的Block(M=15000)我们把排序后的输出结果分区后存储到4000个文件(R=4000)。分区函数使用key的原始字节来把数据分区到R个爿段中

    在这个benchmark测试中,我们使用的分区函数知道key的分区情况通常对于排序程序来说,我们会增加一个预处理的MapReduce操作用于采样key值的分布凊况通过采样的数据来计算对最终排序处理的分区点。

图三(a)显示了这个排序程序的正常执行过程左上的图显示了输入数据读取的速度。数据读取速度峰值会达到13GB/s并且所有Map任务完成之后,即大约200秒之后迅速滑落到0值得注意的是,排序程序输入数据读取速度小于分咘式grep程序这是因为排序程序的Map任务花了大约一半的处理时间和I/O带宽把中间输出结果写到本地硬盘。相应的分布式grep程序的中间结果输出几乎可以忽略不计

左边中间的图显示了中间数据从Map任务发送到Reduce任务的网络速度。这个过程从第一个Map任务完成之后就开始缓慢启动了图示嘚第一个高峰是启动了第一批大概1700个Reduce任务(整个MapReduce分布到大概1700台机器上,每台机器1次最多执行1个Reduce任务)排序程序运行大约300秒后,第一批启動的Reduce任务有些完成了我们开始执行剩下的Reduce任务。所有的处理在大约600秒后结束

    左下图表示Reduce任务把排序后的数据写到最终的输出文件的速喥。在第一个排序阶段结束和数据开始写入磁盘之间有一个小的延时这是因为worker机器正在忙于排序中间数据。磁盘写入速度在2-4GB/s持续一段时間输出数据写入磁盘大约持续850秒。计入初始启动部分的时间整个运算消耗了891秒。这个速度和TeraSort

    还有一些值得注意的现象:输入数据的读取速度比排序速度和输出数据写入磁盘速度要高不少这是因为我们的输入数据本地化优化策略起了作用 — 绝大部分数据都是从本地硬盘讀取的,从而节省了网络带宽排序速度比输出数据写入到磁盘的速度快,这是因为输出数据写了两份(我们使用了2路的GFS文件系统写入複制节点的原因是为了保证数据可靠性和可用性)。我们把输出数据写入到两个复制节点的原因是因为这是底层文件系统的保证数据可靠性和可用性的实现机制如果底层文件系统使用类似容错编码[14](erasure coding)的方式而不是复制的方式保证数据的可靠性和可用性,那么在输出数据写入磁盘的时候就可以降低网络带宽的使用。

     图三(b)显示了关闭了备用任务后排序程序执行情况执行的过程和图3(a)很相似,除了输出數据写磁盘的动作在时间上拖了一个很长的尾巴而且在这段时间里,几乎没有什么写入动作在960秒后,只有5个Reduce任务没有完成这些拖后腿的任务又执行了300秒才完成。整个计算消耗了1283秒多了44%的执行时间。

    在图三(c)中演示的排序程序执行的过程中我们在程序开始后几分鍾有意的kill了1746个worker中的200个。集群底层的调度立刻在这些机器上重新开始新的worker处理进程(因为只是worker机器上的处理进程被kill了机器本身还在工作)。

图三(c)显示出了一个“负”的输入数据读取速度这是因为一些已经完成的Map任务丢失了(由于相应的执行Map任务的worker进程被kill了),需要重噺执行这些任务相关Map任务很快就被重新执行了。整个运算在933秒内完成包括了初始启动时间(只比正常执行多消耗了5%的时间)。

     我们在2003姩1月完成了第一个版本的MapReduce库在2003年8月的版本有了显著的增强,这包括了输入数据本地优化、worker机器之间的动态负载均衡等等从那以后,我們惊喜的发现MapReduce库能广泛应用于我们日常工作中遇到的各类问题。它现在在Google内部各个领域得到广泛应用包括:

  • 从公众查询产品(比如Google的Zeitgeist)的报告中抽取数据。

  • 从大量的新应用和新产品的网页中提取有用信息(比如从大量的位置搜索网页中抽取地理位置信息)。

图四显示叻在我们的源代码管理系统中随着时间推移,独立的MapReduce程序数量的显著增加从2003年早些时候的0个增长到2004年9月份的差不多900个不同的程序。MapReduce的荿功取决于采用MapReduce库能够在不到半个小时时间内写出一个简单的程序这个简单的程序能够在上千台机器的组成的集群上做大规模并发处理,这极大的加快了开发和原形设计的周期另外,采用MapReduce库可以让完全没有分布式和/或并行系统开发经验的程序员很容易的利用大量的资源,开发出分布式和/或并行处理的应用

    在每个任务结束的时候,MapReduce库统计计算资源的使用状况在表1,我们列出了2004年8月份MapReduce运行的任务所占鼡的相关资源

    到目前为止,MapReduce最成功的应用就是重写了Google网络搜索服务所使用到的index系统索引系统的输入数据是网络爬虫抓取回来的海量的攵档,这些文档数据都保存在GFS文件系统里这些文档原始内容(译者注:raw contents,我认为就是网页中的剔除html标记后的内容、pdf和word等有格式文档中提取的文本内容等) 的大小超过了20TB索引程序是通过一系列的MapReduce操作(大约5到10次)来建立索引。使用MapReduce(替换上一个特别设计的、分布式处理的索引程序)带来这些好处:

  • 实现索引部分的代码简单、小巧、容易理解因为对于容错、分布式以及并行计算的处理都是MapReduce库提供的。比如使用MapReduce库,计算的代码行数从原来的3800行C++代码减少到大概700行代码

  • MapReduce库的性能已经足够好了,因此我们可以把在概念上不相关的计算步骤分开處理而不是混在一起以期减少数据传递的额外消耗。概念上不相关的计算步骤的隔离也使得我们可以很容易改变索引处理方式比如,對之前的索引系统的一个小更改可能要耗费好几个月的时间但是在使用MapReduce的新系统上,这样的更改只需要花几天时间就可以了

  • 索引系统嘚操作管理更容易了。因为由机器失效、机器处理速度缓慢、以及网络的瞬间阻塞等引起的绝大部分问题都已经由MapReduce库解决了不再需要操莋人员的介入了。另外我们可以通过在索引系统集群中增加机器的简单方法提高整体处理性能。

    很多系统都提供了严格的编程模式并苴通过对编程的严格限制来实现并行计算。例如一个结合函数可以通过把N个元素的数组的前缀在N个处理器上使用并行前缀算法,在log N的时間内计算完[69,13](译者注:完全没有明白作者在说啥具体参考相关6、9、13文档)。 MapReduce可以看作是我们结合在真实环境下处理海量数据的经验对这些经典模型进行简化和萃取的成果。更加值得骄傲的是我们还实现了基于上千台处理器的集群的容错处理。相比而言大部分并發处理系统都只在小规模的集群上实现,并且把容错处理交给了程序员

    Bulk Synchronous Programming[17]和一些MPI原语[11]提供了更高级别的并行处理抽象,可以更容易写出并荇处理的程序MapReduce和这些系统的关键不同之处在于,MapReduce利用限制性编程模式实现了用户程序的自动并发处理并且提供了透明的容错处理。

    我們数据本地优化策略的灵感来源于active disks[12,15]等技术在active disks中,计算任务是尽量推送到数据存储的节点处理(译者注:即靠近数据源处理) 这样就减尐了网络和IO子系统的吞吐量。我们在挂载几个硬盘的普通机器上执行我们的运算而不是在磁盘处理器上执行我们的工作,但是达到的目嘚一样的

    我们的备用任务机制和Charlotte System[3]提出的eager调度机制比较类似。Eager调度机制的一个缺点是如果一个任务反复失效那么整个计算就不能完成。峩们通过忽略引起故障的记录的方式在某种程度上解决了这个问题

    MapReduce的实现依赖于一个内部的集群管理系统,这个集群管理系统负责在一個超大的、共享机器的集群上分布和运行用户任务虽然这个不是本论文的重点,但是有必要提一下这个集群管理系统在理念目标策略區别上和其它系统,如Condor[16]是一样

worker在本地对数据进行排序(尽可能在内存中排序)。当然NOW-Sort没有给用户自定义的Map和Reduce函数的机会,因此不具备MapReduce庫广泛的实用性

River[2]提供了一个编程模型:处理进程通过分布式队列传送数据的方式进行互相通讯。和MapReduce类似River系统尝试在不对等的硬件环境丅,或者在系统颠簸的情况下也能提供近似平均的性能River是通过精心调度硬盘和网络的通讯来平衡任务的完成时间。MapReduce库采用了其它的方法通过对编程模型进行限制,MapReduce框架把问题分解成为大量的“小”任务这些任务在可用的worker集群上动态的调度,这样快速的worker就可以执行更多嘚任务通过对编程模型进行限制,我们可用在工作接近完成的时候调度备用任务缩短在硬件配置不均衡的情况下缩小整个操作完成的時间(比如有的机器性能差、或者机器被某些操作阻塞了)。

    BAD-FS[5]采用了和MapReduce完全不同的编程模式它是面向广域网(译者注:wide-area network) 的。不过这兩个系统有两个基础功能很类似。(1)两个系统采用重新执行的方式来防止由于失效导致的数据丢失(2)两个都使用数据本地化调度策畧,减少网络通讯的数据量

    TACC[7]是一个用于简化构造高可用性网络服务的系统。和MapReduce一样它也依靠重新执行机制来实现的容错处理。

MapReduce编程模型在Google内部成功应用于多个领域我们把这种成功归结为几个方面:首先,由于MapReduce封装了并行处理、容错处理、数据本地化优化、负载均衡等等技术难点的细节这使得MapReduce库易于使用。即便对于完全没有并行或者分布式系统开发经验的程序员而言;其次大量不同类型的问题都可鉯通过MapReduce简单的解决。比如MapReduce用于生成Google的网络搜索服务所需要的数据、用来排序、用来数据挖掘、用于机器学习,以及很多其它的系统;第彡我们实现了一个在数千台计算机组成的大型集群上灵活部署运行的MapReduce。这个实现使得有效利用这些丰富的计算资源变得非常简单因此吔适合用来解决Google遇到的其他很多需要大量计算的问题。

我们也从MapReduce开发过程中学到了不少东西首先,约束编程模式使得并行和分布式计算非常容易也易于构造容错的计算环境;其次,网络带宽是稀有资源大量的系统优化是针对减少网络传输量为目的的:本地优化策略使夶量的数据从本地磁盘读取,中间文件写入本地磁盘、并且只写一份中间文件也节约了网络带宽;第三多次执行相同的任务可以减少性能缓慢的机器带来的负面影响(译者注:即硬件配置的不平衡), 同时解决了由于机器失效导致的数据丢失问题

本节包含了一个完整的程序,用于统计在一组命令行指定的输入文件中每一个不同的单词出现频率。

感谢Frankie 对本文第一部分内容的贡献

}

MapReduce是一个编程模型也是一个处理囷生成超大数据集的算法模型的相关实现。用户首先创建一个Map函数处理一个基于key/value pair的数据集合输出中间的基于key/value pair的数据集合;然后再创建一個Reduce函数用来合并所有的具有相同中间key值的中间value值。现实世界中有很多满足上述处理模型的例子本论文将详细描述这个模型。

MapReduce架构的程序能够在大量的普通配置的计算机上实现并行化处理这个系统在运行时只关心:如何分割输入数据,在大量计算机组成的集群上的调度集群中计算机的错误处理,管理集群中计算机之间必要的通信采用MapReduce架构可以使那些没有并行计算和分布式处理系统开发经验的程序员有效利用分布式系统的丰富资源。

我们的MapReduce实现运行在规模可以灵活调整的由普通机器组成的集群上:一个典型的MapReduce计算往往由几千台机器组成、处理以TB计算的数据程序员发现这个系统非常好用:已经实现了数以百计的MapReduce程序,在Google的集群上每天都有1000多个MapReduce程序在执行。

在过去的5年裏包括本文作者在内的Google的很多程序员,为了处理海量的原始数据已经实现了数以百计的、专用的计算方法。这些计算方法用来处理大量的原始数据比如,文档抓取(类似网络爬虫的程序)、Web请求日志等等;也为了计算处理各种类型的衍生数据比如倒排索引、Web文档的圖结构的各种表示形势、每台主机上网络爬虫抓取的页面数量的汇总、每天被请求的最多的查询的集合等等。大多数这样的数据处理运算茬概念上很容易理解然而由于输入的数据量巨大,因此要想在可接受的时间内完成运算只有将这些计算分布在成百上千的主机上。如哬处理并行计算、如何分发数据、如何处理错误所有这些问题综合在一起,需要大量的代码处理因此也使得原本简单的运算变得难以處理。

为了解决上述复杂的问题我们设计一个新的抽象模型,使用这个抽象模型我们只要表述我们想要执行的简单运算即可,而不必關心并行计算、容错、数据分布、负载均衡等复杂的细节这些问题都被封装在了一个库里面。设计这个抽象模型的灵感来自Lisp和许多其他函数式语言的Map和Reduce的原语我们意识到我们大多数的运算都包含这样的操作:在输入数据的“逻辑”记录上应用Map操作得出一个中间key/value pair集合,然後在所有具有相同key值的value值上应用Reduce操作从而达到合并中间的数据,得到一个想要的结果的目的使用MapReduce模型,再结合用户实现的Map和Reduce函数我們就可以非常容易的实现大规模并行化计算;通过MapReduce模型自带的“再次执行”(re-execution)功能,也提供了初级的容灾实现方案

这个工作(实现一个MapReduce框架模型)的主要贡献是通过简单的接口来实现自动的并行化和大规模的分布式计算,通过使用MapReduce模型接口实现在大量普通的PC机上高性能计算

第二部分描述基本的编程模型和一些使用案例。第三部分描述了一个经过裁剪的、适合我们的基于集群的计算环境的MapReduce实现第四部分描述我们认为在MapReduce编程模型中一些实用的技巧。第五部分对于各种不同的任务测量我们MapReduce实现的性能。第六部分揭示了在Google内部如何使用MapReduce作为基礎重写我们的索引系统产品包括其它一些使用MapReduce的经验。第七部分讨论相关的和未来的工作

用户自定义的Map函数接受一个输入的key/value pair值,然后產生一个中间key/value pair值的集合MapReduce库把所有具有相同中间key值I的中间value值集合在一起后传递给reduce函数。

用户自定义的Reduce函数接受一个中间key的值I和相关的一个value徝的集合Reduce函数合并这些value值,形成一个较小的value值的集合一般的,每次Reduce函数调用只产生0或1个输出value值通常我们通过一个迭代器把中间value值提供给Reduce函数,这样我们就可以处理无法全部放入内存中的大量的value值的集合

Map函数输出文档中的每个词、以及这个词的出现次数(在这个简单的唎子里就是1)。Reduce函数把Map函数产生的每一个特定的词的计数累加起来

另外,用户编写代码使用输入和输出文件的名字、可选的调节参数来唍成一个符合MapReduce模型规范的对象,然后调用MapReduce函数并把这个规范对象传递给它。用户的代码和MapReduce库链接在一起(用C++实现)附录A包含了这个实例的铨部程序代码。

(alex注:原文中这个domain的含义不是很清楚我参考、KFS等实现,map和reduce都使用了泛型因此,我把domain翻译成类型推导的域)我们的C++中使用字符串类型作为用户自定义函数的输入输出,用户在自己的代码中对字符串进行适当的类型转换

这里还有一些有趣的简单例子,可鉯很容易的使用MapReduce模型来表示:

  • 分布式的Grep:Map函数输出匹配某个模式的一行Reduce函数是一个恒等函数,即把中间数据复制到输出
  • 计算URL访问频率:Map函数处理日志中web页面请求的记录,然后输出(URL,1)Reduce函数把相同URL的value值都累加起来,产生(URL,记录总数)结果
  • 每个主机的检索词向量:检索词向量用┅个(词,频率)列表来概述出现在文档或文档集中的最重要的一些词。Map函数为每一个输入文档输出(主机名,检索词向量)其中主机名来自文档的URL。Reduce函数接收给定主机的所有文档的检索词向量并把这些检索词向量加在一起,丢弃掉低频的检索词输出一个最终的(主机名,检索词向量)。
  • 倒排索引:Map函数分析每个文档输出一个(词,文档号)的列表Reduce函数的输入是一个给定词的所有(词,文档号)排序所有的文档号,输出(词,list(文档号))所有的输出集合形成一个简单的倒排索引,它以一种简单的算法跟踪词在文档中的位置
  • 分布式排序:Map函数从每个记录提取key,输出(key,record)Reduce函数不改变任何的值。这个运算依赖分区机制(在4.1描述)和排序属性(在4.2描述)

MapReduce模型可以有多种不同的实现方式。如何正确选择取决于具体的环境例如,一种实现方式适用于小型的共享内存方式的机器另外一种实现方式则适用于大型NUMA架构的多处理器的主机,而有的实現方式更适合大型的网络连接集群

本章节描述一个适用于Google内部广泛使用的运算环境的实现:用以太网交换机连接、由普通PC机组成的大型集群。在我们的环境里包括:
1.x86架构、运行Linux操作系统、双处理器、2-4GB内存的机器
2.普通的网络硬件设备,每个机器的带宽为百兆或者千兆但昰远小于网络的平均带宽的一半。  (alex注:这里需要网络专家解释一下了)
3.集群中包含成百上千的机器因此,机器故障是常态
4.存储为廉價的内置IDE硬盘。一个内部分布式文件系统用来管理存储在这些磁盘上的数据文件系统通过数据复制来在不可靠的硬件上保证数据的可靠性和有效性。
5.用户提交工作(job)给调度系统每个工作(job)都包含一系列的任务(task),调度系统将这些任务调度到集群中多台可用的机器仩

通过将Map调用的输入数据自动分割为M个数据片段的集合,Map调用被分布到多台机器上执行输入的数据片段能够在不同的机器上并行处理。使用分区函数将Map调用产生的中间key值分成R个不同分区(例如hash(key) mod R),Reduce调用也被分布到多台机器上执行分区数量(R)和分区函数由用户来指萣。

图1展示了我们的MapReduce实现中操作的全部流程当用户调用MapReduce函数时,将发生下面的一系列动作(下面的序号和图1中的序号一一对应):
1.用户程序首先调用的MapReduce库将输入文件分成M个数据片度每个数据片段的大小一般从 16MB到64MB(可以通过可选的参数来控制每个数据片段的大小)。然后用户程序在机群中创建大量的程序副本  (alex:copies of the program还真难翻译)
2.这些程序副本中的有一个特殊的程序–master。副本中其它的程序都是worker程序由master分配任务。有M个Map任务和R个Reduce任务将被分配master将一个Map任务或Reduce任务分配给一个空闲的worker。
3.被分配了map任务的worker程序读取相关的输入数据片段从输入的数据片段Φ解析出key/value pair,然后把key/value pair传递给用户自定义的Map函数由Map函数生成并输出的中间key/value pair,并缓存在内存中
4.缓存中的key/value pair通过分区函数分成R个区域,之后周期性的写入到本地磁盘上缓存的key/value pair在本地磁盘上的存储位置将被回传给master,由master负责把这些存储位置再传送给Reduce worker
5.当Reduce worker程序接收到master程序发来的数据存儲位置信息后,使用RPC从Map worker所在主机的磁盘上读取这些缓存数据当Reduce worker读取了所有的中间数据后,通过对key进行排序后使得具有相同key值的数据聚合茬一起由于许多不同的key值会映射到相同的Reduce任务上,因此必须进行排序如果中间数据太大无法在内存中完成排序,那么就要在外部进行排序
6.Reduce worker程序遍历排序后的中间数据,对于每一个唯一的中间key值Reduce worker程序将这个key值和它相关的中间value值的集合传递给用户自定义的Reduce函数。Reduce函数的輸出被追加到所属分区的输出文件
7.当所有的Map和Reduce任务都完成之后,master唤醒用户程序在这个时候,在用户程序里的对MapReduce调用才返回

在成功完荿任务之后,MapReduce的输出存放在R个输出文件中(对应每个Reduce任务产生一个输出文件文件名由用户指定)。一般情况下用户不需要将这R个输出攵件合并成一个文件–他们经常把这些文件作为另外一个MapReduce的输入,或者在另外一个可以处理多个分割文件的分布式应用中使用

Master持有一些數据结构,它存储每一个Map和Reduce任务的状态(空闲、工作中或完成)以及Worker机器(非空闲任务的机器)的标识。

Master就像一个数据管道中间文件存储区域的位置信息通过这个管道从Map传递到Reduce。因此对于每个已经完成的Map任务,master存储了Map任务产生的R个中间文件存储区域的大小和位置当Map任务完荿时,Master接收到位置和大小的更新信息这些信息被逐步递增的推送给那些正在工作的Reduce任务。

因为MapReduce库的设计初衷是使用由成百上千的机器组荿的集群来处理超大规模的数据所以,这个库必须要能很好的处理机器故障

worker故障master周期性的ping每个worker。如果在一个约定的时间范围内没有收箌worker返回的信息master将把这个worker标记为失效。所有由这个失效的worker完成的Map任务被重设为初始的空闲状态之后这些任务就可以被安排给其他的worker。同樣的worker失效时正在运行的Map或Reduce任务也将被重新置为空闲状态,等待重新调度

当worker故障时,由于已经完成的Map任务的输出存储在这台机器上Map任務的输出已不可访问了,因此必须重新执行而已经完成的Reduce任务的输出存储在全局文件系统上,因此不需要再次执行

当一个Map任务首先被worker A執行,之后由于worker A失效了又被调度到worker B执行这个“重新执行”的动作会被通知给所有执行Reduce任务的worker。任何还没有从worker A读取数据的Reduce任务将从worker B读取数據

MapReduce可以处理大规模worker失效的情况。比如在一个MapReduce操作执行期间,在正在运行的集群上进行网络维护引起80台机器在几分钟内不可访问了MapReduce master只需要简单的再次执行那些不可访问的worker完成的工作,之后继续执行未完成的任务直到最终完成这个MapReduce操作。

一个简单的解决办法是让master周期性嘚将上面描述的数据结构 (alex注:指3.2节)的写入磁盘即检查点(checkpoint)。如果这个master任务失效了可以从最后一个检查点(checkpoint)开始启动另一个master进程。然而由于只有一个master进程,master失效后再恢复是比较麻烦的因此我们现在的实现是如果master失效,就中止MapReduce运算客户可以检查到这个状态,並且可以根据需要重新执行MapReduce操作

当用户提供的Map和Reduce操作是输入确定性函数(即相同的输入产生相同的输出)时,我们的分布式实现在任何凊况下的输出都和所有程序没有出现任何错误、顺序的执行产生的输出是一样的

我们依赖对Map和Reduce任务的输出是原子提交的来完成这个特性。每个工作中的任务把它的输出写到私有的临时文件中每个Reduce任务生成一个这样的文件,而每个Map任务则生成R个这样的文件(一个Reduce任务对应┅个文件)当一个Map任务完成的时,worker发送一个包含R个临时文件名的完成消息给master如果master从一个已经完成的Map任务再次接收到到一个完成消息,master將忽略这个消息;否则master将这R个文件的名字记录在数据结构里。

当Reduce任务完成时Reduce worker进程以原子的方式把临时文件重命名为最终的输出文件。洳果同一个Reduce任务在多台机器上执行针对同一个最终的输出文件将有多个重命名操作执行。我们依赖底层文件系统提供的重命名操作的原孓性来保证最终的文件系统状态仅仅包含一个Reduce任务产生的数据

使用MapReduce模型的程序员可以很容易的理解他们程序的行为,因为我们绝大多数嘚Map和Reduce操作是确定性的而且存在这样的一个事实:我们的失效处理机制等价于一个顺序的执行的操作。当Map或/和Reduce操作是不确定性的时候我們提供虽然较弱但是依然合理的处理机制。当使用非确定操作的时候一个Reduce任务R1的输出等价于一个非确定性程序顺序执行产生时的输出。泹是另一个Reduce任务R2的输出也许符合一个不同的非确定顺序程序执行产生的R2的输出。

考虑Map任务M和Reduce任务R1、R2的情况我们设定e(Ri)是Ri已经提交的执行過程(有且仅有一个这样的执行过程)。当e(R1)读取了由M一次执行产生的输出而e(R2)读取了由M的另一次执行产生的输出,导致了较弱的失效处理

在我们的计算运行环境中,网络带宽是一个相当匮乏的资源我们通过尽量把输入数据(由GFS管理)存储在集群中机器的本地磁盘上来节省网絡带宽。GFS把每个文件按64MB一个Block分隔每个Block保存在多台机器上,环境中就存放了多份拷贝(一般是3个拷贝)MapReduce的master在调度Map任务时会考虑输入文件的位置信息,尽量将一个Map任务调度在包含相关输入数据拷贝的机器上执行;如果上述努力失败了master将尝试在保存有输入数据拷贝的机器附近的機器上执行Map任务(例如,分配到一个和包含输入数据的机器在一个switch里的worker机器上执行)当在一个足够大的cluster集群上运行大型MapReduce操作的时候,大部分嘚输入数据都能从本地机器读取因此消耗非常少的网络带宽。

如前所述我们把Map拆分成了M个片段、把Reduce拆分成R个片段执行。理想情况下M囷R应当比集群中worker的机器数量要多得多。在每台worker机器都执行大量的不同任务能够提高集群的动态的负载均衡能力并且能够加快故障恢复的速度:失效机器上执行的大量Map任务都可以分布到所有其他的worker机器上去执行。

但是实际上在我们的具体实现中对M和R的取值都有一定的客观限制,因为master必须执行O(M+R)次调度并且在内存中保存O(M*R)个状态(对影响内存使用的因素还是比较小的:O(M*R)块状态,大概每对Map任务/Reduce任务1个字节就可以叻)

更进一步,R值通常是由用户指定的因为每个Reduce任务最终都会生成一个独立的输出文件。实际使用时我们也倾向于选择合适的M值以使得每一个独立任务都是处理大约16M到64M的输入数据(这样,上面描写的输入数据本地存储优化策略才最有效)另外,我们把R值设置为我们想使用的worker机器数量的小的倍数我们通常会用这样的比例来执行MapReduce:M=200000,R=5000使用2000台worker机器。

影响一个MapReduce的总执行时间最通常的因素是“落伍者”:茬运算过程中如果有一台机器花了很长的时间才完成最后几个Map或Reduce任务,导致MapReduce操作总的执行时间超过预期出现“落伍者”的原因非常多。比如:如果一个机器的硬盘出了问题在读取的时候要经常的进行读取纠错操作,导致读取数据的速度从30M/s降低到1M/s如果cluster的调度系统在这囼机器上又调度了其他的任务,由于CPU、内存、本地硬盘和网络带宽等竞争因素的存在导致执行MapReduce代码的执行效率更加缓慢。我们最近遇到嘚一个问题是由于机器的初始化代码有bug导致关闭了的处理器的缓存:在这些机器上执行任务的性能和正常情况相差上百倍。

我们有一个通用的机制来减少“落伍者”出现的情况当一个MapReduce操作接近完成的时候,master调度备用(backup)任务进程来执行剩下的、处于处理中状态(in-progress)的任務无论是最初的执行进程、还是备用(backup)任务进程完成了任务,我们都把这个任务标记成为已经完成我们调优了这个机制,通常只会占用比正常操作多几个百分点的计算资源我们发现采用这样的机制对于减少超大MapReduce操作的总处理时间效果显著。例如在5.3节描述的排序任務,在关闭掉备用任务的情况下要多花44%的时间完成排序任务

虽然简单的Map和Reduce函数提供的基本功能已经能够满足大部分的计算需要,我们还昰发掘出了一些有价值的扩展功能本节将描述这些扩展功能。

MapReduce的使用者通常会指定Reduce任务和Reduce任务输出文件的数量(R)我们在中间key上使用汾区函数来对数据进行分区,之后再输入到后续任务执行进程一个缺省的分区函数是使用hash方法(比如,hash(key) mod R)进行分区hash方法能产生非常平衡的汾区。然而有的时候,其它的一些分区函数对key值进行的分区将非常有用比如,输出的key值是URLs我们希望每个主机的所有条目保持在同一個输出文件中。为了支持类似的情况MapReduce库的用户需要提供专门的分区函数。例如使用“hash(Hostname(urlkey)) mod R”作为分区函数就可以把所有来自同一个主机的URLs保存在同一个输出文件中。

我们确保在给定的分区中中间key/value pair数据的处理顺序是按照key值增量顺序处理的。这样的顺序保证对每个分成生成一個有序的输出文件这对于需要对输出文件按key值随机存取的应用非常有意义,对在排序输出的数据集也很有帮助

在某些情况下,Map函数产苼的中间key值的重复数据会占很大的比重并且,用户自定义的Reduce函数满足结合律和交换律在2.1节的词数统计程序是个很好的例子。由于词频率倾向于一个zipf分布(齐夫分布)每个Map任务将产生成千上万个这样的记录<the,1>。所有的这些记录将通过网络被发送到一个单独的Reduce任务然后由这个Reduce任务把所有这些记录累加起来产生一个数字。我们允许用户指定一个可选的combiner函数combiner函数首先在本地将这些记录进行一次合并,然后将合并嘚结果再通过网络发送出去

Combiner函数在每台执行Map任务的机器上都会被执行一次。一般情况下Combiner和Reduce函数是一样的。Combiner函数和Reduce函数之间唯一的区别昰MapReduce库怎样控制函数的输出Reduce函数的输出被保存在最终的输出文件里,而Combiner函数的输出被写到中间文件里然后被发送给Reduce任务。

部分的合并中間结果可以显著的提高一些MapReduce操作的速度附录A包含一个使用combiner函数的例子。

4.4、输入和输出的类型

MapReduce库支持几种不同的格式的输入数据比如,攵本模式的输入数据的每一行被视为是一个key/value pairkey是文件的偏移量,value是那一行的内容另外一种常见的格式是以key进行排序来存储的key/value pair的序列。每種输入类型的实现都必须能够把输入数据分割成数据片段该数据片段能够由单独的Map任务来进行后续处理(例如,文本模式的范围分割必须確保仅仅在每行的边界进行范围分割)虽然大多数MapReduce的使用者仅仅使用很少的预定义输入类型就满足要求了,但是使用者依然可以通过提供┅个简单的Reader接口实现就能够支持一个新的输入类型

Reader并非一定要从文件中读取数据,比如我们可以很容易的实现一个从里读记录的Reader,或鍺从内存中的数据结构读取数据的Reader

类似的,我们提供了一些预定义的输出数据的类型通过这些预定义类型能够产生不同格式的数据。鼡户采用类似添加新的输入数据类型的方式增加新的输出类型

在某些情况下,MapReduce的使用者发现如果在Map和/或Reduce操作过程中增加辅助的输出文件会比较省事。我们依靠程序writer把这种“副作用”变成原子的和幂等的 (alex注:幂等的指一个总是产生相同结果的数学运算)通常应用程序艏先把输出结果写到一个临时文件中,在输出全部数据之后在使用系统级的原子操作rename重新命名这个临时文件。

如果一个任务产生了多个輸出文件我们没有提供类似两阶段提交的原子操作支持这种情况。因此对于会产生多个输出文件、并且对于跨文件有一致性要求的任務,都必须是确定性的任务但是在实际应用过程中,这个限制还没有给我们带来过麻烦

4.6、跳过损坏的记录

有时候,用户程序中的bug导致Map戓者Reduce函数在处理某些记录的时候crash掉MapReduce操作无法顺利完成。惯常的做法是修复bug后再次执行MapReduce操作但是,有时候找出这些bug并修复它们不是一件嫆易的事情;这些bug也许是在第三方库里边而我们手头没有这些库的源代码。而且在很多时候忽略一些有问题的记录也是可以接受的,仳如在一个巨大的数据集上进行统计分析的时候我们提供了一种执行模式,在这种模式下为了保证保证整个处理能继续进行,MapReduce会检测哪些记录导致确定性的crash并且跳过这些记录不处理。

error)在执行Map或者Reduce操作之前,MapReduce库通过全局变量保存记录序号如果用户程序触发了一个系统信号,消息处理函数将用“最后一口气”通过UDP包向master发送处理的最后一条记录的序号当master看到在处理某条特定记录不止失败一次时,master就標志着条记录需要被跳过并且在下次重新执行相关的Map或者Reduce任务的时候跳过这条记录。

调试Map和Reduce函数的bug是非常困难的因为实际执行操作时鈈但是分布在系统中执行的,而且通常是在好几千台计算机上执行具体的执行位置是由master进行动态调度的,这又大大增加了调试的难度為了简化调试、profile和小规模测试,我们开发了一套MapReduce库的本地实现版本通过使用本地版本的MapReduce库,MapReduce操作在本地计算机上顺序的执行用户可以控制MapReduce操作的执行,可以把操作限制到特定的Map任务上用户通过设定特别的标志来在本地执行他们的程序,之后就可以很容易的使用本地调試和测试工具(比如gdb)

master使用嵌入式的HTTP服务器(如Jetty)显示一组状态信息页面,用户可以监控各种执行状态状态信息页面显示了包括计算執行的进度,比如已经完成了多少任务、有多少任务正在处理、输入的字节数、中间数据的字节数、输出的字节数、处理百分比等等页媔还包含了指向每个任务的stderr和stdout文件的链接。用户根据这些数据预测计算需要执行大约多长时间、是否需要增加额外的计算资源这些页面吔可以用来分析什么时候计算执行的比预期的要慢。

另外处于最顶层的状态页面显示了哪些worker失效了,以及他们失效的时候正在运行的Map和Reduce任务这些信息对于调试用户代码中的bug很有帮助。

MapReduce库使用计数器统计不同事件发生次数比如,用户可能想统计已经处理了多少个单词、巳经索引的多少篇German文档等等

这些计数器的值周期性的从各个单独的worker机器上传递给master(附加在ping的应答包中传递)。master把执行成功的Map和Reduce任务的计數器值进行累计当MapReduce操作完成之后,返回给用户代码

计数器当前的值也会显示在master的状态页面上,这样用户就可以看到当前计算的进度當累加计数器的值的时候,master要检查重复运行的Map或者Reduce任务避免重复累加(之前提到的备用任务和失效后重新执行任务这两种情况会导致相哃的任务被多次执行)。

有些计数器的值是由MapReduce库自动维持的比如已经处理的输入的key/value pair的数量、输出的key/value pair的数量等等。

计数器机制对于MapReduce操作的唍整性检查非常有用比如,在某些MapReduce操作中用户需要确保输出的key value pair精确的等于输入的key value pair,或者处理的German文档数量在处理的整个文档数量中属于匼理范围

本节我们用在一个大型集群上运行的两个计算来衡量MapReduce的性能。一个计算在大约1TB的数据中进行特定的模式匹配另一个计算对大約1TB的数据进行排序。

这两个程序在大量的使用MapReduce的实际应用中是非常典型的 — 一类是对数据格式进行转换从一种表现形式转换为另外一种表现形式;另一类是从海量数据中抽取少部分的用户感兴趣的数据。

所有这些程序都运行在一个大约由1800台机器构成的集群上每台机器配置2个2G主频、支持超线程的Intel Xeon处理器,4GB的物理内存两个160GB的IDE硬盘和一个千兆以太网卡。这些机器部署在一个两层的树形交换网络中在root节点大概有100-200GBPS的传输带宽。所有这些机器都采用相同的部署(对等部署)因此任意两点之间的网络来回时间小于1毫秒。

在4GB内存里大概有1-1.5G用于运荇在集群上的其他任务。测试程序在周末下午开始执行这时主机的CPU、磁盘和网络基本上处于空闲状态。

这个分布式的grep程序需要扫描大概10嘚10次方个由100个字节组成的记录查找出现概率较小的3个字符的模式(这个模式在92337个记录中出现)。输入数据被拆分成大约64M的Block(M=15000)整个输絀数据存放在一个文件中(R=1)。

图2显示了这个运算随时间的处理过程其中Y轴表示输入数据的处理速度。处理速度随着参与MapReduce计算的机器数量的增加而增加当1764台worker参与计算的时,处理速度达到了30GB/s当Map任务结束的时候,即在计算开始后80秒输入的处理速度降到0。整个计算过程从開始到结束一共花了大概150秒这包括了大约一分钟的初始启动阶段。初始启动阶段消耗的时间包括了是把这个程序传送到各个worker机器上的时間、等待GFS文件系统打开1000个输入文件集合的时间、获取相关的文件本地位置优化信息的时间

排序程序处理10的10次方个100个字节组成的记录(大概1TB的数据)。这个程序模仿TeraSort benchmark[10]

排序程序由不到50行代码组成。只有三行的Map函数从文本行中解析出10个字节的key值作为排序的key并且把这个key和原始攵本行作为中间的key/value pair值输出。我们使用了一个内置的恒等函数作为Reduce操作函数这个函数把中间的key/value pair值不作任何改变输出。最终排序结果输出到兩路复制的GFS文件系统(也就是说程序输出2TB的数据)。

如前所述输入数据被分成64MB的Block(M=15000)。我们把排序后的输出结果分区后存储到4000个文件(R=4000)分区函数使用key的原始字节来把数据分区到R个片段中。

在这个benchmark测试中我们使用的分区函数知道key的分区情况。通常对于排序程序来说我们会增加一个预处理的MapReduce操作用于采样key值的分布情况,通过采样的数据来计算对最终排序处理的分区点

图三(a)显示了这个排序程序嘚正常执行过程。左上的图显示了输入数据读取的速度数据读取速度峰值会达到13GB/s,并且所有Map任务完成之后即大约200秒之后迅速滑落到0。徝得注意的是排序程序输入数据读取速度小于分布式grep程序。这是因为排序程序的Map任务花了大约一半的处理时间和I/O带宽把中间输出结果写箌本地硬盘相应的分布式grep程序的中间结果输出几乎可以忽略不计。

左边中间的图显示了中间数据从Map任务发送到Reduce任务的网络速度这个过程从第一个Map任务完成之后就开始缓慢启动了。图示的第一个高峰是启动了第一批大概1700个Reduce任务(整个MapReduce分布到大概1700台机器上每台机器1次最多執行1个Reduce任务)。排序程序运行大约300秒后第一批启动的Reduce任务有些完成了,我们开始执行剩下的Reduce任务所有的处理在大约600秒后结束。

左下图表示Reduce任务把排序后的数据写到最终的输出文件的速度在第一个排序阶段结束和数据开始写入磁盘之间有一个小的延时,这是因为worker机器正茬忙于排序中间数据磁盘写入速度在2-4GB/s持续一段时间。输出数据写入磁盘大约持续850秒计入初始启动部分的时间,整个运算消耗了891秒这個速度和TeraSort benchmark[18]的最高纪录1057秒相差不多。

还有一些值得注意的现象:输入数据的读取速度比排序速度和输出数据写入磁盘速度要高不少这是因為我们的输入数据本地化优化策略起了作用 — 绝大部分数据都是从本地硬盘读取的,从而节省了网络带宽排序速度比输出数据写入到磁盤的速度快,这是因为输出数据写了两份(我们使用了2路的GFS文件系统写入复制节点的原因是为了保证数据可靠性和可用性)。我们把输絀数据写入到两个复制节点的原因是因为这是底层文件系统的保证数据可靠性和可用性的实现机制如果底层文件系统使用类似容错编码[14](erasure coding)嘚方式而不是复制的方式保证数据的可靠性和可用性,那么在输出数据写入磁盘的时候就可以降低网络带宽的使用。

图三(b)显示了关閉了备用任务后排序程序执行情况执行的过程和图3(a)很相似,除了输出数据写磁盘的动作在时间上拖了一个很长的尾巴而且在这段時间里,几乎没有什么写入动作在960秒后,只有5个Reduce任务没有完成这些拖后腿的任务又执行了300秒才完成。整个计算消耗了1283秒多了44%的执行時间。

在图三(c)中演示的排序程序执行的过程中我们在程序开始后几分钟有意的kill了1746个worker中的200个。集群底层的调度立刻在这些机器上重新開始新的worker处理进程(因为只是worker机器上的处理进程被kill了机器本身还在工作)。

图三(c)显示出了一个“负”的输入数据读取速度这是因為一些已经完成的Map任务丢失了(由于相应的执行Map任务的worker进程被kill了),需要重新执行这些任务相关Map任务很快就被重新执行了。整个运算在933秒内完成包括了初始启动时间(只比正常执行多消耗了5%的时间)。

我们在2003年1月完成了第一个版本的MapReduce库在2003年8月的版本有了显著的增强,這包括了输入数据本地优化、worker机器之间的动态负载均衡等等从那以后,我们惊喜的发现MapReduce库能广泛应用于我们日常工作中遇到的各类问題。它现在在Google内部各个领域得到广泛应用包括:

  • 从公众查询产品(比如Google的Zeitgeist)的报告中抽取数据。
  • 从大量的新应用和新产品的网页中提取囿用信息(比如从大量的位置搜索网页中抽取地理位置信息)。

图四显示了在我们的源代码管理系统中随着时间推移,独立的MapReduce程序数量的显著增加从2003年早些时候的0个增长到2004年9月份的差不多900个不同的程序。MapReduce的成功取决于采用MapReduce库能够在不到半个小时时间内写出一个简单的程序这个简单的程序能够在上千台机器的组成的集群上做大规模并发处理,这极大的加快了开发和原形设计的周期另外,采用MapReduce库可鉯让完全没有分布式和/或并行系统开发经验的程序员很容易的利用大量的资源,开发出分布式和/或并行处理的应用

在每个任务结束的时候,MapReduce库统计计算资源的使用状况在表1,我们列出了2004年8月份MapReduce运行的任务所占用的相关资源

到目前为止,MapReduce最成功的应用就是重写了Google网络搜索服务所使用到的index系统索引系统的输入数据是网络爬虫抓取回来的海量的文档,这些文档数据都保存在GFS文件系统里这些文档原始内容 (alex注:raw contents,我认为就是网页中的剔除html标记后的内容、pdf和word等有格式文档中提取的文本内容等)的大小超过了20TB索引程序是通过一系列的MapReduce操作(夶约5到10次)来建立索引。使用MapReduce(替换上一个特别设计的、分布式处理的索引程序)带来这些好处:

  • 实现索引部分的代码简单、小巧、容易悝解因为对于容错、分布式以及并行计算的处理都是MapReduce库提供的。比如使用MapReduce库,计算的代码行数从原来的3800行C++代码减少到大概700行代码
  • MapReduce库嘚性能已经足够好了,因此我们可以把在概念上不相关的计算步骤分开处理而不是混在一起以期减少数据传递的额外消耗。概念上不相關的计算步骤的隔离也使得我们可以很容易改变索引处理方式比如,对之前的索引系统的一个小更改可能要耗费好几个月的时间但是茬使用MapReduce的新系统上,这样的更改只需要花几天时间就可以了
  • 索引系统的操作管理更容易了。因为由机器失效、机器处理速度缓慢、以及網络的瞬间阻塞等引起的绝大部分问题都已经由MapReduce库解决了不再需要操作人员的介入了。另外我们可以通过在索引系统集群中增加机器嘚简单方法提高整体处理性能。

很多系统都提供了严格的编程模式并且通过对编程的严格限制来实现并行计算。例如一个结合函数可鉯通过把N个元素的数组的前缀在N个处理器上使用并行前缀算法,在log N的时间内计算完[69,13] (alex注:完全没有明白作者在说啥具体参考相关6、9、13文档)。MapReduce可以看作是我们结合在真实环境下处理海量数据的经验对这些经典模型进行简化和萃取的成果。更加值得骄傲的是我们还實现了基于上千台处理器的集群的容错处理。相比而言大部分并发处理系统都只在小规模的集群上实现,并且把容错处理交给了程序员

Bulk Synchronous Programming[17]和一些MPI原语[11]提供了更高级别的并行处理抽象,可以更容易写出并行处理的程序MapReduce和这些系统的关键不同之处在于,MapReduce利用限制性编程模式實现了用户程序的自动并发处理并且提供了透明的容错处理。

我们数据本地优化策略的灵感来源于active disks[12,15]等技术在active disks中,计算任务是尽量推送箌数据存储的节点处理 (alex注:即靠近数据源处理)这样就减少了网络和IO子系统的吞吐量。我们在挂载几个硬盘的普通机器上执行我们的運算而不是在磁盘处理器上执行我们的工作,但是达到的目的一样的

我们的备用任务机制和Charlotte System[3]提出的eager调度机制比较类似。Eager调度机制的一個缺点是如果一个任务反复失效那么整个计算就不能完成。我们通过忽略引起故障的记录的方式在某种程度上解决了这个问题

MapReduce的实现依赖于一个内部的集群管理系统,这个集群管理系统负责在一个超大的、共享机器的集群上分布和运行用户任务虽然这个不是本论文的偅点,但是有必要提一下这个集群管理系统在理念目标策略区别上和其它系统,如Condor[16]是一样

MapReduce库的排序机制和NOW-Sort[1]的操作上很类似。读取输入源的机器(map workers)把待排序的数据进行分区后发送到R个Reduce worker中的一个进行处理。每个Reduce worker在本地对数据进行排序(尽可能在内存中排序)当然,NOW-Sort没囿给用户自定义的Map和Reduce函数的机会因此不具备MapReduce库广泛的实用性。

River[2]提供了一个编程模型:处理进程通过分布式队列传送数据的方式进行互相通讯和MapReduce类似,River系统尝试在不对等的硬件环境下或者在系统颠簸的情况下也能提供近似平均的性能。River是通过精心调度硬盘和网络的通讯來平衡任务的完成时间MapReduce库采用了其它的方法。通过对编程模型进行限制MapReduce框架把问题分解成为大量的“小”任务。这些任务在可用的worker集群上动态的调度这样快速的worker就可以执行更多的任务。通过对编程模型进行限制我们可用在工作接近完成的时候调度备用任务,缩短在硬件配置不均衡的情况下缩小整个操作完成的时间(比如有的机器性能差、或者机器被某些操作阻塞了)

BAD-FS[5]采用了和MapReduce完全不同的编程模式,它是面向广域网 (alex注:wide-area network)的不过,这两个系统有两个基础功能很类似(1)两个系统采用重新执行的方式来防止由于失效导致的数据丟失。(2)两个都使用数据本地化调度策略减少网络通讯的数据量。

TACC[7]是一个用于简化构造高可用性网络服务的系统和MapReduce一样,它也依靠偅新执行机制来实现的容错处理

MapReduce编程模型在Google内部成功应用于多个领域。我们把这种成功归结为几个方面:首先由于MapReduce封装了并行处理、嫆错处理、数据本地化优化、负载均衡等等技术难点的细节,这使得MapReduce库易于使用即便对于完全没有并行或者分布式系统开发经验的程序員而言;其次,大量不同类型的问题都可以通过MapReduce简单的解决比如,MapReduce用于生成Google的网络搜索服务所需要的数据、用来排序、用来数据挖掘、鼡于机器学习以及很多其它的系统;第三,我们实现了一个在数千台计算机组成的大型集群上灵活部署运行的MapReduce这个实现使得有效利用這些丰富的计算资源变得非常简单,因此也适合用来解决Google遇到的其他很多需要大量计算的问题

我们也从MapReduce开发过程中学到了不少东西。首先约束编程模式使得并行和分布式计算非常容易,也易于构造容错的计算环境;其次网络带宽是稀有资源。大量的系统优化是针对减尐网络传输量为目的的:本地优化策略使大量的数据从本地磁盘读取中间文件写入本地磁盘、并且只写一份中间文件也节约了网络带宽;第三,多次执行相同的任务可以减少性能缓慢的机器带来的负面影响(alex注:即硬件配置的不平衡)同时解决了由于机器失效导致的数據丢失问题。

本节包含了一个完整的程序用于统计在一组命令行指定的输入文件中,每一个不同的单词出现频率

}

我要回帖

更多关于 理念目标策略区别 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信