Ohhnews

分类导航

$ cd ..
foojay原文

Java与Kafka的事件驱动架构实践

#java#kafka#事件驱动架构#响应式编程#mongodb

目录

响应式 Java 非常适合现代流式、事件驱动型应用。在本文中,我们将通过一个结合响应式 Java 与 MongoDB 的应用示例进行讲解。具体内容包括:

  • 引入响应式 Java 的原因及其与传统 Java 编程的区别。
  • 响应式 Java 的关键要素详解:MonoFluxflatMap
  • 示例应用演练,对比使用响应式 Streams MongoDB 驱动的响应式代码版本与使用同步 MongoDB 驱动的传统代码版本。

流数据 - 空中交通管制示例

作为现代化国家空域系统全球倡议的关键组成部分,许多国家的航空管理局已发布强制令,要求大多数非军事飞机使用一种称为“自动相关监视广播”(ADS-B)的系统来传输基于 GPS 的位置数据。

这些数据既用于地面空中交通管制,也用于飞行员在移动地图显示器上查看附近飞机的航向、速度和高度,提供了旧式地面雷达系统所不具备的情境感知能力。

对于我(一名小型私人螺旋桨飞机的飞行员)来说,ADS-B 的普及最酷的一点在于,我拥有的附近飞机位置数据比 10 年前大多数商用喷气式客机飞行员所能获取的还要多。而我仅仅使用了一台 iPad 应用和一个廉价的数据接收器(由树莓派、亚马逊上订购的 USB 无线电接收器以及优秀的开源项目 Stratux 软件构建而成)。这对提升飞行安全、避免空中及机场地面碰撞起到了巨大的推动作用。

[LOADING...]

从数据处理的角度来看,ADS-B 数据是典型的现代流式、事件驱动系统的特征。消息本身结构相对简单且较小,大多数接收系统通常将其转换为 JSON,这使得 MongoDB 非常适合处理此类数据。以下是一条示例消息:

消息内容
{ "_id": { "$oid": "699fd7673ab2c79dfe6666b0" }, "lastgnssdiffalt": 8700, "lastextrapolation": { "$date": { "$numberLong": "-62135594185300" } }, "ageextrapolation": 88.86000061035156, "distance": 3566.14013671875, "addrtype": 0, "track": 2, "lastseen": { "$date": { "$numberLong": "-62135594096360" } }, "onground": false, "nacp": 10, "age": 0.46000000834465027, "lastgnssdiff": { "$date": { "$numberLong": "-62135594096140" } }, "lastspeed": { "$date": { "$numberLong": "-62135594096190" } }, "bearingdistvalid": true, "distanceestimated": 1804.67041015625, "squawk": 1732, "lastalt": { "$date": { "$numberLong": "-62135594096140" } }, "bearing": 28.342145919799805, "vvel": -64, "tail": "UAL401", "emittercategory": 3, "alt": 8700, "gnssdifffrombaroalt": 500, "distanceestimatedlastts": { "$date": "2025-08-22T14:36:20.868Z" }, "targettype": 1, "agelastalt": 0.07000000029802322, "extrapolatedposition": false, "altfix": 9725, "receivedmsgs": 2751, "icaoaddr": 10561259, "nic": 8, "timestamp": { "$date": "2026-02-26T05:17:27.328Z" }, "prioritystatus": 0, "latfix": 39.53397750854492, "signallevel": -6.158007621765137, "lat": 39.62754821777344, "lng": -104.66268920898438, "speed": 254, "speedvalid": true, "lastsource": 1, "lngfix": -104.6051025390625, "reg": "N17315", "positionvalid": true, "altisgnss": false, "turnrate": 0 }

虽然 ADS-B 消息结构并不复杂,但其传输和接收的频率在某些情况下会带来挑战。每架飞机每 500 毫秒广播一次位置更新。在大型机场附近,这可能导致每秒产生数百条消息——按现代标准来看通常不是问题。但在我担任 MongoDB 开发倡导者的角色中,最近有国家级 ADS-B 数据聚合商寻求帮助,希望优化他们对全国接收网络流式传输的数据进行的 MongoDB 聚合操作。在高峰期,他们每秒处理约 20,000 条消息。

