OpenAI Agents SDK #2:Runner 到底在跑什么?Agent Loop 全解析

从「调用 Runner.run() 之后发生了什么」切入,系统拆解 Agent Loop 的 while-true 执行机制、三种运行方式(run / run_sync / run_streamed)的选择逻辑、RunConfig 的四类配置用途,并通过完整代码示例逐一讲解 4 种对话管理策略和 6 种异常类型的处理方式。结尾附三条可立即落地的实践建议。

研究速览

上篇讲了 Agent 是什么、怎么配置。本篇是续集,聚焦 Runner 运行机制——那个真正让 Agent「动起来」的引擎。
你调用 Runner.run(agent, "帮我写首诗"),然后呢?从这行代码到最终输出,中间发生了什么?很多人用了很久 SDK 还是说不清。这篇把它拆开来看。

Agent Loop:一个你绕不开的 while True

Runner 的核心是一个循环1
  1. 调用当前 Agent 的 LLM
  2. 检查返回结果——
    • final_output(纯文本输出且无工具调用)→ 退出循环,返回结果
    • 有 handoff(要移交给另一个 Agent)→ 切换当前 Agent,重新跑循环
    • 有工具调用 → 执行工具,把结果拼回输入,继续循环
  3. 超过 max_turns 上限 → 抛出 MaxTurnsExceeded
就这三步,反复跑。听起来简单,但「一个 turn」的定义值得留意:一次 LLM 调用算一个 turn,包括该轮触发的所有工具调用1。一个设置 max_turns=10 的 Agent,可能实际跑了 10 次 LLM + N 次工具。
社区里有人用「WhileTrue 编排模式」来形容这个设计2——循环检查 max_turns、执行 _run_single_turn,根据 NextStepFinalOutputNextStepAction 决定下一步,超限触发 MaxTurnsExceeded。简洁强大。

三种运行方式,怎么选

SDK 提供三个入口1
方法返回适用场景
Runner.run()RunResult(异步)生产环境主力,FastAPI / asyncio 环境
Runner.run_sync()RunResult(同步)脚本、命令行工具、快速验证
Runner.run_streamed()RunResultStreaming(异步)需要逐字输出的聊天界面
最小化示例,6 行跑起来3
from agents import Agent, Runner

agent = Agent(name="Assistant", instructions="You are a helpful assistant")
result = await Runner.run(agent, "Write a haiku about recursion in programming.")
print(result.final_output)
run_sync 是对 run 的同步封装,内部直接调 .run()。官方文档明说了:如果当前环境已经有事件循环(Jupyter notebook、FastAPI、已运行的 asyncio context),直接用 run_sync 会报错,必须换成 await Runner.run()4。这个坑踩过的人不少。
掘金上的实践者也总结得很直接5:简单场景推荐 run_sync 入门,复杂场景(并行多 Agent、多工具、流式输出)推荐异步,以避免阻塞。
深蓝调代码编辑器,展示 Python 异步代码,与文章技术主题呼应
深蓝调代码编辑器,展示 Python 异步代码,与文章技术主题呼应

RunConfig:一个 run 的全局控制台

每次调用 Runner 时可以传一个 run_config,它能覆盖所有 Agent 的配置,优先级高于单个 Agent 的设置1
按用途分四类4
模型层model / model_provider / model_settings — 全局切换模型或参数,比如让所有 Agent 都用 temperature=0 跑确定性测试
护栏与 Handoff 层input_guardrails / output_guardrails / handoff_input_filter / nest_handoff_historynest_handoff_history 是 opt-in beta,开启后会在 handoff 前把对话历史折叠成一条 assistant 消息,减少下游 Agent 的 token 消耗
追踪层tracing_disabled / workflow_name / trace_id / group_id / trace_metadata — 建议至少设置 workflow_name,方便在 OpenAI 控制台里区分不同业务流程
输入过滤层call_model_input_filter — 在每次 LLM 调用前的最后一刻修改输入,典型用途是截断过长历史1
from agents import Agent, Runner, RunConfig
from agents.run import CallModelData, ModelInputData

