Java与Kafka的事件驱动架构实践
目录
响应式 Java 非常适合现代流式、事件驱动型应用。在本文中,我们将通过一个结合响应式 Java 与 MongoDB 的应用示例进行讲解。具体内容包括:
- 引入响应式 Java 的原因及其与传统 Java 编程的区别。
- 响应式 Java 的关键要素详解:Mono、Flux 和 flatMap。
- 示例应用演练,对比使用响应式 Streams MongoDB 驱动的响应式代码版本与使用同步 MongoDB 驱动的传统代码版本。
流数据 - 空中交通管制示例
作为现代化国家空域系统全球倡议的关键组成部分,许多国家的航空管理局已发布强制令,要求大多数非军事飞机使用一种称为“自动相关监视广播”(ADS-B)的系统来传输基于 GPS 的位置数据。
这些数据既用于地面空中交通管制,也用于飞行员在移动地图显示器上查看附近飞机的航向、速度和高度,提供了旧式地面雷达系统所不具备的情境感知能力。
对于我(一名小型私人螺旋桨飞机的飞行员)来说,ADS-B 的普及最酷的一点在于,我拥有的附近飞机位置数据比 10 年前大多数商用喷气式客机飞行员所能获取的还要多。而我仅仅使用了一台 iPad 应用和一个廉价的数据接收器(由树莓派、亚马逊上订购的 USB 无线电接收器以及优秀的开源项目 Stratux 软件构建而成)。这对提升飞行安全、避免空中及机场地面碰撞起到了巨大的推动作用。
[LOADING...]
从数据处理的角度来看,ADS-B 数据是典型的现代流式、事件驱动系统的特征。消息本身结构相对简单且较小,大多数接收系统通常将其转换为 JSON,这使得 MongoDB 非常适合处理此类数据。以下是一条示例消息:
虽然 ADS-B 消息结构并不复杂,但其传输和接收的频率在某些情况下会带来挑战。每架飞机每 500 毫秒广播一次位置更新。在大型机场附近,这可能导致每秒产生数百条消息——按现代标准来看通常不是问题。但在我担任 MongoDB 开发倡导者的角色中,最近有国家级 ADS-B 数据聚合商寻求帮助,希望优化他们对全国接收网络流式传输的数据进行的 MongoDB 聚合操作。在高峰期,他们每秒处理约 20,000 条消息。
在讨论过程中,我意识到 MongoDB 的 Java 响应式 Streams 驱动程序可能非常适合处理此类流式数据。于是我决定研究一下,与使用标准同步驱动程序相比,使用该驱动程序能带来多大的差异。
响应式 Java - 简史
响应式 Java 的出现是为了解决 Java 中传统同步阻塞 I/O 模型的局限性,特别是在高并发、事件驱动和流式应用中。传统的“每个请求一个线程”模型往往会导致资源过度消耗,因为线程在等待 I/O 操作(如数据库查询或 API 调用)完成时会被阻塞。
该框架旨在解决这个“阻塞问题”,从而提高系统的响应能力和可扩展性。它通过实现响应式 Streams 规范来实现非阻塞异步编程。这种范式将数据作为事件流进行管理,采用了“发布者 (Publishers)”和“订阅者 (Subscribers)”的概念,实现了 30 多年前在经典的“四人帮 (Gang of Four)”著作中描述的“观察者”设计模式。
该框架的一个关键要素是线程可以异步处理活动,允许它们在等待耗时任务完成时立即转而处理其他任务,而不是处于阻塞状态。在某些用例中,这有助于减少整体资源占用,提高内存和 CPU 的使用效率。
在本文的代码示例中,我们将使用响应式 Streams 规范的 Project Reactor 实现。Project Reactor 有两种主要的发布者类型,实现了响应式 Streams 的 Publisher<T> 接口:
- Mono:是一个旨在发射最多一个条目,然后终止(成功返回一个值或报错)的数据流(发布者)。它用于返回零个或一个结果的操作,例如通过 ID 查找单个用户记录或将实体保存到数据库,是 Future 或 Optional 的响应式等价物。
- Flux:是一个旨在发射零个或多个条目,然后可选择性地以成功完成或错误终止的数据流(发布者)。它用于可能随时间返回多个结果的操作,例如返回记录集合的数据库查询或实时事件流(如接收到的 ADS-B 消息),是标准集合或无限流的响应式等价物。
为了演示响应式 Java 如何提升应用性能,请看以下传统的同步代码示例:
此函数执行两项操作:
- 调用一个 web 服务,该服务在返回“success”消息前会休眠指定的秒数。
- 调用一个函数来计算指定位数的圆周率(Pi)。
函数输出包含完成这两项操作所需的时间。值得注意的是,该实现必须等待 web 服务调用返回后,才能开始计算圆周率。web 服务调用是阻塞的,执行线程在调用完成前无法执行任何其他工作。
现在看看使用 Project Reactor 编写的等效代码:
这段代码使用了非阻塞的 WebClient 类,并将对 sleep web 服务的调用封装在响应式管道中:
响应式管道在概念上类似于 MongoDB 的聚合操作,即每个阶段的输出成为下一个阶段的输入。最终阶段发射定义的 (Mono<String>) 类型。
需要注意的两点是:管道的执行是惰性的,直到被订阅时才会发生;且管道的订阅者不会被阻塞以等待管道完成,它们可以自由地执行其他任务,仅在生成的 Mono 发射完成或错误事件时才会“响应”。
第二个 Mono 定义封装了对圆周率计算器的调用,使用 fromCallable 再次实现惰性执行,推迟圆周率计算直到 Mono 被订阅:
第三个 Mono 定义 (Mono.zip) 用于订阅并执行前两个 Mono,组合它们的输出:
- Mono.zip(sleepMono, piMono):订阅两个 Mono 并并发运行它们。当两者都完成时,发射一个 Tuple2<String, BigDecimal>。
- .doOnSuccess(tuple -> { ... }):当组合结果准备好时执行:
- tuple.getT1():获取 sleep web 服务返回的状态。
- tuple.getT2():获取计算出的圆周率位数。
- Duration.between(start, Instant.now()):计算总耗时。
- .block():订阅管道并阻塞当前线程直到完成。这才是真正触发 HTTP 调用和圆周率计算的操作。
此版本代码的关键在于,sleep web 服务调用和圆周率计算是并发执行的。
查看两个版本的应用输出(指定休眠 5 秒,计算圆周率到 10,000 位):
- 阻塞版本运行耗时:11.805 秒。
- 响应式版本运行耗时:6.85 秒。
执行时间的差异几乎正好是 5 秒,这正如我们对顺序执行与并发执行所预期的那样,也展示了响应式 Java 正是为解决此类缓慢 I/O 操作而设计的。通过不等待 web 服务调用完成就启动独立的圆周率计算,我们减少了总完成时间,并避免了可能闲置的 CPU 核心和线程。## 在 Java 中使用 MongoDB 和 Kafka 实现响应式编程
为了测试在处理 ADS-B 数据流时使用响应式 Java(Reactive Java)的影响,我创建了一个应用程序,旨在:
- 从 MongoDB 集合中按顺序读取一组预先捕获的 ADS-B 消息。
- 将这些消息放入 Kafka 主题,供下游应用程序消费。
我选择这种方法而不是直接从接收器读取实时消息,是因为我希望能够:
- 控制消息发送到 Kafka 主题的速率。
- 确保测试的可重复性。
- 将数据和应用程序发布在 GitHub 上供任何人使用,而无需配备 ADS-B 接收器。
我选择使用 Kafka 是为了让场景更具真实感,因为它是现代流式/事件驱动应用程序中分发消息最常用的方式之一,具有高可用性、交付保证和灵活分发的优势。为了简化操作,该应用程序使用了 Confluent 提供的 Kafka Docker 镜像。
为了处理从 Kafka 主题接收和处理消息流,我创建了两个功能相同的 Java 应用程序——一个使用传统的阻塞代码,另一个使用 Project Reactor 以及用于 Kafka 和 MongoDB 的响应式流驱动程序。
在添加 Java 应用程序后,整体应用程序流程如下:
- 流处理应用程序从 MongoDB 读取 ADS-B 消息。
- 流处理应用程序以指定的速率(每秒消息数)将每条消息推送到 Kafka。每条消息的原始时间戳都会更新为当前日期/时间。当 MongoDB 中的所有消息读取完毕后,应用程序将从集合中的第一条 ADS-B 消息开始重新循环。
- Java 应用程序依次从 Kafka 主题读取每条消息。
- 对于每条接收到的消息,Java 应用程序会根据从联邦航空管理局(FAA)下载并保存在第二个 MongoDB 集合中的注册数据,查找飞机的注册号(尾号)。如果找到匹配的注册信息,Java 应用程序将通过添加来自 FAA 数据的飞机制造商和型号来丰富(enrich)原始 ADS-B 消息。
- 最后,Java 应用程序将更新/丰富后的 ADS-B 消息保存到一个新的 MongoDB 集合中。
阻塞代码版本
在阻塞式应用程序中,使用简单的轮询机制从 Kafka 检索 ADS-B 消息。代码的主要部分如下:
这段代码创建了一个 KafkaConsumer 实例,轮询 Kafka 主题 500 毫秒,然后在再次轮询之前处理每条接收到的消息。此过程一直持续到应用程序被中断,并维护写入 MongoDB 的消息计数。
AircraftEnricher 和 MongoMessageWriter 类使用标准的同步 MongoDB 驱动程序处理与 MongoDB 的交互。AircraftEnricher 类的 enrich 方法如下:
MongoMessageWriter 类的 write 方法执行简单的 MongoDB insertOne 操作,将丰富后的 ADS-B 消息写入目标输出集合:
响应式代码版本
响应式代码版本使用了 Project Reactor,包括其响应式 KafkaReceiver 类,以及 MongoDB 的响应式流(Reactive Streams) Java 驱动程序。该版本的主要函数如下:
此类响应式流水线的有趣之处在于,每个阶段的输出都会成为下一个阶段的输入。在我们的流水线中,每个阶段都会产生一个 Flux,允许对流中的每条消息进行异步、非阻塞处理:
在丰富消息时使用 .flatMap() 操作符,以及随后将丰富后的消息写入 MongoDB 的过程值得说明。
响应式流水线通常包含实现 map/reduce 类型语义的操作符,map 风格的操作符产生 Flux 输出,而 reduce 风格的操作符产生 Mono。
在我们的流水线中,执行 MongoDB 数据库操作时使用了 .flatMap() 操作符。与同步版本的 AircraftEnricher.enrich() 和 MongoMessageWriter.write() 方法相比,响应式版本执行相同的操作,但具有不同的返回类型:
public Mono<AdsbMessage> enrich(AdsbMessage message)
public Mono<Void> write(AdsbMessage message)
enrich() 方法返回 Mono<AdsbMessage>,而 write() 返回 Mono<Void>。在调用 write() 时,我们通过链式调用 .thenReturn(msg) 将其映射为 Mono<AdsbMessage>。
Map 类型操作总是返回一个 Flux,但如果我们使用 map 而不是 flatMap 来调用这些方法,输出将是:
Flux<Mono<AdsbMessage>>
这将导致两个问题:
- 由于 map 操作从不订阅这些 Mono,因此 MongoDB 操作实际上永远不会执行——大多数响应式生产者使用惰性执行,这意味着只有在被订阅时才会开始执行。
- 下游操作期望的是
Flux<AdsbMessage>,而不是Flux<Mono<AdsbMessage>>。
与 .map() 相比,.flatMap() 的执行逻辑如下:
- 它从输入的 Flux 中获取每个
AdsbMessage。 - 它执行 MongoDB 操作(
enricher.enrich(msg)或writer.write(msg).thenReturn(msg))以获得一个Mono<AdsbMessage>。 - 它订阅该 Mono。
- 当 Mono 发出信号时,该
AdsbMessage会进一步在输出的 Flux 上发出,传递给下游操作。
正如你现在所预料的那样,所有这些操作都是异步处理的,允许从 Kafka 到达的消息流在没有任何阻塞操作的情况下被处理。
阻塞与响应式应用程序的性能对比
在两个版本的应用程序完成后,我们使用以下参数进行了测试:
- 测试在 MacBook Pro M4 Pro(48GB 内存)上进行。
- 流处理应用程序被设置为每秒向 Kafka 主题推送 1000 条消息。
- 每个应用程序运行 2 分钟,并测量写入 MongoDB 的文档数量。
- Kafka 部署在本地 Docker Desktop 的容器中。
- MongoDB 部署为本地单节点副本集,版本为 Community Edition 8.2.3。
- 应用程序使用 openjdk 21.0.7 2025-04-15 LTS 运行。
在每个版本的应用程序运行两分钟后,我们观察到:
- 阻塞式应用程序处理了 17,172 条消息。
- 响应式应用程序处理了 119,211 条消息。
这表明响应式应用程序处理消息的速度基本上与消息推送到 Kafka 的速度一致,而阻塞版本则无法跟上消息流。
最终思考
尽管测试两个版本应用程序的指标表明,在处理流式数据时,响应式 Java 比传统 Java 技术具有显著的性能优势,但毫无疑问,许多人会观察到阻塞版本的代码是单线程的,因此处于劣势。实际上,当我测试一个显式使用多线程的阻塞代码版本时,由于两个数据库操作本质上是顺序的——在 enrich 操作完成之前无法将 ADS-B 消息写入 MongoDB——两个版本代码之间的性能差异微乎其微。这凸显了在考虑响应式方法时的一个关键考量:
- 当存在可以在 IO 绑定任务被阻塞时并行执行的任务时,异步响应式程序能提供最大的收益。
正如我们在最初的圆周率计算示例中所看到的,响应式方法在任务可以异步执行时,其优势可能是巨大的。
除了性能考量之外,响应式方法对于流式应用程序的另一个优势是它允许采用优雅、声明式的编码风格。相比之下,向阻塞代码中添加线程处理会增加复杂性,并需要更冗长、命令式的代码风格。
如果你正在 Java 中处理流式数据,特别是来自 Kafka 或类似消息系统的数据,结合 MongoDB 及其响应式流驱动程序是一个非常引人注目的数据库选择。也请务必查看 Atlas Stream Processing,它使用熟悉的 MongoDB 聚合语义,直接在 MongoDB Atlas DBAAS 服务中提供消息流处理功能。
本文使用的所有代码均可在 GitHub 上找到。