跳转至

A2A 中的流式传输和异步操作

Agent2Agent (A2A) 协议设计用于处理可能无法立即完成的任务。许多 AI 驱动的操作可能需要长时间运行、涉及多个步骤、产生增量结果或需要人工干预。A2A 提供了强大的机制来管理这种异步交互,确保客户端能够有效地接收更新,无论它们是保持持续连接还是以离线的方式运行。

1. 使用服务器发送事件(SSE)进行流式传输

对于产生增量结果(如生成长文档或流媒体)或提供持续状态更新的任务,A2A 支持使用服务器发送事件(SSE)进行实时通信。这特别适用于客户端能够与 A2A Server 保持活动 HTTP 连接的场景。

主要特点:

  • 初始化:客户端使用 message/stream RPC 方法发送初始消息(例如,提示或命令),同时订阅该任务的更新。
  • 服务器能力:A2A Server 必须在其 Agent Card 中通过设置 capabilities.streaming: true 来表明其支持流式传输。
  • 服务器响应(连接):如果订阅成功,服务器会以 HTTP 200 OK 状态和 Content-Type: text/event-stream 响应。这个 HTTP 连接保持开放,以便服务器推送事件。
  • 事件结构:服务器通过这个流发送事件。每个事件的 data 字段包含一个 JSON-RPC 2.0 响应对象,具体是 SendStreamingMessageResponse。这个 JSON-RPC 响应中的 id 与客户端原始 message/stream 请求中的 id 相匹配。
  • 事件类型(在 SendStreamingMessageResponse.result 中):
    • Task:表示 A2A Server 为 A2A 客户端处理的有状态工作单元。
    • TaskStatusUpdateEvent:传达任务生命周期状态的变化(例如,从 working 到 input-required 或 completed)。它还可以提供来自 Agent 的中间消息(例如,"我正在分析数据...")。
    • TaskArtifactUpdateEvent:传递任务生成的新或更新的 Artifacts。这用于分块流式传输大文件或数据结构。这个对象本身包含 appendlastChunk 等字段,以帮助客户端重新组装完整的 artifact。
  • 流终止:服务器通过在 TaskStatusUpdateEvent 中设置 final: true 来发出特定交互周期(即当前 message/stream 请求)更新结束的信号。这通常发生在任务达到终止状态(completed、failed、canceled)或需要输入状态(服务器期望从客户端获得进一步输入)时。在发送 final: true 事件后,服务器通常会关闭该特定请求的 SSE 连接。
  • 重新订阅:如果客户端的 SSE 连接在任务仍然活动时过早断开(且服务器尚未为该阶段发送 final: true 事件),客户端可以尝试使用 tasks/resubscribe RPC 方法重新连接到流。服务器在断开连接期间丢失事件的行为(例如,是否回填或仅发送新更新)取决于具体实现。

何时使用流式传输:

  • 实时监控长时间运行任务的进度。
  • 增量接收大型结果(artifacts),允许在整个结果可用之前就开始处理。
  • 交互式对话交流,其中即时反馈或部分响应是适用的。
  • 需要代理提供低延迟更新的应用程序。

有关详细结构,请参考协议规范:

2. 断开连接场景下的推送通知

对于非常长时间运行的任务(例如,持续数分钟、数小时甚至数天)或当客户端无法或不愿维持持久连接(如移动客户端或无服务器函数)时,A2A 支持通过推送通知进行异步更新。这种机制允许 A2A Server 在发生重要任务更新时,主动通知客户端提供的 webhook。

主要特点:

  • 服务器能力:A2A Server 必须在其 Agent Card 中通过设置 capabilities.pushNotifications: true 来表明其支持此功能。
  • 配置:客户端向服务器提供 PushNotificationConfig

此配置可以通过以下方式提供:

  • 在初始的 message/sendmessage/stream 请求中(通过 TaskSendParams 中的可选 pushNotification 参数)。
  • 单独使用 tasks/pushNotificationConfig/set RPC 方法为现有任务设置。

