跳转至

Processor interface

TracingProcessor

Bases: ABC

处理 span 的接口。

Source code in agents/tracing/processor_interface.py
class TracingProcessor(abc.ABC):
    """处理 span 的接口。"""

    @abc.abstractmethod
    def on_trace_start(self, trace: "Trace") -> None:
        """当 trace 开始时调用。

        参数:
            trace: 已开始的 trace。
        """
        pass

    @abc.abstractmethod
    def on_trace_end(self, trace: "Trace") -> None:
        """当 trace 结束时调用。

        参数:
            trace: 已结束的 trace。
        """
        pass

    @abc.abstractmethod
    def on_span_start(self, span: "Span[Any]") -> None:
        """当 span 开始时调用。

        参数:
            span: 已开始的 span。
        """
        pass

    @abc.abstractmethod
    def on_span_end(self, span: "Span[Any]") -> None:
        """当 span 结束时调用。此方法不应阻塞或抛出异常。

        参数:
            span: 已结束的 span。
        """
        pass

    @abc.abstractmethod
    def shutdown(self) -> None:
        """当应用程序停止时调用。"""
        pass

    @abc.abstractmethod
    def force_flush(self) -> None:
        """强制立即刷新所有排队的 span/trace。"""
        pass

on_trace_start abstractmethod

on_trace_start(trace: Trace) -> None

当 trace 开始时调用。

参数

trace: 已开始的 trace。

Source code in agents/tracing/processor_interface.py
@abc.abstractmethod
def on_trace_start(self, trace: "Trace") -> None:
    """当 trace 开始时调用。

    参数:
        trace: 已开始的 trace。
    """
    pass

on_trace_end abstractmethod

on_trace_end(trace: Trace) -> None

当 trace 结束时调用。

参数

trace: 已结束的 trace。

Source code in agents/tracing/processor_interface.py
@abc.abstractmethod
def on_trace_end(self, trace: "Trace") -> None:
    """当 trace 结束时调用。

    参数:
        trace: 已结束的 trace。
    """
    pass

on_span_start abstractmethod

on_span_start(span: Span[Any]) -> None

当 span 开始时调用。

参数

span: 已开始的 span。

Source code in agents/tracing/processor_interface.py
@abc.abstractmethod
def on_span_start(self, span: "Span[Any]") -> None:
    """当 span 开始时调用。

    参数:
        span: 已开始的 span。
    """
    pass

on_span_end abstractmethod

on_span_end(span: Span[Any]) -> None

当 span 结束时调用。此方法不应阻塞或抛出异常。

参数

span: 已结束的 span。

Source code in agents/tracing/processor_interface.py
@abc.abstractmethod
def on_span_end(self, span: "Span[Any]") -> None:
    """当 span 结束时调用。此方法不应阻塞或抛出异常。

    参数:
        span: 已结束的 span。
    """
    pass

shutdown abstractmethod

shutdown() -> None

当应用程序停止时调用。

Source code in agents/tracing/processor_interface.py
@abc.abstractmethod
def shutdown(self) -> None:
    """当应用程序停止时调用。"""
    pass

force_flush abstractmethod

force_flush() -> None

强制立即刷新所有排队的 span/trace。

Source code in agents/tracing/processor_interface.py
@abc.abstractmethod
def force_flush(self) -> None:
    """强制立即刷新所有排队的 span/trace。"""
    pass

TracingExporter

Bases: ABC

导出 trace 和 span。例如,可以将它们记录或发送到后端。

Source code in agents/tracing/processor_interface.py
class TracingExporter(abc.ABC):
    """导出 trace 和 span。例如,可以将它们记录或发送到后端。"""

    @abc.abstractmethod
    def export(self, items: list["Trace | Span[Any]"]) -> None:
        """导出 trace 和 span 的列表。

        参数:
            items: 要导出的条目。
        """
        pass

export abstractmethod

export(items: list[Trace | Span[Any]]) -> None

导出 trace 和 span 的列表。

参数

items: 要导出的条目。

Source code in agents/tracing/processor_interface.py
@abc.abstractmethod
def export(self, items: list["Trace | Span[Any]"]) -> None:
    """导出 trace 和 span 的列表。

    参数:
        items: 要导出的条目。
    """
    pass