7. 流式与多轮交互(LangGraph 示例)¶
Helloworld 示例演示了 A2A 的基本机制。对于更高级的特性,如强大的流式处理、任务状态管理和由 LLM 驱动的多轮对话,我们将转向位于 a2a-samples/samples/python/agents/langgraph/ 的 LangGraph 示例。
该示例包含一个“Currency Agent”,它通过 LangChain 和 LangGraph 使用 Gemini 模型来回答货币兑换问题。
设置 LangGraph 示例¶
-
创建一个 Gemini API Key,如果还没有的话。
-
环境变量:
在
a2a-samples/samples/python/agents/langgraph/目录下创建一个.env文件:将
YOUR_API_KEY_HERE替换为你实际的 Gemini API key。 -
安装依赖(如果尚未安装):
langgraph示例有自己的pyproject.toml,其中包含了如langchain-google-genai和langgraph等依赖。当你在a2a-samples根目录用pip install -e .[dev]安装 SDK 时,也会安装工作区示例所需的依赖,包括langgraph-example。如果遇到导入错误,请确保你已在根目录成功安装了主 SDK。
运行 LangGraph 服务器¶
在终端中导航到 a2a-samples/samples/python/agents/langgraph/app 目录,并确保已激活(SDK 根目录下的)虚拟环境。
启动 LangGraph agent 服务器:
这将启动服务器,通常在 http://localhost:10000。
与 LangGraph Agent 交互¶
打开一个新的终端窗口,激活你的虚拟环境,并导航到 a2a-samples/samples/python/agents/langgraph/app。
运行其测试客户端:
现在,你可以在运行 __main__.py 的终端窗口中输入 Ctrl+C 来关闭服务器。
关键特性演示¶
langgraph 示例展示了几个重要的 A2A 概念:
-
LLM 集成:
agent.py定义了CurrencyAgent。它使用ChatGoogleGenerativeAI和 LangGraph 的create_react_agent处理用户查询。- 这演示了如何用真实的 LLM 驱动 agent 逻辑。
-
任务状态管理:
-
samples/langgraph/__main__.py使用InMemoryTaskStore初始化了DefaultRequestHandler。httpx_client = httpx.AsyncClient() request_handler = DefaultRequestHandler( agent_executor=CurrencyAgentExecutor(), task_store=InMemoryTaskStore(), push_notifier=InMemoryPushNotifier(httpx_client), ) server = A2AStarletteApplication( agent_card=agent_card, http_handler=request_handler ) uvicorn.run(server.build(), host=host, port=port) -
CurrencyAgentExecutor(在samples/langgraph/agent_executor.py中),当其execute方法被DefaultRequestHandler调用时,会与包含当前任务(如果有)的RequestContext交互。 - 对于
message/send,DefaultRequestHandler使用TaskStore在多次交互中持久化和检索任务状态。如果 agent 的执行流程涉及多步或产生持久任务,则message/send的响应会是一个完整的Task对象。 test_client.py的run_single_turn_test演示了如何获取Task对象并用get_task查询。
-
-
流式处理与
TaskStatusUpdateEvent和TaskArtifactUpdateEvent:CurrencyAgentExecutor的execute方法负责处理非流式和流式请求,由DefaultRequestHandler协调。- 当 LangGraph agent 处理请求(可能涉及调用如
get_exchange_rate的工具)时,CurrencyAgentExecutor会将不同类型的事件入队到EventQueue:TaskStatusUpdateEvent:用于中间状态更新(如“正在查找汇率...”,“正在处理汇率...”)。这些事件的final标志为False。TaskArtifactUpdateEvent:当最终答案准备好时,以 artifact 形式入队。lastChunk标志为True。- 一个最终的
TaskStatusUpdateEvent,其state=TaskState.completed且final=True,用于标记流式任务的结束。
test_client.py的run_streaming_test函数会在收到这些事件分片时逐个打印。
-
多轮对话(
TaskState.input_required):- 当用户的查询不明确时(如“how much is 100 USD?”),
CurrencyAgent可以请求澄清。 - 这时,
CurrencyAgentExecutor会入队一个TaskStatusUpdateEvent,其中status.state为TaskState.input_required,status.message包含 agent 的问题(如“你想兑换成哪种货币?”)。该事件对当前交互流的final为True。 test_client.py的run_multi_turn_test函数演示了这一过程:- 它发送一个不明确的初始查询。
- agent 响应(通过
DefaultRequestHandler处理入队事件)一个状态为input_required的Task。 - 客户端随后发送第二条消息,包含第一次
Task响应中的taskId和contextId,以补充缺失信息(如“in GBP”)。这样就继续了同一个任务。
- 当用户的查询不明确时(如“how much is 100 USD?”),
代码探索¶
花些时间浏览这些文件:
__main__.py:使用A2AStarletteApplication和DefaultRequestHandler进行服务器设置。注意AgentCard定义中包含capabilities.streaming=True。agent.py:包含 LangGraph、LLM 模型和工具定义的CurrencyAgent。agent_executor.py:实现了execute(和cancel)方法的CurrencyAgentExecutor。它使用RequestContext理解当前任务,并用EventQueue发送各种事件(TaskStatusUpdateEvent、TaskArtifactUpdateEvent,如果没有任务则通过第一个事件隐式返回新Task对象)。test_client.py:演示了多种交互模式,包括获取任务 ID 并用于多轮对话。
该示例更丰富地展示了 A2A 如何促进 agent 之间复杂、有状态和异步的交互。