PushNotificationConfig 包括:

  • url:A2A Server 应该发送(POST)任务更新通知的绝对 HTTPS webhook URL。
  • token(可选):客户端生成的不透明字符串(例如,密钥或任务特定标识符)。服务器应该在通知请求中包含此令牌(例如,在自定义头部如 X-A2A-Notification-Token 中),以供客户端的 webhook 接收器验证。
  • authentication(可选):指定 A2A Server 应该如何向客户端的 webhook URL 进行身份验证的 AuthenticationInfo 对象。客户端(webhook 的接收者)定义这些身份验证要求。

  • 通知触发:A2A Server 决定何时发送推送通知。通常,这发生在任务达到重要状态变化时,例如转换到终止状态(completed、failed、canceled、rejected)或需要输入或需要认证的状态,特别是在其关联的消息和 artifacts 完全生成并稳定之后。

  • 通知负载:A2A 协议本身并不严格定义服务器发送给客户端 webhook 的推送通知的 HTTP 主体负载。但是,通知应该包含足够的信息,使客户端能够识别任务 ID 并理解更新的基本性质(例如,新的 TaskState)。服务器可能发送最小负载(仅包含任务 ID 和新状态)或更全面的负载(例如,摘要或完整的 Task 对象)。

  • 客户端操作:在收到推送通知(并成功验证其真实性和相关性)后,客户端通常使用 tasks/get RPC 方法,通过通知中的任务 ID 获取完整的、更新后的 Task 对象,包括任何新的 artifacts 或详细消息。

推送通知服务(客户端 Webhook 基础设施):

PushNotificationConfig.url 中指定的目标 URL 指向一个推送通知服务。该服务是客户端端(或客户端订阅的服务)的一个组件,负责接收来自 A2A Server 的 HTTP POST 通知。

其职责包括:

  • 对传入的通知进行身份验证(即验证其来自合法的 A2A Server)。
  • 验证通知的相关性(例如,检查令牌)。
  • 将通知或其内容中继到适当的客户端应用程序逻辑或系统。

在简单场景中(例如,本地开发),客户端应用程序本身可能直接暴露 webhook 端点。

在企业或生产环境中,这通常是一个健壮、安全的服务,用于处理传入的 webhook、验证调用者身份并路由消息(例如,到消息队列、内部 API、移动推送通知网关或其他事件驱动系统)。

推送通知的安全考虑

由于推送通知的异步性和服务器发起的出站特性,安全性至关重要。A2A Server(发送通知方)和客户端的 webhook 接收器都有相应的责任。

A2A服务器安全性(向客户端Webhook发送通知时)

1、Webhook URL 验证:

  • 服务器不应该盲目信任并发送 POST 请求到客户端在 PushNotificationConfig 中提供的任何 URL。恶意客户端可能提供指向内部服务或无关第三方系统的 URL,以造成损害(服务器端请求伪造 - SSRF 攻击)或作为分布式拒绝服务(DDoS)放大器。

    缓解策略:

    • 允许列表:如果可行,维护一个受信任域名或 IP 范围的允许列表,用于 webhook URL。
    • 所有权验证/质询-响应机制:在发送实际通知之前,服务器可以(理想情况下应该)执行验证步骤。例如,它可以向 webhook URL 发出带有唯一 validationToken(作为查询参数或头部)的 HTTP GET 或 OPTIONS 请求。webhook 服务必须适当响应(例如,回显令牌或确认就绪)以证明所有权和可达性。
    • 网络控制:使用出口防火墙或网络策略来限制 A2A Server 可以发送出站 HTTP 请求的位置。

2、向客户端的 Webhook 进行身份验证:

A2A Server 必须按照 PushNotificationConfig.authentication 中指定的方案向客户端的 webhook URL 进行身份验证。

服务器到服务器 webhook 的常见身份验证方案包括:

  • Bearer 令牌(OAuth 2.0):A2A Server 获取一个访问令牌(例如,如果 webhook 提供者支持,使用 OAuth 2.0 客户端凭证授权流程)用于代表客户端 webhook 的受众/范围,并将其包含在通知 POST 请求的 Authorization: Bearer <token> 头部中。
  • API 密钥:A2A Server 在特定 HTTP 头部(例如,X-Api-Key)中包含的预共享 API 密钥。
  • HMAC 签名:A2A Server 使用共享密钥通过 HMAC 对请求负载(或请求的部分)进行签名,并将签名包含在头部中(例如,X-Hub-Signature)。webhook 接收器随后验证此签名。
  • 双向 TLS(mTLS):如果客户端的 webhook 基础设施支持,A2A Server 可以出示客户端 TLS 证书。

