基于Spring AI和MongoDB Atlas构建AI运维助手(三):有状态工作流与人机协作
这是本系列的第三篇也是最后一篇文章。第一部分讲述了 RAG 基础——将操作手册加载到向量存储中,并将模型答案基于真实文档。第二部分增加了短期和长期对话记忆。本文介绍有状态工作流检查点、工具调用以及暂停/恢复机制,使多步骤调查能够在会话边界之后继续执行。
尚存的缺口
我们之前进行到哪一步了?在本教程第二部分的结尾,我们的助手已经能够维持由多轮交流组成的对话,并记住跨多个会话交换的信息。例如,运维人员可以就 CPU 峰值告警提出多个问题,助手会根据之前的回答不断累积,考虑用户的偏好或历史选择。实际上,在所有会话中,助手都会记得运维人员曾表示更喜欢使用 Helm Chart 进行回滚操作,以及支付服务运行在由 16 个 Pod 组成的 Kubernetes 基础设施上。
但我们仍然差一步才能让一切真正完整。假设我们的运维人员在调查过程中需要休息一下——毕竟,我们是人类。运维人员离开半个小时,回来后想继续调查。
短期记忆是存在的;它会在 MongoDB 中保存助手和运维人员之间的每一条消息交换记录,但我们无法正确表达任务暂停的概念。此时,助手无法理解我们在调查中的哪个环节,哪些决策仍然待定,哪些自动检查已经完成,哪些还有待执行。重建对话的唯一途径是通过历史记录,但这只有在用户明确要求查看时才能实现。
在类似活动中,这一缺口尤为突出,因为真实的事故很少在单个不间断的会话中解决。如果考虑真实的生产场景,事故很少局限在单个点上,其解决也很少涉及单个操作。此外,解决方案的影响通常涉及运维的多个领域(云工程师、安全专家、存储、网络等),且每个领域的专家都需要批准各自的操作。如果我们想构建一个能够管理这种复杂性的助手,就需要迈出一步。
我们在构建什么
在第三部分中,我们将引入一个检查点系统:每个正在进行的对话都将附加一个持久化文档,用于跟踪工作流的当前状态、调查的整体状态、对外部工具的每次调用,以及所有在中断后智能恢复调查所需的信息。
除此之外,我们还将增加另一个主动功能,即一个 Spring AI 工具,模型可以调用它来获取服务指标的实时列表。到目前为止,助手一直是被动的:它接收问题、读取操作手册并回答。有了这个新功能,助手能够分析服务的当前状态,并利用这些信息丰富其推理过程。
最后,我们将所有这些功能与暂停和恢复对话的能力结合起来:只需调用一次 API,助手就能将状态重新加载到上下文中,从对话中断的地方精确接续。
核心理念:外部化工作流状态
让我们快速回顾几个概念:LLM 本身是一个无状态对象。每次调用开始时都没有任何先前上下文。我们在第二部分中添加的记忆层为对话提供了连续性,但缺少一个关键要素:它没有向模型提供有关哪个任务正在执行的信息。
检查点模式通过在 MongoDB(模型外部)存储任务的状态来解决此问题。其基本思路是使用外部跟踪记录来重建调查状态,而不是通过回溯对话历史来重建。在每个步骤中,模型都能准确知道自己的位置,而无需重建此信息。
我们在分布式系统中看到了相同的原则,目的是使其具有弹性:系统不能依赖自己的内部内存,因为内部内存无法在重启操作后存活。状态被外部化到一个持久化对象中,这样任何需要知道工作确切状态的进程都可以从该点获取。
检查点文档
让我们尝试勾勒并结构化我们的检查点文档。checkpoints 集合的一种可能结构如下:
conversationId:将检查点与对话和短期记忆关联taskId:当前正在执行的操作任务的标识符workflowName:与工作流关联的可读标签(例如incident-investigation)currentStep:描述工作流当前阶段的字符串(例如INIT、PROCESSING、WAITING_APPROVAL)status:表示状态的枚举(例如RUNNING、WAITING_APPROVAL、COMPLETED、FAILED)stateData:键值对映射<String, Object>,允许在对话过程中收集数据和事实,例如模型的最新响应、时间戳以及诊断活动的任何中间结果toolExecutionRefs:指向ToolExecution文档的 ID 列表,用于跟踪调查期间所做的每次工具调用expiresAt:带有 TTL 索引的时间戳,用于清理不再有用的检查点。将此 TTL 设置为覆盖实际事故的生存时间(小时或天,而不是分钟),这样隔夜暂停的调查在运维人员恢复之前不会被删除。在生产环境中,通常只过期终态检查点(COMPLETED 或 FAILED),而不是活跃的检查点。
该文档设计为灵活使用 stateData 映射来包含支持状态所需的所有信息。
检查点生命周期
检查点生命周期完全由 CheckpointService 类管理。该类允许我们创建检查点、前进一个步骤、加载对话的最新检查点、添加外部工具操作的执行引用,最后将其标记为已完成或失败。
实现状态转换的关键方法是 updateStep:该方法将当前对话的最新检查点加载到内存中,通过设置名称和状态来设置新步骤,将 stateData 映射中的所有数据与当前映射合并,并增加 expiresAt 字段中的过期时间。每次更改都仅针对当前状态文档,该文档代表调查的当前状态。
现在让我们看看控制此模型的状态机是如何工作的。
[LOADING...]
当对话中收到第一条消息时创建检查点。ChatController 在处理每个请求之前调用 checkpointService.loadLatest(conversationId) 方法;如果该调用的结果为空,则会创建一个新的检查点,状态为 RUNNING,步骤为 INIT。后续所有消息都会找到现有检查点,将其前进到 PROCESSING,然后在模型响应后前进到 WAITING_APPROVAL。
WAITING_APPROVAL 是消息交换之间的状态:在每次模型响应后,助手将最新交换的消息写入 stateData,并将状态设置为 WAITING_APPROVAL。从检查点的角度来看,模型已产生响应,现在正在等待运维人员决定如何继续。
让模型具备观察系统的能力
我们在第一和第二部分共同构建的助手有一个主要限制:它纯粹是被动的。它当然能够检索信息并记住我们已经进行的对话,但无法与任何正在运行的系统交互。
在最后一部分中,我们将为助手添加第一个主动能力:ServiceStatusTool。显然,这只是一个示例,但可以用于进一步演进助手并根据业务需求扩展其能力。
Spring AI 允许我们使用 @Tool 注解标记一个方法,以便模型可以调用它。在我们的例子中,ChatClient 被配置为通过 defaultTools(serviceStatusTool) 方法调用 ServiceStatusTool:这样,模型现在知道当它需要特定服务的 CPU、内存、错误率和指标的实时信息时,可以调用 getServiceStatus(serviceName, environment) 方法。
这种交互的特别之处在于,模型会自行决定何时调用该工具:当运维人员询问支付服务的健康状态时,模型会调用该工具并实时检索数据,而无需猜测或假设。
出于显而易见的原因,在我们的教程示例中,返回的指标都是模拟的,显示服务降级值(CPU 87%,内存 62%,99 百分位延迟 1240 毫秒)。在实际应用中,这个模拟调用必须替换为对可观测性平台(如 Datadog、Dynatrace、New Relic 或任何其他使用的平台)的调用。但该模型的优势恰恰在于:整个链路保持不变,只需在边缘进行改动。
将上下文传播到工具方法中
使用 @Tool 注解的方法是普通的 Java 方法:它们无法访问 HTTP 调用、ChatClient 或应用程序堆栈中的任何其他内容。除了被专门提供给方法或加载到当前线程中的内容之外,什么都没有。
这是一个问题,因为工具必须能够访问当前对话的 ID,以便记录日志并写入一个审计记录,关联到正在进行的调查。简单的解决方案是向方法添加一个额外的参数,但这会将操作概念与审计概念混在一起。
更好的方法是采用不同的方式:我们将通过 ConversationContextHolder 类使用 JVM 的 ThreadLocal,这允许我们在需要插入与对话上下文相关的信息时与 ThreadLocal 进行交互。这是一种众所周知的模式,我们为了审计目的将上下文信息存储在 ThreadLocal 中。当我们需要添加、修改或删除信息时,我们直接与 ThreadLocal 交互,它随后作为整个调用流程中的传输机制。
需要注意的重要一点是,ThreadLocal 本质上是线程受限的:为某个请求设置的值只会保留在该请求的线程上。需要指出一个注意事项:由于服务器会重复使用线程池中的线程,因此请求完成后(例如在 finally 块中)必须清除该持有者,否则陈旧的对话 ID 可能会传递到重用该线程的下一个请求中。
使用 ToolExecution 进行审计追踪
每次调用 @Tool 注解的方法都会在 tool_executions 集合中写入一个 ToolExecution 文档,记录执行 ID、对话 ID、工具名称、模型提供的输入、返回给模型的响应、状态以及开始和结束时间戳。
此工作流包含两个关键部分:
- 首先,我们将
executionId追加到检查点文档中的toolExecutionRefs列表中,这样我们就能知道调用了哪些工具,如果需要更多信息,可以查询详细信息。 - 其次,我们在
tool_executions集合中创建一个文档,以跟踪工具调用期间执行的操作,确保审计性和对执行步骤的可见性。
所有这些都是为了尽可能了解助手执行的操作:我们不是要构建一个无懈可击的系统,而是要使系统在发生意外情况时可读且可理解。
暂停、检查、恢复
现在我们已经构建了基础,是时候将功能暴露给图形界面了。我们将通过在 ChatController 中暴露两个新端点来实现:
GET /api/ops/chat/{conversationId}/state返回指定对话的当前检查点。演示 UI 在每次聊天响应后调用此端点,以突出显示工作流的状态。POST /api/ops/chat/{conversationId}/resume是暂停/恢复机制的核心。当运维人员暂停并恢复调查,或者对话移交给第二个运维人员时,将触发此 API。控制器会加载最新的可用检查点,将其设置为RUNNING状态,并构造一个提示来启动恢复:此提示包含stateData映射中的上下文信息。这个结构化内容被作为用户消息注入到对ChatClient的新调用中。在收到提供恢复调查所需所有信息的响应后,检查点被设置回WAITING_APPROVAL状态,循环继续。
演示场景
现在是时候测试我们构建的内容了。像往常一样,启动应用程序并导航到 http://localhost:8080 的 UI。这次,尝试在聊天中输入:“支付服务 CPU 告警刚触发。我应该首先检查什么?” 助手会基于我们的操作手册回复诊断步骤。但在幕后,发生了很多事情:
- 创建了一个状态为
RUNNING的检查点。 - 模型调用了
getServiceStatus("payment-service", "prod")工具。 - 一个
ToolExecution文档被写入 MongoDB。 executionID被链接到检查点。- 检查点转换为
WAITING_APPROVAL状态,并将问题和答案保存在stateData映射中。
此时,刷新 Workflow State 面板将显示我们工作流的状态,它将显示为 WAITING_APPROVAL,步骤为 WAITING_APPROVAL,名称为 incident-investigation,并包含刚才所做的工具调用的记录。
现在让我们尝试恢复活动:关闭浏览器然后重新打开。在恢复流程中输入对话 ID(或者在更高级的模型中,从我们的列表中选择要恢复的活动),然后点击“Resume Task”按钮。
点击恢复后,助手会回复说我们正在调查支付服务的高 CPU 使用率,并推荐下一步操作,与到目前为止所做的保持一致。
这种架构所代表的可能性
如果我们回顾构建助手所做的所有工作,会发现一个共同点。在这个原型中,MongoDB 充当了统一的持久化层,管理着四个不同的方面:
- 结构化知识(
knowledge_chunks集合中的操作手册) - 对话会话历史(短期记忆)
- 长期个人知识
- 工作流状态(检查点和审计日志)
每个功能都有不同的访问模式,从向量相似性搜索到按对话 ID 进行键查找,所有这些都在同一个集群中,只需维护和管理一个基础设施。
这种统一建模提供了独特的优势,尤其是在未来开发方面。事实上,每次添加新功能时,我们都不需要添加新的基础设施组件,否则肯定会增加运维开销。将所有内容保持在一起不一定能降低复杂性,但可以让我们将其集中在一个地方。
我们使用 Spring AI 构建的顾问链也值得一提;这是一个有意为之的架构选择,允许我们再次在不触及助手入口和出口点的情况下添加功能。控制器完全不知道它在链中会遇到哪些顾问:它将始终调用 chatClient.prompt()...call() 方法,而链会负责按顺序插入一个顾问。
结论
我们已经来到了本系列文章的终点,该系列引导我们创建了一个能够使用真实文档回答问题、保持对话连贯性并记住所讨论和分析内容的助手。此外,通过我们刚才迈出的最后一步,助手还可以实时检索服务状态,即使在长时间中断后也能恢复该状态。
所有这一切都是可能的,而无需构建任何自定义集成来与特定的 LLM、向量存储或抽象内存管理框架对接。Spring AI 让我们通过抽象聊天、顾问、记忆、向量存储和 MCP 工具的概念来完成这一切,并统一使用 MongoDB 作为持久化层。这是一款专注于领域特定操作逻辑而非基础设施复杂性的 AI 应用程序。
最终结果是一个完整的系统,随时可以进行分叉和定制:插入真实的操作手册,修改监控系统的模拟以进行真实的 API 调用,并添加新的工具来集成助手可以执行的新特定操作。这种架构允许应用程序水平扩展,MongoDB Atlas 集群也是如此,因为工作流状态存在于 MongoDB 中,而不是任何单个实例的内存中。剩下的就是测试助手并等待下一次事故。
本文的代码可在以下仓库中找到。使用不同的操作手册和文档修改内容,观察其在不同用例中的行为。
本文《使用 Spring AI 和 MongoDB Atlas 构建 AI 驱动运维助手——第三部分:有状态工作流与人工干预》最初发布于 foojay。