软件学报  2018, Vol. 29 Issue (6): 1770-1791   PDF    
MapReduce与Spark用于大数据分析之比较
吴信东1,2, 嵇圣硙1     
1. 合肥工业大学 计算机与信息学院, 安徽 合肥 230009;
2. School of Computing and Informatics, University of Louisiana at Lafayette, Lafayette 70504, USA
摘要: 评述了MapReduce与Spark两种大数据计算算法和架构,从背景、原理以及应用场景进行分析和比较,并对两种算法各自优点以及相应的限制做出了总结.当处理非迭代问题时,MapReduce凭借其自身的任务调度策略和shuffle机制,在中间数据传输数量以及文件数目方面的性能要优于Spark;而在处理迭代问题和一些低延迟问题时,Spark可以根据数据之间的依赖关系对任务进行更合理的划分,相较于MapReduce,有效地减少了中间数据传输数量与同步次数,提高了系统的运行效率.
关键词: 大数据     MapReduce     Spark     迭代问题     非迭代问题    
Comparative Study on MapReduce and Spark for Big Data Analytics
WU Xin-Dong1,2, JI Sheng-Wei1     
1. School of Computer Science and Information Engineering, Hefei University of Technology, Hefei 230009, China;
2. School of Computing and Informatics, University of Louisiana at Lafayette, Lafayette 70504, USA
Foundation item: National Key Researh and Development Program of China (2016YFB1000901); National Natural Science Foundation of China (91746209); Program for Changjiang Scholars and Innovative Research Team in University (PCSIRT) of the Ministry of Education (IRT17R3)
Abstract: This paper reviews two state-of-the-art algorithmic architectures, MapReduce and Spark, and compares them from their backgrounds, principles and application scenarios. The advantages and their corresponding limitations of these two algorithms are summarized. When dealing with non-iterative problems, MapReduce, by virtue of its task scheduling strategy and shuffle mechanisms, performs better than Spark in terms of intermediate data transfers and number of files. Spark can be used to deal with iterative problems and low latency issues, as it divides a computing task according to the dependencies between the data and the task. Compared with MapReduce, Spark can effectively reduce the number of intermediate data transmissions and the number of synchronizations, and improve the running efficiency of computing systems.
Key words: big data     MapReduce     Spark     iterative problems     non-iterative problems    

随着互联网的持续发展, 我们可收集获取的数据规模在不断增长.尽管数据的收集存储技术还在进步和日趋成熟[1], 但是如何处理如此庞大的数据成为了新的研究问题.在分布式系统出现之前, 只有通过不断增加单个处理机的频率和性能来缩短数据的处理时间.而分布式的提出则打破了这个传统的约束.所谓分布式, 就是将一个复杂的问题切割成很多子任务, 分布到多台机器上并行处理.在保证系统稳定性的同时, 最大限度地提高系统的运行速度.

Google在2004年提出了最原始的分布式架构模型MapReduce[2], 用于大规模的数据并行处理.MapReduce模型借鉴了函数式程序设计语言中的内置函数Map和Reduce, 主要思想为:将大规模数据处理作业拆分成多个可独立运行的Map任务, 分布到多个处理机上运行, 产生一定量的中间结果, 再通过Reduce任务混洗合并产生最终的输出文件.作为第一代分布式架构, MapReduce已经比较好地考虑了数据存储、调度、通信、容错管理、负载均衡等很多问题, 一定程度上降低了并行程序开发难度, 也为后来的分布式系统的开发打下了很好的基础[3].然而它也存在很多不足:首先, 为了保证较好的可扩展性, MapReduce的任务之间相互独立, 互不干扰, 所造成的后果便是大量的中间结果需要通过网络进行传输, 占用了网络资源, 并且为了保证容错, 所有的中间结果都要存储到磁盘中, 效率不高; 同时, 在MapReduce中只有等待所有的Map任务结束后, Reduce任务才能进行计算, 异步性差, 导致资源利用率低.

近年来, Zaharia等人主导开发了新一代的大数据分布式处理框架Spark[4].Spark以其先进的设计理念, 迅速成为热门课题, 围绕着Spark推出了Spark SQL, Spark Streaming, MLLib和GraphX等组件, 这些组件逐渐形成了大数据处理一站式解决平台.在处理迭代问题以及一些低延迟问题上, Spark性能要高于MapReduce.Spark在MapReduce的基础上做了很多的改进与优化, 使得在处理如机器学习以及图算法等迭代问题时, Spark性能要优于MapReduce.但是作为最经典的分布式架构, MapReduce同样也有自身的可取之处, 在特定的问题环境下, MapReduce还是优于Spark.

本文首先分析与介绍了MapReduce和Spark两种分布式架构, 并在大数据评估指标和不同应用情景两方面对二者进行了比较和总结; 最后, 基于WordCount与PageRank问题从原理上对二者在算法执行时间、文件数目及同步次数等3方面进行了分析和对比, 最后做出总结和展望.

本文第1节介绍MapReduce的背景、具体的模型结构以及作业的调度策略.第2节介绍Spark模型的具体思想以及与MapReduce的区别.第3节基于现有的研究对MapReduce和Spark在大数据评估指标和不同应用情景两方面进行比较和总结.第4节通过WordCount问题比较MapReduce与Spark在处理非迭代问题时各自的优缺点.第5节以PageRank问题为例, 比较MapReduce与Spark在处理迭代问题时性能上的差异.第6节对本文内容进行总结并对大数据分析技术的发展趋势和前景做出展望.

1 MapReduce 1.1 MapReduce背景

在Google, 每天在需要处理大量的原始数据, 比如网页爬取文件、网络日志文件等等.庞大的数据输入量和计算量是一台或者几台机器难以承受的, 只有将任务分配到成百上千台处理机上去并行处理, 才能在合理时间内完成计算.MapReduce思想来源于Lisp函数式语言中的设计思想, 提供了Map和Reduce两种函数来实现并行化操作.MapReduce将分布式系统中如何分布、调度、监控以及容错等逻辑从复杂的细节中抽象出来, 使得程序员不需要太多并发处理或者分布系统的经验, 就可以处理超大的分布式系统的资源.MapReduce是一种简化的分布式编程模型[5], 它可以自动解决输入数据的分布细节, 跨越机器集群的程序执行调度, 处理机器的时效, 并且管理机器之间的通信请求.

1.2 MapReduce模型

MapReduce中的操作只有两种:Map和Reduce, 而这两种操作都是由用户定义的.Map和Reduce的输入与输出数据都是键值对〈key/value〉, 可以用两个公式对它们进行简单的描述.

●   Map (k1, v1)→list(k2, v2);

●   Reduce (k2, list(v2))→list(v2).

在一个问题的计算过程中, Map操作将数据自动地进行分区, 并分布到多台处理机上进行并行处理.Reduce操作会根据中间数据的键值key通过分区函数(如Hash函数)处理并分布到不同处理机上进行相同的计算, 完成一个MapReduce计算过程.图 1为完整的MapReduce计算流程图.

Fig. 1 MapReduce flow chart[2] 图 1 MapReduce流程图[2]

考虑在大量文档集合中统计单词“XindongWu”出现的次数问题, 伪代码如下.

Map(String key, String value)

  //key:文档名

  //value:文档内容

  for each word “XindongWu” in value:

    EmitIntermediate(XindongWu, 1);

Reduce(String key, Iterator values)

  //key:一个单词

  //value:值列表

  int result=0;

  for each v in values:

    result+=ParseInt(v);

  Emit(AsString(result));

在此问题中, Map函数将所有文档进行分割, 分布到多台处理机上进行处理.在并行处理时, 只要遇到单词“XindongWu”便形成一个键值对(XindongWu, 1).Reduce函数将所有处理机上的键值对进行聚合并相加, 最终得到单词“XindongWu”出现的次数.

除词频统计问题外, 还有一些可以用MapReduce解决的问题.

●  分布式Grep(destributed grep):如果特定的行匹配到给定的模式, Map函数会将其传递给Reduce函数. Reduce函数负责将中间数据作为结果输出;

●   URL访问频率统计(count of URL):Map函数首先处理网页请求日志并输出键值对〈URL, 1〉.Reduce函数对相同URL的键值对进行累加并输出为〈URL, total_count〉对;

●  主机词组向量(term-vector per host):在以〈word, frequency〉键值对列表表示的一个或一组文档中, 词组向量是出现文档中最重要的词, 以〈word, frequency〉对列表的形式表示.Map函数处理输入文档并输出〈hostname, term vector〉键值对.Reduce函数对具有相同的host键值对进行规约并去除非常用词, 最终输出〈hostname, term vector〉键值对.

