软件学报  2018, Vol. 29 Issue (3): 869-882   PDF    
应对倾斜数据流在线连接方法
王春凯, 孟小峰     
中国人民大学 信息学院, 北京 100872
摘要: 并行环境下的分布式连接处理要求制定划分策略以减少状态迁移和通信开销.相对于数据库管理系统而言,分布式数据流管理系统中的在线θ连接操作需要更高的计算成本和内存资源.基于完全二部图的连接模型可支持分布式数据流的连接操作.因为连接操作的每个关系仅存放于二部图模型的一侧处理单元,无需复制数据,且处理单元相互独立,因此该模型具有内存高效、易伸缩和可扩展等特性.然而,由于数据流速的不稳定性和属性值分布的不均衡性,导致倾斜数据流的连接操作易出现集群负载不均衡的现象.针对倾斜数据流的连接操作,模型无法动态分配查询节点,并需要人工干预数据分组的参数设置.尤其是应对全部历史数据的连接查询,模型效率更低.基于上述问题,提出了管理倾斜数据流连接的框架,使用基于键值和元组混合的划分样式,有效应对二部图模型的各侧倾斜数据.设计了重新动态分配查询节点的策略和状态迁移算法,以支持全历史数据的连接查询和自适应的资源管理.针对合成数据和真实数据的实验结果表明,该方案可有效应对倾斜数据的连接操作,并进一步提升分布式数据流管理系统的吞吐率,特别是降低云环境中的计算成本.
关键词: 分布式数据流管理系统     在线连接     数据倾斜     状态迁移     二部图连接模型    
Online Join Method for Skewed Data Streams
WANG Chun-Kai, MENG Xiao-Feng     
School of Information, Renmin University of China, Beijing 100872, China
Foundation item: National Natural Science Foundation of China (61532016, 61379050, 61532010, 91646203, 61762082); The National Key Research and Development Program of China (2016YFB1000602, 2016YFB1000603); The Research Funds of Renmin University (11XNL010); the Science and Technology Opening up Cooperation project of He'nan Province (172106000077)
Abstract: Scalable distributed join processing in a parallel environment requires a partitioning policy to transfer data while minimizing the size of migrated statement and the number of communicated messages. Online theta-joins over data streams are more computationally expensive and impose higher memory requirement in distributed data stream management systems (DDSMS) than standalone database management systems (DBMS). The complete bipartite graph-based model can support distributed stream joins, and has the characteristics of memory-efficiency, elasticity and scalability. This is because each relation is stored in its corresponding processing units without data replicas and the units are independent of each other. However, due to the instability of data stream rate and the imbalance of attribute value distribution, the online theta-joins over skewed data streams can lead to the load imbalance of cluster. In this case, the bipartite graph-based model is unable to allocate the query nodes dynamically, and requires to set parameters about the grouping manually. The more serious issue is that the effect of the full-history join is worse. In this paper, a framework for handling skewed stream join is presented for enhancing the adaptability of the join model and minimizing the system cost based on the varying workloads. The proposal includes a mixed key-based and tuple-based partitioning scheme to handle skewed data in each side of the bipartite graph-based model, a strategy for redistribution of query nodes in two sides of this model, and a migration algorithm about state consistency to support full-history joins and adaptive resource management. Experiments with synthetic data and real data show that the presented method can effectively handle skewed data streams and improve the throughput of DDSMS, and it also effective especially on reducing the operational cost in the cloud environment.
Key words: distributed data stream management system     online join     data skew     state migration     bipartite graph-based join model    

近年来, 随着数据类型的增多和数据密集型应用的不断涌现, 数据数量和流速在快速增加, 这使得数据流的实时分析和流式处理成为当今热点研究领域之一[1-3].因此, 分布式数据流管理系统(distributed data stream management system, 简称DDSMS)被广泛应用于大规模数据流的实时处理和查询分析.分布式数据流管理系统往往由上层的关系查询系统(relational query system, 简称RQS)和下层的流处理系统(stream processing system, 简称SPS)组成.目前, 有许多开源的流处理系统, 如S4[4]、Storm[5]等.为提高其易用性和处理能力, 提供查询语言的关系查询系统也相继推出, 如Squall[6]、Calcite[7]等.当用户向关系查询系统提交查询请求时, 查询任务被转换成由多个子任务构成的有向无环图(directed acyclic graph, 简称DAG), 并运行于流处理系统.

在社交网络的微博分析、金融领域的高频交易监控和电商领域的实时推荐服务等应用中, 往往涉及多个数据流的连接查询和分析.连接操作需要维护大量的状态信息, 并依赖于全部历史(full-history)数据[8, 9].在这些应用中, 数据流速往往发生波动且属性值的分布也较不均衡.这使得倾斜数据流的连接操作易出现集群负载不均衡的现象, 导致了连接查询的效率降低和云环境下计算成本的升高.由于数据流速和分布的不均衡性引起了属性值倾斜(attribute value skew, 简称AVS)[10]和由数据划分带来的元组放置倾斜(tuple placement skew, 简称TPS)[10]等问题.因此, 如何应对倾斜数据流的高效连接和集群的负载均衡, 是本文重点关注的问题.

