MapReduce:Simplified Data Processing on Large Clusters

Posted on Nov 10, 2023

Jeffrey Dean@Google and Sanjay Ghemawat@Google

需求

Google的业务量决定了需要计算的数据通常很大,必须把它们分布到成百上千的机器上计算,才能在合理的时间内完成。如何并行计算,分布数据,处理差错成为问题。

为了解决这些问题,作者抽象出了一种简单的计算,同时隐藏了并行的细节,容错机制,数据分配和负载均衡。

作者设计的语言模型受到了Lisp中map和reduce原语的启发。

编程模型

一组键值对作为计算的输入,产生出一组键值对作为输出。

MapReduce库通过Map函数和Reduce函数来表达。

Map函数

Map函数,由用户来编写,以键值对作为输入,产生一组键值对中间量。MapReduce库聚集所有由同一个键产生的中间值,并把它们传给Reduce函数

Reduce函数

Reduce函数,同样由用户编写,接受一个中间键和一组那个键的值。它把这些值聚合成可能会更小的一组值。典型地,每次Reduce函数调用输出0或1作为输出值。中间值通过迭代器(iterator)提供给reduce函数。这允许我们处理一系列太大而不能放入内存中的值。 mr

具体实现

Map函数调用被分配在很多不同的机器上,自动将输入数据分割为M块分割,输入的分割可以被不同机器并行处理。中间键被分割为R块后调用Reduce函数。R的大小(分割的数量)由用户决定。

  • MapReduce库将输入文件首先分割为M块,典型地每块16MB或64MB。然后它启用很多复制的原程序在一组机器上。
  • 其中一个复制的程序是特殊的—主程序(the master)。剩下的是工作程序(workers),由主程序分配任务给它们。有M个map任务和R个reduce任务来分配。主程序挑选闲置程序,给它们分配一个map任务或一个reduce任务。
  • 被分配map任务的工作程序读取对应的输入分片的内容,它将键值对解析出输入数据,并把键值对传给由用户定义的map函数,map函数产生的中间键值对被缓存入内存中。
  • 定期地,缓存的键值对被写入本地磁盘中,通过分区功能划分为 R 个区域。这些本地磁盘的缓冲键值的地址对被传回主程序,由主程序来转发这些地址给运行reduce函数的工作程序。
  • 当一个执行reduce函数的工作程序被主程序告知这些地址时,它会使用远程过程调用从map工作程序的本地磁盘读取缓冲数据。当reduce工作程序读取所有中间数据时,它会按中间键对其进行排序,以便将同一键的所有匹配项分组在一起。排序是必需的,因为通常许多不同的键映射到同一个归约任务。如果中间数据量太大而无法放入内存,则使用外部排序。
  • reduce工作程序遍历排序的中间数据,对于遇到的每个唯一中间键,它将键和相应的中间值集传递给用户的Reduce函数。Reduce函数的输出将追加到此reduce分区的最终输出文件中。
  • 当所有map任务和reduce任务完成,主程序(the master)唤醒用户程序。此时,用户程序中的MapReduce调用返回到用户代码。

成功完成后,mapreduce执行的输出将在R输出文件中可用(每个reduce任务一个,文件名由用户指定)。通常,用户不需要将这些R输出文件合并到一个文件中 - 他们通常将这些文件作为输入传递给另一个MapReduce调用,或者从另一个能够处理分区为多个文件的输入的分布式应用程序中使用它们。

主数据结构

主节点保留多个数据结构。对于每个map任务和reduce任务,它存储状态(空闲、进行中或已完成)和工作计算机的标识(对于非空闲任务)。

主节点是从map任务传播中间文件区域的位置到reduce任务的管道。因此,对于每个已完成的map任务,master 存储map任务生成的 R个中间文件区域的位置和大小。map任务完成后,将收到对此位置和大小信息的更新。信息以增量方式推送给正在进行reduce任务的工作程序。

容错

由于MapReduce库旨在帮助使用数百或数千台机器处理大量数据,因此该库必须优雅地容忍机器故障。

工作程序故障

Master 定期对每个 Worker 执行 ping 操作。如果在一定时间内没有收到 Worker 的响应,Master 会将该 Worker 标记为失败。工作人员完成的任何映射任务都会重置回其初始空闲状态,因此有资格在其他工作人员上进行调度。同样,发生故障的工作线程上正在进行的任何映射任务或减少任务也会重置为空闲状态,并有资格重新安排。