MapReduce编程模型使得任务执行和任务通信之间变得简单且规范, 任务并行化得以实现, 扩展性良好; 且每次Map操作的中间结果会被定期刷写到磁盘上, 不会永久保留在内存中, 这样做一是为了减少内存的消耗, 避免内存溢出带来数据丢失; 二是为了更好地容错, 磁盘的稳定性和易恢复性让MapReduce的容错变得更加可行.但是, 这样的设计也带来了一些缺点.

●  首先, 一些问题并不适合用Map和Reduce操作来完成, 如处理数据结构是图或者网络的问题时, 因为图这样的数据结构中包含着各种隐性的关系:图的边、子树、节点之间的父子关系、权重等, 而这些关系很难被分解为键值对在一次计算中完全表示.同时, 任务依赖关系复杂时, 会因为需要将任务分解以适应该模型而效率下降;

●  其次, 在一个作业中, Reduce任务必须要等待所有的Map任务完成后才能进行计算, 异步性低, 系统资源可能没有得到充分的利用.

1.3 MapReduce作业调度

有些问题可以通过一次Map和Reduce过程解决, 比如词频统计(WordCount)问题.由于单个MapReduce过程能够完成的操作毕竟有限, 有些复杂的问题就需要分解成多个MapReduce过程[6, 7].每个Reduce操作的结果作为下一个Map操作的输入数据, 以此过程来完成对复杂问题的处理[8].

在有多个MapReduce过程的问题中会涉及到作业的依赖关系问题, 一个问题中, 作业的依赖关系有可能为线性; 但在更复杂的情况下, 依赖关系为非线性.在MapReduce提出之初, 考虑到多作业的情况, 采用的是FIFO (fist in fist out)方法.所有的作业被提交到一个队列中, 按照优先级和提交时间的先后顺序扫描整个队列选择一个作业执行.允许执行的作业完全占用集群资源, 等到当前作业执行完毕后再从队列中选择下一个执行的作业. FIFO算法如今已经成为MapReduce默认的调度算法, 但是它忽略了大作业和小作业之间的差异, 通常会导致小作业的长时间等待影响到系统的吞吐率.公平调度算法(fair scheduler)[9]是Facebook公司开发的一种调度算法, 该算法可以在一定时间内让所有的作业平等地共享集群资源.当集群只有一个作业在运行时, 该作业独享整个集群的资源; 当其他作业被提交到集群时, 系统会将空余的任务槽分配给新提交的作业, 使每个作业能够分到近似等量的系统资源.但是这种方法会减少每个作业能够分到的资源, 增加作业的运行时间.文献[10]提出了一种计算能力感知的调度算法(capacity scheduler), 该算法将系统资源分成多个队列, 每个队列中的作业共享该队列的资源.2008年, 美国加州大学伯克利分校的Andy Konwinski等人提出了LATE调度算法(longest approximate time to end)[11].MapReduce应用中一个作业会被划分成若干个任务并行运行, 作业完成时间即最慢任务的完成时间, 为提高作业完成速度, 系统会将速度执行慢的任务在其他计算节点重新并行执行, 该算法充分考虑了系统的异构性并借鉴了其他分布式系统备份执行机制[15].以上几种算法从根本上都是根据作业的提交先后顺序对作业进行调度, 在作业之间的依赖关系为线性时, 这些调度策略不用消耗太多资源在算法调度上, 方便快捷; 如在PageRank问题中, 两次迭代过程被划分为4个作业, 根据作业提交时间依次执行作业1~作业4.如图 2所示.

Fig. 2 Job division of PageRank in two iterations 图 2 PageRank两次迭代作业划分

但是以上调度策略没有考虑数据之间的依赖关系, 仅根据作业的提交时间对作业进行调度, 作业之间完全独立且中间结果需存入磁盘或通过网络进行节点传输, 耗费了大量的系统资源.文献[12]提出了一种基于有向无环图的分布式通用执行引擎, 能够较好地显示每步操作的依赖关系并且映射到物理资源上去.但是这些具体的依赖关系需要用户自行定义, 系统无法自动判断.在复杂问题中也会带来很多麻烦, 并且没有从根本上提高系统的执行速度.

1.4 MapReduce应用与发展

由于MapReduce易操作、可扩展及支持容错等特点, 目前有越来越多的企业和应用都将MapReduce作为大数据处理平台的核心思想, 其中最流行的为Apache研发的HadoopMapReduce平台.Hadoop[13]作为开源的分布式编程计算框架, 其核心由HDFS[14], MapReduce和YARN[15]组成, 其中, HDFS(hadoop distributed file system)是适合搭建在廉价集群上可靠的分布式文件存储与传输系统.HDFS采用master/slave架构, 将集群中所有服务器存储空间连接到一起, 构成了统一的、可以分布式存储海量非结构化数据的空间, 具有成本低廉、可靠性强、吞吐量高等特点.YARN是自Hadoop 2.0后提供的新型资源管理器.YARN将JobTracker的系统资源管理调度和单个应用监控跟踪功能分离成两个独立的进程ResourceManager和ApplicationMaster, 更好地支持分布式编程计算框架.

随着以MapReduce为编程模型的应用越来越多, 关于MapReduce模型的研究也逐渐增多, 其中, 主要研究方向集中在以下几方面.

●   MapReduce作业数据放置问题:在MapReduce中, 数据块放置的方式都是采取随机放置, 这种放置方式实现简单, 但是可能会因为有关联的数据块放置不均匀, 导致MapReduce执行效率不高的问题.文献[16]通过构建历史数据访问图得出最优数据放置策略; 文献[17]根据被访问次数越多则被访问概率越大的原理提出了一种基于文件分组放置的方法;

●  异构环境下提高MapReduce性能问题:在大规模异构环境下, 不同计算节点的性能差异会影响整个系统的计算效率.文献[18]提出了自适应的任务调度策略SAMR(self-adaptive MapReduce), 自动寻找执行较慢的节点并进行备份; 文献[19]通过拆分较慢节点中的任务并分配给快节点执行的动态负载均衡策略SkewTune解决异构问题; 文献[20]提出了多种优化方法, 保证为MapReduce任务分得适当的资源;

●   MapReduce处理迭代问题性能不高问题:由于MapReduce每个作业中只包含一对Map-Reduce任务及中间数据必须存入磁盘的特性, MapReduce在计算迭代问题时性能会显著降低.为此, 在MapReduce基础上产生了很多针对迭代算法改进的类MapReduce框架, 其中代表性的有Twister[21], Haloop[22]和iMapReduce[23]等.

2 Spark 2.1 Spark背景

Spark是一种基于内存的开源计算框架, 2009年诞生于美国加州大学伯克利分校AMPLab, 在2010年正式开源, 并于2013年成为了Apache基金项目, 到2014年便成为Apache基金的顶级项目.自发布以来, Spark已经被Yahoo, eBay和Netflix等多家公司在8 000多个节点的集群上处理了PB级的数据.

在Spark中, 核心抽象概念就是弹性分布式数据集RDD(resilient distributed datasets)[24], 该抽象是分布在集群上的只读型可恢复数据集.用户可以利用Spark中的转换(transformation)和动作(action)对其进行操作, 也可以长期保存在内存中不被回收, 这样, 再次使用这部分内容时, 不需要再次创建此RDD.这也是Spark在迭代问题中的性能表现要高于MapReduce的原因.RDD通过一种血统(lineage)关系来完成容错:如果一个RDD丢失, 那么这个丢失的RDD有充足的信息知道自己是如何从其他RDD转换而来的, 这样便可以通过再次计算得到丢失的RDD.Spark是由Scala语言实现的, 而Scala是一种基于JVM的函数式编程语言, 提供了类似DryadLINQ[25]的编程接口.同时, Spark还通过修改版的Scala解释器提供交互式编程, 用户可以自由定义集群中的RDD、函数、变量以及类.

Spark在处理迭代和低延迟问题时, 能够在保证稳定性和容错性的同时提高系统的运行速度[26], 适用场景包括:

●  迭代式算法:很多迭代式算法都会对相同的数据进行重复计算从而得到最优解.MapReduce计算框架把每次迭代划分成一个或多个MapReduce作业(job), 而每次迭代都要从磁盘重新加载数据, 导致系统效率不高; 而Spark可以把需要重复计算的数据缓存到内存中加快计算效率;

●  交互式数据分析:用户经常会用SQL对大数据集合做临时查询(ad-hoc query).Hive把每次查询都当作一个独立的MapReduce作业, 并且从磁盘加载数据, 有很大的延迟; 而Spark可以把数据加载到内存中, 然后重复的查询;

●  流应用:即需要实时处理的应用, 这类应用往往需要低延迟、高效率.

2.2 弹性分布式数据集(resilient distributed datasets, 简称RDD)

