Ohhnews

分类导航

$ cd ..
InfoQ Java原文

Kafka和Flink管道中的Schema过度扩张问题及解决方案

#kafka#flink#schema过度扩张#schema合并#事件驱动架构

核心要点

  • 一对一的事件-模式映射在开始时很简单,但随着系统扩展,它会逐渐累积下游的复杂性:查询碎片化、维护成本上升和模式漂移。
  • 具有80%到95%结构重叠的事件模式可以通过使用判别器枚举字段进行合并,从而减少表数量(例如,从十多个表减少到两个),并实现单表消费者查询。
  • 可空属性块支持向后兼容的模式演进,允许在不破坏现有消费者的情况下添加新的事件变体。
  • 分层适配器设计将转换逻辑与框架集成分离,使得在现有Apache Flink管道中更容易实现和测试模式合并。
  • 围绕消费者访问模式设计模式可以使查询更简单,并减少事件驱动系统中的长期维护开销。

引言

大多数构建Apache KafkaApache Flink管道的团队,在事件类型目录达到几十种类型时,都会遇到同样的障碍。最初,每个事件都有自己的模式,系统干净,但逐渐成为维护负担。查询变得复杂,模式变更变成协调工作,数据湖开始看起来更像数据沼泽。本文聚焦于这个特定问题:一对一的事件-模式映射导致的模式扩散。这是一个结构性问题,缓慢积累,修复代价高昂。解决方案是基于判别器的模式合并,这是一种减少模式数量、简化下游消费、并使模式演进可控的模式。例子使用共享出行领域,因为它使问题具体化。同样的模式可以应用于任何高基数事件类型具有显著结构重叠的系统,包括联络中心平台、金融交易系统和医疗保健事件管道。

大规模下一对一映射的样子

让我们从简单的例子开始。一个共享出行平台跟踪司机的多种事件类型:

  • 接单
  • 开始行程
  • 完成行程
  • 取消行程

这四种事件类型代表了司机参与行程请求的完整生命周期,从司机接受请求到最终完成或取消。每个事件都带有时间戳、司机标识符、行程标识符以及出现在每个变体中的城市上下文。每个事件还根据行程类型而变化:

  • 标准
  • 拼车
  • 预约

标准行程是单乘客行程,跟踪车辆类型和动态定价。拼车行程合并多个乘客,跟踪乘客数量和拼车效率评分。预约行程是提前预订的,带有预定出发时间和提前预订的分钟数。直觉是为每个组合创建一个模式。这种方法会得到:

DriverRideAcceptedStandardEvent
DriverRideAcceptedSharedEvent
DriverRideAcceptedScheduledEvent
DriverRideStartedStandardEvent
DriverRideStartedSharedEvent
...

四种事件类型和三种行程类型共十二个模式。实际平台有更多事件类型、更多子类型,而且随着新功能发布,列表还在增长。每个模式映射到数据湖中的一个独立下游表。十二个模式意味着十二个表。如果你在S3上使用Apache Iceberg,那就是要管理十二个表、十二个Schema Registry条目进行版本管理,以及Flink管道中的十二个适配器类。系统在一开始还能工作,直到变得难以管理,如图1所示,其中N种事件类型发散出N个独立的模式和N个独立的Iceberg表。跨变体查询需要对所有表进行UNION操作。

[LOADING...] 图1. 每个事件变体一个模式的模式扩散(之前)。(作者图片。)

问题:模式扩散

当系统积累模式的速度超过了团队能够证明其复杂性的合理性时,就会出现模式扩散。它有四个主要症状。

查询复杂性

当数据消费者想回答“司机4821在过去一小时做了什么?”时,他需要联合所有相关表。如果问题跨越事件类型和行程类型,查询可能需要涉及八到十个表。这不是一个查询,而是一个项目。

维护开销

在扩散系统中,大多数模式共享80%到95%的字段。当一个共享字段发生变化时,该变化必须复制到所有包含它的模式中。单个字段重命名意味着二十个模式更新、二十个适配器更新和二十轮测试。

