实现领域驱动设计
上QQ阅读APP看本书,新人免费读10天
设备和账号都新为新人

事件驱动架构

事件驱动架构(Event-Driven Architecture,EDA)是一种用于处理事件的生成、发现和处理等任务的软件架构。[Wikipedia,EDA]

图4.4所示的六边形架构表示了一个事件驱动架构系统的例子。事件驱动架构不见得必须与六边形架构一同使用,但是对于理解事件驱动架构来说,引入六边形架构是有好处的。对于一个新启动的项目来说,我们应该优先考虑采用六边形架构。

在图4.4中,三角形表示了限界上下文所使用的消息机制。输入事件所使用的端口和其他客户端所使用的端口是不同的。同样,输出事件也将使用另一个不同的端口。在前文中我们已经提到,处理消息的端口可以是使用AMQP协议的消息机制,这种方式有别于使用HTTP协议的其他客户端。无论采用哪种类型的消息机制,我们都使用“三角形”来表示事件的进出。

进出六边形的事件可能有不同的类型,而我们需要特别关心的是领域事件。除此之外还可能有系统事件,比如用于系统日志和监控的事件等。但是在DDD中,我们需要关注的是领域事件。

我们可以引入多个六边形来表示整个企业范围内的事件架构,如图4.7所示。再次提醒,这里并不是说每个系统都必须使用六边形架构,而重点在于阐述如何将事件驱动架构用于多个六边形架构系统。你完全可以将这里的六边形架构替换成分层架构或其他架构。

img

图4.7 在一个事件驱动架构中融入了六边形架构风格。该事件驱动架构通过消息机制完成对所有系统的解耦。

一个系统的输出端口所发出的领域事件将被发送到另一个系统的输入端口,此后输入端口的事件订阅方将对事件进行处理。对于不同的限界上下文来说,不同的领域事件具有不同含义,也有可能没有任何含义[5]。在一个限界上下文处理某个事件时,应用程序API将采用该事件中的属性值来执行相应的操作。应用程序API所执行的命令操作将反映到命令模型中。

有可能出现这样一种情况:在一个多任务处理过程中,某种领域事件只能表示该过程中的一部分。只有在所有的参与事件都得到处理之后,我们才能认为这个多任务处理过程完成了。但是,这个过程是如何开始的?它是如何分布在整个企业范围之内的?我们如何跟踪处理进度?这些问题我们将在“长时处理过程”一节中一一回答。这里,我们可以先学习一些基础知识。基于消息的系统通常呈现出一种管道和过滤器风格。

管道和过滤器

以下的shell命令便是一种最简单的管道和过滤器:

上面的Linux命令用于在phone_numbers.txt文件中统计含有电话区号“303”的所有文本行的数量。该命令同时使用了管道和过滤器:

1. cat命令工具用于向标准输出流(Standard Output Stream)输出phone_numbers.txt文件中的内容。通常来说,标准输出流与终端相连。但是在使用了“|”符号之后,输出将会通过管道转向下一个命令工具。

2. 下一步,grep命令从标准输出流中读取数据,此时的标准输出流即是cat命令的输出结果。grep命令的参数表示匹配含有“303”的所有行。grep命令查找到的所有结果都将输出到标准输出流。和cat命令一样,grep命令的输出流通过管道被转向到下一个wc命令。

3. 最后,wc命令读取标准输出流,此时即grep命令的输出结果。wc命令的命令行参数为“-l”,表示统计所读行的数量。wc命令的结果将输出到终端,此时为3,表明grep命令查找到了3个含有“303”的文本行。请注意,此时的输出直接显示在了终端,因为在wc命令之后没有另外的管道了。在Windows下,我们可以使用以下命令来达到相同的效果:

思考一下,在以上的命令工具中都发生了些什么。每个工具都接收一个数据集,对其进行处理,再输出另一个数据集。输出数据集和输入数据集是不同的,因为每一个命令都充当着过滤器的作用。在整个过滤过程完成之后,输出数据和输入数据可能完全不一样了。在本例中,最原始的输入是一个文本文件,但最终的输出则只有一个数字“3”。