RDD是一种分布在集群中的只读型对象集合, Spark将创建RDD的一系列转换记录下来, 如果RDD的某个分区或者部分数据丢失, 可以根据其父辈RDD重建来进行容错, 这种策略称为血统(lineage).RDD的优点有:

●   RDD只能从外部存储或转换(transformation)操作产生, 相比于分布式共享内存(DSM), 可以更高效地实现容错[27], 对于丢失数据, 只需根据血统就可重新计算出来, 而不需要设置特定的检查点(checkpoint)[28];

●   RDD的只读不变性可以实现类MapReduce的预测式执行[3];

●   RDD基于数据的本地性的任务划分调度策略提高了系统性能;

●   RDD是可序列化的, 当内存不足时, 可自动把RDD存储于磁盘上.

创建一个新的RDD只能通过以下4种方法.

●  从内存集或者外部存储中读取数据创建一个初始RDD;

●  通过并行化Scala集合, 即, 把一个集合切分成很多份并发送到各节点;

●  通过把RDD持久化.默认状态下, RDD是惰性的, 只有在遇到行动(action)操作时, RDD才被物化, 执行完后即被释放.用户可以通过缓存(cache)或保存(save)操作使RDD持久化;

●  对其他RDD进行转换(transformation), 得到一个新的RDD.在Spark中, 不同于MapReduce只定义了Map和Reduce两种操作, Spark定义了大量的操作供用户选择.而这些操作又分为两类:转换和行动, 表 1展示了部分转换和行动操作及其含义.

Table 1 Some transformations and actions of RDD in Spark[24] 表 1 Spark中RDD的部分转换和行动操作[24]

转换和行动都是针对RDD的一系列操作.转换操作是将一个RDD转换成一个新的RDD; 而行动操作则是对RDD进行计算, 然后将结果返回驱动程序.RDD并不总需要被物化或者进行实际的操作, 这是RDD最为重要的一个特性之一.RDD中只包含一些必须的信息:其对应的数据存放位置、该RDD的父辈RDD以及该RDD是如何从其父辈RDD转换而来的.在实际运算中, 所有的转换操作都是惰性的, 不会立刻执行, Spark会记录下对RDD的全部转换操作, 直到有行动操作要求返回结果时才会执行记录的全部操作.这种策略被称为懒惰(lazy).也就是说Spark中的转换操作并不是实时的, 它需要一个触发因子, 这个触发因子就是行动.不同于MapReduce, Spark中会最大化地利用集群中有限的资源.例如, Map操作的结果往往是Reduce的输入.事实上, 我们并不关心Map的结果, 而是关心Reduce的返回值.懒惰策略减少了不必要的磁盘I/O以及返回.

以统计单词“XindongWu”出现的次数问题来解释RDD的概念, 伪代码如下.

val WordCountResult=

  sc.textFile(“Introduction”, 4)

  //从“Introduction”文档读取内容并分割

    .flatMap(lineline.split(“·”))

    //拆分数据, 以空格为拆分条件

      .map(“XindongWu”⇒(“XindongWu”, 1))

      //将单个单词转化成键值对形式

        .reduceByKey(_+_)

        //统计“XindongWu”出现次数

WordCountResult.saveAsTextFile(“wordcount_result”)

//输出结果

首先, 由外部文件读取数据创建一个初始RDD——MappedRDD; 然后, 由第1个转换操作flatMap得到FlatMappedRDD; 再由转换操作map形成MappedRDD; 最后, 通过转换操作reduceByKey得到ShuffledRDD.至此, 所有转换操作不会被实例化, 每一个RDD只会记录下其父RDD及相应的转换操作, 当出现行动操作saveAsTextFile时, 之前记录下的所有转换操作才会被真实地执行.在此例中, 共有3次转换操作和1次行动操作并产生了4个RDD.具体流程图如图 3所示.

Fig. 3 Counting process for word 'XindongWu' 图 3 统计单词“XindongWu”流程图

2.3 RDD之间的依赖关系

在MapReduce中, 不同作业中数据块的依赖关系是不同的, 可能是一对一的依赖关系, 也有可能是一对多或者其他更复杂的情况.在Spark当中也存在这个问题, 为了让系统运行更加高效, Spark将RDD的依赖关系做了具体的分类, 即, 窄依赖(narrow dependency)和宽依赖(wide dependency).窄依赖是指每一个RDD分区能且只能被一个子RDD分区所使用; 相反地, 宽依赖则指一个RDD分区可以被至少两个子RDD分区所使用.

图 4所示, 每一个阴影矩形代表一个RDD分区, 由多个RDD分区组成一个完整的RDD.左边3类依赖关系属于窄依赖, 右边两种属于宽依赖.这样划分的原因有两个.

Fig. 4 RDD dependency[24] 图 4 RDD依赖关系[24]

(1) 在窄依赖中, 因为每一个父RDD分区只能被一个子RDD分区所使用, 所以父辈RDD分区中的数据全部是与对应的子RDD分区相关的, 不存在冗余数据; 相反, 在宽依赖中, 因为一个父RDD分区会被多个子RDD分区所使用, 所以父RDD分区中所存储的数据除了和丢失的子RDD分区相关外, 还会有部分数据是用来计算出其他的子RDD分区.所以在宽依赖中, 为了更好进行容错, 中间结果一般需要存入磁盘;

(2) 从定义和图 4中可以看出:在窄依赖中每一个父RDD分区计算完成后可以直接将计算结果传给其对应的子RDD分区, 无需等待其他RDD分区的计算完成; 而在宽依赖关系中, 因为一个子RDD分区需要用到多个父RDD分区的计算结果, 所以必须等待所有的RDD分区计算完成后才能进行子RDD的计算.这种无需等待的运算过程叫做管道化操作(pipeline).

2.4 Spark调度策略

MapReduce中, 将一个Map操作和Reduce操作称做一个作业(job), 且每次作业的中间结果必须进行磁盘存取操作.在Spark中, 对作业的定义则是:通过特定的行动触发所生成的步骤的集合称为一个job.只有在遇到行动时才会触发之前所有转换的实际操作, 而一个作业是指这一系列转换操作和最后的行动操作所组成的集合.

在一个job中, Spark会根据宽窄依赖关系划分出不同的阶段(stage), 然后再根据RDD自身保存的依赖关系形成一个有向无环图(DAG)进行调度.概括来说, 阶段内部的RDD全都为窄依赖关系, 阶段之间为宽依赖关系.在进行调度时, Spark规定每一个阶段如果没有所依赖的父阶段, 可以直接进行计算; 否则必须等待父阶段计算完成后再进行计算.

2.5 Spark应用与发展

为了让Spark能够适应更多的应用场合, 开发者以Spark为基础设计了不同的组件, 包括Spark SQL[29], Spark Streaming[30], Machine Learning(MLlib)[31]以及GraphX[32, 33], 其中:Spark SQL是结构化数据查询模型, 即, 支持分布式类SQL语句的数据管理平台; Spark Streaming通过将输入数据进行切分处理方式, 利用离散流数据(discretized steams)模型实现增量流数据计算.Mllib是Spark的机器学习库, 可以支持超过50种常见机器学习算法的分布式训练模型, 如决策树模型、LDA文档主题生成模型(latent dirichlet allocation)和交替最小二乘矩阵分解模型(alternating least squares matrix factorization)的常见分布式算法都可以利用Mllib完成; 而GraphX提供类似Pregel[34]和GraphLab[35]的图计算接口, 通过将图数据切分成RDD进行分布式计算.正是因为拥有众多的组件和完善的生态圈, Spark得以在如大规模垃圾邮件检测(large-scale spam detection)[36]、图像处理(image processing)[37]和基因数据处理(genomic data processing)[38]等众多研究问题和领域中得到广泛应用.

3 相关工作

目前, 在大数据领域已经有很多关于MapReduce与Spark比较的研究工作, 现有比较研究工作主要集中在针对不同的大数据评估指标进行对比以及在各种应用情景中的适用性比较.笔者将现有的比较研究内容做出分析与总结, 并针对不同情形下两种分布式模型的选择给出合理的建议.

3.1 大数据评估指标下的对比

(1) 运行时间

运行时间是在系统中运行相应的算法所耗费的总时长.在MapReduce与Spark架构上运行不同的算法, 其运行时间差异也较大.因为Spark是轻量级的、基于内存计算的开源的集群计算平台, 在默认情况下, Spark中的数据都会基于内存进行存储; 而MapReduce每一步Map或Reduce操作的结果都需要存入磁盘.所以相对而言, 在一般情形中, 基于内存存储的Spark运行速度较快.