为支持任意连接谓词和应对数据倾斜的问题, 基于矩阵的连接模型[11]和基于完全二部图的连接模型[12]是最具代表性的两种模型.矩阵连接模型利用划分规则将数据流随机分裂成不重叠的子流.如图 1(a)所示, 数据流R(或S)被随机划分为R1R2(或S1S2), 由于各子流需要复制到多个不同的处理单元, 该模型在每个矩阵单元格内实现了全历史数据任意谓词的θ连接.作为替代矩阵模型的代表, 二部图连接模型将处理单元组织成完全二部图的形式, 每侧对应一个数据流.如图 1(b)所示, 流RS被划分至不同的两侧, 并根据基于键值的划分方法(如哈希函数), 各元组被分配至同一侧的不同节点进行存储, 如R1R2(或S1S2).与此同时, 元组通过相同的哈希函数被发送至对面一侧并完成连接操作, 待完成连接操作后, 丢弃该元组.图 1(b)展示了该连接的处理过程:从R发出的元组存储于R1, 并同时发送至S1完成连接操作; 从S发出的元组存储于S2, 并发送至R2完成连接操作.

Fig. 1 Representative join models 图 1 代表性的连接模型

矩阵连接模型和二部图连接模型可有效处理分布式数据流的在线连接操作, 但是面临如下问题和挑战.

1) 矩阵连接模型需要使用较多额外内存用于整行和整列数据的复制与存储; 二部图连接模型尽管节省了内存空间, 但无法根据数据流的倾斜情况动态调整处理单元的分布;

2) 由于数据流分布的不一致性和数据操作的非对称性, 导致集群的负载不均衡, 严重影响了数据流管理系统的性能;

3) 具有状态信息的连接操作需要数据流管理系统具有较好的可扩展性.当某一节点的压力过大(或过小)时, 集群规模可根据应用负载动态增加(或减少).

本文基于二部图连接模型提出了应对倾斜数据流在线连接的数据划分策略, 以实现集群的负载均衡并保证分布式数据流管理系统的高吞吐率.本文的主要贡献如下:

1) 关于二部图连接模型中单侧节点的数据倾斜问题, 提出了基于键值和元组混合的数据划分模式.并结合动态迁移策略中涉及的不同代价类型, 提出归一化的优化目标.通过周期性地监控各处理单元的负载情况, 动态制定迁移策略, 以实现最小化的总体代价, 从而实现单侧节点间的负载均衡;

2) 根据数据流速的倾斜情况, 设计了二部图连接模型两侧处理节点之间的重分布策略.通过逻辑重划分查询节点, 动态实现连接算法的负载均衡.并在开源流处理系统Storm上实现该处理流程, 利用合成数据和真实数据的不同查询任务, 证实了算法的有效性和可行性;

3) 为支持全历史数据的连接查询和自适应的系统资源管理, 给出了确保状态一致性的迁移算法.使用算子状态管理器动态迁移不同节点之间的处理单元, 以保证连接算子的状态一致性和可扩展性.

本文引言部分给出倾斜数据流在线连接的问题描述.第1节介绍相关工作.第2节给出一些预备知识.第3节提出应对倾斜数据流的处理框架.第4节重点描述算法的设计过程和实例说明.第5节给出实验验证及分析结果.第6节进行总结.

1 相关工作

针对分布式环境下数据流在线连接的相关研究工作, 可从以下4个方面归纳概括.

1) 连接类型

为处理不同查询请求, 在线连接类型分为等值连接和θ连接两种.由谷歌公司开发的Photon[13]是针对操作网络查询数据流和用户广告点击数据流的等值连接算子.D-Stream[14]是Sparking Streaming[15]定义的操作对象, 可支持多数据流的θ连接.利用Spark[16]提供的RDD[17]机制, 确保查询处理的正确性和容错性.DYNAMIC[11]算子利用矩阵连接模型支持多数据流的θ连接.通过设计重组器(reshuffler)动态定义划分样式, 确保最小化的数据输入装载因子(input-load factor).JB算子[12]利用二部图连接模型也支持多数据流的θ连接, 并可根据数据的加载程度扩展处理单元的数量.

2) 连接模型

针对不同类型的连接算子, 其连接模型各不相同.Photon[13]利用中心协调器(central coordinator)模型实现多数据中心的容错和扩展连接.通过向中心协调器注册查询事件的方法和多数据中心的分布式架构, 确保数据的完备性.D-Stream[14]利用RDD转换(transformation)模型, 将数据流切分成一系列的微批次(mini-batch)进行处理. DYNAMIC[11]利用矩阵连接模型构造(n, m)映射模式, 自动将处理节点划分成J(J=nxm)个矩阵区域.JB算子利用二部图连接模型将集群划分成两个部分, 降低了数据备份的冗余度并提高了资源利用率.

