Apache Paimon Java API 指南
1. 概述
在本教程中,我们将探讨 Apache Paimon 的 Java API,这对于从自定义应用程序管理 Paimon 数据库至关重要。我们将使用它在 HDFS 存储中创建包含表的数据湖,并对其执行 CRUD 操作。
这种开源数据湖仓一体格式为实时流处理、批处理和 OLAP 提供了统一的存储层。 它既可作为流处理应用的消息队列,也可作为批处理应用的 HIVE 表。它支持在各种存储系统上构建数据湖,包括对象存储、HDFS 和本地文件系统。此外,它还能与 Apache Spark、Apache Flink 和 Trino 等流行的分布式数据处理引擎无缝集成,用于构建可扩展的数据管道。
2. 前提条件
我们从 Java 应用程序集成 Apache Paimon 所需的 Maven 依赖 开始:
在我们的示例中,将使用 HDFS 作为存储,因此需要包含用于与之交互的 运行时库:
3. 创建数据湖
让我们考虑一个 数据湖,用于存储由轮询设备的监控工具收集的 IT 基础设施性能指标。这些指标会迅速积累大量数据,而像 Paimon 这样的数据湖可以长期可靠地存储它们。
定义数据模型: [LOADING...]
在将监控数据导入 Apache Paimon 之前,我们需要通过创建目录(Catalog)并定义表模式来设置存储层。在 Paimon 中,目录作为数据湖的入口点,而表则定义了数据如何组织、类型化和版本化。
接下来,我们来看一个简单的工具类,负责初始化数据湖并创建指标表:
在这个类中,PaimonDatabaseManager.createCatalog() 通过使用 CatalogContext 配置仓库路径,创建了一个由本地文件系统支持的 Catalog 实例。 最后,CatalogFactory.createCatalog() 使用上下文创建 Paimon 数据库。
此外,PaimonDatabaseManager.createTable() 方法为指标表构建了一个 Schema,并使用 Identifier 在 metric_db 数据库下注册该表。表模式包含了带类型的列和一个复合主键。
现在,我们使用 PaimonDatabaseManager 类创建一个 Paimon 数据库表:
在程序中,我们首先调用 PaimonDatabaseManager.createCatalog(WAREHOUSE_PATH) 创建 Paimon 数据库目录。参数 WAREHOUSE_PATH 包含为数据库创建的临时目录路径。然后调用 PaimonDatabaseManager.createTable() 在目录中创建 metrics 表。最后,调用 Catalog.listTables() 列出 metric_db 数据库中的所有表,并确认其中包含 metrics 表。
4. 向表中插入记录
在本节中,我们将学习用于向 Paimon metrics 表中插入指标记录的 API:
首先,我们从 Catalog 对象中获取与 tableId 关联的 Table 对象。然后,通过调用 Table#newBatchWriteBuilder() 获取 BatchWriteBuilder 类的实例。该构建器对象有助于创建用于对表执行写入操作的 BatchTableWrite 对象。
接着,我们在 createGenericRow() 方法中将每个指标记录转换为 GenericRow 对象:
在此过程中,每个列值都被转换为对应的 Paimon 兼容数据类型,例如 BinaryString 和 Timestamp。
在 insert 方法中创建 GenericRow 对象后,BatchTableWrite.write() 将每一行写入 metrics 表。最后,在循环外将数据作为一个批次提交。
让我们运行 PaimonTableDataManager.insert() 向 metrics 表中插入一些记录:
首先,通过断言确保 metrics 表存在于目录中。参数 catalog 和 tableId 分别保存了我们在创建 metrics 表时获取并存储的 Catalog 对象和表标识符。getMetrics() 方法可能是一个从监控工具获取指标数据的服务。在我们的示例中,我们从 CSV 文件 metrics.out 中检索合成数据:
然而,实际应用通常按固定时间间隔轮询监控工具以获取最新指标。最后,我们调用 insert 方法将新记录插入到 metrics 表中。
5. 从表中查询记录
收集原始指标后,需要从数据库中获取并处理以得出可操作的分析结果。API 支持过滤器下推和列投影,有助于提升性能。此外,API 同时支持批处理读取和流式数据,适用于分析和实时场景。
让我们使用 Paimon Java API 实现批处理读取操作:
fetchMetricsBySourceAndDateRange() 方法查询 metrics 表,以获取在指定日期范围内来自特定来源的指标数据。 在该方法中,PredicateBuilder 帮助创建带有对 source 和 create_time 列的过滤条件的 Predicate 对象。Table#newReadBuilder() 创建一个 ReadBuilder 对象,随后 ReadBuilder#withFilter() 设置必要的谓词。此外,我们还定义了一个 projection 数组,仅获取所需列。
接着,构建器创建数据分片(splits),这些分片可以分配到多个线程或进程中,从而实现更快的执行。这些分片类似于对应表的分区或数据文件。然后,ReadBuilder#newRead() 创建一个 TableRead 对象,并在其上调用 executeFilter() 方法。之后,TableRead#createReader() 为生成的分片创建一个 RecordReader。多个读取器可以并发处理不同的分片,从而实现可扩展且高效的并行读取。最后,调用 RecordReader#forEachRemaining() 遍历行并获取指标属性。
现在,让我们运行程序,从特定来源和日期范围内获取指标:
程序获取了 4 条来源为 collector、创建时间介于 2026-04-21 18:58:19 和 2026-04-21 18:58:30 之间的指标行。
6. 更新表记录
Paimon 表没有专门的更新记录的 Java API。然而,当我们执行写入操作时,就像对具有主键的现有记录执行插入操作一样,不会插入新行,而是更新现有行。
为了理解更新操作,我们在 PaimonTableDataManager 类中定义了以下方法:
首先,我们通过调用 fetchMetricByDeviceIdMetricNameAndCreatedDate() 方法获取由主键(包含设备 ID、指标名称和创建日期)过滤的指标记录。该方法与上一节中从指标表查询记录的方法类似。
然后,我们将 Metric#state 属性更新为新的状态。其余步骤与前面描述的插入方法大致相同。然而,并非创建新记录,而是将现有指标的状态更新为新状态。
让我们执行 updateMetricStateByDeviceIdMetricNameAndCreatedDate():
我们通过复合主键获取了一条指标记录,并将其状态更新为 inactive。随后,我们再次获取该记录,并确认状态已更新为 inactive。## 7. 删除表记录
接下来,Paimon Java API 也支持删除操作:
首先,我们通过调用 PaimonTableTableManager.fetchMetricByDeviceIdMetricNameAndCreatedDate() 获取了一条指标记录。接下来的步骤与更新方法几乎相同,只是我们没有修改指标记录中的任何字段。随后,在调用 BatchTableWrite#write() 之前,我们通过调用 BatchTableWrite#setRowKind(RowKind.DELETE) 将操作标记为删除。
现在,通过执行以下方法来验证 deleteRecordsByDeviceIdMetricNameAndCreatedDate() 方法:
首先,我们通过调用 PaimonTableTableManager.fetchMetricByDeviceIdMetricNameAndCreatedDate() 获取与设备对应的指标记录。确认记录不为 null 后,我们调用删除方法。最后,我们再次查询指标表,并确认 Paimon DB 中已不存在该记录。
8. 结论
在本文中,我们探讨了管理 Paimon 数据库的核心 Java API,包括创建表、执行批量插入、更新、删除以及执行带过滤条件的数据查询。我们使用本地文件系统创建了 Paimon DB,但无论选择何种存储层,这些 API 都是标准且有效的。
像往常一样,源代码可以在 GitHub 上找到。