跳转至

Processors

ConsoleSpanExporter

Bases: TracingExporter

将 traces 和 spans 打印到控制台。

Source code in src/agents/tracing/processors.py
class ConsoleSpanExporter(TracingExporter):
    """将 traces 和 spans 打印到控制台。"""

    def export(self, items: list[Trace | Span[Any]]) -> None:
        for item in items:
            if isinstance(item, Trace):
                print(f"[Exporter] Export trace_id={item.trace_id}, name={item.name}, ")
            else:
                print(f"[Exporter] Export span: {item.export()}")

BackendSpanExporter

Bases: TracingExporter

Source code in src/agents/tracing/processors.py
class BackendSpanExporter(TracingExporter):
    def __init__(
        self,
        api_key: str | None = None,
        organization: str | None = None,
        project: str | None = None,
        endpoint: str = "https://api.openai.com/v1/traces/ingest",
        max_retries: int = 3,
        base_delay: float = 1.0,
        max_delay: float = 30.0,
    ):
        """
        参数说明:
            api_key: 用于 "Authorization" 请求头的 API 密钥。如果未提供,默认为
                `os.environ["OPENAI_API_KEY"]`。
            organization: 要使用的 OpenAI 组织。如果未提供,默认为
                `os.environ["OPENAI_ORG_ID"]`。
            project: 要使用的 OpenAI 项目。如果未提供,默认为
                `os.environ["OPENAI_PROJECT_ID"]`。
            endpoint: traces/spans 上传的 HTTP 接口地址。
            max_retries: 失败时的最大重试次数。
            base_delay: 第一次退避的基础延迟(秒)。
            max_delay: 退避增长的最大延迟(秒)。
        """
        self._api_key = api_key
        self._organization = organization
        self._project = project
        self.endpoint = endpoint
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay

        # Keep a client open for connection pooling across multiple export calls
        self._client = httpx.Client(timeout=httpx.Timeout(timeout=60, connect=5.0))

    def set_api_key(self, api_key: str):
        """设置导出器使用的 OpenAI API 密钥。

        参数:
            api_key: 要使用的 OpenAI API 密钥。与 OpenAI Python 客户端使用的密钥相同。
        """
        # We're specifically setting the underlying cached property as well
        self._api_key = api_key
        self.api_key = api_key

    @cached_property
    def api_key(self):
        return self._api_key or os.environ.get("OPENAI_API_KEY")

    @cached_property
    def organization(self):
        return self._organization or os.environ.get("OPENAI_ORG_ID")

    @cached_property
    def project(self):
        return self._project or os.environ.get("OPENAI_PROJECT_ID")

    def export(self, items: list[Trace | Span[Any]]) -> None:
        if not items:
            return

        if not self.api_key:
            logger.warning("OPENAI_API_KEY is not set, skipping trace export")
            return

        data = [item.export() for item in items if item.export()]
        payload = {"data": data}

        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
            "OpenAI-Beta": "traces=v1",
        }

        if self.organization:
            headers["OpenAI-Organization"] = self.organization

        if self.project:
            headers["OpenAI-Project"] = self.project

        # Exponential backoff loop
        attempt = 0
        delay = self.base_delay
        while True:
            attempt += 1
            try:
                response = self._client.post(url=self.endpoint, headers=headers, json=payload)

                # If the response is successful, break out of the loop
                if response.status_code < 300:
                    logger.debug(f"Exported {len(items)} items")
                    return

                # If the response is a client error (4xx), we wont retry
                if 400 <= response.status_code < 500:
                    logger.error(
                        f"[non-fatal] Tracing client error {response.status_code}: {response.text}"
                    )
                    return

                # For 5xx or other unexpected codes, treat it as transient and retry
                logger.warning(
                    f"[non-fatal] Tracing: server error {response.status_code}, retrying."
                )
            except httpx.RequestError as exc:
                # Network or other I/O error, we'll retry
                logger.warning(f"[non-fatal] Tracing: request failed: {exc}")

            # If we reach here, we need to retry or give up
            if attempt >= self.max_retries:
                logger.error("[non-fatal] Tracing: max retries reached, giving up on this batch.")
                return

            # Exponential backoff + jitter
            sleep_time = delay + random.uniform(0, 0.1 * delay)  # 10% jitter
            time.sleep(sleep_time)
            delay = min(delay * 2, self.max_delay)

    def close(self):
        """Close the underlying HTTP client."""
        self._client.close()

__init__

__init__(
    api_key: str | None = None,
    organization: str | None = None,
    project: str | None = None,
    endpoint: str = "https://api.openai.com/v1/traces/ingest",
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 30.0,
)
参数说明