3) 处理方式

数据流的处理方式一般分为非阻塞的元组处理和阻塞的批处理.Photon[13]、DYNAMIC[11]和JB算子均属于非阻塞的元组处理, 可确保实时获取数据流的连接查询结果.D-Stream[14]使用数据阻塞的批处理方式, 需要将数据流切分成微批次的方式在Spark Streaming[15]上处理.

4) 倾斜数据流的负载均衡

为确保集群的负载均衡, 有效应对数据流倾斜的问题, 各种连接模型均有应对措施.DYNAMIC[11]根据数据倾斜变化, 提出自适应的重划分机制.Aleksandar等人引入等重(equi-weight)直方图的概念[18], 推出了多级负载均衡算法.然而, 文献[11, 18]要求处理单元的划分个数必须为2的幂次方.因此, 矩阵模型的伸缩性较差, 资源浪费严重.为降低计算成本, 文献[19]利用构建不规则的矩阵模式, 提出了提高成本效益的数据流连接算法.然而, 其基本思想仍是基于矩阵模型, 存在一定的数据冗余问题.JB算子可显著节省资源使用率, 并通过引入混合路由策略ContRand, 在一定情况下解决了负载不均衡的问题.但是该策略需要人工干预数据分组的参数设置, 并且不能根据数据流速动态分配处理单元.此外, 在查询任务倾斜和变化的场景下, 需要对处理单元中的数据做重新划分的迁移操作.文献[20]在设定相同键值元组未超过同一处理单元存储上限的情况下, 将各处理单元的数据倾斜问题规约至装箱问题[21]进行处理.但数据倾斜程度较高时, 会存在相同键值元组超过同一处理单元存储上限的情况.此时, 需要对同一处理单元的相同键值元组分裂至不同的处理单元.该问题无法用文献[20]提出的方法解决.

2 预备知识

本节给出全历史数据在线连接算子的相关预备知识.

2.1 基本定义

全历史数据在线连接操作是数据流RS的各个元组对满足连接谓词的操作, 表示为RS.在线连接是一种需要额外内存空间记录中间结果和迁移信息的状态算子(stateful operator).为便于形式化定义和明确优化目标, 本文涉及到的相关符号说明见表 1.

Table 1 Table of notations 表 1 符号表

定义1.时刻t, 处理单元pu的负载均衡因子可定义为

$ {\theta _t}\left( {\mathit{pu}} \right) = \left| {{L_t}\left( {pu} \right)-{{\overline L }_t}} \right|/{\overline L _t} $ (1)

其中, Lt为处理单元集合(PU)总负载的平均值.Lt定义为

$ {\overline L _t}{\rm{ = }}\sum\limits_{pu = 1}^{{N_{\mathit{pu}}}} {\left( {{L_t}\left( {pu} \right)} \right)} /{N_{pu}} $ (2)

那么, 如果θt(pu)≤θmax, 我们可认为处理单元pu在时刻t的负载是相对平衡的.

定义2.时刻t, 当存在pu(puPU), 使得θt(pu) > θmax时, 我们认为处理单元pu在时刻t负载不均衡, 需要给出3种数据迁移的机制:(1)数据迁入, 不同处理单元具有相同Ktup的元组迁入至起始处理单元; (2)数据迁出, 具有相同Ktup的全部元组从当前处理单元迁出至其他处理单元; (3)数据分裂, 具有相同Ktup的部分元组迁移至其他处理单元, 剩余元组保持在当前处理单元.

定义3.根据数据流的分布和倾斜程度, 我们需要动态制定迁移策略.根据不同策略, 涉及到3种类型的代价:(1)路由代价Crouting, 数据迁移后, 为记录Ktup和处理单元的映射关系而维护迁移路由的代价; (2)复制代价Cduplication, 数据分裂后, 执行连接操作时复制相同Ktup元组带来的代价; (3)迁移代价Cmigration, 元组从某一处理单元迁移至其他处理单元的代价.

由定义2和定义3可知:数据迁入仅涉及到迁移代价; 数据迁出涉及到路由代价和迁移代价; 数据分裂涉及到路由代价、复制代价和迁移代价.

2.2 优化目标

时刻t, 所有迁移函数的集合F={f1, f2, f3, …}.根据定义3, 每个迁移函数fi的代价可表示为

$ \begin{array}{l} {C_t}\left( {{f_i}} \right) = \alpha \times {C_{routing}}({f_i}) \times \beta \times {C_{duplication}}({f_i}) + \gamma \times {C_{migration}}({f_i}), \alpha + \beta + \gamma = 1 \end{array} $ (3)

其中, α, βγ是3种代价的权重.数据迁移的优化目标可表示为

