Ohhnews

分类导航

$ cd ..
Baeldung原文

Apache Paimon Java API 指南

#apache paimon#java api#数据湖#crud#hdfs

1. 概述

在本教程中,我们将探讨 Apache PaimonJava API,这对于从自定义应用程序管理 Paimon 数据库至关重要。我们将使用它在 HDFS 存储中创建包含表的数据湖,并对其执行 CRUD 操作。

这种开源数据湖仓一体格式为实时流处理、批处理和 OLAP 提供了统一的存储层。 它既可作为流处理应用的消息队列,也可作为批处理应用的 HIVE 表。它支持在各种存储系统上构建数据湖,包括对象存储、HDFS 和本地文件系统。此外,它还能与 Apache Spark、Apache Flink 和 Trino 等流行的分布式数据处理引擎无缝集成,用于构建可扩展的数据管道。

2. 前提条件

我们从 Java 应用程序集成 Apache Paimon 所需的 Maven 依赖 开始:

$ xml
<dependency>
    <groupId>org.apache.paimon</groupId>
    <artifactId>paimon-bundle</artifactId>
    <version>1.4.1</version>
</dependency>

在我们的示例中,将使用 HDFS 作为存储,因此需要包含用于与之交互的 运行时库

$ xml
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client-runtime</artifactId>
    <version>3.4.3</version>
</dependency>

3. 创建数据湖

让我们考虑一个 数据湖,用于存储由轮询设备的监控工具收集的 IT 基础设施性能指标。这些指标会迅速积累大量数据,而像 Paimon 这样的数据湖可以长期可靠地存储它们。

定义数据模型: [LOADING...]

在将监控数据导入 Apache Paimon 之前,我们需要通过创建目录(Catalog)并定义表模式来设置存储层。在 Paimon 中,目录作为数据湖的入口点,而表则定义了数据如何组织、类型化和版本化。

接下来,我们来看一个简单的工具类,负责初始化数据湖并创建指标表:

$ java
public class PaimonDatabaseManager {
    public static Catalog createCatalog(String warehousePath) {
        CatalogContext context = CatalogContext.create(new Path(warehousePath));
        return CatalogFactory.createCatalog(context);
    }
    public static Identifier createTable(Catalog catalog) throws Exception {
        Schema schema = Schema.newBuilder()
          .column("device_id", DataTypes.STRING())
          .column("metrics_name", DataTypes.STRING())
          .column("metrics_value", DataTypes.DOUBLE())
          .column("source", DataTypes.STRING())
          .column("create_time", DataTypes.TIMESTAMP(3))
          .column("state", DataTypes.STRING())
          .primaryKey("device_id", "metrics_name", "create_time")  
          .build();
        Identifier tableId = Identifier.create("metric_db", "metrics");
        catalog.createDatabase("metric_db", true);
        catalog.createTable(tableId, schema, false);
        return tableId;
    }
}

在这个类中,PaimonDatabaseManager.createCatalog() 通过使用 CatalogContext 配置仓库路径,创建了一个由本地文件系统支持的 Catalog 实例。 最后,CatalogFactory.createCatalog() 使用上下文创建 Paimon 数据库。

此外,PaimonDatabaseManager.createTable() 方法为指标表构建了一个 Schema,并使用 Identifiermetric_db 数据库下注册该表。表模式包含了带类型的列和一个复合主键。

现在,我们使用 PaimonDatabaseManager 类创建一个 Paimon 数据库表:

$ java
void whenCallCreateTable_thenTableCreated() throws Exception {
    catalog = PaimonDatabaseManager.createCatalog(WAREHOUSE_PATH);
    assertNotNull(catalog);
    
    tableId = PaimonDatabaseManager.createTable(catalog);
    assertNotNull(tableId);
    assertTrue(catalog.listTables("metric_db").contains("metrics"));
}

在程序中,我们首先调用 PaimonDatabaseManager.createCatalog(WAREHOUSE_PATH) 创建 Paimon 数据库目录。参数 WAREHOUSE_PATH 包含为数据库创建的临时目录路径。然后调用 PaimonDatabaseManager.createTable() 在目录中创建 metrics 表。最后,调用 Catalog.listTables() 列出 metric_db 数据库中的所有表,并确认其中包含 metrics 表。