在讨论过程中,我意识到 MongoDB 的 Java 响应式 Streams 驱动程序可能非常适合处理此类流式数据。于是我决定研究一下,与使用标准同步驱动程序相比,使用该驱动程序能带来多大的差异。

响应式 Java - 简史

响应式 Java 的出现是为了解决 Java 中传统同步阻塞 I/O 模型的局限性,特别是在高并发、事件驱动和流式应用中。传统的“每个请求一个线程”模型往往会导致资源过度消耗,因为线程在等待 I/O 操作(如数据库查询或 API 调用)完成时会被阻塞。

该框架旨在解决这个“阻塞问题”,从而提高系统的响应能力和可扩展性。它通过实现响应式 Streams 规范来实现非阻塞异步编程。这种范式将数据作为事件流进行管理,采用了“发布者 (Publishers)”和“订阅者 (Subscribers)”的概念,实现了 30 多年前在经典的“四人帮 (Gang of Four)”著作中描述的“观察者”设计模式。

该框架的一个关键要素是线程可以异步处理活动,允许它们在等待耗时任务完成时立即转而处理其他任务,而不是处于阻塞状态。在某些用例中,这有助于减少整体资源占用,提高内存和 CPU 的使用效率。

在本文的代码示例中,我们将使用响应式 Streams 规范的 Project Reactor 实现。Project Reactor 有两种主要的发布者类型,实现了响应式 Streams 的 Publisher<T> 接口:

  • Mono:是一个旨在发射最多一个条目,然后终止(成功返回一个值或报错)的数据流(发布者)。它用于返回零个或一个结果的操作,例如通过 ID 查找单个用户记录或将实体保存到数据库,是 Future 或 Optional 的响应式等价物。
  • Flux:是一个旨在发射零个或多个条目,然后可选择性地以成功完成或错误终止的数据流(发布者)。它用于可能随时间返回多个结果的操作,例如返回记录集合的数据库查询或实时事件流(如接收到的 ADS-B 消息),是标准集合或无限流的响应式等价物。

为了演示响应式 Java 如何提升应用性能,请看以下传统的同步代码示例:

$ java
private void run(RestTemplate restTemplate, String serviceUrl, int sleepSeconds, int piDecimalPlaces) {
    Instant start = Instant.now();
    String url = serviceUrl + "/sleep/" + sleepSeconds;
    Map response = restTemplate.getForObject(url, Map.class);
    String status = (String) response.get("status");
    BigDecimal pi = PiCalculator.calculate(piDecimalPlaces);
    Duration elapsed = Duration.between(start, Instant.now());
    System.out.println("Sleep service response: " + status);
    System.out.println("Pi calculated to " + piDecimalPlaces + " decimal places");
    System.out.printf("Total time: %.3f seconds%n", elapsed.toMillis() / 1000.0);
}

此函数执行两项操作:

  1. 调用一个 web 服务,该服务在返回“success”消息前会休眠指定的秒数。
  2. 调用一个函数来计算指定位数的圆周率(Pi)。

函数输出包含完成这两项操作所需的时间。值得注意的是,该实现必须等待 web 服务调用返回后,才能开始计算圆周率。web 服务调用是阻塞的,执行线程在调用完成前无法执行任何其他工作。

现在看看使用 Project Reactor 编写的等效代码:

$ java
private void run(String serviceUrl, int sleepSeconds, int piDecimalPlaces) {
    WebClient webClient = WebClient.create(serviceUrl);
    Instant start = Instant.now();
    
    Mono<String> sleepMono = webClient.get()
        .uri("/sleep/{seconds}", sleepSeconds)
        .accept(MediaType.APPLICATION_JSON)
        .retrieve()
        .bodyToMono(Map.class)
        .map(response -> (String) response.get("status"));
        
    Mono<BigDecimal> piMono = Mono.fromCallable(() -> PiCalculator.calculate(piDecimalPlaces));
    
    Mono.zip(sleepMono, piMono)
        .doOnSuccess(tuple -> {
            String status = tuple.getT1();
            BigDecimal pi = tuple.getT2();
            Duration elapsed = Duration.between(start, Instant.now());
            System.out.println("Sleep service response: " + status);
            System.out.println("Pi calculated to " + piDecimalPlaces + " decimal places");
            System.out.printf("Total time: %.3f seconds%n", elapsed.toMillis() / 1000.0);
        })
        .block();
}

