流式处理
流式处理使你能够在代理运行过程中订阅其更新。这对于向终端用户展示进度更新和部分响应很有用。
要进行流式处理,你可以调用Runner.run_streamed()
,它会返回一个RunResultStreaming
。调用result.stream_events()
会返回一个StreamEvent
对象的异步流,下面将对其进行介绍。
原始响应事件
RawResponsesStreamEvent
是直接从大语言模型(LLM)传递的原始事件。它们采用OpenAI响应API格式,这意味着每个事件都有一个类型(如response.created
、response.output_text.delta
等)和数据。如果你想在响应消息生成后立即将其流式传输给用户,这些事件会很有用。
例如,下面的代码将逐个标记输出大语言模型生成的文本。
import asyncio
from openai.types.responses import ResponseTextDeltaEvent
from agents import Agent, Runner
async def main():
agent = Agent(
name="Joker",
instructions="You are a helpful assistant.",
)
result = Runner.run_streamed(agent, input="Please tell me 5 jokes.")
async for event in result.stream_events():
if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
print(event.data.delta, end="", flush=True)
if __name__ == "__main__":
asyncio.run(main())
运行项目事件和智能体事件
RunItemStreamEvent
是更高级别的事件。当一个项目完全生成时,它们会通知你。这使你能够在 “消息生成”、“工具运行” 等层面推送进度更新,而不是每个令牌都推送。同样,当当前智能体发生变化时(例如交接/切换的结果),AgentUpdatedStreamEvent
会为你提供更新。
例如,这将忽略原始事件并向用户流式传输更新。
import asyncio
import random
from agents import Agent, ItemHelpers, Runner, function_tool
@function_tool
def how_many_jokes() -> int:
return random.randint(1, 10)
async def main():
agent = Agent(
name="Joker",
instructions="First call the `how_many_jokes` tool, then tell that many jokes.",
tools=[how_many_jokes],
)
result = Runner.run_streamed(
agent,
input="Hello",
)
print("=== Run starting ===")
async for event in result.stream_events():
# We'll ignore the raw responses event deltas
if event.type == "raw_response_event":
continue
# When the agent updates, print that
elif event.type == "agent_updated_stream_event":
print(f"Agent updated: {event.new_agent.name}")
continue
# When items are generated, print them
elif event.type == "run_item_stream_event":
if event.item.type == "tool_call_item":
print("-- Tool was called")
elif event.item.type == "tool_call_output_item":
print(f"-- Tool output: {event.item.output}")
elif event.item.type == "message_output_item":
print(f"-- Message output:\n {ItemHelpers.text_message_output(event.item)}")
else:
pass # Ignore other event types
print("=== Run complete ===")
if __name__ == "__main__":
asyncio.run(main())