已完成的映射任务会在发生故障时重新执行,因为它们的输出存储在故障计算机的本地磁盘上,因此无法访问。已完成的reduce任务不需要重新执行,因为它们的输出存储在全局文件系统中。

当一个map任务先由worker A执行,然后由worker B执行(因为A失败)时,所有执行reduce任务的worker会收到重新执行的通知。任何尚未从worker A读取数据的reduce任务将从worker B读取数据。

MapReduce 对大规模工作故障具有弹性。例如,在一次 MapReduce 操作期间,正在运行的集群上的网络维护导致一次 80 台计算机组在几分钟内无法访问。 MapReduce master只是重新执行了无法访问的worker机器完成的工作,并继续向前推进,最终完成MapReduce操作。

主程序故障

很容易让主设备写入上述主数据结构的周期性检查点。如果主任务终止,可以从最后一个检查点状态开始一个新的副本。然而,考虑到只有一个master,它发生故障的可能性不大;因此,如果主服务器发生故障,我们当前的实现将中止 MapReduce 计算。客户端可以检查此情况并根据需要重试 MapReduce 操作。

出现故障时的语义

当用户提供的map和reduce运算符是其输入值的确定性函数时,我们的分布式实现产生的输出与整个程序的无故障顺序执行产生的输出相同。

我们依靠map和reduce任务输出的原子提交来实现此属性。每个正在进行的任务将其输出写入私有临时文件。一个reduce 任务生成一个这样的文件,一个map 任务生成R 个这样的文件(每个reduce任务一个)。当map任务完成时,worker会向master发送一条消息,并在消息中包含R个临时文件的名称。如果master收到已经完成的map任务的完成消息,它会忽略该消息。否则,它将 R 文件的名称记录在主数据结构中。

当reduce任务完成时,reduce工作线程自动将其临时输出文件重命名为最终输出文件。如果在多台机器上执行相同的reduce任务,则会对同一个最终输出文件执行多个重命名调用。我们依靠底层文件系统提供的原子重命名操作来保证最终的文件系统状态仅包含一次执行reduce任务所产生的数据。

我们绝大多数的map和reduce运算符都是确定性的。

事实上,在这种情况下,我们的语义相当于顺序执行,这使得程序员很容易推理他们的程序的行为。当map和/或reduce运算符不确定时,我们提供较弱但仍然合理的语义。在存在非确定性运算符的情况下,特定化简任务的输出R1相当于由非确定性程序的顺序执行产生的R的输出。然而,不同reduce任务R的输出可以对应于由非确定性程序的不同顺序执行产生的R的输出。

地点

网络带宽是我们的计算环境中相对稀缺的资源。我们利用输入数据(由 GFS [8] 管理)存储在组成集群的计算机的本地磁盘上这一事实来节省网络带宽。 GFS 将每个文件划分为64MB的块,并将每个块的多个副本(通常为 3 个副本)存储在不同的机器上。MapReduce master考虑输入文件的位置信息,并尝试在包含相应输入数据副本的计算机上安排映射任务。如果失败,它会尝试在该任务输入数据的副本附近安排映射任务(例如,在与包含数据的机器位于同一网络交换机上的工作机器上)。当对集群中很大一部分工作线程运行大型 MapReduce 操作时,大多数输入数据都是在本地读取的,并且不消耗网络带宽。

任务粒度

M和R的大小存在限制。我们经常使用 2,000 台工作机器执行 M = 200, 000 和 R = 5, 000 的 MapReduce 计算。

备份任务

延长MapReduce操作总时间的常见原因之一是“落后者”:机器花费异常长的时间来完成计算中最后几个map或reduce任务之一。掉队者的出现可能有多种原因。例如,磁盘损坏的机器可能会频繁遇到可纠正错误,从而将其读取性能从30 MB/s降低到1 MB/s。集群调度系统可能在机器上调度了其他任务,导致其由于CPU、内存、本地磁盘或网络带宽的竞争而导致MapReduce代码执行速度变慢。我们最近遇到的一个问题是机器初始化代码中的一个错误,该错误导致处理器缓存被禁用:受影响的机器上的计算速度减慢了一百多倍。

我们有一个总体机制来缓解掉队问题。当MapReduce操作接近完成时,主节点会安排剩余正在进行的任务的备份执行。只要主执行或备份执行完成,任务就会标记为已完成。我们已经调整了这种机制,因此它通常会增加操作使用的计算资源不超过几个百分点。我们发现这显着减少了完成大型 MapReduce 操作的时间。

改进

分区功能