这段代码使用了非阻塞的 WebClient 类,并将对 sleep web 服务的调用封装在响应式管道中:

$ java
Mono<String> sleepMono = webClient.get()
    .uri("/sleep/{seconds}", sleepSeconds)
    .accept(MediaType.APPLICATION_JSON)
    .retrieve()
    .bodyToMono(Map.class)
    .map(response -> (String) response.get("status"));
阶段方法输入类型输出类型目的
1webClient.get()---RequestHeadersUriSpec定义 GET 请求
2.uri("/sleep/{seconds}", sleepSeconds)RequestHeadersUriSpecRequestBodySpec设置 URL 路径并填充 {seconds} 参数
3.accept(MediaType.APPLICATION_JSON)RequestBodySpecRequestBodySpec设置 accept: application/json 头
4.retrieve()RequestBodySpecResponseSpec执行请求
5.bodyToMono(Map.class)ResponseSpecMono<Map>将 JSON 响应体解析为 Map 并封装在 Mono 中
6.map(response -> (String) response.get("status"))Mono<Map>Mono<String>提取 "status" 值并转换为 Mono<String>

响应式管道在概念上类似于 MongoDB 的聚合操作,即每个阶段的输出成为下一个阶段的输入。最终阶段发射定义的 (Mono<String>) 类型。

需要注意的两点是:管道的执行是惰性的,直到被订阅时才会发生;且管道的订阅者不会被阻塞以等待管道完成,它们可以自由地执行其他任务,仅在生成的 Mono 发射完成或错误事件时才会“响应”。

第二个 Mono 定义封装了对圆周率计算器的调用,使用 fromCallable 再次实现惰性执行,推迟圆周率计算直到 Mono 被订阅:

$ java
Mono<BigDecimal> piMono = Mono.fromCallable(() -> PiCalculator.calculate(piDecimalPlaces));

第三个 Mono 定义 (Mono.zip) 用于订阅并执行前两个 Mono,组合它们的输出:

$ java
Mono.zip(sleepMono, piMono)
    .doOnSuccess(tuple -> {
        String status = tuple.getT1();
        BigDecimal pi = tuple.getT2();
        Duration elapsed = Duration.between(start, Instant.now());
        System.out.println("Sleep service response: " + status);
        System.out.println("Pi calculated to " + piDecimalPlaces + " decimal places");
        System.out.printf("Total time: %.3f seconds%n", elapsed.toMillis() / 1000.0);
    })
    .block();
  • Mono.zip(sleepMono, piMono):订阅两个 Mono 并并发运行它们。当两者都完成时,发射一个 Tuple2<String, BigDecimal>
  • .doOnSuccess(tuple -> { ... }):当组合结果准备好时执行:
    • tuple.getT1():获取 sleep web 服务返回的状态。
    • tuple.getT2():获取计算出的圆周率位数。
    • Duration.between(start, Instant.now()):计算总耗时。
  • .block():订阅管道并阻塞当前线程直到完成。这才是真正触发 HTTP 调用和圆周率计算的操作。

此版本代码的关键在于,sleep web 服务调用和圆周率计算是并发执行的。

查看两个版本的应用输出(指定休眠 5 秒,计算圆周率到 10,000 位):

  • 阻塞版本运行耗时:11.805 秒
  • 响应式版本运行耗时:6.85 秒

执行时间的差异几乎正好是 5 秒,这正如我们对顺序执行与并发执行所预期的那样,也展示了响应式 Java 正是为解决此类缓慢 I/O 操作而设计的。通过不等待 web 服务调用完成就启动独立的圆周率计算,我们减少了总完成时间,并避免了可能闲置的 CPU 核心和线程。## 在 Java 中使用 MongoDB 和 Kafka 实现响应式编程

