跳转至

7. 流式与多轮交互(LangGraph 示例)

Helloworld 示例演示了 A2A 的基本机制。对于更高级的特性,如强大的流式处理、任务状态管理和由 LLM 驱动的多轮对话,我们将转向位于 a2a-samples/samples/python/agents/langgraph/ 的 LangGraph 示例。

该示例包含一个“Currency Agent”,它通过 LangChain 和 LangGraph 使用 Gemini 模型来回答货币兑换问题。

设置 LangGraph 示例

  1. 创建一个 Gemini API Key,如果还没有的话。

  2. 环境变量:

    a2a-samples/samples/python/agents/langgraph/ 目录下创建一个 .env 文件:

    echo "GOOGLE_API_KEY=YOUR_API_KEY_HERE" > .env
    

    YOUR_API_KEY_HERE 替换为你实际的 Gemini API key。

  3. 安装依赖(如果尚未安装):

    langgraph 示例有自己的 pyproject.toml,其中包含了如 langchain-google-genailanggraph 等依赖。当你在 a2a-samples 根目录用 pip install -e .[dev] 安装 SDK 时,也会安装工作区示例所需的依赖,包括 langgraph-example。如果遇到导入错误,请确保你已在根目录成功安装了主 SDK。

运行 LangGraph 服务器

在终端中导航到 a2a-samples/samples/python/agents/langgraph/app 目录,并确保已激活(SDK 根目录下的)虚拟环境。

启动 LangGraph agent 服务器:

python __main__.py

这将启动服务器,通常在 http://localhost:10000

与 LangGraph Agent 交互

打开一个新的终端窗口,激活你的虚拟环境,并导航到 a2a-samples/samples/python/agents/langgraph/app

运行其测试客户端:

python test_client.py

现在,你可以在运行 __main__.py 的终端窗口中输入 Ctrl+C 来关闭服务器。

关键特性演示

langgraph 示例展示了几个重要的 A2A 概念:

  1. LLM 集成

    • agent.py 定义了 CurrencyAgent。它使用 ChatGoogleGenerativeAI 和 LangGraph 的 create_react_agent 处理用户查询。
    • 这演示了如何用真实的 LLM 驱动 agent 逻辑。
  2. 任务状态管理

    • samples/langgraph/__main__.py 使用 InMemoryTaskStore 初始化了 DefaultRequestHandler

      httpx_client = httpx.AsyncClient()
      request_handler = DefaultRequestHandler(
          agent_executor=CurrencyAgentExecutor(),
          task_store=InMemoryTaskStore(),
          push_notifier=InMemoryPushNotifier(httpx_client),
      )
      server = A2AStarletteApplication(
          agent_card=agent_card, http_handler=request_handler
      )
      
      uvicorn.run(server.build(), host=host, port=port)
      
    • CurrencyAgentExecutor(在 samples/langgraph/agent_executor.py 中),当其 execute 方法被 DefaultRequestHandler 调用时,会与包含当前任务(如果有)的 RequestContext 交互。

    • 对于 message/sendDefaultRequestHandler 使用 TaskStore 在多次交互中持久化和检索任务状态。如果 agent 的执行流程涉及多步或产生持久任务,则 message/send 的响应会是一个完整的 Task 对象。
    • test_client.pyrun_single_turn_test 演示了如何获取 Task 对象并用 get_task 查询。
  3. 流式处理与 TaskStatusUpdateEventTaskArtifactUpdateEvent

    • CurrencyAgentExecutorexecute 方法负责处理非流式和流式请求,由 DefaultRequestHandler 协调。
    • 当 LangGraph agent 处理请求(可能涉及调用如 get_exchange_rate 的工具)时,CurrencyAgentExecutor 会将不同类型的事件入队到 EventQueue
      • TaskStatusUpdateEvent:用于中间状态更新(如“正在查找汇率...”,“正在处理汇率...”)。这些事件的 final 标志为 False
      • TaskArtifactUpdateEvent:当最终答案准备好时,以 artifact 形式入队。lastChunk 标志为 True
      • 一个最终的 TaskStatusUpdateEvent,其 state=TaskState.completedfinal=True,用于标记流式任务的结束。
    • test_client.pyrun_streaming_test 函数会在收到这些事件分片时逐个打印。
  4. 多轮对话(TaskState.input_required

    • 当用户的查询不明确时(如“how much is 100 USD?”),CurrencyAgent 可以请求澄清。
    • 这时,CurrencyAgentExecutor 会入队一个 TaskStatusUpdateEvent,其中 status.stateTaskState.input_requiredstatus.message 包含 agent 的问题(如“你想兑换成哪种货币?”)。该事件对当前交互流的 finalTrue
    • test_client.pyrun_multi_turn_test 函数演示了这一过程:
      • 它发送一个不明确的初始查询。
      • agent 响应(通过 DefaultRequestHandler 处理入队事件)一个状态为 input_requiredTask
      • 客户端随后发送第二条消息,包含第一次 Task 响应中的 taskIdcontextId,以补充缺失信息(如“in GBP”)。这样就继续了同一个任务。

代码探索

花些时间浏览这些文件:

  • __main__.py:使用 A2AStarletteApplicationDefaultRequestHandler 进行服务器设置。注意 AgentCard 定义中包含 capabilities.streaming=True
  • agent.py:包含 LangGraph、LLM 模型和工具定义的 CurrencyAgent
  • agent_executor.py:实现了 execute(和 cancel)方法的 CurrencyAgentExecutor。它使用 RequestContext 理解当前任务,并用 EventQueue 发送各种事件(TaskStatusUpdateEventTaskArtifactUpdateEvent,如果没有任务则通过第一个事件隐式返回新 Task 对象)。
  • test_client.py:演示了多种交互模式,包括获取任务 ID 并用于多轮对话。

该示例更丰富地展示了 A2A 如何促进 agent 之间复杂、有状态和异步的交互。