2.2 批处理基础:定义及位置
我们开始吧,第一步——批处理。
2.2.1 定义:转换
应用于传统批处理中的转换(transformation)回答了“计算出什么结果?”这个问题。即使你可能对传统批处理已经非常熟悉了,我们也还是要从传统批处理开始说起,因为它是我们添加其他所有概念的基础。
在本章的剩余部分中(实际上,在本书的绝大部分内容中),我们都将研究一个例子:在一个由9个值组成的简单数据集上计算按键分类的整数和。假设我们已经开发了一个基于团队的移动游戏,并且我们想构建一个流水线来计算团队的得分,团队得分由团队的各个成员的手机上报的单独得分求和而成。我们需要在一个名为UserScores
的SQL表中获取9条得分数据的示例,它可能看起来像这样:
> SELECT * FROM UserScores ORDER BY EventTime;
------------------------------------------------
| Name | Team | Score | EventTime | ProcTime |
------------------------------------------------
| Julie | TeamX | 5 | 12:00:26 | 12:05:19 |
| Frank | TeamX | 9 | 12:01:26 | 12:08:19 |
| Ed | TeamX | 7 | 12:02:26 | 12:05:39 |
| Julie | TeamX | 8 | 12:03:06 | 12:07:06 |
| Amy | TeamX | 3 | 12:03:39 | 12:06:13 |
| Fred | TeamX | 4 | 12:04:19 | 12:06:39 |
| Naomi | TeamX | 3 | 12:06:39 | 12:07:19 |
| Becky | TeamX | 8 | 12:07:26 | 12:08:39 |
| Naomi | TeamX | 1 | 12:07:46 | 12:09:00 |
------------------------------------------------
注意,这一示例中的所有得分均来自同一团队的成员。为了使示例简单易懂,我们在后面的图表中只涉及了有限的几个维度。同时,由于本示例中的参与者是按团队分组的,因此我们实际只关注SQL表中的最后3列数据。
Score
与此事件相关的单个成员的得分。
EventTime
得分的事件时间,也就是得分产生的时间。
ProcTime
得分的处理时间,也就是得分被流水线观测到的时间。
对于每个流水线,我们通过一张延时图来呈现数据是如何随时间演化的。这张图将这9条得分数据绘制在我们关注的两个时间维度,即事件时间(位于x轴)和处理时间(位于y轴)上。图2-1展示了输入数据的静态图。
本书后面展示了一些延时图(动图版本可以从异步社区本书对应的网页中获取),让你了解数据是如何随着时间的变化被处理的(更多相关内容很快将在此延时图后 讲到)。
图2-1 在事件时间和处理时间的维度上绘制的9条输入记录
为了让流水线的定义更加具体,我们在每个示例前会提供Apache Beam Java SDK伪代码的简短片段。从某种意义上讲,它确实是伪代码,因为为了让示例更清晰,有时我会打破代码的规则,省略部分代码细节(如使用具体的I/O源)或简化名称(Beam Java 2.x以及之前版本中,触发器的名称冗长繁杂,我使用了简化的名称让示例更清晰)。除这些小问题以外,这些代码都是真实的Beam代码(本章中的所有示例的代码都可以从异步社区本书对应的网页中获得)。
如果你已熟悉Spark或Flink之类的大数据框架,那么你应该很容易理解Beam代码的功能。但是,为了提供一个速成教程,我们先介绍Beam中的两个基本原语。
PCollection
表示可以执行并行转换的(可能是海量的)数据集(因此名称以“P”开头,代表parallel的英文首字母)。
PTransform
PTransform
直接应用于PCollection
,以创建新的PCollection
。PTransform
可以执行逐个元素转换,可以将多个元素分组/聚合在一起,也可以与其他PTransform
组合,如图2-2所示。
图2-2 不同类型的转换
就我们的示例而言,我们通常假设从一个预先加载的、名为“input”的PCollection<KV <Team, Integer>>
(由Team
和Integer
的键/值对组成的PCollection
,其中Team
是代表团队名称的字符串,而Integer
是对应团队中单个成员的得分)开始。在真实的流水线中,我们从I/O源读取原始数据(如日志记录)的PCollection<String>
,然后将日志记录解析为对应的键/值对,再将其转换为PCollection<KV<Team, Integer>>
来获取输入。为了更清晰地讲解这个示例,我在所有这些步骤中都提供了伪代码。但在后面的示例中,为简化起见,我会忽略与I/O和解析相关的代码。
对于简单地从I/O源读取数据的流水线,解析团队/得分对,并计算每个团队的总分,我们将得到示例2-1所示的内容。
示例2-1 求和流水线
PCollection<String> raw = IO.read(...);
PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn());
PCollection<KV<Team, Integer>> totals =
input.apply(Sum.integersPerKey());
从I/O源读取键/值数据,其中Team
(如团队名称的字符串)作为键,Integer
(如单个团队成员的得分)作为值。然后将每个键的值相加,以在输出集合中生成每个键的总和(如团队总分)。
对于接下来的所有示例,我们研究完对描述需要分析的流水线的代码片段后,将研究一张展示在单个键的具体数据集上的流水线执行情况的延时图。你可以想象成在实际业务的流水线中,类似的操作在多台机器上并行执行,但鉴于我们的示例,应尽量简化才能看得更加清晰。
如前所述,本书纸质书版本使用关键帧的静态序列来表现流水线随时间推进的执行情况,在异步社区本书对应的网页中提供完整的动图版本。
每张图中都包含了事件时间(位于x轴)和处理时间(位于y轴)两个维度的输入和输出。实时展示的流水线观测到的处理过程是自下而上的,如随着时间的推进沿着处理时间坐标轴上升的加粗的水平黑线所示。输入数据用圆圈表示,圆圈内的数字表示该特定记录的值。这些圆圈一开始呈现浅灰色,当数据被流水线观测后,这些圆圈的颜色变深。
当流水线观测到数据值时,会将它们在中间状态中进行累积,并最终将聚合结果进行物化输出。数据处理的状态和输出用矩形表示(浅灰色表示状态,深灰色表示输出),聚合结果的值显示在矩形顶部,矩形覆盖的区域表示事件时间和处理时间中已累积到结果中的部分。示例2-1中的流水线在传统的批处理引擎上执行的过程如图2-3所示。
图2-3 传统的批处理
此示例是一个批处理流水线,它会累积状态,直到已观测到所有的输入数据(由顶部的绿色虚线表示),在此时间点产生数值为48的单个输出。在这个示例中,我们基于所有的事件时间计算出一个总和,因为我们没有应用任何具体的开窗转换。因此,表示状态和输出的矩形覆盖了整个x轴。但是,如果我们要处理无界数据源,传统的批处理就无法满足需求了。我们也无法等待所有输入结束,因为无界数据源的输入实际上永远不会结束。我们需要的概念之一是在第1章中介绍的开窗。因此,在2.2.2节中,我们先简要地回顾一下开窗的概念。
2.2.2 位置:开窗
如第1章所述,开窗是沿时间边界切分数据源的过程。常见的开窗策略包括固定窗口、滑动窗口和会话,如图2-4所示。
图2-4 开窗策略示例。每个示例都展示3个不同的键,突出显示对齐的窗口(适用于所有数据)和未对齐的窗口(适用于数据的一个子集)之间的差异
为了更好地了解实际业务中的开窗策略,我们把整数求和流水线放入窗口周期为2 min的固定窗口。在使用Beam的情况下,要做出这种改变只需简单添加一个Window.into
变换,如示例2-2中的突出显示部分。
示例2-2 开窗求和代码
PCollection<KV<Team, Integer>> totals = input
.apply(Window.into(FixedWindows.of(TWO_MINUTES)))
.apply(Sum.integersPerKey());
回想一下,Beam提供的统一模型在批处理和流式处理中同样适用,因为批处理在语义上只是流式处理的一个子集。我们先在批处理引擎上执行这个流水线,因为批处理的机制更为直接,而且,当我们切换到流式引擎时,这个结果将为我们提供直接的对比。图2-5展示了批处理的结果。
和之前一样,输入数据在状态中累积,直到它们被完整消耗掉,然后产生输出结果。然而,在这种情况下,我们得到的是4个单独的输出,而不是一个输出总和,其中每个输出都分别对应4个相关的2 min事件时间窗口中的一个。
至此,我们重新审视了我在第1章中介绍的两个主要概念:事件时间和处理时间之间的关系,以及开窗。如果想进一步了解这些概念,我们需要讨论2.1节开头提到的几个新概念:触发器(trigger)、水位(watermark)和累积(accumulation)。
图2-5 在批处理引擎中执行的开窗求和流水线