模式漂移

独立维护的模式会随时间产生差异。一个团队为同一概念添加的字段名称与另一个团队使用的略有不同。可空性决策变得不一致。类型发生分歧。没有人有意造成这种不一致,只是没有强制机制来保持模式对齐。

生产者-消费者不匹配

一对一模式设计为生产者优化。从消费者角度看,所有这些事件都是同一概念的变化:行程生命周期中发生了某事。消费者希望一起查询它们,而不必考虑十二个表中哪个包含他们所需的记录。以生产者为中心的模式和以消费者为中心的查询之间的不匹配是大多数模式扩散痛苦的根源。

解决方案:合并模式设计

高基数系统中的大多数事件变体共享一个结构核心。DriverRideAcceptedStandardEventDriverRideAcceptedSharedEvent 之间的差异很小。其他一切都相同。不是每个变体一个模式,而是每个逻辑域使用一个合并模式,如图2所示,使用判别器字段来标识变体,并使用可空属性块来携带变体特定的数据。合并方法将十二个事件类型变体折叠成一个单一的DriverRideActivityRecord模式。消费者查询一个表并按判别器字段过滤,无需UNION。

[LOADING...] 图2. 合并模式:基于判别器的路由(之后)。(作者图片。)

判别器字段

合并模式中的每条记录都带有明确的判别器字段,以确切标识它所代表的事件类型:

eventType: enum (ACCEPTED, STARTED, COMPLETED, CANCELLED)
rideType: enum (STANDARD, SHARED, SCHEDULED)

这些字段始终被填充。消费者无需查询十二个表来查找完成行程,只需查询一个表并按eventType = 'COMPLETED'过滤。使用枚举而非自由格式字符串很重要。枚举在适配器代码中提供编译时安全,在下游查询引擎中实现高效谓词过滤,并明确记录哪些值是有效的。如果使用普通字符串字段,无法阻止生产者写入“complete”而非“COMPLETED”,从而静默破坏下游过滤器。

共享字段

无论变体如何,每条记录中都会出现的字段放在合并模式的顶层:

eventTime: timestamp
driverId: string
rideId: string
cityId: string
fareAmount: double (null for pre-completion events)
durationMins: int (null for pre-completion events)

可空属性块

仅适用于某些行程类型的字段被分组为可空嵌套结构。每条记录恰好填充一个块。其他所有块为null。

$ cat
{
  "type": "record",
  "name": "DriverRideActivityRecord",
  "fields": [
    {"name": "eventTime", "type": "long"},
    {"name": "driverId", "type": "string"},
    {"name": "rideId", "type": "string"},
    {"name": "eventType", "type": {"type": "enum", "name": "EventType", "symbols": ["ACCEPTED","STARTED","COMPLETED","CANCELLED"]}},
    {"name": "rideType", "type": {"type": "enum", "name": "RideType", "symbols": ["STANDARD","SHARED","SCHEDULED"]}},
    {"name": "standardRideAttributes", "type": ["null", {
      "type": "record", "name": "StandardRideAttributes",
      "fields": [
        {"name": "vehicleClass", "type": "string"},
        {"name": "surgeMultiplier", "type": "double"}
      ]
    }], "default": null},
    {"name": "sharedRideAttributes", "type": ["null", {
      "type": "record", "name": "SharedRideAttributes",
      "fields": [
        {"name": "passengerCount", "type": "int"},
        {"name": "poolingScore", "type": "double"}
      ]
    }], "default": null},
    {"name": "scheduledRideAttributes", "type": ["null", {
      "type": "record", "name": "ScheduledRideAttributes",
      "fields": [
        {"name": "scheduledTime", "type": "long"},
        {"name": "advanceBookingMinutes", "type": "int"}
      ]
    }], "default": null}
  ]
}

如图3所示,拼车行程的记录会填充sharedRideAttributes,并将所有其他属性块留为null。判别器字段rideType = SHARED告诉消费者检查哪个块。

