跳转至

MCP Servers

MCPServer

Bases: ABC

模型上下文协议(Model Context Protocol)服务器的基类。

Source code in src/agents/mcp/server.py
class MCPServer(abc.ABC):
    """模型上下文协议(Model Context Protocol)服务器的基类。"""

    @abc.abstractmethod
    async def connect(self):
        """连接到服务器。例如,这可能意味着启动一个子进程或打开一个网络连接。
        服务器应保持连接状态,直到调用 `cleanup()` 方法。
        """
        pass

    @property
    @abc.abstractmethod
    def name(self) -> str:
        """服务器的可读名称。"""
        pass

    @abc.abstractmethod
    async def cleanup(self):
        """清理服务器。例如,这可能意味着关闭一个子进程或关闭一个网络连接。"""
        pass

    @abc.abstractmethod
    async def list_tools(self) -> list[MCPTool]:
        """列出服务器上可用的工具。"""
        pass

    @abc.abstractmethod
    async def call_tool(self, tool_name: str, arguments: dict[str, Any] | None) -> CallToolResult:
        """在服务器上调用一个工具。"""
        pass

name abstractmethod property

name: str

服务器的可读名称。

connect abstractmethod async

connect()

连接到服务器。例如,这可能意味着启动一个子进程或打开一个网络连接。 服务器应保持连接状态,直到调用 cleanup() 方法。

Source code in src/agents/mcp/server.py
@abc.abstractmethod
async def connect(self):
    """连接到服务器。例如,这可能意味着启动一个子进程或打开一个网络连接。
    服务器应保持连接状态,直到调用 `cleanup()` 方法。
    """
    pass

cleanup abstractmethod async

cleanup()

清理服务器。例如,这可能意味着关闭一个子进程或关闭一个网络连接。

Source code in src/agents/mcp/server.py
@abc.abstractmethod
async def cleanup(self):
    """清理服务器。例如,这可能意味着关闭一个子进程或关闭一个网络连接。"""
    pass

list_tools abstractmethod async

list_tools() -> list[Tool]

列出服务器上可用的工具。

Source code in src/agents/mcp/server.py
@abc.abstractmethod
async def list_tools(self) -> list[MCPTool]:
    """列出服务器上可用的工具。"""
    pass

call_tool abstractmethod async

call_tool(
    tool_name: str, arguments: dict[str, Any] | None
) -> CallToolResult

在服务器上调用一个工具。

Source code in src/agents/mcp/server.py
@abc.abstractmethod
async def call_tool(self, tool_name: str, arguments: dict[str, Any] | None) -> CallToolResult:
    """在服务器上调用一个工具。"""
    pass

MCPServerStdioParams

Bases: TypedDict

mcp.client.stdio.StdioServerParameters 相对应,但允许你无需额外导入即可传递参数。

Source code in src/agents/mcp/server.py
class MCPServerStdioParams(TypedDict):
    """与 `mcp.client.stdio.StdioServerParameters` 相对应,但允许你无需额外导入即可传递参数。"""

    command: str
    """用于启动服务器的可执行文件。例如,`python` 或 `node`。"""

    args: NotRequired[list[str]]
    """传递给 `command` 可执行文件的命令行参数。例如,`['foo.py']` 或 `['server.js', '--port', '8080']`。"""

    env: NotRequired[dict[str, str]]
    """为服务器设置的环境变量。"""

    cwd: NotRequired[str | Path]
    """启动进程时使用的工作目录。"""

    encoding: NotRequired[str]
    """与服务器发送/接收消息时使用的文本编码。默认为 `utf-8`。"""

    encoding_error_handler: NotRequired[Literal["strict", "ignore", "replace"]]
    """文本编码错误处理方式。默认为 `strict`。

    参见 https://docs.python.org/3/library/codecs.html#codec-base-classes 获取可能值的说明。
    """

command instance-attribute

command: str

用于启动服务器的可执行文件。例如,pythonnode

