通过MCP实现长调用结果的多次流式返回
要求:
大模型请求1分钟内每10秒钟的温度,MCP server每10秒返回一次温度给大模型。
通过Server Notification的方式返回,MCP Server示例代码如下(这个任务千问3.6没有完成,让google ai完成的):import asyncio import random from fastmcp import FastMCP, Context
初始化 FastMCP 服務,並為其命名
mcp = FastMCP("TemperatureStreamer")
@mcp.tool()
async def monitor_temperature(duration_seconds: int = 60, interval_seconds: int = 10, ctx: Context = None) -> str:
"""
在指定的時間內,每隔固定秒數監測並返回一次環境溫度。
Args:
duration_seconds: 總監測時長(秒),預設 60 秒
interval_seconds: 數據採集間隔(秒),預設 10 秒
"""
# 計算總共需要採集幾次數據
total_steps = duration_seconds // interval_seconds
temperatures_log = []
print(f"開始溫度監控任務:預計執行 {duration_seconds} 秒,每 {interval_seconds} 秒採集一次...")
for step in range(1, total_steps + 1):
# 模擬讀取硬體傳感器溫度
current_temp = round(random.uniform(22.0, 28.0), 2)
temperatures_log.append(current_temp)
# 構造本次採集的進度訊息
status_msg = f"第 {step * interval_seconds} 秒 - 當前實時溫度: {current_temp}°C"
print(f"[Server Log] {status_msg}")
# 【核心】如果客戶端支持並傳入了 Context,則通過 SSE 通道即時推送進度通知
if ctx:
# report_progress 參數: (當前進度值, 總進度值, 附帶的文本訊息)
await ctx.report_progress(
progress=step,
total=total_steps,
message=status_msg
)
# 異步等待下一次採集
if step < total_steps:
await asyncio.sleep(interval_seconds)
# 任務結束,返回最終的標準 Response
avg_temp = round(sum(temperatures_log) / len(temperatures_log), 2)
return f"【監控任務完成】共採集 {total_steps} 次數據。歷史溫度: {temperatures_log}。平均溫度: {avg_temp}°C"
if name == "main":
以 sse 傳輸模式啟動服務,默認會掛載在 http://127.0.0
# 備註:新版 FastMCP 也支持 "http" (Streamable HTTP),兩者在網路層皆可實現流式推送
mcp.run(transport="sse", host="127.0.0.1", port=8000)
在Inpector中,仍然是采用常规的tools调用方式,不同的是,每过10秒在右下角会出现`notifications/progress`的返回结果,例如:
```json
{
"method": "notifications/progress",
"params": {
"progress": 1,
"total": 6,
"message": "第 10 秒 - 當前實時溫度: 23.22°C"
}
}
在此期间,工具一直是Running状态,在60秒结束后,才会返回正常的结果。