我们如何将上例中的基本原则用于事件驱动架构呢?事实上,我们是可以发现一些共性的。接下来,我们将围绕管道和过滤器消息模式[Hohpe,Woolf]展开讨论。这里需要注意的是,用于消息的管道和过滤器与上面命令行例子并不完全一样。比如,一个事件驱动架构的过滤器可能并不需要过滤任何数据,它可以用于执行某些操作,但是不会修改消息数据。但是,事件驱动架构中的管道和过滤器却与上面的命令行例子拥有某些相同的特征。下面我们将讲到这些特征,如果你是一个高级读者,请“过滤”掉下面的一节。

表4.2包含了基于消息的管道和过滤器处理过程的基本特征。

表4.2 基于消息的管道和过滤器处理过程的基本特征

img

现在,如果我们将上例中的cat,grep和wc看成是事件驱动架构中的不同组件会发生什么情况?如果要通过创建消息发送方和接收方这样的组件来完成上例中的电话号码过滤功能,我们又应该怎么做?(这里我并不是演示如何使用消息组件来替换命令行,而是演示如何使用消息机制来达到相同的目标。)

以下是管道和过滤器的工作流程,如图4.8所示:

1. 我们可以从PhoneNumbersPublisher组件开始,该组件读取phone_number.txt中的数据,然后创建一个含有所有文本行的事件消息并发送该消息,事件名为AllPhoneNumbersListed。一旦消息发送出去,整个处理过程便开始了。

img

图4.8 对于发送的事件,由过滤器进行处理,这个过程组成了一个管道。

2. 通过配置,一个名为PhoneNumberFin d e r的消息处理组件将订阅AllPhoneNumbersListed事件。该消息处理组件是整个处理流程中的第一个过滤器。该过滤器查找出包含有“303”的所有文本行,然后创建一个名为PhoneNumberMatched的事件,该事件包含所有的查找结果。接下来,过滤器将发出一个新的事件,整个处理流程继续。

3. 一个名为M a t ch e d Ph o n eNu mb e r C ou nt e r的消息处理组件订阅了PhoneNumbersMatched事件。该消息处理组件是第二个过滤器,它的职责是统计所接收事件中的电话号码数量,再把结果以新的事件形式——MatchedPhoneNumbersCounted事件发布出去。该事件中包含了count属性,在本例中count的值为3。

4. 最后,MatchedPhoneNumbersCounted 事件的订阅方——PhoneNumberExectuive组件将记录下最终的结果,包括电话号码数量、消息接收的日期和时间:

3phone numbers matched on July 15, 2012at 11:15PM

到此,整个处理流程执行完毕[6]

以上的流程处理是灵活的。如果我们需要向其中新加入一个过滤器,我们只需要创建一种新的事件即可。我们应该慎重地配置各个处理器的作用顺序。当然,这并不像命令行例子那么简单。然而,通常来说,我们并不会经常性地改变领域事件的处理流程。虽然上面的例子本身没有多大意义,但却向我们展示了事件驱动架构中的管道和过滤器的工作机制。

如果你已经厌倦了上面的例子,那说明有可能你已经掌握了管道和过滤器的相关知识,但是还有很多人并没有像你那么高级。上面的例子主要是向读者解释管道和过滤器中的不同概念。在真实的企业应用里,我们将通过这种模式将一个大问题分解成若干个较小的步骤来完成,这使得分布式处理更容易理解和管理。

在真实的DDD应用场景中,领域事件的名字将反映业务操作。在上例中的第1步,所发出的事件表示某个限界上下文中某个聚合的行为输出。第2步到第4步可以发生在相同的限界上下文中,也可以发生在不同的上下文中,它们将接收上一步所发出的事件,处理之后再发布新的事件以通知下一步。同时,这3个步骤还可以创建和修改相应上下文中的聚合。这些都是在管道和过滤器架构中处理领域事件的常见输出。

就像领域事件(8)所讲的,这些并不是简单的事件通知。他们显式地对业务过程进行建模,这对于整个领域范围内的订阅方来说是有好处的。当然,我们还可以对这种同步式的、逐步式的处理方式进行扩展,以使其同时处理多个任务。

长时处理过程(也叫Saga)

我们可以对上面的管道和过滤器的例子进行扩展,从而得到另一种事件驱动的、分布式的并行处理模式——长时处理过程(Long-Running Process)。一个长时处理过程有时也称为Saga,但是这个名字可能与另一个已有的模式存在冲突。关于Saga的一个早期描述可以参考[Garcia-Molina & Salem]。为了消除混淆和歧义,在本书中我将使用“长时处理过程”这个名称,有时为了简单甚至直接使用“过程”。