$ \mathop {\min }\limits_{{f_i} \in F} {C_t}\left( {{f_i}} \right)\;\;\;\;{\rm{s}}{\rm{.t}}{\rm{.}}\;\;\;\;\;{\theta _\mathit{t}}\left( {pu} \right) \le {\theta _{\max }}, \forall \left( {pu} \right) \in PU $ (4)

当满足负载均衡的条件下, 该优化目标是最小化迁移函数的总体代价.这涉及到Ktup的范围、全部处理单元的数目和最大非平衡因子的边界值θmax.由于具有相同Ktup的数据在同一处理单元内部也有可能被分裂, 这比文献[20]中的装箱问题更为复杂.因此, 本文的优化目标可规约为NP问题.接下来, 本文将给出解决该问题的系统架构和一系列的启发式规则.

3 系统架构

本节详细介绍应对倾斜数据流的动态负载均衡机制.基于二部图连接模型, 在流处理系统上构建控制器(controller), 用于周期性地监控各处理单元的负载情况.系统整体架构与工作流程如图 2所示.

Fig. 2 Architecture of workflow 图 2 工作流程架构图

●首先, 数据流R(或S)使用基于键值的哈希函数进行划分, 分别存储在n(或m)个处理单元中, 并同步将数据元组发送至另一侧处理单元以完成在线连接的操作;

●然后, 以固定时间间隔(本文的实验设置为5s), 周期性地监控二部图模型每侧节点的负载统计信息, 并搜集发送至控制器:若控制器监控到某些处理单元超过负载均衡因子的临界值, 我们则根据本文提出的启发式规则(见第4.1节)动态制定迁移策略, 以实现最小化总体代价的优化目标;

●接下来, 在数据迁移之前, 将新产生的数据流暂存在Kafka[22]中, 暂缓新数据的连接操作.此时, 我们按照迁移策略进行数据流和连接状态信息的迁移, 并同步更新路由表(routing table);

●最后, 继续发送Kafka中暂存的和新到来的数据, 完成后续的在线连接操作.

4 数据迁移算法

在处理单元内部, 数据出现负载过重的情况下, 需要对内部数据进行划分和迁移.这比将处理单元当作整体进行数据迁移的情况更为复杂.因此本节需要制定一系列的启发式规则优化该目标函数, 并提出两种优化策略:一种是针对二部图中单侧内部节点的数据迁移(internal-side-migration, 简称ISM); 另一种是针对二部图两侧节点的逻辑迁移(side-to-side migration, 简称S2SM).

4.1 启发式规则

时刻t, 假设存在某一处理单元的负载超过非平衡因子的临界值上限, 我们将其标记为pumax, 即Lt(pumax) > (1+θmaxLt, 或者存在某一处理单元负载低于非平衡因子的临界值下限, 我们将其标记为pumin, 即Lt(pumin) < (1-θmaxLt.为满足二部图连接模型各处理单元的平衡性, 并尽量减少数据迁移的情况, 我们设定的启发式规则如下.

1) H1.数据需要迁出的处理单元, 如果迁出负载键值的元组后可直接满足非平衡因子阈值的要求, 则直接进行迁出操作, 并在路由表中记录迁移键值;

2) H2.数据需要迁出的处理单元, 如果迁出某些键值的元组后仍不满足非平衡因子阈值的要求, 则需要切分具有较高元组数的键值, 将切分后的部分数据进行迁出操作, 并在路由表中记录迁移键值;

3) H3.数据需要迁入的处理单元, 如果存在键值在路由表中, 则优先将该键值的元组合并至哈希函数映射的处理单元, 并清空路由表中的记录.

根据上述3种规则, 我们分别给出迁出元组和迁入元组的基本算法.其中, 迁入元组的具体流程见算法1.算法首先判断迁出集合中迁出元组的键值范围(第1行~第3行), 并确定待迁入元组的处理单元(第4行~第6行), 然后针对各个迁出键值按照启发式规则H1和H2完成数据迁出, 并更新路由表(第7行~第10行), 最终确定迁移计划.

算法1. MoveOut算法.

输入:迁出处理单元集合PUout, 迁入处理单元集合PUin, 路由表RT;

输出:迁移计划MP.

1.   for (i=1; i < number_of_PUout; i++)

2.    计算迁出元组的键值范围Rk;

3.   end for

4.   for (j=1; j < number_of_PUin; j++)

5.    确定待迁入元组的处理单元puin;

6.    end for

7.    for (k=1; k < Rk; k++)

8.    利用H1和H2迁出元组;

9.    更新路由表RT;

10.    end for

迁入元组的具体流程见算法2描述.算法首先判断迁入集合中迁入元组的键值范围(第1行~第3行), 并确定待迁出元组的处理单元(第4行~第6行), 然后针对各个迁入键值, 按照启发式规则H3完成数据迁入, 并更新路由表(第7行~第10行), 最终确定迁移计划.

算法2. MoveIn算法.

输入:迁入处理单元集合PUin, 迁出处理单元集合PUout, 路由表RT;

输出:迁移计划MP.