文献[39, 40]在MapReduce与Spark平台上运行了PageRank, K-means和Grep等算法并做了相应的实验对比, 得出Spark在基于内存计算时效率要高于MapReduce.但是过多的内存需求也会带来相应的问题, 如在内存资源紧缺或者数据规模较大的情况下, Spark的性能会出现大幅下降甚至无法完成计算[41].

(2) 资源消耗

在大数据领域中, 不同的算法对计算机的性能要求不尽相同, 如数据的排序与查询需要多次访问数据进行I/O密集型任务, 而K-means, PageRank等算法需要大量的迭代计算, 属于计算密集型任务.在不同问题中, 磁盘读写速度、CPU速度和网络带宽都可能是限制算法性能的瓶颈[42].当解决同一问题时, MapReduce在内存、网络以及磁盘的占用率上都要低于Spark.更低的资源消耗不仅可以降低系统对硬件的要求, 也保证MapReduce可以在同一集群中与其他应用协作计算互不干扰[43].

(3) 容错性能

容错技术会占用计算机资源, 在一定程度上影响算法性能; 但另一方面, 算法在具有良好容错机制的平台上执行时, 能够快速从故障中恢复而不会导致失败.大数据处理平台的容错机制与算法性能息息相关, 合理的容错机制是算法执行的保证[44].在MapReduce中, 每一步操作的结果都会被存入磁盘, 虽然需要大量的磁盘I/O操作, 但是在计算出现错误时可以很好地从磁盘进行恢复; 而Spark所有计算都基于内存存储, 因为内存中的数据会不定期进行清理, 所以当某一步计算出现数据丢失时, Spark需要根据RDD中的信息进行数据的重新计算, 会耗费一定的资源.文献[45]比较了在MapReduce与Spark上运行Sort和K-means算法时的容错情况, 无论是在单节点还是多节点情况下, MapReduce的容错率都要优于Spark.

3.2 不同应用情景下的对比

在大数据领域中, 当遇到不同问题时, 用户应该根据自身问题的需要选择合适的分布式模型.以下我们将通过对不同情况进行具体分析并给出相应的建议.

(1) 问题类型

从问题角度出发, 根据MapReduce/Spark的特性, 我们将常见的问题做以下划分.

●  批处理问题与交互式问题

批处理问题通常指针对静态离线的数据进行处理, 在处理过程中, 用户需求及数据不会产生变化的问题, 如WordCount、倒排索引和URL访问频率统计等问题都属于这一范畴; 而交互式问题则要求在处理问题过程中用户可以和系统进行交互式访问并及时响应.针对批处理问题, 通常可以选择MapReduce模型进行处理, Spark则更多地应用在交互式问题上.因为MapReduce作业划分策略以及中间结果必须存入磁盘等特性, 无法在很短的时间内对交互访问进行响应; 而Spark的任务调度策略减少了大量不必要的磁盘操作, 可以较好地进行交互式操作.

●  迭代问题与非迭代问题

迭代问题与非迭代问题的主要区别在于迭代问题需要对某部分数据进行重复的操作以得到最终的结果, 如在PageRank问题中每次迭代都需要对Link和Rank数据进行相同的操作.由于MapReduce处理迭代问题时每次迭代过程都需要磁盘读取操作, 效率较低; 而Spark每次迭代结果无需存入磁盘, 并且允许用户将常用数据存入内存, 使得Spark在处理迭代问题时效率要高于MapReduce.

(2) 数据类型

从数据角度出发, 我们选取了以下几种数据类型进行说明.

●  键值对数据

在MapReduce模型中, 所有的数据都被转化为键值对类型进行分布式处理, 如WordCount算法中, 每个单词都被转化为〈key, value〉形式, 所有的Map和Reduce操作都是建立在键值对形式的基础上.如果待处理数据能很好地以键值对形式进行表现和处理, 那么MapReduce可以较好地胜任这类问题.

●  图数据

在社交网络和商品推荐场景中, 数据通常是以图的形式表现出来, 数据内部关联度可能较高, 如社交网络不同用户之间的相关性.在对这样的数据进行处理时, 如果转化成键值对形式会引入大量的聚集(aggregation)和连接(join)操作, 带来大量的计算和数据迁移, 导致效率不高.针对此类问题, Gonzalez等人于2013年构建了基于Spark的高效图计算模型GraphX.GraphX利用Spark框架提供的内存缓存RDD、任务调度策略以及基于依赖关系的容错等特性, 实现了高效健壮的图计算框架.Spark GraphX定义了图的数据模型以及一系列针对图的并行计算操作, 使得图数据可以方便快速地进行存储与计算.

●  流数据

在一些特定的场景中(如实时用户推荐、用户行为分析)对实时性要求很高, 要求系统能够快速地处理实时的数据流, 通常每次数据处理的时间间隔在数百毫秒到数秒之间.显然, MapReduce基于磁盘的计算模型不能满足这种实时的需求.而基于Spark的流式框架Spark Streaming则是专门用来处理此类问题.其原理是:将流数据分割成数据片段封装到RDD中, 然后以类似批处理的方式对这些数据片段进行操作.Spark Streaming在本质上仍然是一种批处理方式, 但由于Spark可以基于内存达到较快的速度从而获得准实时的特性.

(3) 数据规模

在MapReduce中, 一个完整的问题会被划分为一个或多个作业, 每个作业运行时只会占用少量内存, 且运行完成后会将结果写入磁盘并释放内存中的数据.而Spark在运行过程中会产生大量的RDD, 所有RDD都会默认先存入内存中.由于RDD只读不可修改的特性, 随着计算的进行不断会有新的RDD生成并存入内存, 所以在处理相同问题时, Spark内存的需求量会远远大于MapReduce.并且, Spark的内存需求会随着问题的数据规模增大而增加, 甚至在数据规模过大时, Spark会无法正常完成运行过程, 但是MapReduce可以很好地应对这种问题[41].所以在数据规模较大时, 考虑到系统的稳定性以及内存消耗问题, 应该选择对内存需求更小的MapReduce框架; 而在处理轻量级数据时, Spark可以根据其内存运算的优势达到更好的效果.MapReduce与Spark在不同情境下的对比与选择见表 2.

Table 2 A comparison of MapReduce with Spark in different cases 表 2 不同情况下MapReduce与Spark比较

以上针对MapReduce与Spark的比较研究主要集中在实验性能对比方面, 但是具体的实验对比结果只适用于其特定的参数配置, 不具有普遍性, 并且缺少对实验结果进行相应的原理分析.后文我们将以WordCount和PageRank算法为例对MapReduce与Spark进行原理分析和比较.

4 WordCount问题的分布式处理 4.1 问题描述

WordCount问题是分布式算法中最为经典的问题之一, 其基本要求是, 从给定文档中统计出每个单词出现的次数.在MapReduce[2]与Spark[4]的官方文献中也都提到了处理WordCount问题的基本思路.我们将通过WordCount问题分为以下3点比较MapReduce与Spark各自的优缺点.

●  算法执行时间:算法执行时间是算法最主要的影响因素之一, 直接关系到算法的性能和效率;

●  文件数目:在分布式系统中, 中间数据都是通过文件的形式进行传输, 在总数据量相同的情况下, 文件数目越多, 文件系统的负担越大, 并且随机磁盘IO也会严重影响程序的执行效率;

●  同步次数:同步模型要求所有节点完成当前阶段后才可以开始下一阶段, 这严重限制了计算性能.但同步是数据规约和汇总的前提, 也是下一个阶段的开始时机.异步模型则可以使各个节点之间独立运行, 加快了算法的执行速度.即:在算法执行过程中, 同步次数越少, 越有利于提高算法的性能和效率.

假设给定文档为“CABDCABCDABCCACDBA”, 为了方便表示, 其中“A, B, C, D”分别代表不同的单词, 要求从该文档中统计出4个单词出现的次数.

4.2 MapReduce处理WordCount问题

在第2.2节中, 我们已经简单描述了MapReduce处理问题的大致步骤, 但是具体如何进行任务的切分与重组、数据如何在各处理机之间计算与传递是我们所关心的.

图 5为官方描述的一个MapReduce作业的流程图, 通过分析该流程图, 可以找出影响MapReduce作业完成的主要因素.

Fig. 5 Implementation process of MapReduce 图 5 MapReduce的执行过程

MapReduce作业的执行过程主要分为Map阶段、中间结果排序与传递阶段和Reduce阶段.即, MapReduce作业执行时间受读取输入文件时间Tread、中间数据排序时间Tsort、中间数据传输时间Ttrans和写输出文件到HDFS时间Twrite的影响.通过以上分析, MapReduce的算法执行时间TMR可以表示为

TMR=Tread+Tsort+Ttrans+Twrite.

假设输入数据大小表示为|fin|, 输出数据大小表示为|fout|, 远程文件读写速度为Cr, Tread可以表示为

