数据的指数级增长给数据管理和分析带来了严峻的挑战.连接查询是数据分析中一种常用运算,而MapReduce是一种用于大规模数据集并行处理的编程模型,研究基于MapReduce的连接查询代价评估和查询优化,有着学术意义和应用价值.MapReduce连接查询算法的性能主要取决于I/O代价(包括本地和网络I/O),而I/O代价与数据集以及连接运算的特征参数相关,通过对二元连接的I/O代价评估可以优化多元连接执行计划.基于此,首先提出了二元连接查询的I/O代价模型;随后,对现有二元连接算法进行形式化定义和简单扩展,归纳出6种基于MapReduce连接查询算法,并通过算法白盒分析定义它们的I/O代价函数;最后,提出一种多元连接最优执行计划的选择算法.通过实验表明I/O代价模型的正确性且能够准确地反映算法的性能优劣.
The exponential growth of data has posed serious challenges to the data management and analysis. Join query is a common data analysis operation, and MapReduce is a programming model implemented for parallel processing on large-scale datasets. Therefore the research on MapReduce based join algorithms and its cost model has a certain academic significance and application value. This study believes that the I/O (including the network and the local I/O) cost is the main factor affecting the performance of MapReduce based join algorithm. Furthermore, as the I/O cost is determined by the feature of both datasets and join operation, the executed plan of multi-ways join could be optimized by evaluating the I/O cost of two-ways join. In the study, an I/O cost model of two-ways join is proposed and then formally defined as a simple extension to the existing MapReduce based join algorithms, resulting in six join algorithms and their I/O cost functions through write-box analysis. In addition, an selection algorithm to find the best executed plan of multi-ways join is presented. The correctness and accuracy of the I/O cost model are validated through a series of experiments. The experiment results suggest that the I/O cost can accurately reflect the algorithm performance.
近年来,随着计算机技术在互联网、传感器以及科学数据分析等领域的应用,数据呈指数级地增长,海量数据给传统的数据管理和分析带来了新的挑战.海量数据具有数据量大(TB-PB级别)、半结构化或非结构化、异构、实时等特点.海量数据的分析和处理包含了数据高可用性、复杂和时间苛刻的运算等诸多新内涵.《自然》杂志在2008年9月专门出版了一期Big Data(大数据)的专刊[
与传统数据分析类似,海量数据分析中最常见的操作为连接查询.现有海量数据基于MapReduce的连接研究均从算法角度考虑二元连接(two-ways join)运算的优化,因此提出了很多连接的实现算法,如Map端连接、Reduce端连接、半连接等[6, 7].本文从另外一个角度优化连接.我们注意到以下事实:① 尽管现有很多MapReduce连接算法,但每一种算法都有其适用范围,某算法在特定环境下性能会最优;② 实际应用环境很少出现简单的二元连接,而更多的是复杂的多元连接(multi-mays join),最常见的为分析型数据仓库中维数据集和事实数据集之间的星型连接,以及社会网络中多个数据集的链式连接,
本文研究连接运算的I/O代价,我们假设多元连接均转化成二元连接执行,基于此,我们提出以下问题:
(1) 有多少种本质不同的MapReduce算法可以实现二元连接查询,一个多元连接有多少种可行的执行计划;
(2) 对于多元连接,是否可以根据上下文计算每一种执行计划的I/O代价,以此选择I/O代价最小的执行计划;
(3) 连接查询的I/O代价与哪些因素有关,如何定量分析这种关系,是否可以对现有算法的I/O代价进行优化.
MapReduce作业的执行是一个复杂的过程,涉及到多节点之间的协作和复杂的数据交互,I/O代价又取决于很多因素,如数据量、数据连接率、属性选择率、并行度、Map任务个数、Reduce任务个数、网络带宽等,因此,对连接查询的I/O代价进行定量分析存在一定挑战.就我们目前所知,尚未发现基于MapReduce的连接算法I/O代价评估的研究报道,上述问题亟待解决.
本文首先在MapReduce连接现有的两种主流算法Map-Join和Reduce-Join的基础上,定义MapReduce连接查询和其关键算法步骤;接着,定义基于MapReduce的连接查询的I/O代价模型;随后,对现有Map-Join和Reduce-Join加以整理和扩展,给出6种二元连接算法的形式化表达以及I/O代价与各影响因素的函数表达,称为I/O代价函数;随后,将I/O代价模型和函数扩展到星型连接、链式连接等多元连接上,提出一种最优执行计划的选择算法;最后,通过实验证明I/O代价函数对I/O代价估算的准确性、连接算法的I/O代价规律与理论模型的一致性,并证明了I/O代价是影响连接算法性能的主要因素.本文工作将对海量数据连接算法的评价和优化,以及基于MapReduce的数据管理系统和分析系统的研发起指导作用.
本文第1节介绍研究背景和相关工作.第2节描述连接查询的定义和已知MapReduce实现算法.第3节给出MapReduce连接查询算法主要步骤的I/O代价模型.第4节阐述6种二元连接算法和其I/O代价函数.第5节基于二元连接的I/O代价函数提出多元连接执行计划的优化选择算法.第6节验证I/O代价模型的正确性,并分析算法I/O代价与性能之间的关系.第7节总结全文并提出下一步工作.
MapReduce[
第1类是对现有MapReduce连接算法的总结和评价.
文献[
第2类是对现有MapReduce连接算法的改进.
文献[
第3类是对特定连接算法的研究及应用.
文献[
与本文最为相似的是文献[
综上,本文提出的连接查询算法和I/O代价模型借鉴了现有基于MapReduce的连接运算研究工作,但与之有明显区别.本从MapReduce算法角度讨论连接查询的多种实现;提出的代价模型基于I/O代价而非执行时间,综合考虑各种影响因素定量分析I/O代价,指导调度程序选择I/O代价小的执行算法.
查询是在数据集中检索满足特定条件的数据并生成结果集的过程,连接运算则把两个或多个数据集中的记录按条件组合为一个结果数据集,我们将包含连接运算的查询统称为连接查询.连接查询在数据分析中非常常见,TPC-H提供的22个查询用例中有16个涉及到此类查询.连接运算有多种方式,内连接的连接查询结果集中仅包含满足条件的行,是大部分数据库系统默认的连接方式.根据所使用的比较方式不同,内连接又分为等值连接、自然连接和不等连接等3种;交叉连接的连接查询结果集包含两个数据集所有行的组合,又称笛卡尔连接;外连接的连接查询结果集既包含那些满足条件的行,还包含其中某个数据集的全部行,有3种形式的外连接:左外连接、右外连接、全外连接.在上述连接种类中,最为常用的是等值连接和自然连接,也是本文研究的重点.
当一个连接操作涉及多个数据集时,我们称该连接为多元连接(multi-ways join).连接运算可采用如
Examples of two-ways join, star join and chain join
二元连接、星型连接和链式连接示意图
一般的,
$\prod\nolimits_P {({\sigma _{x.a = y.a \wedge {C_x} \wedge {C_y}}}(x \times y))} $ (1)
基于此,我们讨论MapReduce连接查询的实现算法.Map-Join和Reduce-Join是连接操作基于MapReduce的两种实现:前者首先需要将从表分发到每个Map节点或各个节点的分布式缓存中,然后在Map任务中完成连接操作;后者的Map任务负责同时读取主表和从表,并为每条数据打上标签以区别数据来源,Reduce任务进行连接操作.Reduce-Join是最常用的Join方式,对于给定的两数据集
分发任务
${D_1}:mem(y) \to {M_2}:\prod\nolimits_{y.b,x.c} {({\sigma _{x.a = y.a \wedge {C_x} \wedge {C_y}}}(x' \times y))} $ (2)
公式(2)中,
${M_1}:\left\{ {\begin{array}{*{20}{l}} {{\Theta _{1x}} = \prod\nolimits_{x.a,x.c} {({\sigma _{{C_x}}}(x))} }\\ {{\Theta _{1y}} = \prod\nolimits_{y.a,y.b} {({\sigma _{Cy}}(y))} } \end{array}} \right. \to {R_2}:{\sigma _{x.a = y.a}}({\Theta _{1x}} \times {\Theta _{1y}})$ (3)
基于定义1,第3节将研究每个阶段的I/O代价特征.
衡量算法的效率通常采用时间复杂度和空间复杂度两种指标,即算法执行需要的运算和存储.本文讨论的连接查询是数据密集型计算的一种,该类型计算有以下特点:当数据量超过一个限度时,存储系统难以满足海量数据处理的读写需要,数据传输带宽成为计算的一个瓶颈[
设|
${\alpha _x} = \frac{{|{\sigma _{x.a{\rm{ in }}y.a}}(x)|}}{{|x|}},{\alpha _y} = \frac{{|{\sigma _{y.a{\rm{ in }}x.a}}(y)|}}{{|y|}}$ (4)
${\beta _s} = \frac{{|{\sigma _{{C_s}}}(s)|}}{{|s|}}$ (5)
若近似认为每个属性大小相等,则
${\gamma _s} = \frac{{Attr(P)}}{{Attr(s)}}$ (6)
为进一步描述I/O代价模型,
Description of notations
相关符号描述
符号 | 含义描述 |
| Map节点数 |
| 主表(较大的数据集) |
| 从表(较小的数据集) |
| 连接查询后的记录条数 |
| |
| 连接查询后数据量(KB) |
| 连接查询后的单条记录大小(KB) |
| |
| 第 |
| 单个连接属性大小(KB) |
| 参加连接的不同连接属性的个数 |
| |
| |
| |
| |
| Reduce过程中sort阶段磁盘数据的百分比 |
$C_i^L$ | 第 |
$C_i^N$ | 第 |
| 连接查询的磁盘I/O总量 |
| 连接查询的网络I/O总量 |
我们把连接查询作为一个整体计算结果数据集大小,连接查询结果的数据量
第1步:对
第2步:对于数据集
第3步:对于任意数据集
${\varepsilon _s} = \left\{ {\begin{array}{*{20}{l}} {{\omega _s} \cdot \min ({\alpha _s},{\beta _s}),{\rm{ }}{\alpha _s} + {\beta _s} \le 1}\\ {{\omega _s}[\min ({\alpha _s},{\beta _s}) - ({\alpha _s} + {\beta _s} - 1)] + ({\alpha _s} + {\beta _s} - 1),{\rm{ }}{\alpha _s} + {\beta _s} > 1} \end{array}} \right.$ (7)
当
Sketch for explain Eq.(7)
公式(7)的示意图
通过公式(7)可以计算获得
${\theta '_x} = {\theta _x} \cdot {\varepsilon _x},{\theta '_v} = {\theta _v} \cdot {\varepsilon _v}$.
第4步:设
• 当
• 当
同理获得$p_y^t$.那么,可按公式(8)计算连接查询后的数据集记录条数
$\theta = \sum\limits_{t \in X \cup Y} {(p_x^t \cdot {{\theta '}_x}) \cdot (p_y^t \cdot {{\theta '}_y})} = ({\theta _x} \cdot {\varepsilon _x} \cdot {\theta _y} \cdot {\varepsilon _y})\sum\limits_{t \in X \cup Y} {p_x^t \cdot p_y^t} $ (8)
第5步:计算
$\phi = \left\{ {\begin{array}{*{20}{l}} {{\gamma _x}\frac{{{\phi _x}}}{{{\theta _x}}} + {\gamma _y}\frac{{{\phi _y}}}{{{\theta _y}}} - \tau ,{\rm{ }}a \in P}\\ {{\gamma _x}\frac{{{\phi _x}}}{{{\theta _x}}} + {\gamma _y}\frac{{{\phi _y}}}{{{\theta _y}}} - 2\tau ,{\rm{ }}a \notin P} \end{array}} \right.$ (9)
第6步:参照公式(8)和公式(10),计算连接查询后的数据量
$\Phi = \theta \cdot \phi = \left( {({\theta _x} \cdot {\varepsilon _x} \cdot {\theta _y} \cdot {\varepsilon _y})\sum\limits_{t \in X \cup Y} {p_x^t \cdot p_y^t} } \right) \cdot \left( {{\gamma _x}\frac{{{\phi _x}}}{{{\theta _x}}} + {\gamma _y}\frac{{{\phi _y}}}{{{\theta _y}}} - {2^n} \cdot \tau } \right),n = \left\{ {\begin{array}{*{20}{l}} {0,{\rm{ }}a \in P}\\ {1,{\rm{ }}a \notin P} \end{array}} \right.$ (10)
按上述步骤,可以估算连接查询后数据集的大小.举例说明:如果经查询条件过滤后的数据集上,连接属性的每个值出现的频率服从常见的Zipf分布[
$\theta = ({\theta _x} \cdot {\varepsilon _x} \cdot {\theta _y} \cdot {\varepsilon _y})\sum\limits_{i = 1}^k {p_x^i \cdot p_y^j} = \frac{{{\theta _x} \cdot {\varepsilon _x} \cdot {\theta _y} \cdot {\varepsilon _y}}}{{{{(\gamma + \ln k)}^2}}}\sum\limits_{i = 1}^k {\frac{1}{{i \cdot j}}} $ (11)
其中,
$\theta = \frac{{{\theta _x} \cdot {\varepsilon _x} \cdot {\theta _y} \cdot {\varepsilon _y}}}{{{{(\gamma + \ln k)}^2}}}\sum\limits_{i = 1}^k {\frac{1}{{{i^2}}}} = \frac{{{\theta _x} \cdot {\varepsilon _x} \cdot {\theta _y} \cdot {\varepsilon _y} \cdot {\pi ^2}}}{{6{{(\gamma + \ln k)}^2}}}$ (12)
如果数据集的连接属性服从的是其他分布,也可用类似的方法来求解
本节首先给出连接查询算法各个阶段的I/O代价模型,第4节则根据具体算法对每个阶段的I/O代价进行汇总.I/O代价的计算基于MapReduce框架的数据读写情况,如
Data reading and wirting in MapReduce process
MapReduce过程数据读写
对于分发阶段的数据分发任务
$C_D^L = {I_D},C_D^N = ({N_m} - 1) \cdot {I_D}$ (13)
对于Map任务
$C_M^L = {I_M} + 3{O_M},C_M^N = 0$ (14)
若Map任务后没有Reduce任务,则只需本地读取数据一次,对其进行相应的操作后,将生成的结果数据集
写入分布式文件系统,由此计算出该阶段的本地I/O量(CM'L)和网络I/O量(CM'N),见公式(15):
CM'L=IM+OM, CM'N=0 (15)
对于Reduce任务
$C_R^L = (1 + \lambda ) \cdot {I_R} + {O_R},C_R^N = {I_R}$ (16)
连接查询算法的I/O分本地I/O和网络I/O两种,二者代价不同.因此,本文为它们分别设置了权重,将二者的加权和作为算法整体的I/O代价(见公式(17)),并以此作为算法选择的依据.因此,假设MapReduce模型有
$Cost = {\omega _1} \cdot {C^L} + {\omega _2} \cdot {C^N} = \sum\limits_{i = 1}^n {({\omega _1} \cdot C_i^L + {\omega _2} \cdot C_i^N)} $ (17)
公式(17)中,
根据第2节描述的二元连接的Map-Join,Reduce-Join和Semi-Join算法,本节设计了6种基于MapReduce的连接查询算法:M,M2,M3R,RM,RM2和R2M2,并逐一分析它们的I/O代价.算法命名时,采用M表示Map任务,R表示Reduce任务,指数表示Map和Reduce任务的次数,M在前的为Map-Join,R在前的为Reduce-Join.按第2节公式(1)设计的查询用例如下述SQL所示,数据集
本节余下部分将具体介绍上述6种算法,参照定义1,按阶段描述算法内容,并根据第3节描述,推导它们的I/O代价函数.连接查询的实现算法与用例相关,对于有些算法,如M3R,若投影或选择条件满足一定条件,算法会随之简化.此外,通过Map-Reduce-Merge[
M算法即为Map-Join算法,需要一次分发任务、一次Map任务.M算法要求连接操作中较小的数据集
${D_1}:mem(y) \to {M_2}:\prod\nolimits_{y.b,x.c} {({\sigma _{x.a = x.a \wedge {C_x} \wedge {C_y}}}(x' \times y))} $ (18)
在
$\left\{ {\begin{array}{*{20}{l}} {C_1^L = {\phi _y}}\\ {C_1^N = ({N_m} - 1) \cdot {\phi _y}} \end{array}} \right.,\left\{ {\begin{array}{*{20}{l}} {C_2^L = {\phi _x} + \Phi }\\ {C_2^N = 0} \end{array}} \right.$ (19)
汇总以上各个阶段的I/O代价,可以得到整个过程的I/O代价函数,见公式(20):
$\left\{ {\begin{array}{*{20}{l}} {{C^L} = \sum\limits_{i = 1}^2 {C_i^L} = {\phi _x} + {\phi _y} + \Phi }\\ {{C^N} = \sum\limits_{i = 1}^2 {C_i^N} = ({N_m} - 1) \cdot {\phi _y}} \end{array}} \right.$ (20)
M3R算法是Semi-Join和Map-Join的结合,需要2次分发任务、3次Map任务和1次Reduce任务.M算法最明显的局限是
$\begin{array}{l} {M_1}:\left\{ {\begin{array}{*{20}{c}} {{\Theta _{1x}} = \prod\nolimits_{x.a} {(x)} }\\ {{\Theta _{1y}} = \prod\nolimits_{y.a} {(y)} } \end{array}} \right. \to {R_2}:\prod\nolimits_{y.a} {{\sigma _{x.a = y.a}}({\Theta _{1x}} \times {\Theta _{1y}})} \to {D_3}:mem({\Theta _2}) \to \\ {M_4}:\prod\nolimits_{y.a,y.b} {({\sigma _{y.a{\rm{ in }}{\Theta _2} \wedge {C_y}}}(y))} \to {D_5}:mem({\Theta _4}) \to {M_6}:\prod\nolimits_{y.b,x.c} {({\sigma _{x.a = y.a \wedge {C_x}}}({{\Theta '}_x} \times {\Theta _4}))} \end{array}$ (21)
$\begin{array}{l} \left\{ {\begin{array}{*{20}{l}} {C_1^L = {\phi _x} + {\phi _y} + 3({\theta _x} + {\theta _y}) \cdot \tau }\\ {C_1^N = 0} \end{array}} \right.,\left\{ {\begin{array}{*{20}{l}} {C_2^L = (1 + \lambda ) \cdot ({\theta _x} + {\theta _x}) \cdot \tau + k \cdot \tau }\\ {C_2^N = ({\theta _x} + {\theta _y}) \cdot \tau } \end{array}} \right.,\left\{ {\begin{array}{*{20}{l}} {C_3^L = k \cdot \tau }\\ {C_3^N = ({N_m} - 1) \cdot k \cdot \tau } \end{array}} \right.,\\ \left\{ {\begin{array}{*{20}{l}} {C_4^L = {\phi _y} + {\phi _y} \cdot {\varepsilon _y} \cdot {\gamma _y}}\\ {C_4^N = 0} \end{array}} \right.,{\rm{ }}\left\{ {\begin{array}{*{20}{l}} {C_5^L = {\phi _y} \cdot {\varepsilon _y} \cdot {\gamma _y}}\\ {C_5^N = ({{N'}_M} - 1) \cdot {\phi _y} \cdot {\varepsilon _y} \cdot {\gamma _y}} \end{array}} \right.,{\rm{ }}\left\{ {\begin{array}{*{20}{l}} {C_6^L = {\phi _x} + \Phi }\\ {C_6^N = 0} \end{array}} \right. \end{array}$ (22)
汇总以上各个阶段的I/O代价,可以得到整个过程的I/O代价函数,见公式(23):
$\left\{ {\begin{array}{*{20}{l}} {{C^L} = \sum\limits_{i = 1}^6 {C_i^L} = 2({\phi _x} + {\phi _y}) + (4 + \lambda ) \cdot ({\theta _x} + {\theta _y}) \cdot \tau + 2k \cdot \tau + 2{\phi _y} \cdot {\varepsilon _y} \cdot {\gamma _y} + \Phi }\\ {{C^N} = \sum\limits_{i = 1}^6 {C_i^N} = ({\theta _x} + {\theta _y}) \cdot \ell + ({N_m} - 1) \cdot k \cdot \tau + ({{N'}_M} - 1) \cdot {\phi _y} \cdot {\varepsilon _y} \cdot {\gamma _y}} \end{array}} \right.$ (23)
M2算法是本文对Map-Join算法做的简单改进,需要2次Map任务和1次分发任务.M2算法采用查询的方式过滤
${M_1}:\prod\nolimits_{y.a,y.b} {({\sigma _{{C_y}}}(y))} \to {D_2}:mem({\Theta _1}) \to {M_3}:\prod\nolimits_{y.b,x.c} {({\sigma _{x.a = y.a \wedge {C_x}}}(x' \times {\Theta _2}))} $ (24)
$\left\{ {\begin{array}{*{20}{l}} {C_1^L = {\phi _y} + {\phi _y} \cdot {\beta _y} \cdot {\gamma _y}}\\ {C_1^N = 0} \end{array}} \right.,\left\{ {\begin{array}{*{20}{l}} {C_2^L = {\phi _y} \cdot {\beta _y} \cdot {\gamma _y}}\\ {C_2^N = ({N_m} - 1) \cdot {\phi _y} \cdot {\beta _y} \cdot {\gamma _y}} \end{array}} \right.,\left\{ {\begin{array}{*{20}{l}} {C_3^L = {\phi _x} + \Phi }\\ {C_3^N = 0} \end{array}} \right.$ (25)
汇总以上各个阶段的I/O代价,可以得到整个过程的I/O代价函数,见公式(26):
$\left\{ {\begin{array}{*{20}{l}} {{C^L} = \sum\limits_{i = 1}^3 {C_i^L} = {\phi _x} + {\phi _y} + 2{\phi _y} \cdot {\beta _y} \cdot {\gamma _y} + \Phi }\\ {{C^N} = \sum\limits_{i = 1}^3 {C_i^N} = ({N_m} - 1) \cdot {\phi _y} \cdot {\beta _y} \cdot {\gamma _y}} \end{array}} \right.$ (26)
RM算法即为Reduce-Join算法[
${M_1}:\left\{ {\begin{array}{*{20}{l}} {{\Theta _{1x}} = \prod\nolimits_{x.a,x.c} {({\sigma _{{C_x}}}(x))} }\\ {{\Theta _{1y}} = \prod\nolimits_{y.a,y.b} {({\sigma _{{C_y}}}(y))} } \end{array}} \right. \to {R_2}:{\sigma _{x.a = y.a}}({\Theta _{1x}} \times {\Theta _{1y}})$ (27)
$\left\{ {\begin{array}{*{20}{l}} {C_1^L = {\phi _x} + {\phi _y} + 3{\rm{(}}{\phi _x} \cdot {\beta _x} \cdot {\gamma _x} + {\phi _y} \cdot {\beta _y} \cdot {\gamma _y}{\rm{)}}}\\ {C_1^N = 0} \end{array}} \right.,\left\{ {\begin{array}{*{20}{l}} {C_2^L = {\rm{(1}} + \lambda {\rm{)}} \cdot {\rm{(}}{\phi _x} \cdot {\beta _x} \cdot {\gamma _x} + {\phi _y} \cdot {\beta _y} \cdot {\gamma _y}{\rm{)}} + \Phi }\\ {C_2^N = {\phi _x} \cdot {\beta _x} \cdot {\gamma _x} + {\phi _y} \cdot {\beta _y} \cdot {\gamma _y}} \end{array}} \right.$ (28)
汇总以上各个阶段的I/O代价,可以得到整个过程的I/O代价函数,见公式(29):
$\left\{ {\begin{array}{*{20}{l}} {{C^L} = \sum\limits_{i = 1}^2 {C_i^L} = {\phi _x} + {\phi _y} + (4 + \lambda ) \cdot ({\phi _x} \cdot {\beta _x} \cdot {\gamma _x} + {\phi _y} \cdot {\beta _y} \cdot {\gamma _y}) + \Phi }\\ {{C^N} = \sum\limits_{i = 1}^2 {C_i^N} = {\phi _x} \cdot {\beta _x} \cdot {\gamma _x} + {\phi _y} \cdot {\beta _y} \cdot {\gamma _y}} \end{array}} \right.$ (29)
R2M2算法是Semi-Join和Reduce-Join的结合,需要1次分发任务、2次Map任务和2次Reduce任务,类似M3R算法.R2M2算法的形式化描述为
$\begin{array}{l} {M_1}:\left\{ {\begin{array}{*{20}{l}} {{\Theta _{1x}} = \prod\nolimits_{x.a} {(x)} }\\ {{\Theta _{1y}} = \prod\nolimits_{y.a} {(y)} } \end{array}} \right. \to {R_2}:\prod\nolimits_{y.a} {({\sigma _{x.a = y.a}}({\Theta _{1x}} \times {\Theta _{1y}}))} \to {D_3}:mem({\Theta _2}) \to \\ {M_4}:\left\{ {\begin{array}{*{20}{l}} {{\Theta _{4x}} = \prod\nolimits_{x.a,x.c} {({\sigma _{x.a{\rm{ in }}{\Theta _2} \wedge {C_x}}}(x))} }\\ {{\Theta _{4y}} = \prod\nolimits_{y.a,y.b} {({\sigma _{y.a{\rm{ in }}{\Theta _2} \wedge {C_y}}}(y))} } \end{array}} \right. \to {R_5}:{\sigma _{x.a = y.a}}({\Theta _{4x}} \times {\Theta _{4y}}) \end{array}$ (30)
$\begin{array}{l} \left\{ {\begin{array}{*{20}{l}} {C_1^L = {\phi _x} + {\phi _y} + 3({\theta _x} + {\theta _y}) \cdot \tau }\\ {C_1^N = 0} \end{array}} \right.,\left\{ {\begin{array}{*{20}{l}} {C_2^L = (1 + \lambda ) \cdot ({\theta _x} + {\theta _y}) \cdot \tau + k \cdot \tau }\\ {C_2^N = ({\theta _x} + {\theta _y}) \cdot \tau } \end{array}} \right.,\left\{ {\begin{array}{*{20}{l}} {C_3^L = k \cdot \tau }\\ {C_3^N = ({N_m} - 1) \cdot k \cdot \tau } \end{array}} \right.,\\ \left\{ {\begin{array}{*{20}{l}} {C_4^L = {\phi _x} + {\phi _y} + 3({\phi _x} \cdot {\varepsilon _x} \cdot {\gamma _x} + {\phi _y} \cdot {\varepsilon _y} \cdot {\gamma _y})}\\ {C_4^N = 0} \end{array}} \right.,\left\{ {\begin{array}{*{20}{l}} {C_5^L = (1 + \lambda ) \cdot ({\phi _x} \cdot {\varepsilon _x} \cdot {\gamma _x} + {\phi _y} \cdot {\varepsilon _y} \cdot {\gamma _y}) + \Phi }\\ {C_5^N = {\phi _x} \cdot {\varepsilon _x} \cdot {\gamma _y} + {\phi _y} \cdot {\varepsilon _y} \cdot {\gamma _y}} \end{array}} \right. \end{array}$ (31)
汇总以上各个阶段的I/O代价,可以得到整个过程的I/O代价函数,见公式(32):
$\left\{ {\begin{array}{*{20}{l}} {{C^L} = \sum\limits_{i = 1}^5 {C_i^L} = 2({\phi _x} + {\phi _y}) + (4 + \lambda ) \cdot ({\theta _x} + {\theta _y}) \cdot \tau + 2k \cdot \tau + (4 + \lambda ) \cdot ({\phi _x} \cdot {\varepsilon _x} \cdot {\gamma _x} + {\phi _y} \cdot {\varepsilon _y} \cdot {\gamma _y}) + \Phi }\\ {{C^N} = \sum\limits_{i = 1}^5 {C_i^N} = ({\theta _x} + {\theta _x}) \cdot \tau + ({N_m} - 1) \cdot k \cdot \tau + ({\phi _x} \cdot {\varepsilon _x} \cdot {\gamma _x} + {\phi _Y} \cdot {\varepsilon _y} \cdot {\gamma _y})} \end{array}} \right.$ (32)
RM2算法是本文对Reduce-Join算法做的简单改进,算法需要1次分发任务、2次Map任务和1次Reduce任务.首先,通过Map任务抽取较小的数据集
${M_1}:\prod\nolimits_{y.a} {(y)} \to {D_2}:mem({\Theta _1}) \to {M_3}:\left\{ {\begin{array}{*{20}{l}} {{\Theta _{3x}} = \prod\nolimits_{x.a,x.c} {({\sigma _{x.a{\rm{ in }}{\Theta _1} \wedge {C_x}}}(x))} }\\ {{\Theta _{3y}} = \prod\nolimits_{y.a,y.b} {({\sigma _{{C_y}}}(y))} } \end{array}} \right. \to {R_4}:{\sigma _{x.a = y.a}}({\Theta _{3x}} \times {\Theta _{3y}})$ (33)
若
$\begin{array}{l} \left\{ {\begin{array}{*{20}{l}} {C_1^L = {\phi _y} + {\theta _y} \cdot \tau }\\ {C_1^N = 0} \end{array}} \right.,{\rm{ }}\left\{ {\begin{array}{*{20}{l}} {C_2^L = {\theta _y} \cdot \tau }\\ {C_2^N = ({N_m} - 1) \cdot {\theta _y} \cdot \tau } \end{array}} \right.,\\ \left\{ {\begin{array}{*{20}{l}} {C_3^L = {\phi _x} + {\phi _y} + 3({\phi _y} \cdot {\beta _y} \cdot {\gamma _y} + {\phi _x} \cdot {\varepsilon _x} \cdot {\gamma _x})}\\ {C_3^N = 0} \end{array}} \right.,\left\{ {\begin{array}{*{20}{l}} {C_4^L = (1 + \lambda ) \cdot ({\phi _y} \cdot {\beta _y} \cdot {\gamma _y} + {\phi _x} \cdot {\varepsilon _x} \cdot {\gamma _x}) + \Phi }\\ {C_4^N = {\phi _x} \cdot {\varepsilon _x} \cdot {\gamma _x} + {\phi _y} \cdot {\beta _y} \cdot {\gamma _y}} \end{array}} \right. \end{array}$ (34)
汇总以上各个阶段的I/O代价,可以得到整个过程的I/O代价函数,见公式(35):
$\left\{ {\begin{array}{*{20}{l}} {{C^L} = \sum\limits_{i = 1}^4 {C_i^L} = ({\phi _x} + 2{\phi _y}) + 2{\theta _y} \cdot \tau + (4 + \lambda ) \cdot ({\phi _y} \cdot {\beta _y} \cdot {\gamma _y} + {\phi _x} \cdot {\varepsilon _x} \cdot {\gamma _x}) + \Phi }\\ {{C^N} = \sum\limits_{i = 1}^4 {C_i^N} = ({N_m} - 1) \cdot {\theta _y} \cdot \tau + ({\phi _x} \cdot {\varepsilon _x} \cdot {\gamma _x} + {\phi _y} \cdot {\beta _y} \cdot {\gamma _y})} \end{array}} \right.$ (35)
查询优化是数据管理系统的核心问题.对于传统数据库,若数据表按照连接属性进行分区,能够最大程度地优化连接查询的性能.但是在海量数据环境下,维护良好数据分区的代价很高;而且针对各种查询,我们难以找到一种最合适的分区方法.因此,本文认为,数据尚未良好分区.此外,由于数据是海量的,因此我们近似认为:数据量均匀分布在每个节点上,并且属性的值域也均匀分布.在此基础上,我们研究基于代价的执行计划选择,对于多元连接,执行计划的选择即为连接顺序的选择,而查询代价即为前文得出的I/O代价模型和代价函数,我们研究多元连接查询的I/O代价最小化的执行计划.
等值连接与自然连接是应用最广泛的连接操作,自然连接与等值连接无本质区别,可以通过修改属性名把等值连接操作转化为自然连接操作,所以研究自然连接具有普遍意义.本节以自然连接(⋈)为例,设
$\prod\nolimits_{attr(R) \cup attr(S)} {({\sigma _{r.{a_1} = s.{a_1} \wedge r.{a_2} = s.{a_2} \wedge ... \wedge r.{a_n} = s.{a_n}}}(R \times S))} ,attr(R) \cap attr(S) = \{ {a_1},{a_2},...,{a_n}\} $.
特殊地,当
((
[
然而,可证⋈满足以下性质:幂等律
(
由于连接运算结果是确定的,因此无论采用何种顺序执行多元连接,结果集大小是一致的.因此,我们需找到这些执行计划中I/O代价最小的作为最优执行计划.较直接的算法为逐一计算每个执行计划的I/O代价,搜索I/O代价最小的执行计划.首先,对于任意一个执行计划,需要逐一计算
Input:
Return:
Globe
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
在获得最佳执行计划以后,最后讨论计划的执行方式问题.
Parallel or sequential execution plan of joined query
并行或顺序执行的查询执行计划
我们通过前期实验发现,在一个MapReduce集群中同时执行多个相同类型的作业(如数据密集型作业)的时间代价大于顺序执行他们的时间代价.这是因为:① MapReduce环境每个作业本身就是并行执行的,因此,保证多作业的并行性同时会降低每个作业的并行性,多作业并行并没有带来额外的性能提升;② 若I/O密集型和CPU密集型作业并行执行,则有助于提高资源利用率,提高性能,若多个I/O密集型作业并行执行,每个作业需要的资源不能互补,而且会争抢I/O资源;③ 作业调度会损失部分性能;④ 多元连接并行执行仍需要同步,某一个同步步骤的性能取决于最慢的作业分支(如
为测试本文提出的连接查询算法的有效性,并验证I/O代价模型的正确性,我们选用13台同构PC机,构建了基于Hadoop HDFS的云计算环境.节点为清华同方超翔Z900计算机,CPU为Inter Core i5-2300 2.80GHz,8GB内存,1TB硬盘,操作系统采用CentOS 5.6,Linux 2.6.18内核,Hadoop采用1.0.4版,千兆网络环境.测试数据为TPC-H中的两个数据集
SQL of joined query in test cases
查询用例中连接查询的SQL表达
SELECT |
测试用例对应的数据集设计见
Data set oftest cases
测试用例数据集
连接率( | 选择率( | ||
| | ||
| No.1, No.7 | No.2, No.8 | |
| No.3, No.9 | No.4, No.10 | |
| No.5, No.11 | No.6, No.12 |
I/O代价模型是一个计算模型,其中涉及较多参数.在实际应用中,可将这些参数分为两类:一类可以根据数据特征和查询作业特征直接获取,如主从表数据量(
本文通过实验主要验证以下3个方面:首先是验证I/O代价模型和代价函数的准确性;其次,简单分析了6种二元连接算法的I/O代价特点;最后,验证了I/O代价和连接查询性能的关系.实验没有涉及多元连接的最优执行计划的I/O代价验证,因为在I/O代价函数正确的前提下,多元连接最优执行计划的I/O代价是可以计算得到的并从推导上保证其正确性.
Analysis of theoretic and actual I/O costs of six join algorithms
6种算法I/O代价理论值与实际值的误差(%)分析
Actual I/O costs of six join algorithms on 12 data sets
6种算法在12个数据集上的实际I/O代价
(1) 总体上,M算法的I/O代价最高.这是因为M算法中,从表被反复分发到各个节点,这将增加算法的I/O代价;
(2) M2算法的I/O代价较低.这是因为与M算法相比,M2分发的是从表经选择投影后的数据,数据量远小于从表本身;而与其他算法相比,M2算法对主表和从表的读取次数较少,且选择与连接操作同时进行,网络I/O代价也少;
(3) 同样是Map端Join,M3R的I/O代价高于M2.这是因为M3R算法中主表读了两遍,而M2只读取一遍,由此造成的I/O代价远大于经连接表过滤后减少的网络I/O代价;
(4) 同样是Reduce端Join,R2M2的I/O代价高于RM2.其原因同样是因为读取主表的I/O代价远大于经属性数据集过滤后减少的网络I/O代价;
(5) 6种算法在同一数据集上的I/O代价主要取决于算法本身.
此外,若横向比较每种算法在不同数据集上的I/O代价可以发现:两表数据量大小是决定I/O代价的主要因素,7~12组实验的I/O代价是1~6组实验的近10倍;此外,I/O也受连接率、选择率等参数的影响,他们之间的关系可以从I/O代价计算公式推出.
本文设计的12组实验数据集未能穷举所有情况,因此并非每种算法在I/O代价上的优势都能得以体现.例如:两表的连接率极低时,通过半连接获取连接属性数据集从而对两表先进行过滤的M3R2和R3M3算法将会凸显优势;而在数据仓库中,当从表为维表、主表为事实表时(此时,连接属性一般为从表主键),从表的连接属性集包含所有连接属性值,此时半连接失效,只提取从表连接属性的RM2算法将会凸显优势.
本研究认为,连接查询的性能取决于I/O代价.连接查询的运算简单,I/O代价明显高于运算代价,这一点在分布式环境下尤为明显,因此,选择I/O代价较小的算法执行将会提高查询性能.为验证这一结论,我们以查询的I/O代价为横轴,对应的执行时间为纵轴,对72组连接查询做散点图分析,如
Relationship between I/O costs and time consumptions of six join algorithms on 12 data sets
6种算法在12个数据集上的I/O代价与执行时间的关系图
本文研究了基于MapReduce的连接查询算法的I/O代价模型.首先基于MapReduce连接现有的两种主流算法Map-Join和Reduce-Join,定义MapReduce连接查询和其关键算法步骤;接着提出基于MapReduce的连接查询的I/O代价模型;随后对现有Map-Join和Reduce-Join加以整理和扩展,给出6种二元连接算法的形式化表达,并推导它们的I/O代价函数;随后将I/O代价模型和函数扩展到星型连接、链式连接等多元连接上,提出一种最优执行计划的选择算法.实验结果表明:6种算法能够有效完成查询任务,提出的I/O代价模型也能够在可接受的误差范围内准确地估算各个算法的I/O代价,且I/O代价能够准确地反映算法的性能优劣.除上述研究成果外,本研究还得出以下结论:基于MapReduce的连接查询的性能取决于实现算法的I/O代价,两者成正比关系,其I/O代价主要受数据量、连接率、查询率、投影率等的影响,它们之间的关系可以通过I/O代价函数量化地表达,选择I/O代价较小的执行计划将会提高连接查询性能;此外,研究算法的I/O代价与执行时间的关系时,应该充分考虑并行度的影响,相同I/O代价的前提下,并行性高的算法性能更好.
基于MapReduce的连接查询是海量数据分析中常用的数据操作,本研究提出的I/O代价模型能够清晰定义I/O代价与其影响因素之间的关系,能够提前准确预测算法I/O代价,从而为算法选择、执行计划的选择和I/O代价优化提供理论依据,对分析MapReduce数据密集型任务的I/O代价起指导作用,有助于基于MapReduce的数据管理系统和分析系统的研发.本研究还处于初步阶段,首先,定义I/O代价模型的目的是为了更好地优化I/O,因此我们将依据本文得出的结论进一步研究I/O代价优化方法.此外,除I/O代价以外,算法的并行度对数据密集型计算的性能影响也很大,而并行度又受数据倾斜等因素的影响,在性能优化时应该充分考虑并行度因素.
国家自然科学基金(61433008, 61202088, 61402090); 教育部高等学校博士学科点专项科研基金(20130042120006);中国博士后科学基金面上项目(2013M540232); 中央高校基本科研业务费重大科技创新项目(N120817001); 辽宁省博士启动基金(201403314)