[LOADING...] 图3. 可空属性块:每条记录恰好填充一个块。(作者图片。)

在Flink管道中实现此模式

在Kafka-Flink管道中,合并发生在Kafka摄取与下游序列化之间的处理层。架构使用两个层。

第一层:转换逻辑

每个源事件类型都有一个专用适配器类,负责将该事件映射到合并模式:

$ java
public class SharedRideAcceptedAdapter implements RecordAdapter {
    @Override
    public DriverRideActivityRecord adapt(String orgId, DriverRideAcceptedSharedEvent event) {
        DriverRideActivityRecord record = new DriverRideActivityRecord();
        record.setEventTime(event.getTimestamp());
        record.setDriverId(event.getDriverId());
        record.setRideId(event.getRideId());
        record.setCityId(event.getCityId());
        record.setEventType(EventType.ACCEPTED);
        record.setRideType(RideType.SHARED);
        SharedRideAttributes attrs = new SharedRideAttributes();
        attrs.setPassengerCount(event.getPassengerCount());
        attrs.setPoolingScore(event.getPoolingScore());
        record.setSharedRideAttributes(attrs);
        return record;
    }
}

这些适配器类不依赖Flink框架。它们是纯转换逻辑,无需任何框架设置即可直接进行单元测试。

第二层:框架集成

$ java
public class RideActivityConsolidationJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(60_000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30_000);

        KafkaSource source = KafkaSource.builder()
            .setBootstrapServers(config.getKafkaBrokers())
            .setTopics("ride-events")
            .setGroupId("ride-consolidation-consumer")
            .setValueOnlyDeserializer(new RawRideEventDeserializer())
            .build();

        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka ride events")
            .map(new ConsolidationAdapter(adapterRegistry))
            .sinkTo(icebergSink);

        env.execute("Ride Activity Schema Consolidation");
    }
}

该作业从单个Kafka主题读取原始事件,通过ConsolidationAdapter路由每个事件(它根据事件类型查找正确的适配器并调用adapt()),然后将合并后的记录写入S3上的Iceberg表。adapterRegistry是一个从事件类型到对应适配器实例的映射,在作业启动时初始化一次并传入ConsolidationAdapter类。有意调用WatermarkStrategy.noWatermarks(),因为模式合并是无状态转换,不涉及事件时间窗口,因此没有需要推进的水位线。精确一次检查点协调Kafka偏移提交与Iceberg的事务提交协议,因此失败后作业重启会从最后一个一致检查点重放,不会产生重复记录。三十秒的最小暂停防止慢速检查点立即触发下一个检查点。检查点间隔控制重启时重放的数据量,应根据吞吐量和可接受的恢复时间进行调优。

使用Apache Avro的模式演进

Apache Avro是一种基于模式的二进制序列化格式,广泛用于Kafka管道。它与Schema Registry配合管理模式版本并强制执行兼容性规则,随着系统增长,这些规则变得越来越重要。

团队对合并模式提出的主要担忧之一是演进。如果所有内容都放入一个模式,添加新事件类型是否需要触及每个消费者?不需要。规则是:新的属性块必须始终为可空且默认值为null。当发布新的行程类型时,模式更新添加一个可空的premiumRideAttributes块(默认null),并在rideType枚举中加入PREMIUM。基于旧模式编译的现有消费者读取新记录时,会看到premiumRideAttributes为null。它们不会中断,也不需重新部署。

有一个边界情况值得注意:在某些Schema Registry兼容模式下,向Avro枚举添加新值并不总是安全的。在向后兼容模式下,基于旧模式编译的消费者可能会遇到未知符号并抛出反序列化错误。向前兼容则涵盖相反的方面:较旧的生产者写入记录,而较新的消费者需要读取。对于会随时间演进的合并模式,FullFull_Transitive兼容性是最安全的设置,因为它同时验证两个方向,并在枚举添加到达生产环境之前捕获它们。