牛仔的逻辑

LB:“我认为Saga就像连续剧Dallas和Dynasty一样!”

AJ:“对于你们所有的德国读者来说,Dynasty即是Der Denver Clan。”

作为对前一个例子的扩展,我们只需添加一个过滤器——TotalPhoneNumberCounter,便可以创建一个并行的处理流程。该过滤器订阅了AllPhoneNumbersListed事件,它与PhoneNumberFinder几乎同时接收到AllPhoneNumbersListed事件。这个新的过滤器目的很简单,即统计所有的电话号码数量。和先前的例子不同的是,此时的长时处理过程将由PhoneNumberExecutive来启动,同时它还将对处理过程进行跟踪。PhoneNumberExecutive可以重用PhoneNumbersPublisher,也可以不再重用,让我们重点看看PhoneNumberExecutive有哪些新的功能。PhoneNumberExcecutive可以通过应用服务或者命令处理器的形式实现,它将跟踪长时处理过程的各个阶段。同时,PhoneNumberExcecutive它还知道一个长时处理过程何时执行完毕,并在这些过程执行完毕之后,再执行其他任务。长时处理过程请参考图4.9:

img

图4.9 长时处理过程启动多个并行的处理过程,然后对其进行跟踪。图中大的箭头表示并行处理的开始,之后两个过滤器将接收到相同的事件。

设计长时处理过程的不同方法

设计长时处理过程有三种方法:

·将处理过程设计成一个组合任务,使用一个执行组件对任务进行跟踪,并对各个步骤和任务完成情况进行持久化。我们将详尽地讨论这种方法。

·将处理过程设计成一组聚合,这些聚合在一系列的活动中相互协作。一个或多个聚合实例充当执行组件并维护整个处理过程的状态。这种方式被Amazon的Pat Helland所提倡[Helland]。

·设计一个无状态的处理过程,其中每一个消息处理组件都将对所接收到的消息进行扩充——即向其中加入额外的数据信息——然后再将消息发送到下一个处理组件。在这种方法种,整个处理过程的状态包含在每条消息中。

现在,由于两个过滤器组件同时订阅了最初的事件,它们将几乎同时接收到该事件。原有的过滤器功能不变,即匹配含有“303”的文本行。新的过滤器将统计所有的文本行,在执行完毕时发出AllPhoneNumbersCounted事件,该事件中包含了所有电话号码的总数量。例如,如果总共有15个电话号码,那么该事件的count属性值即为15。

PhoneNumberExecutive同时订阅了MatchedPhoneNumbersCounted和AllPhoneNumbersCounted事件。只有在两个领域事件都被PhoneNumberExecutive接收到时,整个并行处理过程才算完成,此时两个并行处理的结果将合二为一。PhoneNumbersExecutive所生成的日志记录为:

3of 15phone numbers matched on July 15, 2012at 11:27PM

日志输出在原来的基础上包含了电话号码的总数信息。虽然这里演示的例子非常简单,但是它们却是并行运行的。同时,由于有些订阅组件被部署在不同的计算节点上,此时的并行处理过程也是分布式的。

然而,这个长时处理过程还存在一个问题。PhoneNumberExecutive无法知道所接收到的两个领域事件是否来自同一个并行处理过程。处理过程并行启动,完成事件无序地产生,那么PhoneNunmberExecutive如何知道是哪个处理过程执行完毕了呢?在电话号码统计这个例子中,出现这个问题可能并不严重。但是,当处理真实的企业业务领域时,这样的问题却有可能是灾难性的。

解决这个问题的第一步是在每个领域事件中加入处理过程的身份标识。这个标识可以和引发处理过程的领域事件的标识相同,比如AllPhoneNumbersListed事件。此时我们可以使用UUID,请参考实体(5)和领域事件(8)。PhoneNumberExecutive只有在接收到具有相同标识的领域事件时才会输出日志记录。然而,PhoneNumberExecutive并不会等待所有事件的到达,它也是一个事件订阅方,在事件到达时将自动启动相应的处理过程。

执行器和跟踪器?