MapReduce的用户指定他们想要的reduce任务/输出文件的数量(R)使用中间键上的分区函数对数据进行跨这些任务的分区。提供了使用散列的默认分区函数(例如“hash(key) mod R”)。这往往会产生相当平衡的分区。然而,在某些情况下,通过键的某些其他功能对数据进行分区很有用。例如,有时输出键是 URL,我们希望单个主机的所有条目最终都在同一个输出文件中。为了支持这种情况,MapReduce 库的用户可以提供特殊的分区函数。例如,使用“hash(Hostname(urlkey)) mod R”作为分区函数会导致来自同一主机的所有 URL 最终出现在同一输出文件中。

订购保证

我们保证在给定的分区内,中间键/值对按递增的键顺序进行处理。这种顺序保证可以轻松地为每个分区生成排序的输出文件,当输出文件格式需要支持按键进行高效的随机访问查找,或者输出的用户发现对数据进行排序很方便时,这非常有用。

合路功能

在某些情况下,每个map任务产生的中间键存在显着的重复,并且用户指定的Reduce函数是可交换的和关联的。由于词频往往遵循 Zipf 分布,因此每个映射任务将生成数百或数千条形式的记录。所有这些计数都将通过网络发送到单个reduce 任务,然后由Reduce函数加在一起以生成一个数字。我们允许用户指定一个可选的组合器函数,该函数在通过网络发送数据之前对该数据进行部分合并。

Combiner函数在每台执行map任务的机器上执行。通常,相同的代码用于实现组合器和化简函数。 reduce 函数和组合器函数之间的唯一区别是 MapReduce 库如何处理函数的输出。化简函数的输出被写入最终的输出文件。组合器函数的输出被写入中间文件,该中间文件将被发送到reduce 任务。

部分组合显着加速了某些类别的MapReduce操作。

输入和输出类型

MapReduce库支持读取多种不同格式的输入数据。

副作用

在某些情况下,MapReduce 用户发现生成辅助文件作为其map和/或reduce运算符的附加输出很方便。我们依靠应用程序编写者来使此类副作用原子化和幂等。通常,应用程序会写入临时文件,并在文件完全生成后自动重命名该文件。

我们不支持单个任务生成的多个输出文件的原子两阶段提交。因此,生成具有跨文件一致性要求的多个输出文件的任务应该是确定性的。这种限制在实践中从来都不是问题。

跳过不良记录

有时,用户代码中存在错误,导致 Map 或 Reduce 函数在某些记录上确定性崩溃。此类错误会阻止 MapReduce 操作完成。通常的做法是修复错误,但有时这是不可行的;也许该错误存在于第三方库中,而该库的源代码不可用。此外,有时忽略一些记录是可以接受的,例如在对大型数据集进行统计分析时。我们提供了一种可选的执行模式,其中 MapReduce 库检测哪些记录导致确定性崩溃并跳过这些记录以取得进展。

每个工作进程都安装一个信号处理程序来捕获分段违规和总线错误。在调用用户Map或Reduce操作之前,MapReduce库将参数的序列号存储在全局变量中。如果用户代码产生信号,信号处理程序向 MapReduce 主节点发送包含序列号的“最后一口气”UDP 数据包。当Master在某一特定记录上发现多个故障时,表明在下一次重新执行相应的Map或Reduce任务时应跳过该记录。

本地执行

Map 或Reduce 函数中的调试问题可能很棘手,因为实际计算发生在分布式系统中,通常在数千台机器上,工作分配决策由主机动态做出。为了帮助促进调试、分析和小规模测试,我们开发了 MapReduce 库的替代实现,它可以在本地计算机上按顺序执行 MapReduce 操作的所有工作。向用户提供控件,以便将计算限制于特定的地图任务。用户使用特殊标志调用他们的程序,然后可以轻松使用他们认为有用的任何调试或测试工具(例如 gdb)。

状态信息

主站运行一个内部 HTTP 服务器并导出一组状态页面供人类使用。状态页面显示计算的进度,例如已完成多少任务、正在进行多少任务、输入字节、中间数据字节、输出字节、处理速率等。这些页面还包含指向每个任务生成的标准错误和标准输出文件。用户可以使用此数据来预测计算将花费多长时间,以及是否应将更多资源添加到计算中。这些页面还可用于确定计算何时比预期慢得多。

此外,顶级状态页面还显示哪些工作线程失败了,以及失败时他们正在处理哪些map和reduce任务。当尝试诊断用户代码中的错误时,此信息非常有用。