相比之下,一对一的方法:添加高级行程类型意味着新模式、新表、新适配器代码、新Schema Registry条目,而且希望跨所有行程类型查询的消费者现在需要在它们的UNION中添加另一个表。

权衡

更宽的记录

合并模式携带可空块,这些块对于大多数记录来说是空的。使用Avro配合正确的null处理,序列化成本极低,但并非为零。在极高吞吐量(每天数十亿事件)下,额外的序列化开销值得在承诺之前进行基准测试。

模式管治

由多个团队拥有的合并模式需要明确的归属。带有强制兼容性规则的Schema Registry自动处理机械方面,但仍需有人来决定什么内容进入模式、什么内容排除在外。没有这一点,你只是用模式扩散换取了另一种漂移。

调试

当特定事件类型出现问题时,问题在一个混有很多其他事件类型的表中。额外的过滤步骤并不昂贵,但这是调试工作流的一个变化,值得向团队说明。像SELECT * FROM driver_ride_activity LIMIT 100这样的查询如果不加WHERE eventType = 'COMPLETED'过滤器,会返回所有事件类型的混合。这个问题很容易修复,但需要团队养成新习惯。

何时不应该使用

当事件类型在结构上重叠且经常一起查询时,此模式才有意义。如果两个事件类型的字段完全不同且从不在同一上下文中被查询,将它们合并并没有实际收益。

这种方法在实践中的表现

部署合并设计后,最直接的变化出现在数据层。之前需要查询十个独立表的事件组现在只查询两个表。对分析师来说,这意味着可以在五分钟内写出的查询与需要知道哪些表包含哪些变体以及如何正确UNION它们的查询之间的区别。

模式变更的传播变得简单得多。当模式共享90%到95%的字段时,以往对共享字段的任何更改都需要更新所有包含该字段的模式、协调适配器更改并跨团队管理下游表更新。使用合并模式,同样的更改只是一个模式更新。添加新的事件变体也遵循相同模式:它是增量的,不触及现有消费者,不需要新表,也不需要从头编写新适配器。

超越适配器:原生多事件支持

此处描述的两层适配器模式是在现有管道框架(每个映射处理一个事件类型)中工作时的正确解决方案。它提供了所有合并带来的好处,而无需更改底层框架。

对于正在构建或扩展处理框架的团队,有一种更简洁的选项:在框架层面原生支持每个映射的多个事件类型。在适配器方法中,框架仍然认为它在处理一个事件类型。合并发生在顶层添加的层中。当框架原生支持多个输入事件类型映射到单个输出模式时,自定义路由层就消失了。转换逻辑本身(将每个源事件映射到合并模式的适配器类)仍然存在。消失的是手动将事件路由到正确适配器的分派器。框架原生接管该责任。

大多数团队遵循的实际路径:首先实现适配器模式,以验证合并模式设计有效。一旦价值明确,原生框架支持的理由不言自明。无论哪种方式,合并模式设计保持不变。适配器层是最终会消失的部分。

结论

模式扩散是一种在感觉可控直到不可控的问题。当它变得痛苦时,分析师正在编写十二表的UNION查询,工程师正在为一次字段重命名更新二十个模式。模式已成为承重墙。修复它们意味着跨团队协调、更新下游消费者以及迁移历史数据。这些都不是不可能的,但全部代价高昂。

这里的合并模式并非适用于每个系统。当事件类型共享结构并经常一起查询时,它工作良好。它增加了需要刻意管理的管治开销。对于某些团队来说,这些权衡可能不值得。但是,如果你面对一个不断扩散的模式目录,而其中大多数模式共享80%到90%的字段,那么值得问的问题不是是否要合并,而是还要等多久。

参考实现可在以下GitHub仓库中找到。

关于作者

[LOADING...]

Spoorthi Basu

Spoorthi Basu 是一名软件工程师,专注于分布式系统、数据工程和实时流处理,在Apache Kafka和Flink上构建容错系统和大型事件驱动管道(将数据写入AWS S3上的Apache Iceberg)方面拥有五年经验。她是Apache Flink CDC的活跃贡献者。