4. 向表中插入记录

在本节中,我们将学习用于向 Paimon metrics 表中插入指标记录的 API:

$ java
public class PaimonTableDataManager {
    public static void insert(Catalog catalog, Identifier tableId, List metrics) throws Exception {
        Table table = catalog.getTable(tableId);
        BatchWriteBuilder builder = table.newBatchWriteBuilder();
        BatchTableWrite write = builder.newWrite();
        metrics.forEach(metric -> {
          try { 
              GenericRow row = createGenericRow(metric);
              write.write(row, 0);
          } catch (Exception e) {
              logger.error("Error writing metric", e);
          }
        });
        List messages = write.prepareCommit();
        BatchTableCommit commit = builder.newCommit();
        commit.commit(messages);
    }
}

首先,我们从 Catalog 对象中获取与 tableId 关联的 Table 对象。然后,通过调用 Table#newBatchWriteBuilder() 获取 BatchWriteBuilder 类的实例。该构建器对象有助于创建用于对表执行写入操作的 BatchTableWrite 对象。

接着,我们在 createGenericRow() 方法中将每个指标记录转换为 GenericRow 对象:

$ java
private static GenericRow createGenericRow(Metric metric) {
    GenericRow row = GenericRow.of(
      BinaryString.fromString(metric.getDeviceId()),
      BinaryString.fromString(metric.getMetricsName()),
      metric.getMetricsValue(),
      BinaryString.fromString(metric.getSource()),
      convertToTimestamp(metric.getCreateTime()),
      BinaryString.fromString(metric.getCreatedBy()),
      BinaryString.fromString(metric.getState())
    );
    return row;
}

在此过程中,每个列值都被转换为对应的 Paimon 兼容数据类型,例如 BinaryStringTimestamp

在 insert 方法中创建 GenericRow 对象后,BatchTableWrite.write() 将每一行写入 metrics 表。最后,在循环外将数据作为一个批次提交。

让我们运行 PaimonTableDataManager.insert()metrics 表中插入一些记录:

$ java
void whenCallInsertRecords_thenRecordsInserted() throws Exception {
    assertTrue(catalog.listTables("metric_db").contains("metrics"));
    PaimonTableDataManager.insert(catalog, tableId, getMetrics());
}

首先,通过断言确保 metrics 表存在于目录中。参数 catalogtableId 分别保存了我们在创建 metrics 表时获取并存储的 Catalog 对象和表标识符。getMetrics() 方法可能是一个从监控工具获取指标数据的服务。在我们的示例中,我们从 CSV 文件 metrics.out 中检索合成数据:

$ bash
device_id,metric_name,create_time,metric_value,source
dev_101,cpu_usage,2026-04-21 18:57:01,72.5,agent
dev_102,cpu_usage,2026-04-21 18:57:02,65.1,agent
dev_103,memory_usage,2026-04-21 18:57:03,58.3,agent
...

然而,实际应用通常按固定时间间隔轮询监控工具以获取最新指标。最后,我们调用 insert 方法将新记录插入到 metrics 表中。

5. 从表中查询记录

收集原始指标后,需要从数据库中获取并处理以得出可操作的分析结果。API 支持过滤器下推和列投影,有助于提升性能。此外,API 同时支持批处理读取和流式数据,适用于分析和实时场景。

让我们使用 Paimon Java API 实现批处理读取操作:

$ java
public static List<Metric> fetchMetricsBySourceAndDateRange(Catalog catalog, Identifier tableId,
      String source, String startDate, String endDate) throws Exception {
    Table table = catalog.getTable(tableId);
    RowType rowType = table.rowType();
    PredicateBuilder predicateBuilder = new PredicateBuilder(rowType);
    int[] projection = new int[] {0, 1, 2, 3, 4};
    Predicate sourcePredicate = predicateBuilder.equal(3, BinaryString.fromString(source));
    Predicate dateRangePredicate = predicateBuilder.between(
      4, convertToTimestamp(startDate), convertToTimestamp(endDate)
    );
    Predicate predicate = PredicateBuilder.and(sourcePredicate, dateRangePredicate);
    ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate).withProjection(projection);
    List<Split> splits = readBuilder.newScan().plan().splits();
    TableRead read = readBuilder.newRead().executeFilter();
    RecordReader<InternalRow> reader = read.createReader(splits);
    List<Metric> results = new ArrayList<>();
    reader.forEachRemaining(internalRow -> {
        String deviceId = internalRow.getString(0).toString();
        String metricsName = internalRow.getString(1).toString();
        double metricsValue = internalRow.getDouble(2);
        String sourceValue = internalRow.getString(3).toString();
        Timestamp timestamp = internalRow.getTimestamp(4, 3);
        String createTime = ConvertTimestampToStr(timestamp); 
        Metric metric = new Metric(deviceId, metricsName, metricsValue, sourceValue, createTime, null);
        logger.info("Fetched Metric: {}", metric);
        results.add(metric);
    });
    return results;
}

fetchMetricsBySourceAndDateRange() 方法查询 metrics 表,以获取在指定日期范围内来自特定来源的指标数据。 在该方法中,PredicateBuilder 帮助创建带有对 sourcecreate_time 列的过滤条件的 Predicate 对象。Table#newReadBuilder() 创建一个 ReadBuilder 对象,随后 ReadBuilder#withFilter() 设置必要的谓词。此外,我们还定义了一个 projection 数组,仅获取所需列。

接着,构建器创建数据分片(splits),这些分片可以分配到多个线程或进程中,从而实现更快的执行。这些分片类似于对应表的分区或数据文件。然后,ReadBuilder#newRead() 创建一个 TableRead 对象,并在其上调用 executeFilter() 方法。之后,TableRead#createReader() 为生成的分片创建一个 RecordReader。多个读取器可以并发处理不同的分片,从而实现可扩展且高效的并行读取。最后,调用 RecordReader#forEachRemaining() 遍历行并获取指标属性。

现在,让我们运行程序,从特定来源和日期范围内获取指标:

$ java
void whenCallReadRecords_thenRecordsRead() throws Exception {
    List<Metric> metrics = PaimonTableDataManager.fetchMetricsBySourceAndDateRange(
      catalog, tableId, "collector", "2026-04-21 18:58:19", "2026-04-21 18:58:30");
    assertFalse(metrics.isEmpty());
    assertTrue(metrics.size() == 4);
}

程序获取了 4 条来源为 collector、创建时间介于 2026-04-21 18:58:192026-04-21 18:58:30 之间的指标行。

6. 更新表记录

Paimon 表没有专门的更新记录的 Java API。然而,当我们执行写入操作时,就像对具有主键的现有记录执行插入操作一样,不会插入新行,而是更新现有行。

为了理解更新操作,我们在 PaimonTableDataManager 类中定义了以下方法:

$ java
public static void updateMetricStateByDeviceIdMetricNameAndCreatedDate(Catalog catalog,
      Identifier tableId, String deviceId, String metricName, String newState, String createdDate)
    throws Exception {
    Metric metric = fetchMetricByDeviceIdMetricNameAndCreatedDate(
      catalog, tableId, deviceId, metricName, createdDate
    );
    Table table = catalog.getTable(tableId);
    BatchWriteBuilder builder = table.newBatchWriteBuilder();
    BatchTableWrite write = builder.newWrite();
    metric.setState(newState);
    write.write(createGenericRow(metric), 0);
    List<CommitMessage> messages = write.prepareCommit();
    BatchTableCommit commit = builder.newCommit();
    commit.commit(messages);
}

首先,我们通过调用 fetchMetricByDeviceIdMetricNameAndCreatedDate() 方法获取由主键(包含设备 ID、指标名称和创建日期)过滤的指标记录。该方法与上一节中从指标表查询记录的方法类似。

然后,我们将 Metric#state 属性更新为新的状态。其余步骤与前面描述的插入方法大致相同。然而,并非创建新记录,而是将现有指标的状态更新为新状态。

