基于流/响应式编程中的事务/请求-响应模式

Transactions / request-response-pattern in flow based/reactive programming

提问人:Jan Thomä 提问时间:6/6/2019 最后编辑:Jan Thomä 更新时间:11/7/2019 访问量:253

问:

因此,在过去的几天里,我一直在阅读有关基于流程的编程(FBP)的文章,我也一直在阅读J. Paul Morrison关于它的书。但是,我觉得我仍然无法真正理解它。一般的概念是,您将编程视为某种装配线,其中的组件将一些数据包作为输入并生成一些数据包作为输出。您可以连接这些组件,数据包通过网络传输。虽然我完全明白它如何用于 ETL 类型的应用程序或批处理,但我不知道如何处理同步请求/响应模式或数据库事务等事情。

例如,假设我有一个作为 FPB 实现的 Web 服务器。此 Web 服务器有一个 GET /user/{id},它应该返回一个 JSON,其中包含有关用户的一些信息。它还有一个 POST /user/{id},您可以通过将一些 JSON 发送回服务器来更新用户。因此,以下是我想象此流程的样子:

Example Flow

我尝试拥有许多可重用的组件,而不是将处理请求的整个逻辑放入单个组件中。因此,有一个 HTTP 服务器组件,它将请求发送到调度程序组件,然后调度程序组件将请求分派到后续流中。在每个流中,请求由一个通用的“请求解析器”组件解析,该组件将请求的各个部分输出到流的其余部分。

上半部分非常简单,我从 DB 中读取具有给定 ID 的用户实体,将对象序列化为 JSON,然后将其发送回去。但是,在这一点上,我们不再真正引用HTTP请求,那么我怎么知道将此请求发送到哪里呢?

在下半部分,我们有一些额外的复杂性,因为我想以事务方式写入数据库。因此,首先启动一个事务(并行地将请求体解析为某个对象),然后从数据库中检索用户对象并与请求的输入合并。最后,它被写回数据库并提交事务。最后,呼叫者会响应一些“OK”状态。在这里,我还有一个额外的问题,即在提交事务时,我真的不知道要提交哪个事务。当然,在发送响应时,我不知道要将其发送到哪个请求。

因此,这两个问题似乎有一些共同点——一种跨越许多组件的“上下文”。在一个示例中,它是 HTTP 请求/响应上下文,在另一个示例中是事务上下文。在常规编程中,这些上下文通常在线程级别进行处理。由于请求在单个线程中运行,因此事务和请求上下文绑定到线程本地,因此只要所有内容都在同一线程中运行,就可以在任何地方访问它们。

在基于流程的编程中,每个组件都独立运行,理想情况下在单独的线程上运行。这实际上是一件关键的事情,因为它允许并行化和有效使用多个处理器。但是,当线程本地上下文不再存在时,如何在基于流的编程中处理这些问题?如果正确处理错误(我在示例中省略了这一点),这将变得更加复杂。

我认为,当你进行反应式编程时,大部分处理都是异步和多线程的,你也会遇到同样的问题,所以我想知道是否有模式可以处理这个问题。您是否有响应式编程或基于流程的编程的实际经验,并且对我如何解决这个问题有一些提示?

多线程与 语言无关 响应式 基于流程的编程

评论


答:

1赞 Paul Morrison 8/5/2019 #1

我在 Twitter 上写了一个快速的答案 - 我想我也会在这里发布它......对于重复发布,我们深表歉意!

我喜欢这个/这些问题的子流,其中子流中的第一个信息包提供了您正在谈论的“上下文”。这可能会有所帮助:https://github.com/jpaulm/javafbp-websockets...呵呵!

PS 这种循环式网络拓扑结构也是 Facebook 新的“Flux”技术的基础——参见 Jing Chen 的演讲,她在演讲中将这种方法与 MVC 进行了比较: https://www.youtube.com/watch?v=nYkdrAPrdcw

1赞 kimathie 11/7/2019 #2

希望这能推动你朝着正确的方向前进。我遇到了类似的问题,我需要在异步微服务架构中执行同步操作。

我解决它的方法是使用 Observer 模式。我有 3 个组件;一个 HTTP 服务器、一个回调服务器和一个计时器轮。与您的 http 服务器类似的 http 服务器接收传入请求,回调服务器在异步处理后接收总体结果,计时器轮将原始 http 上下文排队并将响应与 http 请求协调起来。

当收到传入的请求时,http 服务器会创建一个相关 id,将其附加到请求元数据中,将回调服务器 url 附加到请求元数据中,最后将请求和原始 http 上下文一起添加到计时器轮中。然后,http 服务器会像您的情况一样将请求传递给调度程序,并将消息发送到相关组件进行异步处理。

根据当前处理组件的执行结果,它将从元数据中检索回调 URL,并将响应发送到回调服务器。在您的情况下,有 json 序列化或数据库写入可以执行此操作。然后,回调服务器将提取附加的相关 ID,并获取相应的 http 上下文并写入响应。

注意:计时器轮中的每个计时器对象都有一个可配置的超时,这样,如果异步处理延迟,它将超时并向相应 http 上下文的 http 客户端返回可配置的消息。