有人认为执行器和跟踪器这两种概念合并成一个对象——聚合——是最简单的方法。此时,我们在领域模型中实现这样一个聚合,再通过该聚合来跟踪长时处理过程的状态。这是一种解放性的技术,我们不需要开发一个单独的跟踪器来作为状态机,而事实上这也是实现基本长时处理过程的最好方法。

在六边形架构中,端口-适配器的消息处理组件将简单地将任务分发给应用服务(或命令处理器),之后应用服务加载目标聚合,再调用聚合上的命令方法。同样,聚合也会发出领域事件,该事件表明聚合已经完成了它的处理任务。

这种方式很像Pat Helland所提倡的方法,他将此称为合伙活动(Partner Activity)[Helland],这也是在“设计长时处理过程的不同方法”一节中所提到的第二种方法。然而,将执行器和跟踪器分开讨论是一种更有效的方法。

在实际的领域中,一个长时处理过程的执行器将创建一个新的类似聚合的状态对象来跟踪事件的完成情况。该状态对象在处理过程开始时创建,它将与所有的领域事件共享一个唯一标识。同时,将处理过程开始时的时间戳保存在该状态对象中也是有好处的(原因请参考本章后续内容)。长时处理过程的状态对象如图4.10所示。

当并行处理的每个执行流运行完毕时,执行器都会接收到相应的完成事件。然后,执行器根据事件中的过程标识获取到与该过程相对应的状态跟踪对象实例,再在这个对象实例中修改该执行流所对应的属性值。

img

图4.10 PhoneNumberStateTracker作为长时处理过程的状态对象,它用于跟踪处理进度。PhoneNumberStateTracker是一个聚合。

长时处理过程的状态实例通常有一个名为isCompleted()的方法。每当某个执行流执行完成,其对应的状态属性也将随之更新,随后执行器将调用isCompleted()方法。该方法检查所有的并行执行流是否全部运行完毕。当isCompleted()返回true时,执行器将根据业务需要发布最终的领域事件。如果该长时处理过程是更大的并行处理过程的一个分支,那么向外发布该事件便是非常有必要的了。

有些消息机制可能并不能保证消息的单次投递(Single Delivery)[7]。对于一个领域事件有可能被多次投递的情况,我们可以通过长时处理过程的状态实例来消除重复。那么,这是否需要消息机制提供额外的特殊功能呢?让我们看看在没有这些特殊功能的时候应该如何处理。

当一个完成事件到达时,执行器将检查该事件中相应的状态属性,该状态属性表示该事件是否已经存在。如果状态已经被设值,那么该事件便是一个重复事件,执行器将忽略该事件,但是还是会对该事件做出应答[8]。另一种方式是将状态对象设计成幂等的。这样,如果执行器接收到了重复消息,它将同等对待,即执行器依然会使用该消息来更新处理过程的状态,但是此时的更新不会产生任何效果。在以上两中方法中,虽然只有第二种方法将状态对象本身设计成幂等的,但是在结果上他们都能达到消息传输的幂等性。关于事件消重的更多讨论,请参考领域事件(8)。

对于跟踪有些长时处理过程来说,我们需要考虑时间敏感性。在过程处理超时,我们既可以采用被动的,亦可以采取主动。回忆一下,状态跟踪器可以包含处理过程开始时的时间戳。如果再向跟踪器添加一个最大允许处理时间,那么执行器便可以管理那些对时间敏感的长时处理过程了。

被动超时检查由执行器在每次并行执行流的完成事件到达时执行。执行器根据状态跟踪器来决定是否出现超时,比如调用名为hasTimedOut()的方法。如果执行流的处理时间超过了最大允许处理时间,状态跟踪器将被标记为“遗弃”状态。此时,执行器甚至可以发布一个表明处理失败的领域事件。被动超时检查的一个缺点是,如果由于某些原因导致执行器始终接收不到完成领域事件,那么即便处理过程已经超时,执行器还是会认为处理过程正处于活跃状态。如果还有更大的并发过程依赖于该过程处理,那么这将是不可接受的。