def drop_old_messages(data: CallModelData[None]) -> ModelInputData:
    # 只保留最近 5 条消息
    trimmed = data.model_data.input[-5:]
    return ModelInputData(input=trimmed, instructions=data.model_data.instructions)

result = Runner.run_sync(
    agent,
    "Explain quines",
    run_config=RunConfig(call_model_input_filter=drop_old_messages),
)
还有一个不起眼但实际很有用的参数:reasoning_item_id_policy。设为 "omit" 可以解决多轮对话中 reasoning item 的 Responses API 400 报错(错误信息类似 Item 'rs_...' of type 'reasoning' was provided without its required following item4。这类错误平时很难排查,改一个参数就能规避。

对话管理:4 种策略,一次说清

多轮对话是 Agent 应用里最容易乱的地方。SDK 给了 4 种策略,每次选一种,混用会导致上下文重复1

策略 1:result.to_input_list() — 手动拼接

最透明、最灵活,你自己管历史:
agent = Agent(name="Assistant", instructions="Reply very concisely.")

result = await Runner.run(agent, "What city is the Golden Gate Bridge in?")
print(result.final_output)  # San Francisco

# 手动把上轮输出 + 新问题拼在一起
new_input = result.to_input_list() + [{"role": "user", "content": "What state is it in?"}]
result = await Runner.run(agent, new_input)
print(result.final_output)  # California
适合小型对话循环,或需要完全掌控历史内容的场景。

策略 2:session — 自动历史管理

把历史托管给 SDK,自动读取和写入6
from agents import Agent, Runner, SQLiteSession

agent = Agent(name="Assistant", instructions="Reply very concisely.")
session = SQLiteSession("conversation_123")

# 第一轮
result = await Runner.run(agent, "What city is the Golden Gate Bridge in?", session=session)
# 第二轮 — Agent 自动记住上下文
result = await Runner.run(agent, "What state is it in?", session=session)
SQLiteSession 支持 db_path 参数,把会话持久化到本地数据库,重启进程仍能恢复历史6。适合需要跨请求持久化的场景,比如带历史记忆的聊天机器人。

策略 3:conversation_id — 服务端命名对话

让 OpenAI 服务端管理对话,可以跨进程、跨服务共享同一个对话1
from openai import AsyncOpenAI

client = AsyncOpenAI()
conversation = await client.conversations.create()
conv_id = conversation.id

while True:
    user_input = input("You: ")
    result = await Runner.run(agent, user_input, conversation_id=conv_id)
    print(f"Assistant: {result.final_output}")
每次只传新的用户输入,历史由 OpenAI 服务端维护。仅推荐纯 OpenAI 模型场景使用,其他模型 provider 无法写入 Conversation 对象4

策略 4:previous_response_id — 轻量链式对话

最轻的服务端方案,不创建对话资源,只链接上一轮的 response ID7
agent = Agent(name="Assistant", instructions="Reply very concisely.")
previous_response_id = None

while True:
    user_input = input("You: ")
    result = await Runner.run(
        agent,
        user_input,
        previous_response_id=previous_response_id,
        auto_previous_response_id=True,
    )
    previous_response_id = result.last_response_id
    print(f"Assistant: {result.final_output}")
不用重传完整历史,token 消耗更低。注意:Responses API 的 response 有效期 30 天,生产环境建议同时存储过期时间7
conversation_idprevious_response_id 互斥1。用前者你拥有一个可命名、可跨系统共享的对话资源;用后者只是最简的「上一轮 → 这一轮」链接。session(客户端管理)和这两个服务端方案也不能在同一次 run 里混用。

流式输出:run_streamed 的双层事件

Runner.run_streamed() 返回 RunResultStreaming,调用 .stream_events() 得到事件流8
事件分两层:
低层:RawResponsesStreamEvent — LLM 原始输出,逐 token 流,event.type == "raw_response_event",适合需要「第一个字尽快出现」的场景:
from openai.types.responses import ResponseTextDeltaEvent

result = Runner.run_streamed(agent, input="Tell me 5 jokes.")
async for event in result.stream_events():
    if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
        print(event.data.delta, end="", flush=True)
高层:RunItemStreamEventAgentUpdatedStreamEvent — 语义级别的事件,一条工具调用、一条消息输出、一次 Agent 切换各自触发一个事件8
from agents import Agent, ItemHelpers, Runner, function_tool

async for event in result.stream_events():
    if event.type == "raw_response_event":
        continue  # 忽略低层 token 事件
    elif event.type == "agent_updated_stream_event":
        print(f"切换到 Agent: {event.new_agent.name}")
    elif event.type == "run_item_stream_event":
        if event.item.type == "tool_call_item":
            print("-- 工具被调用")
        elif event.item.type == "tool_call_output_item":
            print(f"-- 工具输出: {event.item.output}")
        elif event.item.type == "message_output_item":
            print(f"-- 消息: {ItemHelpers.text_message_output(event.item)}")
一个容易踩的坑:流迭代结束 ≠ 运行完成8stream_events() 的 async iterator 跑完后,SDK 可能还在做 session 持久化、审批状态写入、历史压缩等后处理。只有 result.is_completeTrue 时才算真正结束。
需要中途停止?result.cancel() 立即停,result.cancel(mode="after_turn") 等当前 turn 完成后再停8
知乎上有开发者总结了流式输出最直接的场景9:通过检查 event.type == "raw_response_event"event.data.delta 实现实时逐字显示,是 Web 聊天界面「打字机效果」最简单的做法。

6 种异常,分清楚才能优雅处理

SDK 的异常体系1
正在加载统计卡片…
MaxTurnsExceeded 支持自定义 error handler,返回一个优雅的 fallback 而不是直接抛错1
from agents import Agent, RunErrorHandlerInput, RunErrorHandlerResult, Runner

def on_max_turns(_data: RunErrorHandlerInput[None]) -> RunErrorHandlerResult:
    return RunErrorHandlerResult(
        final_output="任务太复杂了,请把请求拆得更细一些。",
        include_in_history=False,  # 不把这条 fallback 加入对话历史
    )

result = Runner.run_sync(
    agent,
    "分析这份超长报告",
    max_turns=3,
    error_handlers={"max_turns": on_max_turns},
)
print(result.final_output)
护栏异常(InputGuardrailTripwireTriggered / OutputGuardrailTripwireTriggered)建议用 try/except 捕获,做降级处理10
UserError 是你写错了代码——比如 call_model_input_filter 返回了非 ModelInputData 类型,或者配置了互斥的参数组合。看到这个异常先查代码,不是 SDK bug。

三条落地建议

1. 先用 run_sync 验证逻辑,再改成异步
脚本和快速 Demo 阶段 run_sync 简单直接,没有 async/await 的心智负担。等逻辑稳定、需要处理并发或流式输出时,再切换到 Runner.run()5
2. RunConfig 是统一管多 Agent 行为的最好入口
如果你的 workflow 里有多个 Agent,与其给每个 Agent 分别设置 model_settings,不如在 RunConfig 里统一设置一次。call_model_input_filter 截断历史也在这里做——比在每个 Agent 里加逻辑简洁得多11
3. 对话管理策略在项目开始就想清楚,别等到重构
to_input_list 最灵活但你要自己管;SQLiteSession 适合单机持久化;conversation_id 适合分布式多服务共享;previous_response_id 轻量但有 30 天有效期6。这四种策略改起来会牵动整个对话逻辑——早决定,省事。

下篇会继续往下走,聊 Tools 工具系统——@function_tool 怎么工作、Pydantic 验证、工具超时处理,以及 built-in tools(网页搜索、文件搜索、计算机操作)的接入方式。

围绕这条内容继续补充观点或上下文。

  • 登录后可发表评论。