Tread=|finCr.

同理, Twrite可以表示为

Twrite=|foutCr.

在MapReduce与Spark针对WordCount问题的比较过程中, 其输入/输出数据是相同的, 也就是说, 两者的|fin|与|fout|相同; 且我们比较的是MapReduce与Spark运行机制以及调度策略不同所导致的时间快慢, 所以不考虑网络传输速度以及文件读写速度所带来的误差, 即:默认为在MapReduce与Spark中, Cr的值相同.所以TreadTwrite在两种算法中的值近似相同, 对两者算法执行时间的差异不会产生影响, 我们主要关注TsortTtrans的比较.

在此例中, MapReduce的执行过程为:

(1) 首先将数据分块, 并转换为键值对形式;

(2) 每个Map任务根据key值对数据进行分区, 即, 同一个分区的数据将会被发送到同一个Reduce节点上;

(3) 对每一个分区进行排序, 同时将可以合并的数据进行合并(如两个(A, 1)将会被合并成(A, 2))

(4) 每一个Reduce从各个Map节点通过网络传输复制相应的数据;

(5) 将不同Map节点的数据进行归并.

具体流程如图 6所示.

Fig. 6 MapReduce flow chart for WordCount 图 6 MapReduce处理WordCount问题流程图

4.3 Spark处理WordCount问题

Spark处理WordCount问题的具体流程见图 7, 过程如下.

Fig. 7 Spark flow chart for WordCount 图 7 Spark处理WordCount问题流程图

(1) 第1次遍历:生成RDD及记录依赖关系

●  从外部文件中读取数据生成第1个RDD(ParallelCollectionRDD), 分区数为3;

●  将每一个分区再进行哈希分块, 分块数对应阶段2中的任务(task)数目, 每一个分块单独形成一个文件进行数据传输;

●  将中间数据文件进行shuffle形成ShuffledRDD, 并在ShuffledRDD中记录下其父RDD以及依赖关系;

●  通过对ShuffledRDD进行转换操作生成MapPartitionsRDD, 并在MapPartitionsRDD中记录下其父RDD以及依赖关系;

●  因为在MapPartitionsRDD后为行动操作, 则本作业中所有RDD生成完毕, 第1次遍历结束.

(2) 第2次遍历:划分阶段与任务

●  根据第1次遍历每一个RDD中记录的父RDD信息及依赖关系, 从MapPartitionsRDD开始向前进行第2次遍历;

●  首先进行阶段划分, 若向前遍历时遇到窄依赖关系, 则将依赖的RDD加入到当前阶段; 若遇到宽依赖关系, 则形成一个新的阶段继续向前遍历.在本例中, ParallelCollectionRDD与ShuffledRDD为宽依赖关系, ShuffledRDD与MapPartitionsRDD为窄依赖关系, 所以会生成两个阶段, 阶段1中包括Parallel CollectionRDD, 阶段2中包括ShuffledRDD和MapPartitionsRDD;

●  在每一个阶段内部进行任务划分, 任务数目为阶段内部RDD的分区数, 在阶段1中有3个任务, 阶段2中有2个任务;

(3) 第3次遍历:执行任务

根据第2次遍历划分的阶段与任务进行作业的具体执行, 首先执行阶段1中的任务, 执行完毕后再执行阶段2中的任务, 输出数据.

4.4 WordCount问题分布式处理的比较与总结

在分析完MapReduce与Spark处理WordCount问题的流程后, 我们从算法执行时间、同步次数和文件数目这3个方面进行MapReduce与Spark的比较, 并进行总结.

●  算法执行时间

为了分析算法执行时间, 我们需要讨论两者的中间数据排序时间Tsort与中间数据传输时间Ttrans.在分析两者的工作流程时可以看到:排序操作只有在MapReduce中存在, 在Spark中并没有要求对中间数据进行排序操作.而MapReduce中规定每次shuffle必须对中间结果进行排序, 主要是为了可以将中间结果进行初步的归并操作, 使得需要传输的数据量减少, 降低网络传输压力; 并且可以保证每一个Map任务只输出一个有序的中间数据文件, 减少文件数目.

在MapReduce中, 在Map阶段对每一分区的数据进行排序, 在Reduce阶段对不同Map任务的输出数据进行归并.共有m个Map任务, 平均每个Map任务有M条数据, 平均每个Reduce任务有R条数据.可以得到Tsort-MR:

Tsort-MR=M×logM+R=O(MlogM).

针对此例, 在3个Map任务中, 第3个Map任务进行比较次数最多, 共做了4次比较(快速排序).而在Spark中因为没有中间数据排序过程, 所以,

Tsort-Spark=0.

再来讨论两者中间数据传输时间Ttrans, 中间数据传输是指由Map任务的执行节点发送到Reduce任务的执行节点的数据, 所以Ttrans由Map任务输出的中间数据大小|D|和网络文件传输速度Ct决定, 即:

Ttrans=Ct×|D|.

在不考虑网络传输速度带来性能差异的情况下, 默认在MapReduce与Spark中Ct大小相等, 则Ttrans与|D|成正比.在此例中, MapReduce中间数据共有12条键值对数据, 而Spark则有18条.Spark中的|D|要大于MapReduce, 则相应的Spark的Ttrans也要大于MapReduce.

●  文件数目

在分布式系统中, 中间数据是以文件的形式进行存储的.文件数目过多, 会严重占用内存并影响磁盘的IO性能, 所以在分布式系统中, 应当尽量减少中间数据文件的数量.

对于MapReduce来说, 每一个Map只会产生一个中间数据文件, 不同分区的数据都会存在一个文件中, 之所以可以做到这样, 是因为MapReduce的排序操作使得分区内数据有序, 不同的分区数据只需要通过增加一个偏移量便可以区分.所以在MapReduce中, 文件数目等于Map任务数量m; 而在Spark中, 因为没有对数据进行预排序, 所以只能将不同分区的数据放在不同的文件中, 则每一个Map任务都会生成r个文件, 其中, m为Reduce任务数量.则Spark总文件数目等于m×r.

此例中, MapReduce的文件数量为3, 而spark的文件数量为6.

●  同步次数

同步模型要求所有节点完成当前阶段后才可以进行下一阶段, 这严重限制了计算性能.在MapReduce中, 所有的步骤都严格遵守同步模型, 即, Reduce操作要在所有的Map操作结束后进行;

异步模型则可以使多个节点形成单独的流水线独立运行, 大大加速了算法的执行.Spark考虑到了这一点, 它规定在无需进行网络传输的时候, 有窄依赖关系的节点形成流水线独立运行; 在需要网络传输的时候, 遵守同步模型.但是这种策略所带来的缺点是额外的遍历次数.

在此例中, 因为只存在一次Map与Reduce操作, 只有一次shuffle, 所以MapReduce与Spark在同步次数上相同, 皆为一次.但是Spark因为要建立操作之间的依赖关系, 所以需要付出额外的代价, 即, 比MapReduce要多遍历两次.

通过表 3可以看到:在处理WordCount问题时, MapReduce在排序上消耗了更多的时间, 而在中间数据传输、文件数目以及同步次数上效率都要优于Spark.

Table 3 Performance comparison between MapReduce and Spark with WordCount 表 3 MapReduce与Spark处理WordCount问题之性能比较

5 PageRank问题的分布式处理 5.1 问题描述

迭代问题一直是大数据领域中最为重要的问题之一, 如常见的PageRank算法以及k-Means算法, 通常是将初始数据经过很多次的重复操作逼近所需的目标结果.如何在迭代问题中通过有效的数据存储优化达到加快系统效率的目的, 是大数据问题中不可忽略的一项内容.PageRank是由谷歌的创始人Larry Page发明, 用来衡量网站页面的重要性[46].其基本思想来源于参考文献的引用网络, 被引用越多的文献往往更加重要和权威. PageRank基本思想基于两个前提.

(1) 一个网页在两种情况下可能是重要的网页:被很多网页所引用或者被一个或几个重要网页所引用.每一个网页的重要性被平均传递给其指向的网页, 每个网页的重要性会被具体量化成一个权重(Rank);

(2) 假定每次开始时都会随机地访问页面, 往后会根据网页的链接访问其他网页, 访问下一个网站页面的概率是该网站页面的Rank值.而这也基本符合人们在日常生活中浏览网页的习惯.

基本的PageRank算法需要两个重要的数据结构:Rank(每一个网页的权重, 初始值唯一)和Link(每个页面之间的指向关系).PageRank具体的算法可以描述如下.

(1) 将每一个网页的Rank值设为1;

(2) 对于每一次迭代, 将每一个网页的权重贡献(Rank/指向的网页数量)发送给指向的网页;

