Forwarder component

date
Nov 24, 2023
slug
storing-data-and-publishing-an-event-in-one-transaction
status
Published
tags
Review
summary
在一个事务中实现事件发送,并通过数据库存储数据
type
Post
Created Time
Nov 24, 2023 07:30 AM
Updated Time
Nov 24, 2023 09:59 AM
AI summary
Status

我们为什么需要关心在事务中发布消息

在使用事件驱动应用程序时,您可能需要在某个时刻存储应用程序的状态,并同时发布一条消息,告诉系统中的其他部分刚刚发生了什么。在理想的情况下,您希望在事务中持久化应用程序状态时,能够成功发布消息,因为不这样做可能会导致数据一致性方面的问题。为了在一个事务中同时提交数据存储和事件发送,您需要能够将消息发布到用于数据存储的同一数据库,或者自行实现两阶段提交(2PC)。如果您不想将消息代理更改为数据库,也不想再次重复发明轮子,您可以通过使用Watermill 的 Forwarder component 使自己的生活更加轻松!

Forwarder component 是什么?

您可以将 Forwarder 想象成一个后台运行的守护程序,它等待发布到数据库的消息,并确保它们最终到达消息代理。
notion image
为了使 Forwarder 通用且能够透明地使用,它监听基于发布/订阅的中间数据库上的单个事件,其中通过使用装饰的 Forwarder Publisher 发送封装的消息。Forwarder 解封它们并发送到消息代理上的指定目标主题。
notion image

案例

让我们考虑以下示例:有一个命令的任务是运行彩票抽奖。它必须从系统中注册的用户中随机选择一个作为获奖者。在执行此操作的同时,它还应该通过将数据存储至数据库来持久化所做的决定,将唯一的彩票 ID 与所选用户的 ID 关联起来。此外,作为一个事件驱动的系统,它应该发出一个LotteryConcluded 事件,以便其他组件可以适当地对其做出反应。准确地说,将会有一个负责向彩票获奖者发送奖品的组件。它将接收 LotteryConcluded 事件,并使用事件中传入的彩票 ID 来验证谁是获奖者,与数据库条目进行核对。
在我们的案例中,数据库是 MySQL,消息代理是 Google Pub/Sub,但它可以是任何其他两种技术。
在实现这样一个命令时,我们可以采用多种方式。下面我们将介绍三种可能的尝试,并指出它们的优缺点。

首先发布事件,然后存储数据。

在这种方法中,该命令首先会发布一个事件,然后紧接着存储数据。虽然在大多数情况下这种方法可能运行良好,但让我们尝试找出可能出现的问题。
该命令需要执行三个基本操作:
  1. 选取用户 A 作为彩票的获奖者。
  1. 发布一个 LotteryConcluded 事件,通知抽奖结果 B 已经结束。
  1. 在数据库中存储抽奖结果 B 已被用户 A 获得的信息。
这些步骤中的每一步都有可能失败,打破了我们命令的流程。如果第一步失败,后果并不严重 - 我们只需返回一个错误并认为整个命令失败。没有数据会被存储,也没有事件会被发布。我们可以简单地重新运行该命令。
如果第二步失败,我们仍然没有发布事件,数据库中也没有存储任何数据。我们可以重新运行该命令并再次尝试。
最有趣的是,如果第三步失败会发生什么。在第二步之后,我们已经发布了事件,但最终没有数据存储到数据库中。其他组件会收到彩票已经结束的信号,但在事件中发送的彩票 ID 将没有与任何获奖者相关联。它们将无法验证获奖者是谁,因此它们的操作也必须被视为失败。
我们需要可以摆脱这种情况,但很可能需要一些手动操作,例如使用已发布事件中的彩票 ID 重新运行命令。

先存储数据,然后发布事件。

在第二种方法中,我们将尝试解决第一种方法的缺点。如果我们在数据库中没有正确持久化状态,我们不会通过不发布事件来泄露我们的失败给外部组件。这意味着我们将改变我们的操作顺序如下:
  1. 选择一个随机用户 A 作为彩票的获奖者。
  1. 在数据库中存储抽奖结果 B 已被用户 A 获得的信息。
  1. 发布一个 LotteryConcluded 事件,通知抽奖结果 B 已经结束。
与第一种方法一样,如果前两个操作失败,我们不会有任何后果。如果第三步失败,我们的数据将持久化在数据库中,但没有事件被发布。在这种情况下,我们不会将失败通知给彩票组件之外的部分。然而,考虑到系统的预期行为,我们的获奖者将无法获得奖品,因为没有事件会传递到负责此操作的组件。
这可能可以通过一些手动操作来修复,例如手动发布事件。我们仍然需要做得更好。

将数据存储和事件发布合并为一个事务

让我们想象一下,我们的命令可以同时执行第二个和第三个步骤。它们将以原子方式提交,这意味着其中任何一个在另一个失败时都无法成功。这可以通过利用大多数数据库当前实现的事务机制来实现。我们示例中使用的 MySQL 就是其中之一。
为了在一个事务中同时提交数据存储和事件发布,我们需要能够将消息发布到 MySQL。因为我们不想在整个系统中将消息代理器更改为由 MySQL 支持,所以我们必须找到其他方法来实现。
好消息是:Watermill 提供了所有必要的工具!如果您使用的数据库是 MySQL、PostgreSQL(或任何其他SQL)、Firestore 或 Bolt 之一,您可以将消息发布到这些数据库中。Forwarder 组件将帮助您选择在数据库中发布的所有消息,并将它们转发到您的消息代理器。
您需要做的就是确保:
  1. 您的命令使用在数据库事务上下文中工作的发布器(例如SQL、Firestore、Bolt)。
  1. Forwarder组件正在运行,使用数据库订阅者和消息代理器发布器。
在这种情况下,命令可能如下所示:
为了使 Forwarder 组件在后台工作,并将消息从 MySQL 转发到 Google Pub/Sub,您需要按照以下步骤进行设置:
如果您希望更深入地了解这个示例,您可以在此处找到其实现代码。

© 孙东辉 2022 - 2024