主动超时检查可以通过一个外部定时器来进行管理。比如,一个JMX的TimerMBean实例便可以用来获取一个被Java管理的定时器。在处理过程开始时,定时器便被设以最大允许处理时间。定时时间到时,定时监听器将访问状态跟踪器的状态。如果此时的状态显示处理还未完毕,那么处理状态将被标记为“遗弃”状态。如果此时处理过程已经完毕,那么我们可以终止定时。主动超时检查的一个缺点是,它需要更多的系统资源,这可能加重系统的运行负担。同时,定时器和完成事件之间的竞态条件有可能会造成系统失败。

长时处理过程通常和分布式并行处理联系在一起,但是它与分布式事务没有什么关系。长时处理过程需要的是最终一致性。我们应该慎重地设计长时处理过程,在基础设施或处理过程本身失败的时候,我们应该能够采取适当的修复措施。只有在执行器接收到整个处理过程成功的通知时,我们才能认为处理过程的各个参与方达到了最终一致性。诚然,对于有些长时处理过程来说,整个处理过程的成功并不需要所有的并行执行流都成功。还有可能出现的情况是,一个处理过程在成功完成之前可能会延迟好几天的时间。但是,如果一个处理过程被搁浅,那么所有的参与系统都将处于一种不一致的状态,此时做出一些补偿是必要的。但是,补偿可能增加处理过程的复杂性。还有可能是,业务需求是允许失败情况发生的,而此时采用工作流方案可能更加合适。

SaaSOvation公司在不同的限界上下文之间采用了事件驱动架构。ProjectOvation团队将使用最简单形式的长时处理过程来管理为Product实例创建Discussion的过程。他们首选六边形架构,以在整个企业范围之类发布领域事件。

值得注意的事,长时处理过程的执行器可以发布一个或者多个事件来触发并行处理流程。同时,事件的订阅方也不见得只能有两个,而是可以有多个。换句话说,在一个长时处理过程中,可能存在许多彼此分离的业务处理过程同时运行。因此,上面的简单例子只是向大家演示了长时处理过程的基本概念。

当与遗留系统的集成存在很大的时间延迟时,采用长时处理过程将非常有用。当然,即便时间延迟和遗留系统并不是我们的主要关注点,我们依然能从长时处理过程中得到好处,即由分布式和并行处理所带来的优雅性,这样也有助于我们开发高可伸缩性、高可用性的业务系统。

有些消息机制中已经内建了对长时处理过程的支持,这可以大大提高这些软件本身的采用率。其中一个例子便是[NServiceBus],在NServiceBus中,长时处理过程被称为Saga。另一个例子是[MassTransit]。

事件源

有时,我们的业务可能需要对发生在领域对象上的修改进行跟踪。此时的跟踪有不同的层次,而每个层次又对应有不同的方法。通常来说,业务人员可能只关心某些实体的创建时间、最后修改时间和是谁做的修改等信息。这是一种非常简单的跟踪方式,而对于领域模型中每次单独的改变,这种方式并没有提供任何信息。

随着人们要求更多的变化跟踪,业务层也需要更过的元数据。业务层开始关心每一次单独操作,甚至希望得到某个业务操作的执行时间。这些需求要求维护一份审计日志(Audit Log)。然而,审计日志也是存在局限的,虽然它包含了系统中的一些事件发生信息,甚至可以用于系统调试,但是我们并不能检查单个领域对象在改变前和改变后的状态。那么,如果我们希望从变化跟踪中得到更多的信息,应该怎么办呢?

作为程序员,我们已经接触过一些优秀的变化跟踪工具,其中最常见的非源代码库莫属了,比如CVS、Subversioin、Git或Mercurial等。这些源代码管理工具都有一个相同的功能:它们都知道如何跟踪对每一个源代码文件的修改。这些工具使得我们从头到尾浏览对一个文件的修改。当把所有的源文件都提交给这些工具管理时,我们便可以跟踪软件在整个开发过程中的各种变化。

现在,如果我们将这种概念应用在单个实体上,然后用在单个聚合上,再用于模型中的每个聚合,那么我们便能体会到在对象层面上跟踪变化的好处,进而体会到变化跟踪对于整个系统的好处。因此,我们希望有种方式能够记录下诸如“创建聚合”这样的操作。有了所有操作的历史数据,我们甚至可以支持临时模型。这个层面上的变化跟踪便是事件源(Event Sourcing)的核心[9]。事件源模式如图4.11所示。

img