args instance-attribute

args: NotRequired[list[str]]

传递给 command 可执行文件的命令行参数。例如,['foo.py']['server.js', '--port', '8080']

env instance-attribute

env: NotRequired[dict[str, str]]

为服务器设置的环境变量。

cwd instance-attribute

cwd: NotRequired[str | Path]

启动进程时使用的工作目录。

encoding instance-attribute

encoding: NotRequired[str]

与服务器发送/接收消息时使用的文本编码。默认为 utf-8

encoding_error_handler instance-attribute

encoding_error_handler: NotRequired[
    Literal["strict", "ignore", "replace"]
]

文本编码错误处理方式。默认为 strict

参见 https://docs.python.org/3/library/codecs.html#codec-base-classes 获取可能值的说明。

MCPServerStdio

Bases: _MCPServerWithClientSession

使用 stdio 传输的 MCP 服务器实现。详见 [规范] (https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#stdio)。

Source code in src/agents/mcp/server.py
class MCPServerStdio(_MCPServerWithClientSession):
    """使用 stdio 传输的 MCP 服务器实现。详见 [规范]
    (https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#stdio)。
    """

    def __init__(
        self,
        params: MCPServerStdioParams,
        cache_tools_list: bool = False,
        name: str | None = None,
        client_session_timeout_seconds: float | None = 5,
    ):
        """基于 stdio 传输创建新的 MCP 服务器。

        参数:
            params: 配置服务器的参数。包括用于启动服务器的命令、传递给命令的参数、为服务器设置的环境变量、
                启动进程时使用的工作目录,以及与服务器发送/接收消息时使用的文本编码。
            cache_tools_list: 是否缓存工具列表。如果为 `True`,工具列表将被缓存,并且只会从服务器获取一次。
                如果为 `False`,每次调用 `list_tools()` 时都会从服务器获取工具列表。可以通过调用
                `invalidate_tools_cache()` 使缓存失效。如果你确定服务器的工具列表不会变化,建议设置为
                `True`,因为这样可以显著提升延迟表现(避免每次都与服务器进行一次往返通信)。
            name: 服务器的可读名称。如果未提供,则会根据命令自动生成。
            client_session_timeout_seconds: 传递给 MCP ClientSession 的读取超时时间(秒)。
        """
        super().__init__(cache_tools_list, client_session_timeout_seconds)

        self.params = StdioServerParameters(
            command=params["command"],
            args=params.get("args", []),
            env=params.get("env"),
            cwd=params.get("cwd"),
            encoding=params.get("encoding", "utf-8"),
            encoding_error_handler=params.get("encoding_error_handler", "strict"),
        )

        self._name = name or f"stdio: {self.params.command}"

    def create_streams(
        self,
    ) -> AbstractAsyncContextManager[
        tuple[
            MemoryObjectReceiveStream[SessionMessage | Exception],
            MemoryObjectSendStream[SessionMessage],
            GetSessionIdCallback | None
        ]
    ]:
        """为服务器创建流。"""
        return stdio_client(self.params)

    @property
    def name(self) -> str:
        """服务器的可读名称。"""
        return self._name

name property

name: str

服务器的可读名称。

__init__

__init__(
    params: MCPServerStdioParams,
    cache_tools_list: bool = False,
    name: str | None = None,
    client_session_timeout_seconds: float | None = 5,
)

基于 stdio 传输创建新的 MCP 服务器。

参数

params: 配置服务器的参数。包括用于启动服务器的命令、传递给命令的参数、为服务器设置的环境变量、 启动进程时使用的工作目录,以及与服务器发送/接收消息时使用的文本编码。 cache_tools_list: 是否缓存工具列表。如果为 True,工具列表将被缓存,并且只会从服务器获取一次。 如果为 False,每次调用 list_tools() 时都会从服务器获取工具列表。可以通过调用 invalidate_tools_cache() 使缓存失效。如果你确定服务器的工具列表不会变化,建议设置为 True,因为这样可以显著提升延迟表现(避免每次都与服务器进行一次往返通信)。 name: 服务器的可读名称。如果未提供,则会根据命令自动生成。 client_session_timeout_seconds: 传递给 MCP ClientSession 的读取超时时间(秒)。

Source code in src/agents/mcp/server.py
def __init__(
    self,
    params: MCPServerStdioParams,
    cache_tools_list: bool = False,
    name: str | None = None,
    client_session_timeout_seconds: float | None = 5,
):
    """基于 stdio 传输创建新的 MCP 服务器。

    参数:
        params: 配置服务器的参数。包括用于启动服务器的命令、传递给命令的参数、为服务器设置的环境变量、
            启动进程时使用的工作目录,以及与服务器发送/接收消息时使用的文本编码。
        cache_tools_list: 是否缓存工具列表。如果为 `True`,工具列表将被缓存,并且只会从服务器获取一次。
            如果为 `False`,每次调用 `list_tools()` 时都会从服务器获取工具列表。可以通过调用
            `invalidate_tools_cache()` 使缓存失效。如果你确定服务器的工具列表不会变化,建议设置为
            `True`,因为这样可以显著提升延迟表现(避免每次都与服务器进行一次往返通信)。
        name: 服务器的可读名称。如果未提供,则会根据命令自动生成。
        client_session_timeout_seconds: 传递给 MCP ClientSession 的读取超时时间(秒)。
    """
    super().__init__(cache_tools_list, client_session_timeout_seconds)

    self.params = StdioServerParameters(
        command=params["command"],
        args=params.get("args", []),
        env=params.get("env"),
        cwd=params.get("cwd"),
        encoding=params.get("encoding", "utf-8"),
        encoding_error_handler=params.get("encoding_error_handler", "strict"),
    )

    self._name = name or f"stdio: {self.params.command}"

create_streams

create_streams() -> AbstractAsyncContextManager[
    tuple[
        MemoryObjectReceiveStream[
            SessionMessage | Exception
        ],
        MemoryObjectSendStream[SessionMessage],
        GetSessionIdCallback | None,
    ]
]

为服务器创建流。

Source code in src/agents/mcp/server.py
def create_streams(
    self,
) -> AbstractAsyncContextManager[
    tuple[
        MemoryObjectReceiveStream[SessionMessage | Exception],
        MemoryObjectSendStream[SessionMessage],
        GetSessionIdCallback | None
    ]
]:
    """为服务器创建流。"""
    return stdio_client(self.params)

connect async

connect()

连接到服务器。

Source code in src/agents/mcp/server.py
async def connect(self):
    """连接到服务器。"""
    try:
        transport = await self.exit_stack.enter_async_context(self.create_streams())
        # streamablehttp_client returns (read, write, get_session_id)
        # sse_client returns (read, write)

        read, write, *_ = transport

        session = await self.exit_stack.enter_async_context(
            ClientSession(
                read,
                write,
                timedelta(seconds=self.client_session_timeout_seconds)
                if self.client_session_timeout_seconds
                else None,
            )
        )
        server_result = await session.initialize()
        self.server_initialize_result = server_result
        self.session = session
    except Exception as e:
        logger.error(f"Error initializing MCP server: {e}")
        await self.cleanup()
        raise

cleanup async

cleanup()

清理服务器。

Source code in src/agents/mcp/server.py
async def cleanup(self):
    """清理服务器。"""
    async with self._cleanup_lock:
        try:
            await self.exit_stack.aclose()
        except Exception as e:
            logger.error(f"Error cleaning up server: {e}")
        finally:
            self.session = None

list_tools async

list_tools() -> list[Tool]

列出服务器上可用的工具。

Source code in src/agents/mcp/server.py
async def list_tools(self) -> list[MCPTool]:
    """列出服务器上可用的工具。"""
    if not self.session:
        raise UserError("Server not initialized. Make sure you call `connect()` first.")

    # Return from cache if caching is enabled, we have tools, and the cache is not dirty
    if self.cache_tools_list and not self._cache_dirty and self._tools_list:
        return self._tools_list

    # Reset the cache dirty to False
    self._cache_dirty = False

    # Fetch the tools from the server
    self._tools_list = (await self.session.list_tools()).tools
    return self._tools_list

call_tool async

call_tool(
    tool_name: str, arguments: dict[str, Any] | None
) -> CallToolResult

在服务器上调用一个工具。

Source code in src/agents/mcp/server.py
async def call_tool(self, tool_name: str, arguments: dict[str, Any] | None) -> CallToolResult:
    """在服务器上调用一个工具。"""
    if not self.session:
        raise UserError("Server not initialized. Make sure you call `connect()` first.")

    return await self.session.call_tool(tool_name, arguments)

invalidate_tools_cache

invalidate_tools_cache()

使工具缓存失效。

Source code in src/agents/mcp/server.py
def invalidate_tools_cache(self):
    """使工具缓存失效。"""
    self._cache_dirty = True

MCPServerSseParams

Bases: TypedDict

mcp.client.sse.sse_client 中的参数相对应。

Source code in src/agents/mcp/server.py
class MCPServerSseParams(TypedDict):
    """与 `mcp.client.sse.sse_client` 中的参数相对应。"""

    url: str
    """服务器的 URL。"""

    headers: NotRequired[dict[str, str]]
    """发送给服务器的请求头。"""

    timeout: NotRequired[float]
    """HTTP 请求的超时时间。默认为 5 秒。"""

    sse_read_timeout: NotRequired[float]
    """SSE 连接的超时时间(秒)。默认为 5 分钟。"""

url instance-attribute

url: str

服务器的 URL。

headers instance-attribute

headers: NotRequired[dict[str, str]]

发送给服务器的请求头。

timeout instance-attribute

timeout: NotRequired[float]

HTTP 请求的超时时间。默认为 5 秒。

sse_read_timeout instance-attribute

sse_read_timeout: NotRequired[float]

SSE 连接的超时时间(秒)。默认为 5 分钟。

MCPServerSse

Bases: _MCPServerWithClientSession

使用 HTTP + SSE 传输的 MCP 服务器实现。详见 [规范] (https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse)。

Source code in src/agents/mcp/server.py
class MCPServerSse(_MCPServerWithClientSession):
    """使用 HTTP + SSE 传输的 MCP 服务器实现。详见 [规范]
    (https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse)。
    """

    def __init__(
        self,
        params: MCPServerSseParams,
        cache_tools_list: bool = False,
        name: str | None = None,
        client_session_timeout_seconds: float | None = 5,
    ):
        """基于 HTTP + SSE 传输创建新的 MCP 服务器。

        参数:
            params: 配置服务器的参数。包括服务器的 URL、发送给服务器的请求头、HTTP 请求的超时时间,以及 SSE 连接的超时时间。

            cache_tools_list: 是否缓存工具列表。如果为 `True`,工具列表将被缓存,并且只会从服务器获取一次。
                如果为 `False`,每次调用 `list_tools()` 时都会从服务器获取工具列表。可以通过调用
                `invalidate_tools_cache()` 使缓存失效。如果你确定服务器的工具列表不会变化,建议设置为
                `True`,因为这样可以显著提升延迟表现(避免每次都与服务器进行一次往返通信)。

            name: 服务器的可读名称。如果未提供,则会根据 URL 自动生成。

            client_session_timeout_seconds: 传递给 MCP ClientSession 的读取超时时间(秒)。
        """
        super().__init__(cache_tools_list, client_session_timeout_seconds)

        self.params = params
        self._name = name or f"sse: {self.params['url']}"

    def create_streams(
        self,
    ) -> AbstractAsyncContextManager[
        tuple[
            MemoryObjectReceiveStream[SessionMessage | Exception],
            MemoryObjectSendStream[SessionMessage],
            GetSessionIdCallback | None
        ]
    ]:
        """为服务器创建流。"""
        return sse_client(
            url=self.params["url"],
            headers=self.params.get("headers", None),
            timeout=self.params.get("timeout", 5),
            sse_read_timeout=self.params.get("sse_read_timeout", 60 * 5),
        )

    @property
    def name(self) -> str:
        """服务器的可读名称。"""
        return self._name

name property

name: str

服务器的可读名称。

__init__

__init__(
    params: MCPServerSseParams,
    cache_tools_list: bool = False,
    name: str | None = None,
    client_session_timeout_seconds: float | None = 5,
)

基于 HTTP + SSE 传输创建新的 MCP 服务器。

参数

params: 配置服务器的参数。包括服务器的 URL、发送给服务器的请求头、HTTP 请求的超时时间,以及 SSE 连接的超时时间。

cache_tools_list: 是否缓存工具列表。如果为 True,工具列表将被缓存,并且只会从服务器获取一次。 如果为 False,每次调用 list_tools() 时都会从服务器获取工具列表。可以通过调用 invalidate_tools_cache() 使缓存失效。如果你确定服务器的工具列表不会变化,建议设置为 True,因为这样可以显著提升延迟表现(避免每次都与服务器进行一次往返通信)。

name: 服务器的可读名称。如果未提供,则会根据 URL 自动生成。

client_session_timeout_seconds: 传递给 MCP ClientSession 的读取超时时间(秒)。

Source code in src/agents/mcp/server.py
def __init__(
    self,
    params: MCPServerSseParams,
    cache_tools_list: bool = False,
    name: str | None = None,
    client_session_timeout_seconds: float | None = 5,
):
    """基于 HTTP + SSE 传输创建新的 MCP 服务器。

    参数:
        params: 配置服务器的参数。包括服务器的 URL、发送给服务器的请求头、HTTP 请求的超时时间,以及 SSE 连接的超时时间。

        cache_tools_list: 是否缓存工具列表。如果为 `True`,工具列表将被缓存,并且只会从服务器获取一次。
            如果为 `False`,每次调用 `list_tools()` 时都会从服务器获取工具列表。可以通过调用
            `invalidate_tools_cache()` 使缓存失效。如果你确定服务器的工具列表不会变化,建议设置为
            `True`,因为这样可以显著提升延迟表现(避免每次都与服务器进行一次往返通信)。

        name: 服务器的可读名称。如果未提供,则会根据 URL 自动生成。

        client_session_timeout_seconds: 传递给 MCP ClientSession 的读取超时时间(秒)。
    """
    super().__init__(cache_tools_list, client_session_timeout_seconds)

    self.params = params
    self._name = name or f"sse: {self.params['url']}"

create_streams

create_streams() -> AbstractAsyncContextManager[
    tuple[
        MemoryObjectReceiveStream[
            SessionMessage | Exception
        ],
        MemoryObjectSendStream[SessionMessage],
        GetSessionIdCallback | None,
    ]
]

为服务器创建流。

Source code in src/agents/mcp/server.py
def create_streams(
    self,
) -> AbstractAsyncContextManager[
    tuple[
        MemoryObjectReceiveStream[SessionMessage | Exception],
        MemoryObjectSendStream[SessionMessage],
        GetSessionIdCallback | None
    ]
]:
    """为服务器创建流。"""
    return sse_client(
        url=self.params["url"],
        headers=self.params.get("headers", None),
        timeout=self.params.get("timeout", 5),
        sse_read_timeout=self.params.get("sse_read_timeout", 60 * 5),
    )

connect async

connect()

连接到服务器。

Source code in src/agents/mcp/server.py
async def connect(self):
    """连接到服务器。"""
    try:
        transport = await self.exit_stack.enter_async_context(self.create_streams())
        # streamablehttp_client returns (read, write, get_session_id)
        # sse_client returns (read, write)

        read, write, *_ = transport

        session = await self.exit_stack.enter_async_context(
            ClientSession(
                read,
                write,
                timedelta(seconds=self.client_session_timeout_seconds)
                if self.client_session_timeout_seconds
                else None,
            )
        )
        server_result = await session.initialize()
        self.server_initialize_result = server_result
        self.session = session
    except Exception as e:
        logger.error(f"Error initializing MCP server: {e}")
        await self.cleanup()
        raise

cleanup async

cleanup()

清理服务器。

Source code in src/agents/mcp/server.py
async def cleanup(self):
    """清理服务器。"""
    async with self._cleanup_lock:
        try:
            await self.exit_stack.aclose()
        except Exception as e:
            logger.error(f"Error cleaning up server: {e}")
        finally:
            self.session = None

list_tools async

list_tools() -> list[Tool]

列出服务器上可用的工具。

Source code in src/agents/mcp/server.py
async def list_tools(self) -> list[MCPTool]:
    """列出服务器上可用的工具。"""
    if not self.session:
        raise UserError("Server not initialized. Make sure you call `connect()` first.")

    # Return from cache if caching is enabled, we have tools, and the cache is not dirty
    if self.cache_tools_list and not self._cache_dirty and self._tools_list:
        return self._tools_list

    # Reset the cache dirty to False
    self._cache_dirty = False

    # Fetch the tools from the server
    self._tools_list = (await self.session.list_tools()).tools
    return self._tools_list

call_tool async

call_tool(
    tool_name: str, arguments: dict[str, Any] | None
) -> CallToolResult

在服务器上调用一个工具。

Source code in src/agents/mcp/server.py
async def call_tool(self, tool_name: str, arguments: dict[str, Any] | None) -> CallToolResult:
    """在服务器上调用一个工具。"""
    if not self.session:
        raise UserError("Server not initialized. Make sure you call `connect()` first.")

    return await self.session.call_tool(tool_name, arguments)

invalidate_tools_cache

invalidate_tools_cache()

使工具缓存失效。

Source code in src/agents/mcp/server.py
def invalidate_tools_cache(self):
    """使工具缓存失效。"""
    self._cache_dirty = True

MCPServerStreamableHttpParams

Bases: TypedDict

mcp.client.streamable_http.streamablehttp_client 中的参数相对应。

Source code in src/agents/mcp/server.py
class MCPServerStreamableHttpParams(TypedDict):
    """与 `mcp.client.streamable_http.streamablehttp_client` 中的参数相对应。"""

    url: str
    """服务器的 URL。"""

    headers: NotRequired[dict[str, str]]
    """发送给服务器的请求头。"""

    timeout: NotRequired[timedelta]
    """HTTP 请求的超时时间。默认为 5 秒。"""

    sse_read_timeout: NotRequired[timedelta]
    """SSE 连接的超时时间(秒)。默认为 5 分钟。"""

    terminate_on_close: NotRequired[bool]
    """关闭时是否终止连接。"""

url instance-attribute

url: str

服务器的 URL。

headers instance-attribute

headers: NotRequired[dict[str, str]]

发送给服务器的请求头。

timeout instance-attribute

timeout: NotRequired[timedelta]

HTTP 请求的超时时间。默认为 5 秒。

sse_read_timeout instance-attribute

sse_read_timeout: NotRequired[timedelta]

SSE 连接的超时时间(秒)。默认为 5 分钟。

terminate_on_close instance-attribute

terminate_on_close: NotRequired[bool]

关闭时是否终止连接。

MCPServerStreamableHttp

Bases: _MCPServerWithClientSession

使用 Streamable HTTP 传输的 MCP 服务器实现。详见 [规范] (https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http)。

Source code in src/agents/mcp/server.py
class MCPServerStreamableHttp(_MCPServerWithClientSession):
    """使用 Streamable HTTP 传输的 MCP 服务器实现。详见 [规范]
    (https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http)。
    """

    def __init__(
        self,
        params: MCPServerStreamableHttpParams,
        cache_tools_list: bool = False,
        name: str | None = None,
        client_session_timeout_seconds: float | None = 5,
    ):
        """基于 Streamable HTTP 传输创建新的 MCP 服务器。

        参数:
            params: 配置服务器的参数。包括服务器的 URL、发送给服务器的请求头、HTTP 请求的超时时间、
                Streamable HTTP 连接的超时时间,以及是否需要在关闭时终止连接。

            cache_tools_list: 是否缓存工具列表。如果为 `True`,工具列表将被缓存,并且只会从服务器获取一次。
                如果为 `False`,每次调用 `list_tools()` 时都会从服务器获取工具列表。可以通过调用
                `invalidate_tools_cache()` 使缓存失效。如果你确定服务器的工具列表不会变化,建议设置为
                `True`,因为这样可以显著提升延迟表现(避免每次都与服务器进行一次往返通信)。

            name: 服务器的可读名称。如果未提供,则会根据 URL 自动生成。

            client_session_timeout_seconds: 传递给 MCP ClientSession 的读取超时时间(秒)。
        """
        super().__init__(cache_tools_list, client_session_timeout_seconds)

        self.params = params
        self._name = name or f"streamable_http: {self.params['url']}"

    def create_streams(
        self,
    ) -> AbstractAsyncContextManager[
        tuple[
            MemoryObjectReceiveStream[SessionMessage | Exception],
            MemoryObjectSendStream[SessionMessage],
            GetSessionIdCallback | None
        ]
    ]:
        """为服务器创建流。"""
        return streamablehttp_client(
            url=self.params["url"],
            headers=self.params.get("headers", None),
            timeout=self.params.get("timeout", timedelta(seconds=30)),
            sse_read_timeout=self.params.get("sse_read_timeout", timedelta(seconds=60 * 5)),
            terminate_on_close=self.params.get("terminate_on_close", True)
        )

    @property
    def name(self) -> str:
        """服务器的可读名称。"""
        return self._name

name property

name: str

服务器的可读名称。

__init__

__init__(
    params: MCPServerStreamableHttpParams,
    cache_tools_list: bool = False,
    name: str | None = None,
    client_session_timeout_seconds: float | None = 5,
)

基于 Streamable HTTP 传输创建新的 MCP 服务器。

参数

params: 配置服务器的参数。包括服务器的 URL、发送给服务器的请求头、HTTP 请求的超时时间、 Streamable HTTP 连接的超时时间,以及是否需要在关闭时终止连接。

cache_tools_list: 是否缓存工具列表。如果为 True,工具列表将被缓存,并且只会从服务器获取一次。 如果为 False,每次调用 list_tools() 时都会从服务器获取工具列表。可以通过调用 invalidate_tools_cache() 使缓存失效。如果你确定服务器的工具列表不会变化,建议设置为 True,因为这样可以显著提升延迟表现(避免每次都与服务器进行一次往返通信)。

name: 服务器的可读名称。如果未提供,则会根据 URL 自动生成。

client_session_timeout_seconds: 传递给 MCP ClientSession 的读取超时时间(秒)。

Source code in src/agents/mcp/server.py
def __init__(
    self,
    params: MCPServerStreamableHttpParams,
    cache_tools_list: bool = False,
    name: str | None = None,
    client_session_timeout_seconds: float | None = 5,
):
    """基于 Streamable HTTP 传输创建新的 MCP 服务器。

    参数:
        params: 配置服务器的参数。包括服务器的 URL、发送给服务器的请求头、HTTP 请求的超时时间、
            Streamable HTTP 连接的超时时间,以及是否需要在关闭时终止连接。

        cache_tools_list: 是否缓存工具列表。如果为 `True`,工具列表将被缓存,并且只会从服务器获取一次。
            如果为 `False`,每次调用 `list_tools()` 时都会从服务器获取工具列表。可以通过调用
            `invalidate_tools_cache()` 使缓存失效。如果你确定服务器的工具列表不会变化,建议设置为
            `True`,因为这样可以显著提升延迟表现(避免每次都与服务器进行一次往返通信)。

        name: 服务器的可读名称。如果未提供,则会根据 URL 自动生成。

        client_session_timeout_seconds: 传递给 MCP ClientSession 的读取超时时间(秒)。
    """
    super().__init__(cache_tools_list, client_session_timeout_seconds)

    self.params = params
    self._name = name or f"streamable_http: {self.params['url']}"

create_streams

create_streams() -> AbstractAsyncContextManager[
    tuple[
        MemoryObjectReceiveStream[
            SessionMessage | Exception
        ],
        MemoryObjectSendStream[SessionMessage],
        GetSessionIdCallback | None,
    ]
]

为服务器创建流。

Source code in src/agents/mcp/server.py
def create_streams(
    self,
) -> AbstractAsyncContextManager[
    tuple[
        MemoryObjectReceiveStream[SessionMessage | Exception],
        MemoryObjectSendStream[SessionMessage],
        GetSessionIdCallback | None
    ]
]:
    """为服务器创建流。"""
    return streamablehttp_client(
        url=self.params["url"],
        headers=self.params.get("headers", None),
        timeout=self.params.get("timeout", timedelta(seconds=30)),
        sse_read_timeout=self.params.get("sse_read_timeout", timedelta(seconds=60 * 5)),
        terminate_on_close=self.params.get("terminate_on_close", True)
    )

connect async

connect()

连接到服务器。

Source code in src/agents/mcp/server.py
async def connect(self):
    """连接到服务器。"""
    try:
        transport = await self.exit_stack.enter_async_context(self.create_streams())
        # streamablehttp_client returns (read, write, get_session_id)
        # sse_client returns (read, write)

        read, write, *_ = transport

        session = await self.exit_stack.enter_async_context(
            ClientSession(
                read,
                write,
                timedelta(seconds=self.client_session_timeout_seconds)
                if self.client_session_timeout_seconds
                else None,
            )
        )
        server_result = await session.initialize()
        self.server_initialize_result = server_result
        self.session = session
    except Exception as e:
        logger.error(f"Error initializing MCP server: {e}")
        await self.cleanup()
        raise

cleanup async

cleanup()

清理服务器。

Source code in src/agents/mcp/server.py
async def cleanup(self):
    """清理服务器。"""
    async with self._cleanup_lock:
        try:
            await self.exit_stack.aclose()
        except Exception as e:
            logger.error(f"Error cleaning up server: {e}")
        finally:
            self.session = None

list_tools async

list_tools() -> list[Tool]

列出服务器上可用的工具。

Source code in src/agents/mcp/server.py
async def list_tools(self) -> list[MCPTool]:
    """列出服务器上可用的工具。"""
    if not self.session:
        raise UserError("Server not initialized. Make sure you call `connect()` first.")

    # Return from cache if caching is enabled, we have tools, and the cache is not dirty
    if self.cache_tools_list and not self._cache_dirty and self._tools_list:
        return self._tools_list

    # Reset the cache dirty to False
    self._cache_dirty = False

    # Fetch the tools from the server
    self._tools_list = (await self.session.list_tools()).tools
    return self._tools_list

call_tool async

call_tool(
    tool_name: str, arguments: dict[str, Any] | None
) -> CallToolResult

在服务器上调用一个工具。

Source code in src/agents/mcp/server.py
async def call_tool(self, tool_name: str, arguments: dict[str, Any] | None) -> CallToolResult:
    """在服务器上调用一个工具。"""
    if not self.session:
        raise UserError("Server not initialized. Make sure you call `connect()` first.")

    return await self.session.call_tool(tool_name, arguments)

invalidate_tools_cache

invalidate_tools_cache()

使工具缓存失效。

Source code in src/agents/mcp/server.py
def invalidate_tools_cache(self):
    """使工具缓存失效。"""
    self._cache_dirty = True