为了测试在处理 ADS-B 数据流时使用响应式 Java(Reactive Java)的影响,我创建了一个应用程序,旨在:

  • 从 MongoDB 集合中按顺序读取一组预先捕获的 ADS-B 消息。
  • 将这些消息放入 Kafka 主题,供下游应用程序消费。

我选择这种方法而不是直接从接收器读取实时消息,是因为我希望能够:

  • 控制消息发送到 Kafka 主题的速率。
  • 确保测试的可重复性。
  • 将数据和应用程序发布在 GitHub 上供任何人使用,而无需配备 ADS-B 接收器。

我选择使用 Kafka 是为了让场景更具真实感,因为它是现代流式/事件驱动应用程序中分发消息最常用的方式之一,具有高可用性、交付保证和灵活分发的优势。为了简化操作,该应用程序使用了 Confluent 提供的 Kafka Docker 镜像。

为了处理从 Kafka 主题接收和处理消息流,我创建了两个功能相同的 Java 应用程序——一个使用传统的阻塞代码,另一个使用 Project Reactor 以及用于 Kafka 和 MongoDB 的响应式流驱动程序。

在添加 Java 应用程序后,整体应用程序流程如下:

  1. 流处理应用程序从 MongoDB 读取 ADS-B 消息。
  2. 流处理应用程序以指定的速率(每秒消息数)将每条消息推送到 Kafka。每条消息的原始时间戳都会更新为当前日期/时间。当 MongoDB 中的所有消息读取完毕后,应用程序将从集合中的第一条 ADS-B 消息开始重新循环。
  3. Java 应用程序依次从 Kafka 主题读取每条消息。
  4. 对于每条接收到的消息,Java 应用程序会根据从联邦航空管理局(FAA)下载并保存在第二个 MongoDB 集合中的注册数据,查找飞机的注册号(尾号)。如果找到匹配的注册信息,Java 应用程序将通过添加来自 FAA 数据的飞机制造商和型号来丰富(enrich)原始 ADS-B 消息。
  5. 最后,Java 应用程序将更新/丰富后的 ADS-B 消息保存到一个新的 MongoDB 集合中。

阻塞代码版本

在阻塞式应用程序中,使用简单的轮询机制从 Kafka 检索 ADS-B 消息。代码的主要部分如下:

代码片段
try (MongoClient mongoClient = MongoClients.create(config.getMongoUri())) { // 获取 MongoDB 数据库引用 MongoDatabase database = mongoClient.getDatabase(config.getMongoDatabase()); // 获取包含 FAA 注册数据的集合引用 MongoCollection<Document> registeredAircraft = database.getCollection(REGISTERED_AIRCRAFT_COLLECTION); // 获取用于存放装饰后消息的输出集合引用 MongoCollection<Document> outputCollection = database.getCollection(config.getOutputCollection()); // 如果输出集合已存在则删除它,写入时会自动重新创建 outputCollection.drop(); AircraftEnricher enricher = new AircraftEnricher(registeredAircraft); MongoMessageWriter writer = new MongoMessageWriter(outputCollection); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Collections.singletonList(config.getKafkaTopic())); try { while (true) { // 重复轮询 Kafka 以获取 ADS-B 消息 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord<String, String> record : records) { try { // 将每条接收到的消息转换为 POJO AdsbMessage msg = AdsbMessage.fromJson(record.value()); // 查找飞机制造商和型号 enricher.enrich(msg); // 将丰富后的消息写入 MongoDB writer.write(msg); // 统计写入 MongoDB 的文档数量 written.incrementAndGet(); } catch (Exception e) { log.warn("处理消息失败: {}", e.getMessage()); } } } } catch (org.apache.kafka.common.errors.WakeupException e) { // 关闭时预期的异常 } } }

这段代码创建了一个 KafkaConsumer 实例,轮询 Kafka 主题 500 毫秒,然后在再次轮询之前处理每条接收到的消息。此过程一直持续到应用程序被中断,并维护写入 MongoDB 的消息计数。

AircraftEnricherMongoMessageWriter 类使用标准的同步 MongoDB 驱动程序处理与 MongoDB 的交互。AircraftEnricher 类的 enrich 方法如下:

代码片段
public void enrich(AdsbMessage message) { // 从 ADS-B 消息中获取飞机注册号 String reg = message.getReg(); if (reg == null) { return; } // registeredAircraft 是包含 FAA 注册数据的 MongoDB 集合引用 Document aircraft = registeredAircraft // 美国飞机注册号均以字母 'N' 开头,因此使用 'n_number' // 此查询返回 'n_number' 字段与 ADS-B 消息 reg 字段匹配的第一条文档 .find(Filters.eq("n_number", reg)) .first(); if (aircraft == null) { return; } Object manufacturer = aircraft.get("manufacturer"); Object model = aircraft.get("model"); if (manufacturer != null) { message.setManufacturer(manufacturer.toString()); } if (model != null) { message.setModel(model.toString()); } }

MongoMessageWriter 类的 write 方法执行简单的 MongoDB insertOne 操作,将丰富后的 ADS-B 消息写入目标输出集合:

代码片段
public void write(AdsbMessage message) { collection.insertOne(message.toDocument()); }

响应式代码版本

响应式代码版本使用了 Project Reactor,包括其响应式 KafkaReceiver 类,以及 MongoDB 的响应式流(Reactive Streams) Java 驱动程序。该版本的主要函数如下:

代码片段
public static void main(String[] args) { // 连接到 MongoDB MongoClient mongoClient = MongoClients.create(config.getMongoUri()); // 获取 MongoDB 数据库引用 MongoDatabase database = mongoClient.getDatabase(config.getMongoDatabase()); // 获取包含 FAA 注册数据的集合引用 MongoCollection<Document> registeredAircraft = database.getCollection(REGISTERED_AIRCRAFT_COLLECTION); // 获取用于存放装饰后消息的输出集合引用 MongoCollection<Document> outputCollection = database.getCollection(config.getOutputCollection()); // 删除输出集合 - 写入时会自动重新创建 Mono.from(outputCollection.drop()).block(); AircraftEnricher enricher = new AircraftEnricher(registeredAircraft); MongoMessageWriter writer = new MongoMessageWriter(outputCollection); ReceiverOptions<String, String> receiverOptions = ReceiverOptions.<String, String>create(consumerProps) .subscription(Collections.singleton(config.getKafkaTopic())); // 订阅 Kafka 主题并运行以下操作流水线 Disposable pipeline = KafkaReceiver.create(receiverOptions) // 从 Kafka 接收消息 .receive() // 将 JSON 消息转换为 POJO。mapNotNull 会过滤掉解析时抛出异常的消息 .mapNotNull(record -> { try { return AdsbMessage.fromJson(record.value()); } catch (Exception e) { log.warn("解析消息失败: {}", e.getMessage()); return null; } }) // 查找并添加飞机制造商和型号 .flatMap(enricher::enrich) // 将装饰后的消息写入输出 MongoDB 集合 .flatMap(msg -> writer.write(msg).thenReturn(msg)) // 统计写入 MongoDB 的装饰后消息数量 .doOnNext(msg -> { written.incrementAndGet(); }) .subscribe(); try { // 这里将阻塞直到主 JVM 线程被中断。 // 响应式流水线异步运行 // 如果没有这一行,应用程序将立即终止。 Thread.currentThread().join(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }

此类响应式流水线的有趣之处在于,每个阶段的输出都会成为下一个阶段的输入。在我们的流水线中,每个阶段都会产生一个 Flux,允许对流中的每条消息进行异步、非阻塞处理:

阶段输入类型输出类型
receive()---Flux<ReceiverRecord<String, String>>
mapNotNull()Flux<ReceiverRecord<String, String>>Flux<AdsbMessage>
flatMap (enrich)Flux<AdsbMessage>Flux<AdsbMessage>
flatMap (write)Flux<AdsbMessage>Flux<AdsbMessage>
doOnNext ()Flux<AdsbMessage>Flux<AdsbMessage>
subscribe()Flux<AdsbMessage>Disposable

在丰富消息时使用 .flatMap() 操作符,以及随后将丰富后的消息写入 MongoDB 的过程值得说明。

响应式流水线通常包含实现 map/reduce 类型语义的操作符,map 风格的操作符产生 Flux 输出,而 reduce 风格的操作符产生 Mono。

在我们的流水线中,执行 MongoDB 数据库操作时使用了 .flatMap() 操作符。与同步版本的 AircraftEnricher.enrich()MongoMessageWriter.write() 方法相比,响应式版本执行相同的操作,但具有不同的返回类型:

public Mono<AdsbMessage> enrich(AdsbMessage message)

public Mono<Void> write(AdsbMessage message)

enrich() 方法返回 Mono<AdsbMessage>,而 write() 返回 Mono<Void>。在调用 write() 时,我们通过链式调用 .thenReturn(msg) 将其映射为 Mono<AdsbMessage>

Map 类型操作总是返回一个 Flux,但如果我们使用 map 而不是 flatMap 来调用这些方法,输出将是:

Flux<Mono<AdsbMessage>>

这将导致两个问题:

  • 由于 map 操作从不订阅这些 Mono,因此 MongoDB 操作实际上永远不会执行——大多数响应式生产者使用惰性执行,这意味着只有在被订阅时才会开始执行。
  • 下游操作期望的是 Flux<AdsbMessage>,而不是 Flux<Mono<AdsbMessage>>

.map() 相比,.flatMap() 的执行逻辑如下:

  1. 它从输入的 Flux 中获取每个 AdsbMessage
  2. 它执行 MongoDB 操作(enricher.enrich(msg)writer.write(msg).thenReturn(msg))以获得一个 Mono<AdsbMessage>
  3. 它订阅该 Mono。
  4. 当 Mono 发出信号时,该 AdsbMessage 会进一步在输出的 Flux 上发出,传递给下游操作。

正如你现在所预料的那样,所有这些操作都是异步处理的,允许从 Kafka 到达的消息流在没有任何阻塞操作的情况下被处理。

阻塞与响应式应用程序的性能对比

在两个版本的应用程序完成后,我们使用以下参数进行了测试:

  1. 测试在 MacBook Pro M4 Pro(48GB 内存)上进行。
  2. 流处理应用程序被设置为每秒向 Kafka 主题推送 1000 条消息。
  3. 每个应用程序运行 2 分钟,并测量写入 MongoDB 的文档数量。
  4. Kafka 部署在本地 Docker Desktop 的容器中。
  5. MongoDB 部署为本地单节点副本集,版本为 Community Edition 8.2.3。
  6. 应用程序使用 openjdk 21.0.7 2025-04-15 LTS 运行。

在每个版本的应用程序运行两分钟后,我们观察到:

  • 阻塞式应用程序处理了 17,172 条消息
  • 响应式应用程序处理了 119,211 条消息

这表明响应式应用程序处理消息的速度基本上与消息推送到 Kafka 的速度一致,而阻塞版本则无法跟上消息流。

最终思考

尽管测试两个版本应用程序的指标表明,在处理流式数据时,响应式 Java 比传统 Java 技术具有显著的性能优势,但毫无疑问,许多人会观察到阻塞版本的代码是单线程的,因此处于劣势。实际上,当我测试一个显式使用多线程的阻塞代码版本时,由于两个数据库操作本质上是顺序的——在 enrich 操作完成之前无法将 ADS-B 消息写入 MongoDB——两个版本代码之间的性能差异微乎其微。这凸显了在考虑响应式方法时的一个关键考量:

  • 当存在可以在 IO 绑定任务被阻塞时并行执行的任务时,异步响应式程序能提供最大的收益。

正如我们在最初的圆周率计算示例中所看到的,响应式方法在任务可以异步执行时,其优势可能是巨大的。

除了性能考量之外,响应式方法对于流式应用程序的另一个优势是它允许采用优雅、声明式的编码风格。相比之下,向阻塞代码中添加线程处理会增加复杂性,并需要更冗长、命令式的代码风格。

如果你正在 Java 中处理流式数据,特别是来自 Kafka 或类似消息系统的数据,结合 MongoDB 及其响应式流驱动程序是一个非常引人注目的数据库选择。也请务必查看 Atlas Stream Processing,它使用熟悉的 MongoDB 聚合语义,直接在 MongoDB Atlas DBAAS 服务中提供消息流处理功能。

本文使用的所有代码均可在 GitHub 上找到。