模型上下文协议(MCP)正在成为游戏规则的改变者,正在重新定义AI系统如何理解和响应真实的场景。MCP通过一个标准协议,正在重新定义大型语言模型如何获得高质量的上下文信息。
最近我在处理一个临床操作案例,旨在构建多代理系统,该系统需要从外部来源获取数据,这些数据可以通过API接口访问。我的想法是通过创建MCP服务器来整合这些外部来源,这样用户就可以通过MCP服务器来获取这些数据。
我遇到的大多数MCP教程都在本地服务器上实现MCP服务器。我想远程托管MCP服务器并通过MCP客户端访问它。本文将介绍架构、设置步骤和一些心得体会。
在介绍用例和解决方案之前,先简单讲一下MCP。
你知道为什么是模型上下文协议(MCP)吗?在企业环境中,典型的AI解决方案需要与多个系统集成以获取上下文。许多系统各自开发出不同的方法来获取上下文信息,导致了碎片化的生态系统和缺乏标准化的问题。这些限制带来了诸多挑战。
集成挑战 — 没有一个通用的框架来集成多个AI系统以纳入新的数据源,这需要大量的定制开发工作。这导致了复杂性增加和维护成本上升,从而增加了整体难度和费用。
可扩展性问题: 自定义解决方案可能会带来可扩展性问题,影响生产效率的提升,并影响新功能的添加。
不可预测的结果可能包括: 当不同的系统使用不同的方法来处理和利用上下文时,因为处理方式和理解上下文的方法不同,AI的输出也会不一样,这样也可能导致无法预料的结果。
安全与合规问题: 缺乏适当的安全标准使得保障数据隐私和系统之间的安全通信变得困难。定制的解决方案可能会无意中暴露难以检测和解决的安全漏洞,从而给系统带来风险。
MCP 提供了哪些?MCP 提供了一种标准化、可扩展且安全的方法,将上下文信息整合到 AI 应用中。它能够赋予 AI 代理更丰富的上下文信息,帮助它们做出更加智能的决策。
MCP是如何应对这些挑战的呢?统一通信协议: MCP 定义了 AI 代理和外部系统之间数据交换的明确协议,从而实现无缝集成并大大减少了开发负担。
提高了预测性和可靠性:标准化上下文管理有助于提升输出的准确性,从而使AI系统更加可预测。
加速开发:MCP 减少了创建自定义连接器和适配器所需的开发工作,从而加快了开发速度。
更安全的保障: MCP 遵循通用的安全和验证标准,确保数据传输和处理的安全,从而减少数据泄露,确保符合合规标准。
我的使用场景回到我的用例,我的想法是实现多个远程的MCP服务器,每个服务器提供不同的上下文环境,并创建一个MCP客户端来集成这些服务器。用户可以向MCP客户端提供自然语言查询,客户端会自动找到合适的服务器并获取上下文环境。MCP客户端可以发现所有的MCP服务器,并确定哪个服务器可用于为查询提供上下文,然后连接到该服务器,调用相应的MCP工具并检索上下文。
我最初尝试在 Azure Functions 中托管服务器并通过 SSE(服务器端事件)协议来暴露 MCP 工具。但由于 Azure Functions 对 HTTP 流和 SSE 事件的支持有限,如 这里 所述,未能成功实现。然后我转向了 Azure 应用服务。
技术堆栈· MCP 服务器是用 Python 编写的,使用了 FastAPI 和 FastMCP。MCP 服务器托管在 Azure 应用服务中。
· MCP客户端是用Python编写的
我们使用 LangGraph 来支持自然语言的查询
该解决方案由两个MCP服务器(MCP Server)和两个不同的Azure应用服务上的托管,以及在本地机器上运行的MCP客户端(MCP Client)程序组成。
出于功能性考虑,我选择了两个选项,比如天气信息和简易计算器,这两个例子非常常见。
MCP 服务器1# — 提供天气信息服务
接收一个城市的名字作为参数,并返回该城市的天气信息。
MCP 服务器#2 — 提供基本的计算器功能
这是一款简单的计算器服务,可以接受两个数字并执行基本计算,比如加减乘除。
MCP 程序
MCP客户端应用
· 接受用户输入的自然语言查询
· 找到所有的MCP工具,这些工具都在MCP服务器上可用。
根据查询结果决定调用哪个服务器实例
· 调用恰当工具并获取结果
下面是一步步的方法和代码
第一步 — 创建一个MCP服务器来提供天气信息
1.1. 创建一个 Visual Studio Code 项目,名为 mcp_weather_server
requirements.txt 列出了以下包。
#requirements.txt
uvicorn
asyncpg
click
fastapi
gunicorn
h11
psycopg2-binary
pydantic
starlette
websockets
fastmcp
运行 pip install -r requirements.txt
来安装依赖。
1.2. 创建一个 FastAPI 应用和几个测试接口,以便在部署后测试
在项目根目录下创建一个 main.py 文件。
# 在 main.py 文件中
from fastapi import FastAPI
from fastmcp import FastMCP
from mcp.server.fastmcp import FastMCP
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Mount, Route
import uvicorn
# 定义一些测试端点
app = FastAPI(title="使用 FastAPI 端点的 REST API")
@app.get("/")
async def root():
return {"message": "服务已启动"}
@app.get("/health")
async def health():
return {"status": "状态"}
1.3. 创建MCP实例和SSE端点
我们需要 SSE 终点来捕获来自 MCP 客户端的 SSE 请求。
# 在main.py中
mcp = FastMCP("mcp_weather_server")
def 创建SSE服务器(mcp: FastMCP):
"""创建处理SSE连接及消息的Starlette应用"""
transport = SseServerTransport("/messages/")
# 定义处理函数如下
async def handle_sse(request):
async with transport.connect_sse(
request.scope, request.receive, request._send
) as streams:
await mcp._mcp_server.run(
streams[0], streams[1], mcp._mcp_server.create_initialization_options()
)
# 创建Starlette路由
routes = [
Route("/sse/", endpoint=handle_sse),
Mount("/messages/", app=transport.handle_post_message),
]
# 创建SSE服务
return Starlette(routes=routes)
app.mount("/", create_sse_server(mcp))
这篇文章说明了如何创建具备SSE功能的MCP服务器。
1.4. 制作MCP工具(MCP指…,请在此处添加解释如果需要的话)
# 在 main.py 文件中
# 定义工具
# 这里我使用了硬编码的天气数据。在实际应用中,可以与真实的天气数据源进行集成。
@mcp.tool() # 这是一个装饰器注释
def get_weather(city: str) -> str:
"""
获取指定城市的天气情况。
参数说明:
city (str): 要获取天气情况的城市名称。
返回值:
字符串: 给定城市的天气情况。
"""
result = f"""
*************************
{city} 的天气晴好
温度 - 30 度
湿度 - 70%
云覆盖率 - 60%
能见度 - 7 公里(约4.3英里)
****************************
"""
return result
在这个例子中,我直接设定了天气数据。它可以换成真正的功能。
这是MCP服务器#1的代码完成。为了测试服务器,添加了以下代码:
# 供本地测试
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8000)
第二步 为计算器服务创建MCP服务器
流程和步骤#1一样。在VSCode中创建一个新的项目mcp_calculator_server,并重复步骤。只有使用的工具会有所不同。这里是一个简单的计算器服务。完整的代码会在最后提供。
第3步 创建MCP客户端应用程序
3.1 在 VSCode 中创建一个新的代码文件名为 mcp_client。requirements.txt 文件需要包含如下依赖项
#requirements.txt
mcp
langchain-core
langchain-community
langchain-mcp-adapters
langgraph
langchain-openai
3.2 导入所有必需的模块,并实例化大型语言模型
from langchain_mcp_adapters.client import MultiServerMCPClient
import asyncio
from langgraph.prebuilt import ToolNode
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langchain_core.messages import SystemMessage, HumanMessage
from dotenv import load_dotenv
load_dotenv()
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4")
3.3 以下是与多个MCP服务器交互的代码。因为我们有多个MCP服务器,所以我使用了MultiServerMCPClient(来自langchain_mcp_adapters)。
async def run_agent(question):
async with MultiServerMCPClient(
{
"weather": {
#"url":"http://127.0.0.1:8000/sse/", #这是在部署前本地测试应用的注释
"url": "https://<your app server name>.azurewebsites.net/sse/",
"transport": "sse",
},
"calculator": {
#"url":"http://127.0.0.1:8001/sse/", #这是在部署前本地测试应用的注释
"url": "https://<your app server name>.azurewebsites.net/sse/",
"transport": "sse",
},
}
) as client:
tools = client.get_tools()
print("********* TOOLS ************")
print(tools)
llm_with_tools = llm.bind_tools(tools)
tool_node = ToolNode(tools)
下面的代码将查找各个服务器中可用的MCP工具,然后将其与之前创建的ChatOpenAI实例关联起来。
3.4 处理用户自然语言查询的代码
我用LangGraph内置图来处理用户的查询,下面是代码。
class GraphState(TypedDict):
question: str
messages: str
def root_node(state: GraphState):
print("********** IN Node**********")
instruction = """
你是一个专业的助手,帮助用户找到所需的信息。
你有工具可以获取天气信息并进行计算。
根据用户的请求,使用适当的工具来获取信息。
"""
sys_msg = instruction
user_msg = state["question"]
messages = [
SystemMessage(content=sys_msg),
HumanMessage(content=user_msg)
]
response = llm_with_tools.invoke(messages)
print("\n\n")
print("RESPONSE = " , response)
return {"messages": [response]}
def should_continue(state: GraphState):
messages = state["messages"]
print("\n MESSAGES ====", messages)
last_message = messages[0]
if last_message.tool_calls:
print("&&&& IN TOOL CALLS &&&&&")
return 'tools'
return 'END'
graph_builder = StateGraph(GraphState)
graph_builder.add_node("root_node",root_node)
graph_builder.add_node("tools",tool_node)
graph_builder.add_edge(START,"root_node")
graph_builder.add_conditional_edges("root_node", should_continue, ["tools", END])
graph_builder.add_edge("tools",END)
代理 = graph_builder.compile()
input = {
"question":question
}
result = await agent.ainvoke(input)
print("********* RESULT ************")
print(result)
# 注释:#await agent.ainvoke({"messages": 提示}, {"recursion_limit": 3})
return await result
4. 本地测试应用一下.
在 mcp_weather_server 文件夹中运行 main.py,这将会启动 MCP 服务器于 http://127.0.0.1:8000。
b. 在 mcp_calculator_server 目录下执行 main.py。这会启动一个服务器,地址为 http://127.0.0.1:8001。
在 MCP 客户端中,取消注释 MultiServerMCPClient 代码中的本地服务器代码(127.0.0.1:8000 和 127.0.0.1:8001)。同时,注释掉远程 URL 地址。
下面的代码是用于测试功能的代码
def get_weather():
question = "提供班加罗尔市当前的天气情况"
result = asyncio.run(run_agent(question))
result_formatted = result["messages"][0].content
print("最终结果:", result_formatted)
def getSum():
question = "将57和75这两个数字相加"
result = asyncio.run(run_agent(question))
result_formatted = result["messages"][0].content
print("最终结果:", result_formatted)
if __name__ == "__main__":
#get_weather() # 测试天气服务器
getSum() # 测试加法功能
5. 在远程环境中部署MCP服务器
一旦本地测试成功了,我们就可以把MCP服务器部署到远程App服务中去了。
在 Azure 中创建两个应用实例
将 mcp_weather_server 和 mcp_calculator_server 部署到各自的应用服务中,并将客户端代码中的 URL(<your app server name>)用相应的 URL 替换它们。
同样,在应用服务中,在“设置 -> 启动命令”设置中添加"uvicorn main:app --host 0.0.0.0 --reload"如下
当MCP客户端执行get_weather()函数时,会调用MCP服务器并获取结果。
当调用 getSum() 函数时,客户端会根据加、减、乘、除选择合适的服务器和相应的工具来执行相应操作。
下面这段完整的代码
- mcp_weather_server/main.py
from fastapi import FastAPI
from fastmcp import FastMCP
from mcp.server.fastmcp import FastMCP
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Mount, Route
import uvicorn
# 定义一些用于测试的端点
app = FastAPI(title="使用FastAPI端点的REST API")
@app.get("/")
async def root():
return {"message": "服务器已启动"}
@app.get("/health")
async def health():
return {"status": "ok"}
####################################################
mcp = FastMCP("mcp_weather_server")
def create_sse_server(mcp: FastMCP):
"""创建一个管理SSE连接和消息处理的Starlette应用"""
transport = SseServerTransport("/messages/")
# 定义SSE处理函数
async def handle_sse(request):
async with transport.connect_sse(
request.scope, request.receive, request._send
) as streams:
await mcp._mcp_server.run(
streams[0], streams[1], mcp._mcp_server.create_initialization_options()
)
# 创建Starlette路由用于SSE和消息处理
routes = [
Route("/sse/", endpoint=handle_sse),
Mount("/messages/", app=transport.handle_post_message),
]
# 创建Starlette应用
return Starlette(routes=routes)
app.mount("/", create_sse_server(mcp))
####################################################
# 定义工具
# 示例中使用固定天气数据。实际应用中,可集成实时天气数据。
@mcp.tool()
def get_weather(city: str) -> str:
"""
获取给定城市的天气。
参数:
city (str): 获取天气的城市名称。
返回:
str: 给定城市的天气信息。
"""
result = f"""\
*************************
{city}的天气是晴朗
温度 - 30°C
湿度 - 70%
云覆盖率 - 60%
能见距离 - 7公里
****************************
"""
return result
# 用于本地测试
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8000)
2. mcp_calculator_server/main.py
from fastapi import FastAPI
from fastmcp import FastMCP
from mcp.server.fastmcp import FastMCP
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Mount, Route
# 定义一些测试端点
app = FastAPI(title="使用 FastAPI EndPoints 的 REST API")
@app.get("/")
async def root():
return {"message": "服务器启动"}
@app.get("/health")
async def health():
return {"status": "正常"}
##############################################
mcp = FastMCP("mcp_calculator_server")
def create_sse_server(mcp: FastMCP):
"""创建一个处理 SSE 连接和消息处理的 Starlette 应用"""
transport = SseServerTransport("/messages/")
# 定义处理程序
async def handle_sse(request):
async with transport.connect_sse(
request.scope, request.receive, request._send
) as streams:
await mcp._mcp_server.run(
streams[0], streams[1], mcp._mcp_server.create_initialization_options()
)
# 为 SSE 和消息处理创建 Starlette 路由
routes = [
Route("/sse/", endpoint=handle_sse),
Mount("/messages/", app=transport.handle_post_message),
]
# 创建一个 Starlette 应用
return Starlette(routes=routes)
app.mount("/", create_sse_server(mcp))
###########################################################################
# 简单的计算器工具。
@mcp.tool()
def add(a: int, b: int) -> int:
"""
用于将两个数字相加的函数
参数:
a (int): 要相加的第一个数字
b (int): 要相加的第二个数字
返回:
int: 数字 a 和 b 的和
"""
return a + b
@mcp.tool()
def subtract(a: int, b: int) -> int:
"""
用于将两个数字减去的函数
参数:
a (int): 要减去的第一个数字
b (int): 要减去的第二个数字
返回:
int: 两个数字的差
"""
return a - b
@mcp.tool()
def multiply(a: int, b: int) -> int:
"""
用于将两个数字乘以的函数
参数:
a (int): 要乘以的第一个数字
b (int): 要乘以的第二个数字
返回:
int: 两个数字的乘积
"""
return a * b
@mcp.tool()
def divide(a: int, b: int) -> float:
"""
用于将两个数字除以的函数
参数:
a (int): 要除以的第一个数字
b (int): 要除以的第二个数字
返回:
float: 两个数字的商
"""
return a / b
# 本地运行测试
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8001)
3. mcp_client代码
from langchain_mcp_adapters.client import MultiServerMCPClient
import asyncio
from langgraph.prebuilt import ToolNode
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
from langchain_core.messages import SystemMessage, HumanMessage
from dotenv import load_dotenv
_ = load_dotenv()
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o")
class GraphState(TypedDict):
question: str
messages: str
async def run_agent(question):
async with MultiServerMCPClient(
{
"weather": {
"url": "https://vinmcpappservice-cka9dwcfe0b8exgh.centralus-01.azurewebsites.net/sse/", # 这是在部署前用于本地测试的注释
"transport": "sse",
},
"calculator": {
"url": "https://vinmcpcalcservice-dthrfxanbdb0bfcj.centralus-01.azurewebsites.net/sse/", # 这是在部署前用于本地测试的注释
"transport": "sse",
},
}
) as client:
tools = client.get_tools()
print("********* TOOLS ************")
print(tools)
llm_with_tools = llm.bind_tools(tools)
tool_node = ToolNode(tools)
#*********************************************************************************************
#***************************************************************************************
def root_node(state: GraphState):
print("********** IN Node**********")
instruction = """
您是一位专业的助手,帮助用户查找信息。您有工具可以获取天气信息,并使用计算器进行计算。根据用户的查询,使用适当的工具来获取所需的信息。
"""
sys_msg = instruction
user_msg = state["question"]
messages = [
SystemMessage(content=sys_msg),
HumanMessage(content=user_msg)
]
response = llm_with_tools.invoke(messages)
print("\n\n")
print("RESPONSE = " , response)
return {"messages": [response]}
def should_continue(state: GraphState):
messages = state["messages"]
print("n\n MESSAGES ====", messages)
last_message = messages[0]
if last_message.tool_calls:
print("&&&& IN TOOL CALLS &&&&&")
return "tools"
return END
graph_builder = StateGraph(GraphState)
graph_builder.add_node("root_node",root_node)
graph_builder.add_node("tools",tool_node)
graph_builder.add_edge(START,"root_node")
graph_builder.add_conditional_edges("root_node", should_continue, ["tools", END])
graph_builder.add_edge("tools",END)
agent = graph_builder.compile()
input = {
"question":question
}
result = agent.ainvoke(input)
print("********* RESULT ************")
print(result)
return await result
def get_weather():
question = "提供班加罗尔市当前天气"
result = asyncio.run(run_agent(question))
print("FINAL RESULT =======" ,result)
def getSum():
question = "计算57和75的和"
result = asyncio.run(run_agent(question))
result_formatted = result["messages"][0].content
print("FINAL RESULT =======" ,result_formatted)
if __name__ == "__main__":
#get_weather()
getSum()
参考文献
MCP AI:Charles Sprinter编写《开发指南:使用模型上下文协议构建的下一代AI代理程序一书》
共同學(xué)習(xí),寫下你的評論
評論加載中...
作者其他優(yōu)質(zhì)文章