让我们执行 updateMetricStateByDeviceIdMetricNameAndCreatedDate()

$ java
void whenCallUpdateRecord_thenRecordUpdated() throws Exception {
    Metric metric = PaimonTableDataManager
      .fetchMetricByDeviceIdMetricNameAndCreatedDate(
        catalog, tableId, 
        "dev_137", "cpu_usage",
        "2026-04-21 18:58:27"
    );
    assertNotNull(metric);
    assertEquals("active", metric.getState());
    PaimonTableDataManager.updateMetricStateByDeviceIdMetricNameAndCreatedDate(
        catalog, tableId, "dev_137",
            "cpu_usage", "inactive", 
            "2026-04-21 18:58:27"
    );
    metric = PaimonTableDataManager.fetchMetricByDeviceIdMetricNameAndCreatedDate(
        catalog, tableId, "dev_137", 
        "cpu_usage", "2026-04-21 18:58:27"
    );
    assertNotNull(metric);
    assertEquals("inactive", metric.getState());
}

我们通过复合主键获取了一条指标记录,并将其状态更新为 inactive。随后,我们再次获取该记录,并确认状态已更新为 inactive。## 7. 删除表记录

接下来,Paimon Java API 也支持删除操作:

$ java
public static void deleteRecordsByDeviceIdMetricNameAndCreatedDate(Catalog catalog, 
    Identifier tableId, String deviceId, 
    String metricsName, String createdDate) throws Exception {
    Metric metric = PaimonTableTableManager
     .fetchMetricByDeviceIdMetricNameAndCreatedDate(
       catalog, tableId, deviceId, metricsName, createdDate
    );
    Table table = catalog.getTable(tableId);
    BatchWriteBuilder builder = table.newBatchWriteBuilder();
    BatchTableWrite write = builder.newWrite();
    GenericRow deleteRow = createGenericRow(metric);
    deleteRow.setRowKind(RowKind.DELETE);
    write.write(deleteRow, 0);
    List<CommitMessage> messages = write.prepareCommit();
    builder.newCommit().commit(messages);
}

首先,我们通过调用 PaimonTableTableManager.fetchMetricByDeviceIdMetricNameAndCreatedDate() 获取了一条指标记录。接下来的步骤与更新方法几乎相同,只是我们没有修改指标记录中的任何字段。随后,在调用 BatchTableWrite#write() 之前,我们通过调用 BatchTableWrite#setRowKind(RowKind.DELETE) 将操作标记为删除

现在,通过执行以下方法来验证 deleteRecordsByDeviceIdMetricNameAndCreatedDate() 方法:

$ java
void whenCallDeleteRecord_thenRecordDeleted() throws Exception {
    Metric metric = PaimonTableTableManager.fetchMetricByDeviceIdMetricNameAndCreatedDate(
      catalog, tableId, 
      "dev_136", "disk_io", "2026-04-21 18:58:26"
    );
    assertNotNull(metric);
    PaimonTableTableManager.deleteRecordsByDeviceIdMetricNameAndCreatedDate(
      catalog, tableId, 
      "dev_136", "disk_io", "2026-04-21 18:58:26"
    );
    metric = PaimonTableTableManager.fetchMetricByDeviceIdMetricNameAndCreatedDate(
      catalog, tableId, "dev_136", "disk_io", "2026-04-21 18:58:26"
    );
    assertTrue(metric == null);
}

首先,我们通过调用 PaimonTableTableManager.fetchMetricByDeviceIdMetricNameAndCreatedDate() 获取与设备对应的指标记录。确认记录不为 null 后,我们调用删除方法。最后,我们再次查询指标表,并确认 Paimon DB 中已不存在该记录。

8. 结论

在本文中,我们探讨了管理 Paimon 数据库的核心 Java API,包括创建表、执行批量插入、更新、删除以及执行带过滤条件的数据查询。我们使用本地文件系统创建了 Paimon DB,但无论选择何种存储层,这些 API 都是标准且有效的

像往常一样,源代码可以在 GitHub 上找到。