api_key: 用于 "Authorization" 请求头的 API 密钥。如果未提供,默认为 os.environ["OPENAI_API_KEY"]。 organization: 要使用的 OpenAI 组织。如果未提供,默认为 os.environ["OPENAI_ORG_ID"]。 project: 要使用的 OpenAI 项目。如果未提供,默认为 os.environ["OPENAI_PROJECT_ID"]。 endpoint: traces/spans 上传的 HTTP 接口地址。 max_retries: 失败时的最大重试次数。 base_delay: 第一次退避的基础延迟(秒)。 max_delay: 退避增长的最大延迟(秒)。

Source code in src/agents/tracing/processors.py
def __init__(
    self,
    api_key: str | None = None,
    organization: str | None = None,
    project: str | None = None,
    endpoint: str = "https://api.openai.com/v1/traces/ingest",
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 30.0,
):
    """
    参数说明:
        api_key: 用于 "Authorization" 请求头的 API 密钥。如果未提供,默认为
            `os.environ["OPENAI_API_KEY"]`。
        organization: 要使用的 OpenAI 组织。如果未提供,默认为
            `os.environ["OPENAI_ORG_ID"]`。
        project: 要使用的 OpenAI 项目。如果未提供,默认为
            `os.environ["OPENAI_PROJECT_ID"]`。
        endpoint: traces/spans 上传的 HTTP 接口地址。
        max_retries: 失败时的最大重试次数。
        base_delay: 第一次退避的基础延迟(秒)。
        max_delay: 退避增长的最大延迟(秒)。
    """
    self._api_key = api_key
    self._organization = organization
    self._project = project
    self.endpoint = endpoint
    self.max_retries = max_retries
    self.base_delay = base_delay
    self.max_delay = max_delay

    # Keep a client open for connection pooling across multiple export calls
    self._client = httpx.Client(timeout=httpx.Timeout(timeout=60, connect=5.0))

set_api_key

set_api_key(api_key: str)

设置导出器使用的 OpenAI API 密钥。

参数

api_key: 要使用的 OpenAI API 密钥。与 OpenAI Python 客户端使用的密钥相同。

Source code in src/agents/tracing/processors.py
def set_api_key(self, api_key: str):
    """设置导出器使用的 OpenAI API 密钥。

    参数:
        api_key: 要使用的 OpenAI API 密钥。与 OpenAI Python 客户端使用的密钥相同。
    """
    # We're specifically setting the underlying cached property as well
    self._api_key = api_key
    self.api_key = api_key

close

close()

Close the underlying HTTP client.

Source code in src/agents/tracing/processors.py
def close(self):
    """Close the underlying HTTP client."""
    self._client.close()

BatchTraceProcessor

Bases: TracingProcessor

一些实现说明: 1. 使用 Queue,线程安全。 2. 使用后台线程导出 span,以最小化性能影响。 3. span 会存储在内存中,直到被导出。

Source code in src/agents/tracing/processors.py
class BatchTraceProcessor(TracingProcessor):
    """一些实现说明:
    1. 使用 Queue,线程安全。
    2. 使用后台线程导出 span,以最小化性能影响。
    3. span 会存储在内存中,直到被导出。
    """

    def __init__(
        self,
        exporter: TracingExporter,
        max_queue_size: int = 8192,
        max_batch_size: int = 128,
        schedule_delay: float = 5.0,
        export_trigger_ratio: float = 0.7,
    ):
        """
        参数:
            exporter: 要使用的导出器。
            max_queue_size: 队列中可存储的最大 span 数量。超过该数量后,将开始丢弃 span。
            max_batch_size: 单次批量导出的最大 span 数量。
            schedule_delay: 检查是否有新 span 需要导出的间隔时间。
            export_trigger_ratio: 队列达到该比例时会触发导出操作。
        """
        self._exporter = exporter
        self._queue: queue.Queue[Trace | Span[Any]] = queue.Queue(maxsize=max_queue_size)
        self._max_queue_size = max_queue_size
        self._max_batch_size = max_batch_size
        self._schedule_delay = schedule_delay
        self._shutdown_event = threading.Event()

        # The queue size threshold at which we export immediately.
        self._export_trigger_size = int(max_queue_size * export_trigger_ratio)

        # Track when we next *must* perform a scheduled export
        self._next_export_time = time.time() + self._schedule_delay

        self._worker_thread = threading.Thread(target=self._run, daemon=True)
        self._worker_thread.start()

    def on_trace_start(self, trace: Trace) -> None:
        try:
            self._queue.put_nowait(trace)
        except queue.Full:
            logger.warning("Queue is full, dropping trace.")

    def on_trace_end(self, trace: Trace) -> None:
        # We send traces via on_trace_start, so we don't need to do anything here.
        pass

    def on_span_start(self, span: Span[Any]) -> None:
        # We send spans via on_span_end, so we don't need to do anything here.
        pass

    def on_span_end(self, span: Span[Any]) -> None:
        try:
            self._queue.put_nowait(span)
        except queue.Full:
            logger.warning("Queue is full, dropping span.")

    def shutdown(self, timeout: float | None = None):
        """
        当应用程序停止时调用。我们通知工作线程停止,然后等待其结束。
        """
        self._shutdown_event.set()
        self._worker_thread.join(timeout=timeout)

    def force_flush(self):
        """
        立即强制刷新所有排队的 span。
        """
        self._export_batches(force=True)

    def _run(self):
        while not self._shutdown_event.is_set():
            current_time = time.time()
            queue_size = self._queue.qsize()

            # If it's time for a scheduled flush or queue is above the trigger threshold
            if current_time >= self._next_export_time or queue_size >= self._export_trigger_size:
                self._export_batches(force=False)
                # Reset the next scheduled flush time
                self._next_export_time = time.time() + self._schedule_delay
            else:
                # Sleep a short interval so we don't busy-wait.
                time.sleep(0.2)

        # Final drain after shutdown
        self._export_batches(force=True)

    def _export_batches(self, force: bool = False):
        """清空队列并分批导出。如果 force=True,则导出所有内容。
        否则,每次最多导出 `max_batch_size`,直到队列为空或低于某个阈值为止。
        """
        while True:
            items_to_export: list[Span[Any] | Trace] = []

            # Gather a batch of spans up to max_batch_size
            while not self._queue.empty() and (
                force or len(items_to_export) < self._max_batch_size
            ):
                try:
                    items_to_export.append(self._queue.get_nowait())
                except queue.Empty:
                    # Another thread might have emptied the queue between checks
                    break

            # If we collected nothing, we're done
            if not items_to_export:
                break

            # Export the batch
            self._exporter.export(items_to_export)