(3) 对于每一个网页, 将收到的权重贡献相加得到contribs, 重新计算每一个网页的Rank=(1-p)+ p*contribs(其中, p为跳跃因子, 可以根据具体情况设为[0, 1]中的任意值);

(4) 循环操作第2步与第3步, 直到每个网页的Rank值趋于收敛, 结束.

为了可以有更直观的理解, 图 8举出了一个具体的例子, 并且动态描述了10次迭代过程.在此例中, 跳跃因子p的值取0.8, 所有结果保留3位有效数字.

Fig. 8 Ten iterations of PageRank 图 8 PageRank的10次迭代

通过计算, 此例中网页的权值是收敛的, 经过10次迭代后的权值误差可以缩小到0.1.第10次迭代计算后, 网页P1~P5的权值依次为(0.52, 1.54, 0.78, 1.39, 0.78).

与WordCount问题类似, 我们同样通过算法执行时间、文件数目和同步次数这3个方面来比较MapReduce与Spark在处理PageRank问题时性能的差异.因为PageRank是迭代问题, 为了更好地分析迭代次数的变化给两种算法性能带来的影响, 将分别讨论一次迭代、两次迭代和10次迭代情况下, MapReduce与Spark在执行时间、文件数目和同步次数这3个方面的性能差异.

5.2 MapReduce处理PageRank问题

PageRank问题在MapReduce中的操作步骤要比图 8复杂很多, 如何根据每个网页的Rank和Link数据计算出每个网页的最终Rank并且分布到多台处理机上去执行, 是MapReduce所关心的.在图 9中, 我们展示了PageRank在MapReduce中的一次迭代过程, 在此例中, 我们每一步操作都假设分布到3台处理机上去操作, 其他情况下可以根据具体需要进行变更.具体算法如下[41].

●   Job1:通过一次Map和Reduce操作, 将每个网页的Rank和Link数据进行合并, 因为是第1次迭代, 所以网页的初始Rank值设为1;

●   Job2:根据Job1的合并结果进行一次Map和Reduce操作, 计算出每一个网页接收的权重贡献, 并加入跳跃因子进行计算, 得到每个网页一次迭代后的新Rank值.

图 9可以看到, 在MapReduce一次迭代过程中, 需要注意以下几点.

Fig. 9 MapReduce for PageRank with one iteration 图 9 MapReduce处理PageRank问题一次迭代

●  与WordCount类似, MapReduce中每个Map任务都需要在输出数据前对中间数据进行排序, 使其分区内有序;

●  每一个Map节点与Reduce节点单独地作为一个任务, 不同任务之间独立计算互不干扰, 每一个任务的计算结果都要存入磁盘进行数据传输(图 9中所有阴影部分表示需要存入磁盘的数据);

●  为了保证系统的可靠性和可扩展性, MapReduce要求所有的Reduce需要在所有Map任务结束后才可以进行计算; 并且只有在作业1所有任务完成后, 作业2才能开始运行任务.

MapReduce中, PageRank的两次迭代过程被划分为4个作业进行计算, 共有23个任务、4次shuffle操作.图 10为MapReduce处理PageRank问题两次迭代的流程图.

Fig. 10 MapReduce for PageRank with two iterations 图 10 MapReduce处理PageRank问题两次迭代

5.3 Spark处理PageRank问题

Spark在处理迭代问题时, 相较于MapReduce做了很多改进:首先, 在内存足够的情况下, Spark允许用户将常用的数据缓存到内存中, 加快了系统的运行速度; 其次, Spark对数据之间的依赖关系有了明确的划分, 根据宽依赖与窄依赖关系进行任务的调度, 可以实现管道化操作, 使系统的灵活性得以提高.

图 11图 12分别是Spark处理PageRank问题的一次迭代与两次迭代的流程图.

Fig. 11 Spark for PageRank with one iteration 图 11 Spark处理PageRank问题一次迭代

Fig. 12 Spark for PageRank with two iterations 图 12 Spark处理PageRank问题两次迭代

通过图 11可以看到:在一次迭代过程中, Spark需要5次转换(transformation)操作, 共产生5个RDD, 与MapReduce一样需要进行两次shuffle操作, 被划分为3个阶段.

与MapReduce不同的是, 在图 12的两次迭代中, Spark可以将具有窄依赖关系的RDD分区分配到一个任务中进行管道化操作, 任务内部数据无需通过网络传输且任务之间互不干扰.如在图 12的阶段3中, 所有分区节点根据依赖关系被分为3个任务(分别标注为红色、绿色和蓝色).在PageRank问题的两次迭代计算过程中, 作业被划分为4个阶段, 共有11个任务、3次shuffle操作, Spark相较于MapReduce有效地减少了任务之间的等待时间以及中间数据传输数量.

5.4 PageRank问题分布式处理的比较与总结

与WordCount问题类似, 我们将分别在一次迭代、两次迭代和10次迭代情况下, 通过算法执行时间、文件数目和同步次数对MapReduce与Spark进行比较.

●  算法执行时间

首先讨论中间数据排序时间Tsort, 从图 9中可以看出:MapReduce一次迭代需要两次Map-Reduce操作, 第1次Map-Reduce操作中, 每个Map都需要进行两次比较; 而第2次Map-Reduce操作中, 第1个Map任务进行的比较次数最多, 为两次.则在一次迭代中, MapReduce在中间数据排序上要进行4次比较, 在两次迭代中, MapReduce, 需要进行8次比较, 在10次迭代中, MapReduce需要进行40次比较.而Spark在shuffle时不要求中间数据有序, 所以其比较次数为0.

再讨论两者中间数据传输时间Ttrans, 在WordCount问题中, MapReduce需要传输的中间数据量要小于Spark; 但在迭代问题中, MapReduce的优势将不再明显.其原因是:Spark根据依赖关系采用的任务调度策略, 使得shuffle次数相较于MapReduce有了显著降低.

在MapReduce一次迭代过程中, 需要进行两次数据传输, 数据量分别为:|D1|=10, |D2|=7, Ttrans-MR=Ct×17;在两次迭代中, MapReduce需要进行4次数据传输, |D1|=10, |D2|=7, |D3|=10, |D4|=7, Ttrans-MR=Ct×34;在10次迭代中, Ttrans-MR=Ct×170.Spark一次迭代过程中与MapReduce相同, 需要进行两次传输:|D1|=10, |D2|=7, Ttrans-Spark=Ct×17;在两次迭代中, 需要进行3次数据传输, |D1|=10, |D2|=7, |D3|=7, Ttrans-Spark=Ct×24;而在10次迭代过程中, 共进行了11次数据传输, Ttrans-Spark=Ct×80.

●  文件数目

同WordCount类似, 在需要进行网络传输的时候, MapReduce的文件数量是m, 而Spark的文件数量是m×r, 即, 单次shuffle过程的文件数量MapReduce要少于Spark.但从整个作业角度来看, Spark的shuffle次数要少于MapReduce, 所以宏观上说, 在迭代问题中, MapReduce文件数量上的优势不如WordCount中那么明显.就此例而言, 一次迭代过程中, MapReduce总文件数为2+3=5, Spark总文件数为6+9=15;两次迭代过程中, MapReduce总文件数为2+3+3+3=11, Spark总文件数为6+9+9=24;10次迭代过程中, MapReduce总文件数为5+6×9=59, Spark总文件数为15+9×9=96.

●  同步次数

之前已经描述了同步模型与异步模型之间的区别, 在算法执行过程中, 同步次数越少、所占比例越小, 越有利于算法的局部性能.虽然伴随着算法整体步骤的增加(如Spark中更多的遍历次数), 但是总的来说会使算法性能得到有效的提升.在一次迭代过程中, MapReduce与Spark同步次数皆为2次; 在两次迭代过程中, MapReduce中4次Map-Reduce操作全部遵守同步模型, 同步次数为4次(即, 每次都要等待上一步所有任务都执行完毕才会执行下一步操作), Spark将两次迭代过程划分成4个阶段, 每个阶段内遵守异步模型, 阶段间为同步操作, 所以同步次数为3次; 在10次迭代过程中, MapReduce共有20次同步操作, 而Spark只有11次.

通过表 4可以看到:在一次迭代过程中, MapReduce与Spark在性能上并没有很大的差别; 但是随着迭代次数的增加, 两者的差距逐渐显现出来.

Table 4 Performance comparison between MapReduce and Spark with PageRank 表 4 MapReduce与Spark处理PageRank问题之性能比较

●  在算法执行时间方面, Spark性能要好于MapReduce.与WordCount问题不同, Spark在PageRank问题中的中间数据传输数量要低于MapReduce;

●  在同步次数方面, Spark同步次数同样低于MapReduce.