图4.11 从高层次看事件源,由聚合发布的事件被保存到事件存储中,同时这些事件被用于跟踪模型的状态变化。资源库从事件存储中读取事件,并将这些事件应用于对聚合状态的重建。

事件源存在多种定义,在此做些澄清是有必要的。我们这里所说的事件源是指:对于某个聚合上的每次命令操作,都有至少一个领域事件发布出去,该领域事件描述了操作的执行结果。每一个领域事件都将被保存到事件存储(Event Store,8)中。每次从资源库中获取某个聚合时,我们将根据发生在该聚合上的历史事件来重建该聚合实例,事件的作用顺序应该与它们的产生顺序相同[10]。换句话说,第一个发生的事件将最先起作用,根据事件信息,聚合执行相应的操作以更新自身的状态。接着,第二个发生的事件进一步修改聚合状态。这样依次进行,直到所有的事件都被“重放”为止。此时,聚合的状态应该和最后一次命令执行后的状态相同。

移动的目标?

业界对于事件源的定义并没有达成统一,直到我写这本书时,这个问题都还没有解决。就像其他新技术一样,我们将不断地对事件源进行改进。本书所展现的是事件源在用于DDD时的一些核心方面,同时包含了事件源有可能的一些发展方向。

随着时间的增长,发生在聚合实例上的事件将越来越多,那么,重放这些成百上千的事件是否会对模型的处理造成影响呢?对于那些业务操作繁忙的模型来说,这种影响至少是存在的。

为了避免这种瓶颈,我们可以通过聚合状态“快照”的方式来进行优化。我们可以创建一个聚合内存状态的快照,此时的快照反映了聚合在事件存储历史中某个事件发生后的状态。为了达到这样的目的,我们需要利用该事件及其发生前的所有事件来重建聚合实例,之后对聚合状态进行序列化,再把序列化之后的快照保存在事件存储中。这样一来,我们便可以先通过聚合快照来实例化某个聚合,接着再“重放”比快照更新的事件来修改聚合的状态,最终使聚合达到最后一个事件发生后的状态。

快照并不是随意创建的,而是可以在所发生的事件达到某个数量时才创建的。根据项目的实际情况,团队可以自行确定一个数量值。比如,我发现当两个快照之间的事件数在50到100个之间时,我们可以获得最优的聚合获取性能。

更多的,我们将事件源看成是一种技术解决方案。在没有事件源的情况下,我们同样可以创建能够发布领域事件的领域模型。作为一种持久化机制,事件源可以取代ORM。但同时,事件源和ORM又存在很大的不同。事件通常以二进制的方式保存在事件存储中,这使得事件源不能用于查询操作。事实上,为事件源所设计的资源库只需要一个接受聚合唯一标识为参数的查询方法。因此,我们需要另外一种方法来支持查询,通常将CQRS和事件源一同使用[11]

事件源为我们提供了设计领域模型的新思路。从最基本的层面来看,事件历史可以用来消除系统中的bug,对调试也有很大的益处。事件源有助于获得高吞吐量的领域模型,从而极大地提高事务处理效率。比如,向单张数据库表中追加事件是非常快的。另外,事件源还有助于提高CQRS查询模型的伸缩性,因为此时查询模型的数据源可以在事件存储更新之后得到静默更新。这样做的另外一个好处是,我们可以复制多个查询模型的数据源实例以满足更多的新增客户。

然而,技术上的优势并不总能提高业务上的优势。无论如何,让我们考虑以下几种由事件源的技术优势所带来的业务优势:

• 用新的或者修改后的事件向事件存储打补丁可以修正许多问题。这对于业务来说可能不那么显而易见,但是这些补丁可以在很大程度上减少由模型中的bug所带来的系统问题。

• 除了补丁之外,我们可以通过重放一组事件的方式来重做或撤销对模型的修改。

• 有了所有事件的历史信息,业务层便可以考虑很多诸如“如果……会怎么样?”的问题。即通过重放一组发生在聚合上的事件,业务层可以得到很多问题的答案。通过模拟这些虚拟的业务场景,业务层可以从中获得不少好处,而这也是实现业务智能化的一种方式。

附录A为我们详尽地阐述了如何在聚合上实现事件源,同时还讲解了在CQRS中如何创建视图投射(View Projection)。更多信息请参考[Dahan,CQRS]和[Nijof,CQRS]。