客户端Webhook接收器安全性(接收来自A2A服务器的通知时)

1、验证 A2A Server:

  • webhook 端点必须严格验证传入通知请求的真实性,以确保它们来自合法的 A2A Server 而不是冒充者。

    验证签名/令牌:

    • 如果使用 JWT(例如,作为 Bearer 令牌),根据 A2A Server 的可信公钥验证 JWT 的签名(例如,如果适用,从 A2A Server 提供的 JWKS 端点获取)。同时,验证声明如 iss(签发者)、aud(受众 - 应该标识你的 webhook)、iat(签发时间)和 exp(过期时间)。
    • 如果使用 HMAC 签名,使用共享密钥重新计算接收到的负载的签名,并将其与请求头部中的签名进行比较。
    • 如果使用 API 密钥,确保密钥有效且已知。
  • 验证 PushNotificationConfig.token:如果客户端在设置任务通知时在其 PushNotificationConfig 中提供了不透明令牌,webhook 应该检查传入的通知是否包含这个确切的令牌(例如,在自定义头部如 X-A2A-Notification-Token 中)。这有助于确保通知是针对这个特定的客户端上下文和任务的,增加了授权层。

2、防止重放攻击:

  • 时间戳:通知应该理想地包含时间戳(例如,JWT 中的 iat - 签发时间 - 声明,或自定义时间戳头部)。webhook 应该拒绝太旧的通知(例如,超过几分钟的),以防止攻击者重放旧的、被捕获的通知。时间戳应该是签名负载的一部分(如果使用签名),以确保其完整性。

  • 随机数/唯一 ID:对于关键通知,考虑为每个通知使用唯一的、一次性标识符(随机数或事件 ID)。webhook 应该跟踪接收到的 ID(在合理的时间窗口内),以防止处理重复的通知。JWT 的 jti(JWT ID)声明可以用于此目的。

3、安全的密钥管理和轮换:

  • 如果使用加密密钥(用于 HMAC 的对称密钥,或用于 JWT 签名/mTLS 的非对称密钥对),实施安全的密钥管理实践,包括定期密钥轮换。

  • 对于 A2A Server 签名而客户端 webhook 验证的非对称密钥,像 JWKS(JSON Web Key Set)这样的协议允许服务器在众所周知的端点发布其公钥(包括轮换期间的新密钥)。客户端 webhook 然后可以动态获取正确的公钥进行签名验证,促进更顺畅的密钥轮换。

非对称密钥流程示例(JWT + JWKS)
  • 客户端设置 PushNotificationConfig,指定 authentication.schemes: ["Bearer"],可能还包括 JWT 的预期签发者或受众。
  • A2A Server 在发送通知时:
    • 生成 JWT,使用其私钥签名。JWT 包含声明如 iss(签发者)、aud(受众 - webhook)、iat(签发时间)、exp(过期时间)、jti(JWT ID)和 taskId
    • JWT 头部(algkid)指示签名算法和密钥 ID。
    • A2A Server 通过 JWKS 端点(webhook 提供者可能知道或发现此端点的 URL)提供其公钥。
  • 客户端 Webhook 在接收通知时:
    • Authorization 头部提取 JWT。
    • 检查 JWT 头部中的 kid
    • 从 A2A Server 的 JWKS 端点获取相应的公钥(建议缓存密钥)。
    • 使用公钥验证 JWT 签名。
    • 验证声明(issaudiatexpjti)。
    • 如果提供了 PushNotificationConfig.token,则检查它。

这种全面的、分层的方法来确保推送通知的安全性,确保消息是真实的、完整的和及时的,保护了发送方 A2A Server 和接收方客户端 webhook 基础设施。