__init__

__init__(
    exporter: TracingExporter,
    max_queue_size: int = 8192,
    max_batch_size: int = 128,
    schedule_delay: float = 5.0,
    export_trigger_ratio: float = 0.7,
)
参数

exporter: 要使用的导出器。 max_queue_size: 队列中可存储的最大 span 数量。超过该数量后,将开始丢弃 span。 max_batch_size: 单次批量导出的最大 span 数量。 schedule_delay: 检查是否有新 span 需要导出的间隔时间。 export_trigger_ratio: 队列达到该比例时会触发导出操作。

Source code in src/agents/tracing/processors.py
def __init__(
    self,
    exporter: TracingExporter,
    max_queue_size: int = 8192,
    max_batch_size: int = 128,
    schedule_delay: float = 5.0,
    export_trigger_ratio: float = 0.7,
):
    """
    参数:
        exporter: 要使用的导出器。
        max_queue_size: 队列中可存储的最大 span 数量。超过该数量后,将开始丢弃 span。
        max_batch_size: 单次批量导出的最大 span 数量。
        schedule_delay: 检查是否有新 span 需要导出的间隔时间。
        export_trigger_ratio: 队列达到该比例时会触发导出操作。
    """
    self._exporter = exporter
    self._queue: queue.Queue[Trace | Span[Any]] = queue.Queue(maxsize=max_queue_size)
    self._max_queue_size = max_queue_size
    self._max_batch_size = max_batch_size
    self._schedule_delay = schedule_delay
    self._shutdown_event = threading.Event()

    # The queue size threshold at which we export immediately.
    self._export_trigger_size = int(max_queue_size * export_trigger_ratio)

    # Track when we next *must* perform a scheduled export
    self._next_export_time = time.time() + self._schedule_delay

    self._worker_thread = threading.Thread(target=self._run, daemon=True)
    self._worker_thread.start()

shutdown

shutdown(timeout: float | None = None)

当应用程序停止时调用。我们通知工作线程停止,然后等待其结束。

Source code in src/agents/tracing/processors.py
def shutdown(self, timeout: float | None = None):
    """
    当应用程序停止时调用。我们通知工作线程停止,然后等待其结束。
    """
    self._shutdown_event.set()
    self._worker_thread.join(timeout=timeout)

force_flush

force_flush()

立即强制刷新所有排队的 span。

Source code in src/agents/tracing/processors.py
def force_flush(self):
    """
    立即强制刷新所有排队的 span。
    """
    self._export_batches(force=True)

default_exporter

default_exporter() -> BackendSpanExporter

默认的导出器,将 traces 和 spans 批量导出到后端。

Source code in src/agents/tracing/processors.py
def default_exporter() -> BackendSpanExporter:
    """默认的导出器,将 traces 和 spans 批量导出到后端。"""
    return _global_exporter

default_processor

default_processor() -> BatchTraceProcessor

默认的处理器,将 traces 和 spans 批量导出到后端。

Source code in src/agents/tracing/processors.py
def default_processor() -> BatchTraceProcessor:
    """默认的处理器,将 traces 和 spans 批量导出到后端。"""
    return _global_processor