1.   for (i=1; i < number_of_PUin; i++)

2.    计算迁入元组的键值范围Rk;

3.   end for

4.    for (j=1; j < number_of_PUout; j++)

5.    确定待迁出元组的处理单元puout;

6.   end for

7.    for (k=1; k < Rk; k++)

8.    利用H3迁入元组;

9.    更新路由表RT;

10.    end for

4.2 ISM算法

时刻t, 为满足二部图连接模型单侧处理单元的平衡性, 并尽量减少数据迁移的情况, 单侧的数据迁移的ISM算法见算法3描述.算法首先统计时刻t每个计算单元的负载Lt(pu), 并计算出平均负载Lt(第1行~第4行); 然后, 对于需要迁出数据的处理单元, 调用MoveOut算法(第6行~第8行); 最后, 对于需要迁入数据的处理单元, 调用MoveIn算法(第9行~第11行).

算法3. ISM算法.

输入:二部图单侧处理节点PU, 路由表RT, 非平衡因子的临界值θmax;

输出:迁移计划MP.

1.    for (i=1; i < number_of_PU; i++)

2.    计算Lt(pui);

3.    end for

4.  计算Lt;

5.      for (i=1; i < number_of_PU; i++)

6.    if (Lt(pui) > (1+θmaxLt) then

7.       MoveOut(PU, PU, RT);

8.  end if

9.  if (Lt(pui) < (1-θmaxLt) then

10.        MoveIn(PU, PU, RT);

11.   end if

12.   end for

我们用实例说明ISM算法的处理过程.假设每个处理单元的负载要求完全一致, 即θmax=0.如图 3(a)所示, 时刻t1:处理单元pu1包含键值k1k2, 元组数分别为25和5;pu2包含键值k3k4, 元组数分别为5和5.根据启发式规则H1和H2, 需要将k1元组切分成15和10, 并将其中10个元组迁移至pu2, 同步更新路由表.如图 3(b)所示:在下一时刻t2, 由于数据的倾斜原因, 导致k4的元组增加至45个, 其他不变.根据启发式规则H3, 我们首先需要将pu2中的k1元组移回pu1, 并清空路由表; 然后, 再将k4元组切分成35和10, 并将其中10个元组迁移至pu1, 同步更新路由表.

Fig. 3 Example of ISM 图 3 ISM算法举例

4.3 S2SM算法

由于数据流速的动态改变, 导致二部图模型中两侧数据流的数量会有较大间隙, 这严重影响到分布式数据流管理系统的吞吐率.为此, 我们给出两侧节点逻辑迁移的S2SM算法(见算法4).该算法首先统计每个单元的负载Lt(pu), 并分别统计各侧和整个集群的平均负载Ltm, Ltm, Lt(第1行~第4行); 然后, 根据临界值判定迁出元组的一侧和迁入元组的一侧(第5行~第14行); 最后, 针对迁出侧, 判断需要迁出的处理单元并调用迁出算法(第15行~第19行), 针对迁入侧, 判断需要迁入的处理单元并调用迁入算法(第20行~第24行).

算法4. S2SM算法.

输入:二部图两侧处理节点PUmPUn, 路由表RT, 非平衡因子的临界值θmax;

输出:迁移计划MP.

1.   for (i=1; i < number_of_(PUm+PUn); i++)

2.    计算Lt(pui);

3.   end for

4.  计算Ltm, Ltm, Lt;

5.    if (Ltm>(1+θmaxLt) then

6.     PUout=PUm;

7.    else if (Ltm < (1-θmaxLt) then

8.     PUin=PUm;

9.   end if

10.   if (Ltn>(1+θmaxLt) then

11.     PUout=PUn;

12.  else if (Ltn < (1-θmaxLt) then

13.    PUin=PUn;

14.  end if

15.  for (j=1; j < number_of_PUout; j++)

16.    if (Lt(puj) > (1+θmaxLt) then

17.      MoveOut(PUout, PUin, RT);

18.    end if

19.  end for

20.  for (k=1; k < number_of_PUin; k++)

21.    if (Lt(puk) < (1-θmaxLt) then

22.       MoveIn(PUm, PUout, RT);

23.    end if

24.  end for

我们用实例说明S2SM算法的处理过程.假设每个处理单元的负载要求完全一致, 即θmax=0.如图 4所示, 时刻t, 二部图连接模型共包括4个处理单元pu1~pu4, 其中, pu1pu2用于存储数据流R, pu3pu4用于存储数据流S.pu1pu2包含键值Rk1~Rk4, 元组数分别为5, 5, 5和5;pu3pu4包含键值Sk1~Sk4, 元组数分别为25, 5, 25和5, 集群和两侧负载分别为20, 10和30.因此, 我们需要将数据流S的部分元组迁移至pu1pu2.按照迁出规则H1和H2, 需要将Sk1元组数切分成15和10, 其中10个元组迁移至pu1, 同步更新路由表; 并将Sk4元组数切分成15和10, 其中10个元组迁移至pu2, 同步更新路由表.

Fig. 4 Example of S2SM 图 4 S2SM算法举例

4.4 状态一致性迁移

基于二部图连接模型设计的BiStream系统[12]实现了动态扩展(或缩减)查询节点的资源管理策略, 但其仅支持基于窗口的连接模型, 无法对全历史数据的在线连接提供状态一致性的保证.本节引入内存文件系统Tachyon[23]存储算子的状态信息, 并使用算子状态管理器(operator states manager, 简称OSM)[24]动态迁移不同节点间的处理单元, 以保证连接算子的状态一致性和可扩展性.我们利用该管理器实现了全历史数据在线连接操作的动态资源管理.状态管理器的架构图如图 5所示.节点中的每个处理进程(storm中称作worker)管理自身的处理单元(pu1~pun).状态管理器负责维护处理单元的数据不被当作垃圾回收, 并将处理单元按照维持、迁出和迁入这3种状态进行管理.维持状态的处理单元仍运行在本进程中; 迁出状态的处理单元将被迁移至其他节点, 相关状态信息保存至文件系统(file server); 迁入状态的处理单元由其他节点迁入该节点, 并将其状态信息通过文件系统加载至恢复线程(retriever).该管理器确保了全历史数据在线连接操作的动态资源管理.

Fig. 5 Architecture of OSM[24] 图 5 OSM架构[24]

5 实验与结果分析 5.1 实验准备

1) 实验环境

本文实验平台用1GB网络连通14个物理节点, 其中5个是使用kafka的数据发送节点, 1个是Storm的nimbus节点, 其余8个是Storm的supervisor节点.数据发送与nimbus各节点配置是:CPU:Intel E5-2620 2.00GHz, Memory:4GB; supervisor各节点配置是:CPU:两个Intel E5-2620 2.00GHz, Memory:64GB; 操作系统: Ubuntu-14.04.3;Storm版本0.9.5.

2) 数据集

我们使用合成数据和真实数据进行实验对比.首先, 以TPC-H[25]作为基准测试, 并利用dbgen产生数据集.所有数据在发送至Storm之前, 均存放于数据发送节点的Kafka中.为测试数据集的不同倾斜程度, 在连接属性上使用不同倾斜度z的Zipf分布.默认情况下, 我们令z=1, 并产生10GB数据.其次, 第2个数据集是Linear Road Benchmark(LRB)[26], 该基准测试模拟高速公路的收费系统, 统计不同路段、不同方向的车速信息.我们产生了1 200万条车辆数据信息, 如图 6所示, 车速分布具有明显的倾斜性.

Fig. 6 Speed distribution of LRB 图 6 LRB车速分布图

3) 查询任务

本文共选取4个查询任务, 其中两个是TPC-H提供的等值连接Q3和Q5, 一个是文献[11, 12]中使用的范围查询(band).Band查询描述为:

● SELECT *, FROM LINEITEM L1, LINEITEM L2

WHERE ABS(L1.orderkey-L2.orderkey)≤1

AND (L1.shipmode=‘TRUCK’ AND L2.shipinstruct=‘NONE’) AND L1.Quantity > 48

另一个是LRB场景下, 通过自连接操作统计同一车道下车速相同的车辆信息, 其查询描述为:

● SELECT *, FROM LRB L1, LRB L2

WHERE L1.Spd=L2.Spd AND L1.Lane=L2.Lane AND L1.Dir=L2.Dir AND L1.VID!=L2.VID

4) 对比模型

本文使用3种算法对比分析查询性能:D-JB, JB算子和JB6.D-JB是本文提出的算法, 表示动态二部图连接模型.JB算子表示平均分配集群节点至二部图的各侧.JB6表示平均分配节点后, 二部图各侧内部的处理单元分成6个子组做随机路由.

5.2 吞吐率和延迟

我们使用TPC-H中z=1的10GB数据, 针对3个不同的查询任务, 对比了3个模型的吞吐率和延迟.如图 7(a)所示, 由于D-JB动态调整了处理单元的负载情况, 其吞吐率最高.而JB算子需要做单侧的全网广播操作, 通信量较大, 故其吞吐率最低.针对不同查询任务的topology处理延迟而言, 图 7(b)说明, D-JB的延迟均低于JB算子和JB6.

Fig. 7 Throughput and latency 图 7 吞吐率和延迟

5.3 系统扩展性

当集群节点动态改变时, 我们进一步分析D-JB, JB算子和JB6的可扩展性.由于JB算子和JB6不支持全历史数据连接的动态扩展, 在增加处理节点时, 我们将TPC-H数据暂存于Kafka的消息队列, 待重启Storm的topology后继续发送数据(需减去重启topology的时间, 以获得计算连接操作的实际运行时间).如图 8(a)~图 8(c)所示:D-JB的运行时间是最短的, 其扩展性最好; 此外, 由于Q5涉及到的连接操作最复杂, 涉及的数据流最多, 其运行时间高于其他两个查询任务.

Fig. 8 Run time 图 8 运行时间

5.4 通信代价

为验证D-JB模型应对不同倾斜度数据流的处理情况, 我们通过改变Zipf的分布情况, 测试3种模型的平均网络通信代价和数据迁移总量.对比图 9(a)~图 9(c)我们发现:

Fig. 9 Network communication cost 图 9 网络通信代价

● JB算子需要做全网广播操作, 其平均网络通信代价最高;

● JB6仅做子网广播操作, 其通信代价最低;

● D-JB由于做了数据迁移操作, 其通信代价略高于JB6;并且, 由于Q5涉及到的数据流最多, 其通信代价高于Q3和Band.

此外, 根据不同倾斜度, 图 10给出了执行D-JB算法的数据迁移总量.可以发现:随数据倾斜度的增加, 由数据迁移引起的网络通信总量也随之增加, 但迁移总量相对于全部数据流量来讲相对较低.

Fig. 10 Total migration 图 10 迁移总量

5.5 归一化优化目标的选参设置

本文为设计归一化模型, 为3个代价设置相关权重参数.首先, 路由表仅在中心节点更新, 维护代价最低; 其次, 在连接操作时, 由于数据分裂导致的复制相同Ktup数据至多个节点, 网络开销较高; 最后, 数据分裂导致多条数据在不同节点间迁移, 网络开销最高.如图 11所示, 我们分别将α, βγ设置为0.2, 0.3和0.5(conf_1);0.1, 0.3, 0.6(conf_2)和0.1, 0.4, 0.5(conf_3).以Q3查询为例, 我们可以发现, conf_1的查询吞吐率最高.

Fig. 11 Throughput 图 11 吞吐率

5.6 真实数据流分析

除使用合成数据TPC-H外, 我们还使用LRB的真实数据测试算法的有效性.执行第5.1节的查询任务, 根据速度分布的倾斜性和数据量的不断增加, 记录其执行时间.如图 12所示, D-JB算法的执行时间最短.

Fig. 12 LRB run time 图 12 LRB运行时间

6 总结

本文提出了一种应对倾斜数据流的在线连接方法.基于二部图连接模型, 在单侧节点内, 我们设计了基于键值和元组的混合划分样式; 在双侧节点间, 我们制定了重新分布处理单元的策略; 并通过引入状态管理器, 实现了可扩展的全历史数据连接操作.实验结果表明, 本文提出的方法在系统性能、可扩展性和应对倾斜数据的能力等方面是可行且有效的.文中提到的数据迁移问题, 文本通过制定归一化的优化目标并利用启发式规则以获取较优解.接下来, 根据集群环境的不同, 我们需要解决自动选取归一化优化目标的最优配置问题, 并寻找更佳的优化方法使连接操作更加快速, 系统更加稳定.

参考文献
[1]
Sun DW, Zhang GY, Zheng WM. Big data stream computing:Technologies and instances. Ruan Jian Xue Bao/Journal of Software, 2014, 25(4): 839–862(in Chinese with English abstract). http://www.jos.org.cn/jos/ch/reader/view_abstract.aspx?file_no=4558&flag=1 [doi:10.13328/j.cnki.jos.004558]
[2]
Cui XC, Yu XH, Liu Y, Lü ZY. Distributed stream processing:A survey. Journal of Computer Research and Development, 2015, 52(2): 318–332(in Chinese with English abstract). [doi:10.7544/issn1000-1239.2015.20140268]
[3]
Wang CK, Meng XF. Relational query techniques for distributed data stream:A survey. Computer Journal of Computers, 2016, 39(1): 80–96(in Chinese with English abstract). [doi:10.11897/SP.J.1016.2016.00080]
[4]
Neumeyer L, Robbins B, Nair A, Kesari A. S4: Distributed stream computing platform. In: Proc. of the 10th EEE Int'l Conf. on Data Mining Workshops. Sydney: IEEE, 2010. 170-177. [doi: 10.1109/ICDMW.2010.172]
[5]
Toshniwal A, Taneja S, Shukla A, Ramasamy K, Pater JM. Storm@Twitter. In: Proc. of the 2014 ACM Int'l Conf. on Management of Data. Snowbird: ACM Press, 2014. 147-156. [doi: 10.1145/2588555.2595641]
[6]
[7]
[8]
Hwang JH, Balazinska M, Rasin A, Cetintemel U, Stonebraker M. High-Availability algorithms for distributed stream processing. In: Proc. of the 21st Int'l Conf. on Data Engineering. Tokyo: IEEE, 2005. 779-790. [doi: 10.1109/ICDE.2005.72]
[9]
Fernandez RC, Migliavacca M, Kalyvianaki E, Pietzuch P. Integrating scale out and fault tolerance in stream processing using operator state management. In: Proc. of the 2013 ACM SIGMOD Int'l Conf. on Management of Data. New York: ACM Press, 2013. 725-736. [doi: 10.1145/2463676.2465282]
[10]
Walton CB, Dale AG, Jenevein RM. A taxonomy and performance model of data skew effects in parallel joins. In: Proc. of the 17th Int'l Conf. on Very Large Data Bases. Barcelona: Morgan Kaufmann Publishers, 1991. 537-548.
[11]
Elseidy M, Elguindy A, Vitorovic A, Koch C. Scalable and adaptive online joins. Proc. of the VLDB Endowment, 2014, 7(6): 441–452. [doi:10.14778/2732279.2732281]
[12]
Lin Q, Ooi BC, Wang Z, Yu C. Scalable distributed stream join processing. In: Proc. of the 2015 ACM SIGMOD Int'l Conf. on Management of Data. Melbourne: ACM Press, 2015. 811-825. [doi: 10.1145/2723372.2746485]
[13]
Ananthanarayanan R, Basker V, Das S, Gupta A, Jiang H. Photon: Fault-tolerant and scalable joining of continuous data streams. In: Proc. of the ACM 2013 Int'l Conf. on Management of Data. New York: ACM Press, 2013. 577-588. [doi: 10.1145/2463676.2465272]
[14]
Zaharia M, Das T, Li H, Hunter T, Shenker S. Discretized streams: Fault-tolerant streaming computation at scale. In: Proc. of the ACM SIGOPS 24th Symp. on Operating Systems Principles. Farmington: ACM Press, 2013. 423-438. [doi: 10.1145/2517349.2522737]
[15]
[16]
[17]
Zaharia M, Chowdhury M, Das T, Dave A, Ma J. Resilient distributed datasets: A fault-tolerant abstraction for in-memory cluster computing. In: Proc. of the 9th USENIX Symp. on Networked Systems Design and Implementation. San Jose: USENIX, 2012. 15-28.
[18]
Vitorovic A, Elseidy M, Koch C. Load balancing and skew resilience for parallel joins. In: Proc. of the 32nd IEEE Int'l Conf. on Data Engineering. Helsinki: IEEE, 2016. 313-324. [doi: 10.1109/ICDE.2016.7498250]
[19]
Fang JH, Zhang R, Wang XT, Fu TZJ, Zhang ZJ, Zhou AY. Cost-Effective stream join algorithm on cloud system. In: Proc. of the 25th ACM Int'l Conf. on Information and Knowledge Management. Indianapolis: ACM Press, 2016. 1773-1782. [doi: 10.1145/2983323.2983773]
[20]
Fang JH, Zhang R, Fu TZJ, Zhang ZJ, Zhou AY, Zhu JH. Parallel stream processing against workload skewness and variance. In: Proc. of the 26th Int'l Symp. on High-Performance Parallel and Distributed Computing. Washington: ACM Press, 2017. 15-26. [doi: 10.1145/3078597.3078613]
[21]
Karmarkar N, Karp RM. An efficient approximation scheme for the one-dimensional bin-packing problem. In: Proc. of the 23rd Annual Symp. on Foundations of Computer Science. Chicago: IEEE, 1982. 312-320. [doi: 10.1109/SFCS.1982.61]
[22]
[23]
Li HY, Ghodsi A, Zaharia M, Shenker S, Stoica I. Tachyon: Reliable, memory speed storage for cluster computing frameworks. In: Proc. of the 2014 ACM Symp. on Cloud Computing. Seattle: ACM Press, 2014. 1-15. [doi: 10.1145/2670979.2670985]
[24]
Ding JB, Fu TZJ, Ma RTB, Winslett M, Yang Y, Zhang ZJ, Chao HY. Optimal operator state migration for elastic data stream processing. HAL-INRIA, 2015, 22(3): 1–8. https://arxiv.org/abs/1501.03619
[25]
[26]
Arasu A, Cheniack M, Maier D, Maskey AS, Ryvkina E, Stonebraker M, Tibbetts R. Linear road: A stream data management benchmark. In: Proc. of the 30th Int'l Conf. on Very Large Data Bases. Toronto: Morgan Kaufmann Publishers, 2004. 480-491. [doi: 10.1016/B978-012088469-8.50044-9]
[1]
孙大为, 张广艳, 郑纬民. 大数据流式计算:关键技术及系统实例. 软件学报, 2014, 25(4): 839–862. http://www.jos.org.cn/jos/ch/reader/view_abstract.aspx?file_no=4558&flag=1 [doi:10.13328/j.cnki.jos.004558]
[2]
崔星灿, 禹晓辉, 刘洋, 吕朝阳. 分布式流处理技术综. 计算机研究与发展, 2015, 52(2): 318–332. [doi:10.7544/issn1000-1239.2015.20140268]
[3]
王春凯, 孟小峰. 分布式数据流关系查询技术研究. 计算机学报, 2016, 39(1): 80–96. [doi:10.11897/SP.J.1016.2016.00080]