这些优势主要是因为Spark根据数据间的依赖关系对任务进行了更为合理的划分, 有效地减少了问题中shuffle操作的次数.而因为MapReduce与Spark在shuffle操作中的差别, 导致Spark文件数目要高于MapReduce.而Spark也在不断地进行自我完善:在Spark0.8.1版本中引入了shuffle consolidation[47]机制, 有效地减少了Spark中间数据文件数量; 从Spark1.1开始, Spark开始将MapReduce的Sort-BasedShuffle机制作为一套备选shuffle机制供用户选择, Sort-BasedShuffle机制能够保证每个Map任务节点只输出一个文件, 且数据为分区内有序.

6 总结与展望

本文首先分别阐述了MapReduce和Spark两种大数据分布式架构的背景、原理以及调度等方面, 并从大数据评估指标和应用情景两个方面分别对二者进行对比和总结, 针对不同问题需求给出相应的分析和选择.为了更好地理解MapReduce与Spark在原理上的区别, 我们通过WordCount和PageRank算法分别分析MapReduce和Spark在处理非迭代和迭代问题时在算法执行时间、文件数目和同步次数上的差异.通过分析结果可以看出:在处理不同问题时, MapReduce与Spark各有优点与不足.在处理WordCount问题时, MapReduce在排序上消耗了更多的时间, 而在中间数据传输、文件数目以及同步次数上效率都要优于Spark; 在处理PageRank问题时, Spark通过更合理的任务调度机制, 在算法执行时间和同步次数方面要优于MapReduce.但是因为不同的shuffle机制, 导致Spark的文件数目要高于MapReduce, 而Spark也在后期的版本中对此缺点进行了补充和完善.

MapReduce作为第一代大数据分布式架构, 让传统的大数据问题可以并行地在多台处理机上进行计算.而MapReduce之所以能够迅速成为大数据处理的主流计算平台, 得力于其自动并行、自然伸缩、实现简单和容错性强等特性[48].但是MapReduce并不适合处理迭代问题以及低延时问题, 而Spark作为轻量、基于内存计算的分布式计算平台, 采用了与MapReduce类似的编程模型, 使用RDD抽象对作业调度、数据操作和存取进行修改, 并增加了更丰富的算子, 使得Spark在处理迭代问题、交互问题以及低延时问题时能有更高的效率.同样, Spark也有其不足:如数据规模过大或内存不足时, 会出现性能降低、数据丢失需要进行重复计算等缺点.总而言之, 随着大数据领域的不断发展和完善, 现有的大数据分析技术仍然有大量具有挑战性的问题需要深入研究, 而作为大数据领域重要的两种分布式处理架构, MapReduce与Spark都有着不可替代的地位和作用.

