跳转至

Setup

SynchronousMultiTracingProcessor

Bases: TracingProcessor

按注册顺序将所有调用转发到 TracingProcessor 列表中的每一个。

Source code in agents/tracing/setup.py
class SynchronousMultiTracingProcessor(TracingProcessor):
    """
    按注册顺序将所有调用转发到 TracingProcessor 列表中的每一个。
    """

    def __init__(self):
        # 使用元组以避免在遍历处理器时出现竞态条件
        self._processors: tuple[TracingProcessor, ...] = ()
        self._lock = threading.Lock()

    def add_tracing_processor(self, tracing_processor: TracingProcessor):
        """
        向处理器列表中添加一个处理器。每个处理器都会接收到所有的 trace/span。
        """
        with self._lock:
            self._processors += (tracing_processor,)

    def set_processors(self, processors: list[TracingProcessor]):
        """
        设置处理器列表。这将替换当前的处理器列表。
        """
        with self._lock:
            self._processors = tuple(processors)

    def on_trace_start(self, trace: Trace) -> None:
        """
        当 trace 开始时调用。
        """
        for processor in self._processors:
            processor.on_trace_start(trace)

    def on_trace_end(self, trace: Trace) -> None:
        """
        当 trace 结束时调用。
        """
        for processor in self._processors:
            processor.on_trace_end(trace)

    def on_span_start(self, span: Span[Any]) -> None:
        """
        当 span 开始时调用。
        """
        for processor in self._processors:
            processor.on_span_start(span)

    def on_span_end(self, span: Span[Any]) -> None:
        """
        当 span 结束时调用。
        """
        for processor in self._processors:
            processor.on_span_end(span)

    def shutdown(self) -> None:
        """
        当应用程序停止时调用。
        """
        for processor in self._processors:
            logger.debug(f"Shutting down trace processor {processor}")
            processor.shutdown()

    def force_flush(self):
        """
        强制处理器刷新它们的缓冲区。
        """
        for processor in self._processors:
            processor.force_flush()

add_tracing_processor

add_tracing_processor(tracing_processor: TracingProcessor)

向处理器列表中添加一个处理器。每个处理器都会接收到所有的 trace/span。

Source code in agents/tracing/setup.py
def add_tracing_processor(self, tracing_processor: TracingProcessor):
    """
    向处理器列表中添加一个处理器。每个处理器都会接收到所有的 trace/span。
    """
    with self._lock:
        self._processors += (tracing_processor,)

set_processors

set_processors(processors: list[TracingProcessor])

设置处理器列表。这将替换当前的处理器列表。

Source code in agents/tracing/setup.py
def set_processors(self, processors: list[TracingProcessor]):
    """
    设置处理器列表。这将替换当前的处理器列表。
    """
    with self._lock:
        self._processors = tuple(processors)

on_trace_start

on_trace_start(trace: Trace) -> None

当 trace 开始时调用。

Source code in agents/tracing/setup.py
def on_trace_start(self, trace: Trace) -> None:
    """
    当 trace 开始时调用。
    """
    for processor in self._processors:
        processor.on_trace_start(trace)

on_trace_end

on_trace_end(trace: Trace) -> None

当 trace 结束时调用。

Source code in agents/tracing/setup.py
def on_trace_end(self, trace: Trace) -> None:
    """
    当 trace 结束时调用。
    """
    for processor in self._processors:
        processor.on_trace_end(trace)

on_span_start

on_span_start(span: Span[Any]) -> None

当 span 开始时调用。

Source code in agents/tracing/setup.py
def on_span_start(self, span: Span[Any]) -> None:
    """
    当 span 开始时调用。
    """
    for processor in self._processors:
        processor.on_span_start(span)

on_span_end

on_span_end(span: Span[Any]) -> None

当 span 结束时调用。

Source code in agents/tracing/setup.py
def on_span_end(self, span: Span[Any]) -> None:
    """
    当 span 结束时调用。
    """
    for processor in self._processors:
        processor.on_span_end(span)

shutdown

shutdown() -> None

当应用程序停止时调用。

Source code in agents/tracing/setup.py
def shutdown(self) -> None:
    """
    当应用程序停止时调用。
    """
    for processor in self._processors:
        logger.debug(f"Shutting down trace processor {processor}")
        processor.shutdown()

