跳转至

Workflow

VoiceWorkflowBase

Bases: ABC

语音工作流的基类。你必须实现 run 方法。一个“工作流”可以是任何你想要的代码,它接收转录文本并生成将由文本转语音模型转换为语音的文本。 在大多数情况下,你会创建 Agent 并使用 Runner.run_streamed() 来运行它们,从流中返回部分或全部文本事件。你可以使用 VoiceWorkflowHelper 类来帮助从流中提取文本事件。 如果你有一个简单的工作流,只有一个起始 agent 并且没有自定义逻辑,可以直接使用 SingleAgentVoiceWorkflow

Source code in agents/voice/workflow.py
class VoiceWorkflowBase(abc.ABC):
    """
    语音工作流的基类。你必须实现 `run` 方法。一个“工作流”可以是任何你想要的代码,它接收转录文本并生成将由文本转语音模型转换为语音的文本。
    在大多数情况下,你会创建 `Agent` 并使用 `Runner.run_streamed()` 来运行它们,从流中返回部分或全部文本事件。你可以使用 `VoiceWorkflowHelper` 类来帮助从流中提取文本事件。
    如果你有一个简单的工作流,只有一个起始 agent 并且没有自定义逻辑,可以直接使用 `SingleAgentVoiceWorkflow`。
    """

    @abc.abstractmethod
    def run(self, transcription: str) -> AsyncIterator[str]:
        """
        运行语音工作流。你会收到一个输入转录文本,必须生成将要对用户说出的文本。你可以在这里运行任何你想要的逻辑。在大多数情况下,最终逻辑会涉及调用 `Runner.run_streamed()` 并从流中生成任何文本事件。
        """
        pass

run abstractmethod

run(transcription: str) -> AsyncIterator[str]

运行语音工作流。你会收到一个输入转录文本,必须生成将要对用户说出的文本。你可以在这里运行任何你想要的逻辑。在大多数情况下,最终逻辑会涉及调用 Runner.run_streamed() 并从流中生成任何文本事件。

Source code in agents/voice/workflow.py
@abc.abstractmethod
def run(self, transcription: str) -> AsyncIterator[str]:
    """
    运行语音工作流。你会收到一个输入转录文本,必须生成将要对用户说出的文本。你可以在这里运行任何你想要的逻辑。在大多数情况下,最终逻辑会涉及调用 `Runner.run_streamed()` 并从流中生成任何文本事件。
    """
    pass

VoiceWorkflowHelper

Source code in agents/voice/workflow.py
class VoiceWorkflowHelper:
    @classmethod
    async def stream_text_from(cls, result: RunResultStreaming) -> AsyncIterator[str]:
        """包装一个 `RunResultStreaming` 对象,并从流中生成文本事件。"""
        async for event in result.stream_events():
            if (
                event.type == "raw_response_event"
                and event.data.type == "response.output_text.delta"
            ):
                yield event.data.delta

stream_text_from async classmethod

stream_text_from(result: RunResultStreaming) -> AsyncIterator[str]

包装一个 RunResultStreaming 对象,并从流中生成文本事件。

Source code in agents/voice/workflow.py
@classmethod
async def stream_text_from(cls, result: RunResultStreaming) -> AsyncIterator[str]:
    """包装一个 `RunResultStreaming` 对象,并从流中生成文本事件。"""
    async for event in result.stream_events():
        if (
            event.type == "raw_response_event"
            and event.data.type == "response.output_text.delta"
        ):
            yield event.data.delta

SingleAgentWorkflowCallbacks

Source code in agents/voice/workflow.py
class SingleAgentWorkflowCallbacks:
    def on_run(self, workflow: SingleAgentVoiceWorkflow, transcription: str) -> None:
        """当工作流运行时被调用。"""
        pass

on_run

on_run(workflow: SingleAgentVoiceWorkflow, transcription: str) -> None

当工作流运行时被调用。

Source code in agents/voice/workflow.py
def on_run(self, workflow: SingleAgentVoiceWorkflow, transcription: str) -> None:
    """当工作流运行时被调用。"""
    pass

SingleAgentVoiceWorkflow

Bases: VoiceWorkflowBase

一个简单的语音工作流,运行单个 agent。每个转录文本和结果都会被添加到输入历史中。 对于更复杂的工作流(例如多次 Runner 调用、自定义消息历史、自定义逻辑、自定义配置),请继承 VoiceWorkflowBase 并实现你自己的逻辑。

Source code in agents/voice/workflow.py
class SingleAgentVoiceWorkflow(VoiceWorkflowBase):
    """一个简单的语音工作流,运行单个 agent。每个转录文本和结果都会被添加到输入历史中。
    对于更复杂的工作流(例如多次 Runner 调用、自定义消息历史、自定义逻辑、自定义配置),请继承 `VoiceWorkflowBase` 并实现你自己的逻辑。
    """

    def __init__(self, agent: Agent[Any], callbacks: SingleAgentWorkflowCallbacks | None = None):
        """创建一个新的单 agent 语音工作流。

        参数:
            agent: 要运行的 agent。
            callbacks: 在工作流期间调用的可选回调。
        """
        self._input_history: list[TResponseInputItem] = []
        self._current_agent = agent
        self._callbacks = callbacks

    async def run(self, transcription: str) -> AsyncIterator[str]:
        if self._callbacks:
            self._callbacks.on_run(self, transcription)

        # Add the transcription to the input history
        self._input_history.append(
            {
                "role": "user",
                "content": transcription,
            }
        )

        # Run the agent
        result = Runner.run_streamed(self._current_agent, self._input_history)

        # Stream the text from the result
        async for chunk in VoiceWorkflowHelper.stream_text_from(result):
            yield chunk

        # Update the input history and current agent
        self._input_history = result.to_input_list()
        self._current_agent = result.last_agent

__init__

__init__(agent: Agent[Any], callbacks: SingleAgentWorkflowCallbacks | None = None)

创建一个新的单 agent 语音工作流。

参数

agent: 要运行的 agent。 callbacks: 在工作流期间调用的可选回调。

Source code in agents/voice/workflow.py
def __init__(self, agent: Agent[Any], callbacks: SingleAgentWorkflowCallbacks | None = None):
    """创建一个新的单 agent 语音工作流。

    参数:
        agent: 要运行的 agent。
        callbacks: 在工作流期间调用的可选回调。
    """
    self._input_history: list[TResponseInputItem] = []
    self._current_agent = agent
    self._callbacks = callbacks