参考文献
[1]
Gordon K. What is big data?. Itnow, 2013, 55(3): 12–13. [doi:10.1093/itnow/bwt037]
[2]
Dean J, Ghemawat S. MapReduce: Simplified data processing on large clusters. In: Proc. of the 6th Conf. on Symp. on Opearting Systems Design and Implementation. San Francisco: USENIX Association Berkeley, 2004. 137-149.http://portal.acm.org/citation.cfm?doid=1327452.1327492
[3]
Rao BT, Sridevi N, Reddy VK, Reddy L. Performance issues of heterogeneous hadoop clusters in cloud computing. Global Journal of Computer Science and Technology, 2011, 6(8): 1–6. https://arxiv.org/pdf/1207.0894
[4]
Zaharia M, Chowdhury M, Franklin MJ, Shenker S, Stoica I. Spark:Cluster computing with working sets. HotCloud, 2010, 10(10-10): 1–7. https://amplab.cs.berkeley.edu/publication/spark-cluster-computing-with-working-sets-paper/
[5]
Dean J. Experiences with MapReduce, an abstraction for large-scale computation. In: Proc. of the 15th Int'l Conf. on Parallel Architectures and Compilation Techniques. Washington: ACM Press, 2006. 1-1.http://dl.acm.org/citation.cfm?id=1152155
[6]
Srirama SN, Jakovits P, Vainikko E. Adapting scientific computing problems to clouds using MapReduce. Future Generation Computer Systems, 2012, 28(1): 184–192. [doi:10.1016/j.future.2011.05.025]
[7]
Elghandour I, Aboulnaga A. ReStore: Reusing results of MapReduce jobs in pig. In: Proc. of the 2012 ACM SIGMOD Int'l Conf. on Management of Data. Scottsdale: ACM Press, 2012. 701-704. [doi:10.1145/2213836.2213937]
[8]
Pansare N, Borkar VR, Jermaine C, Condie T. Online aggregation for large MapReduce jobs. Proc. of the Vldb Endowment, 2012, 4(11): 1135-1145.https://www.researchgate.net/publication/220538853_Online_Aggregation_for_Large_MapReduce_Jobs
[9]
Zaharia M, Borthakur D, Sarma JS, Elmeleegy K, Shenker S, Stoica I. Job scheduling for multi-user MapReduce clusters. Technical Report UCB/EECS-2009-55. Berkeley: EECS Department, University of California, 2009. 1-18.http://www.researchgate.net/publication/228862665_Job_scheduling_for_multi-user_MapReduce_clusters
[10]
Zaharia M. Job scheduling with the fair and capacity schedulers. In: Proc. of the Hadoop Summit. 2009. 9.
[11]
Nightingale EB, Chen PM, Flinn J. Speculative execution in a distributed file system. ACM SIGOPS Operating Systems Review, 2005, 39(5): 191–205. [doi:10.1145/1095809.1095829]
[12]
Isard M, Budiu M, Yu Y, Birrell A, Fetterly D. Dryad: Distributed data-parallel programs from sequential building blocks. In: Proc. of the 2nd ACM SIGOPS/EuroSys European Conf. on Computer Systems 2007. Lisbon: ACM Press, 2007. 59-72. [doi:10.1145/1272998.1273005]
[13]
Venner J. Pro Hadoop. Berkeley: Apress, 2009. 1-440. [doi:10.1007/978-1-4302-1943-9]
[14]
Shvachko K, Kuang H, Radia S, Chansler R. The hadoop distributed file system. In: Proc. of the 2010 IEEE 26th Symp. on Mass Storage Systems and Technologies (MSST). Nevada: IEEE, 2010. 1-10. [doi:10.1109/MSST.2010.5496972]
[15]
Vavilapalli VK, Murthy AC, Douglas C, Agarwal S, Konar M, Evans R, Graves T, Lowe J, Shah H, Seth S. Apache hadoop yarn: Yet another resource negotiator. In: Proc. of the 4th Annual Symp. on Cloud Computing. ACM Press, 2013. 1-16. [doi:10.1145/2523616.2523633]
[16]
Wang J, Shang P, Yin J. DRAW: A New Data-gRouping-AWare Data Placement Scheme for Data Intensive Applications with Interest Locality. New York: Springer, 2014. 149-174. [doi:10.1007/978-1-4939-1905-5]
[17]
Amer A, Long DD, Burns RC. Group-Based management of distributed file caches. In: Proc. of the 200222nd Int'l Conf. on Distributed Computing Systems. Vienna: IEEE, 2002. 525-534. [doi:10.1109/ICDCS.2002.1022302]
[18]
Chen Q, Zhang D, Guo M, Deng Q, Guo S. Samr: A self-adaptive MapReduce scheduling algorithm in heterogeneous environment. In: Proc. of the 2010 IEEE 10th Int'l Conf. on Computer and Information Technology (CIT). Bradford: IEEE, 2010. 2736-2743. [doi:10.1109/CIT.2010.458]
[19]
Kwon Y, Balazinska M, Howe B, Rolia J. Skewtune: Mitigating skew in MapReduce applications. In: Proc. of the 2012 ACM SIGMOD Int'l Conf. on Management of Data. Arizona: ACM Press, 2012. 25-36. [doi:10.1145/2213836.2213840]
[20]
Cherniak A, Zaidi H, Zadorozhny V. Optimization strategies for A/B testing on HADOOP. Proc. of the VLDB Endowment, 2013, 6(11): 973–984. [doi:10.14778/2536222.2536224]
[21]
Ekanayake J, Li H, Zhang B, Gunarathne T, Bae SH, Qiu J, Fox G. Twister: A runtime for iterative MapReduce. In: Proc. of the 19th ACM Int'l Symp. on High Performance Distributed Computing. Chicago: ACM Press, 2010. 810-818. [doi:10.1145/1851476.1851593]
[22]
Bu Y, Howe B, Balazinska M, Ernst MD. HaLoop:Efficient iterative data processing on large clusters. Proc. of the VLDB Endowment, 2010, 3(1-2): 285–296. [doi:10.14778/1920841.1920881]
[23]
Zhang Y, Gao Q, Gao L, Wang C. iMapReduce:A distributed computing framework for iterative computation. Journal of Grid Computing, 2012, 10(1): 47–68. [doi:10.1007/s10723-012-9204-9]
[24]
Zaharia M, Chowdhury M, Das T, Dave A, Ma J, McCauley M, Franklin MJ, Shenker S, Stoica I. Resilient distributed datasets: A fault-tolerant abstraction for in-memory cluster computing. In: Proc. of the 9th USENIX Conf. on Networked Systems Design and Implementation. San Jose: USENIX Association Berkeley, 2012. 1-14.http://dl.acm.org/citation.cfm?id=2228301
[25]
Yu Y, Isard M, Fetterly D, Budiu M, Erlingsson Ú, Gunda PK, Currey J. DryadLINQ: A system for general-purpose distributed data-parallel computing using a high-level language. In: Proc. of the 8th USENIX Conf. on Operating Systems Design and Implementation. USENIX Association Berkeley, 2008. 1-14.http://dl.acm.org/citation.cfm?id=1855742
[26]
Lhoták O, Hendren L. Scaling Java points-to analysis using Spark. In: Proc. of the 12th Int'l Conf. on Compiler Construction. Berlin: Springer-Verlag, 2003. 153-169.http://www.springerlink.com/content/3g87gdq467dyjxrx
[27]
Ananthanarayanan G, Ghodsi A, Shenker S, Stoica I. Disk-Locality in datacenter computing considered irrelevant. In: Proc. of the 13th USENIX Conf. on Hot topics in Operating Systems. California USENIX Association Berkeley, 2011. 12-17.http://dl.acm.org/citation.cfm?id=1991613
[28]
Bhatotia P, Wieder A, Rodrigues R, Acar UA, Pasquin R. Incoop: MapReduce for incremental computations. In: Proc. of ACM Symp. on Cloud Computing. Cascais: ACM Press, 2011. 1-14. [doi:10.1145/2038916.2038923]
[29]
Armbrust M, Xin RS, Lian C, Huai Y, Liu D, Bradley JK, Meng X, Kaftan T, Franklin MJ, Ghodsi A. Spark sql: Relational data processing in Spark. In: Proc. of the 2015 ACM SIGMOD Int'l Conf. on Management of Data. Victoria: ACM Press, 2015. 1383-1394. [doi:10.1145/2723372.2742797]
[30]
Zaharia M, Das T, Li H, Hunter T, Shenker S, Stoica I. Discretized streams: Fault-tolerant streaming computation at scale. In: Proc. of the 24th ACM Symp. on Operating Systems Principles. Farmington: ACM Press, 2013. 423-438. [doi:10.1145/2517349.2522737]
[31]
Meng X, Bradley J, Yavuz B, Sparks E, Venkataraman S, Liu D, Freeman J, Tsai D, Amde M, Owen S, Xin D, Reynold X, Franklin MJ, Zadeh R, Zaharia M, Talwalkar A. Mllib:Machine learning in apache Spark. The Journal of Machine Learning Research, 2016, 17(1): 1235–1241. https://www.researchgate.net/publication/277334549_MLlib_Machine_Learning_in_Apache_Spark
[32]
Xin RS, Gonzalez JE, Franklin MJ, Stoica I. Graphx: A resilient distributed graph system on Spark. In: Proc. of the 1st Int'l Workshop on Graph Data Management Experiences and Systems. New York: ACM Press, 2013. 1-6. [doi:10.1145/2484425.2484427]
[33]
Gonzalez JE, Xin RS, Dave A, Crankshaw D, Franklin MJ, Stoica I. GraphX: Graph processing in a distributed dataflow framework. In: Proc. of the 11th USENIX Conf. on Operating Systems Design and Implementation. USENIX Association Berkeley, 2014. 599-613.http://dl.acm.org/citation.cfm?id=2685096
[34]
Malewicz G, Austern MH, Bik AJ, Dehnert JC, Horn I, Leiser N, Czajkowski G. Pregel: A system for large-scale graph processing. In: Proc. of the 2010 ACM SIGMOD Int'l Conf. on Management of Data. ACM Press, 2010. 135-146. [doi:10.1145/1807167.1807184]
[35]
Low Y, Bickson D, Gonzalez J, Guestrin C, Kyrola A, Hellerstein JM. Distributed GraphLab:A framework for machine learning and data mining in the cloud. Proc. of the VLDB Endowment, 2012, 5(8): 716–727. [doi:10.14778/2212351.2212354]
[36]
Thomas K, Grier C, Ma J, Paxson V, Song D. Design and evaluation of a real-time URL spam filtering service. In: Proc. of the 2011 IEEE Symp. on Security and Privacy (SP). Berkeley: IEEE, 2011. 447-462. [doi:10.1109/SP.2011.25]
[37]
Zhang Z, Barbary K, Nothaft FA, Sparks E, Zahn O, Franklin MJ, Patterson DA, Perlmutter S. Scientific computing meets big data technology: An astronomy use case. In: Proc. of the 2015 IEEE Int'l Conf. on Big Data. Santa Clara: IEEE, 2015. 918-927. [doi:10.1109/BigData.2015.7363840]
[38]
Nothaft FA, Massie M, Danford T, Zhang Z, Laserson U, Yeksigian C, Kottalam J, Ahuja A, Hammerbacher J, Linderman M. Rethinking data-intensive science using scalable analytics systems. In: Proc. of the 2015 ACM SIGMOD Int'l Conf. on Management of Data. Victoria: ACM Press, 2015. 631-646. [doi:10.1145/2723372.2742787]
[39]
Veiga J, Expósito RR, Pardo XC, Taboada GL, Tourifio J. Performance evaluation of big data frameworks for large-scale data analytics. In: Proc. of the 2016 IEEE Int'l Conf. on Big Data. Washington: IEEE, 2016. 424-431. [doi:10.1109/BigData.2016.7840633]
[40]
Lee H, Kang M, Youn SB, Lee JG, Kwon Y. An experimental comparison of iterative MapReduce frameworks. In: Proc. of the 25th ACM Int'l on Conf. on Information and Knowledge Management. Indiana: ACM Press, 2016. 2089-2094. [doi:10.1145/2983323.2983647]
[41]
Gu L, Li H. Memory or time: Performance evaluation for iterative operation on hadoop and Spark. In: Proc. of the 2013 IEEE 10th Int'l Conf. on High Performance Computing and Communications & 2013 IEEE Int'l Conf. on Embedded and Ubiquitous Computing (HPCC_EUC). Zhangjiajie: IEEE, 2013. 721-727. [doi:10.1109/HPCC.and.EUC.2013.106]
[42]
Kang M, Lee JG. An experimental analysis of limitations of MapReduce for iterative algorithms on Spark. Cluster Computing, 2017, 20(4): 3593–3604. [doi:10.1007/s10586-017-1167-y]
[43]
Mavridis I, Karatza H. Performance evaluation of cloud-based log file analysis with apache hadoop and apache Spark. Journal of Systems and Software, 2017, 125: 133–151. [doi:10.1016/j.jss.2016.11.037]
[44]
Song J, Sun ZZ, Mao KM, Bao YB, Yu G. Research advance on MapReduce based big data processing platforms and algorithms. Ruan Jian Xue Bao/Journal of Software, 2017, 28(3): 514–543(in Chinese with English abstract). http://www.jos.org.cn/1000-9825/5169.htm [doi:10.13328/j.cnki.jos.005169]
[45]
Shi J, Qiu Y, Minhas UF, Jiao L, Wang C, Reinwald B, Özcan F. Clash of the titans:Mapreduce vs. Spark for large scale data analytics. Proc. of the VLDB Endowment, 2015, 8(13): 2110–2121. [doi:10.14778/2831360.2831365]
[46]
Page L. The PageRank citation ranking:Bringing order to the Web. Stanford Digital Libraries Working Paper, 1998, 9(1): 1–17. http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.31.1768
[47]
Davidson A, Or A. Optimizing shuffle performance in Spark. Technical Report, University of California, Berkeley-Department of Electrical Engineering and Computer Sciences, 2013.
[48]
Wolf J, Balmin A, Rajan D, Hildrum K, Khandekar R, Parekh S, Wu KL, Vernica R. On the optimization of schedules for MapReduce workloads in the presence of shared scans. The Int'l Journal on Very Large Data Bases, 2012, 21(5): 589–609. [doi:10.1007/s00778-012-0279-5]
[44]
宋杰, 孙宗哲, 毛克明, 鲍玉斌, 于戈. MapReduce大数据处理平台与算法研究进展. 软件学报, 2017, 28(3): 514–543. http://www.jos.org.cn/1000-9825/5169.htm [doi:10.13328/j.cnki.jos.005169]