force_flush

force_flush()

强制处理器刷新它们的缓冲区。

Source code in agents/tracing/setup.py
def force_flush(self):
    """
    强制处理器刷新它们的缓冲区。
    """
    for processor in self._processors:
        processor.force_flush()

TraceProvider

Source code in agents/tracing/setup.py
class TraceProvider:
    def __init__(self):
        self._multi_processor = SynchronousMultiTracingProcessor()
        self._disabled = os.environ.get("OPENAI_AGENTS_DISABLE_TRACING", "false").lower() in (
            "true",
            "1",
        )

    def register_processor(self, processor: TracingProcessor):
        """
        向处理器列表中添加一个处理器。每个处理器都会接收到所有的 trace/span。
        """
        self._multi_processor.add_tracing_processor(processor)

    def set_processors(self, processors: list[TracingProcessor]):
        """
        设置处理器列表。这将替换当前的处理器列表。
        """
        self._multi_processor.set_processors(processors)

    def get_current_trace(self) -> Trace | None:
        """
        返回当前激活的 trace(如果有)。
        """
        return Scope.get_current_trace()

    def get_current_span(self) -> Span[Any] | None:
        """
        返回当前激活的 span(如果有)。
        """
        return Scope.get_current_span()

    def set_disabled(self, disabled: bool) -> None:
        """
        设置是否禁用 tracing。
        """
        self._disabled = disabled

    def create_trace(
        self,
        name: str,
        trace_id: str | None = None,
        group_id: str | None = None,
        metadata: dict[str, Any] | None = None,
        disabled: bool = False,
    ) -> Trace:
        """
        创建一个新的 trace。
        """
        if self._disabled or disabled:
            logger.debug(f"Tracing is disabled. Not creating trace {name}")
            return NoOpTrace()

        trace_id = trace_id or util.gen_trace_id()

        logger.debug(f"Creating trace {name} with id {trace_id}")

        return TraceImpl(
            name=name,
            trace_id=trace_id,
            group_id=group_id,
            metadata=metadata,
            processor=self._multi_processor,
        )

    def create_span(
        self,
        span_data: TSpanData,
        span_id: str | None = None,
        parent: Trace | Span[Any] | None = None,
        disabled: bool = False,
    ) -> Span[TSpanData]:
        """
        创建一个新的 span。
        """
        if self._disabled or disabled:
            logger.debug(f"Tracing is disabled. Not creating span {span_data}")
            return NoOpSpan(span_data)

        if not parent:
            current_span = Scope.get_current_span()
            current_trace = Scope.get_current_trace()
            if current_trace is None:
                logger.error(
                    "No active trace. Make sure to start a trace with `trace()` first"
                    "Returning NoOpSpan."
                )
                return NoOpSpan(span_data)
            elif isinstance(current_trace, NoOpTrace) or isinstance(current_span, NoOpSpan):
                logger.debug(
                    f"Parent {current_span} or {current_trace} is no-op, returning NoOpSpan"
                )
                return NoOpSpan(span_data)

            parent_id = current_span.span_id if current_span else None
            trace_id = current_trace.trace_id

        elif isinstance(parent, Trace):
            if isinstance(parent, NoOpTrace):
                logger.debug(f"Parent {parent} is no-op, returning NoOpSpan")
                return NoOpSpan(span_data)
            trace_id = parent.trace_id
            parent_id = None
        elif isinstance(parent, Span):
            if isinstance(parent, NoOpSpan):
                logger.debug(f"Parent {parent} is no-op, returning NoOpSpan")
                return NoOpSpan(span_data)
            parent_id = parent.span_id
            trace_id = parent.trace_id

        logger.debug(f"Creating span {span_data} with id {span_id}")

        return SpanImpl(
            trace_id=trace_id,
            span_id=span_id,
            parent_id=parent_id,
            processor=self._multi_processor,
            span_data=span_data,
        )

    def shutdown(self) -> None:
        if self._disabled:
            return

        try:
            logger.debug("Shutting down trace provider")
            self._multi_processor.shutdown()
        except Exception as e:
            logger.error(f"Error shutting down trace provider: {e}")

register_processor

register_processor(processor: TracingProcessor)

向处理器列表中添加一个处理器。每个处理器都会接收到所有的 trace/span。

Source code in agents/tracing/setup.py
def register_processor(self, processor: TracingProcessor):
    """
    向处理器列表中添加一个处理器。每个处理器都会接收到所有的 trace/span。
    """
    self._multi_processor.add_tracing_processor(processor)

set_processors

set_processors(processors: list[TracingProcessor])

设置处理器列表。这将替换当前的处理器列表。

Source code in agents/tracing/setup.py
def set_processors(self, processors: list[TracingProcessor]):
    """
    设置处理器列表。这将替换当前的处理器列表。
    """
    self._multi_processor.set_processors(processors)

get_current_trace

get_current_trace() -> Trace | None

返回当前激活的 trace(如果有)。

Source code in agents/tracing/setup.py
def get_current_trace(self) -> Trace | None:
    """
    返回当前激活的 trace(如果有)。
    """
    return Scope.get_current_trace()

get_current_span

get_current_span() -> Span[Any] | None

返回当前激活的 span(如果有)。

Source code in agents/tracing/setup.py
def get_current_span(self) -> Span[Any] | None:
    """
    返回当前激活的 span(如果有)。
    """
    return Scope.get_current_span()

set_disabled

set_disabled(disabled: bool) -> None

设置是否禁用 tracing。

Source code in agents/tracing/setup.py
def set_disabled(self, disabled: bool) -> None:
    """
    设置是否禁用 tracing。
    """
    self._disabled = disabled

create_trace

create_trace(name: str, trace_id: str | None = None, group_id: str | None = None, metadata: dict[str, Any] | None = None, disabled: bool = False) -> Trace

创建一个新的 trace。

Source code in agents/tracing/setup.py
def create_trace(
    self,
    name: str,
    trace_id: str | None = None,
    group_id: str | None = None,
    metadata: dict[str, Any] | None = None,
    disabled: bool = False,
) -> Trace:
    """
    创建一个新的 trace。
    """
    if self._disabled or disabled:
        logger.debug(f"Tracing is disabled. Not creating trace {name}")
        return NoOpTrace()

    trace_id = trace_id or util.gen_trace_id()

    logger.debug(f"Creating trace {name} with id {trace_id}")

    return TraceImpl(
        name=name,
        trace_id=trace_id,
        group_id=group_id,
        metadata=metadata,
        processor=self._multi_processor,
    )

create_span

create_span(span_data: TSpanData, span_id: str | None = None, parent: Trace | Span[Any] | None = None, disabled: bool = False) -> Span[TSpanData]

创建一个新的 span。

Source code in agents/tracing/setup.py
def create_span(
    self,
    span_data: TSpanData,
    span_id: str | None = None,
    parent: Trace | Span[Any] | None = None,
    disabled: bool = False,
) -> Span[TSpanData]:
    """
    创建一个新的 span。
    """
    if self._disabled or disabled:
        logger.debug(f"Tracing is disabled. Not creating span {span_data}")
        return NoOpSpan(span_data)

    if not parent:
        current_span = Scope.get_current_span()
        current_trace = Scope.get_current_trace()
        if current_trace is None:
            logger.error(
                "No active trace. Make sure to start a trace with `trace()` first"
                "Returning NoOpSpan."
            )
            return NoOpSpan(span_data)
        elif isinstance(current_trace, NoOpTrace) or isinstance(current_span, NoOpSpan):
            logger.debug(
                f"Parent {current_span} or {current_trace} is no-op, returning NoOpSpan"
            )
            return NoOpSpan(span_data)

        parent_id = current_span.span_id if current_span else None
        trace_id = current_trace.trace_id

    elif isinstance(parent, Trace):
        if isinstance(parent, NoOpTrace):
            logger.debug(f"Parent {parent} is no-op, returning NoOpSpan")
            return NoOpSpan(span_data)
        trace_id = parent.trace_id
        parent_id = None
    elif isinstance(parent, Span):
        if isinstance(parent, NoOpSpan):
            logger.debug(f"Parent {parent} is no-op, returning NoOpSpan")
            return NoOpSpan(span_data)
        parent_id = parent.span_id
        trace_id = parent.trace_id

    logger.debug(f"Creating span {span_data} with id {span_id}")

    return SpanImpl(
        trace_id=trace_id,
        span_id=span_id,
        parent_id=parent_id,
        processor=self._multi_processor,
        span_data=span_data,
    )