流式响应
了解如何在 Mint Cloud AI 平台上实现流式响应,提升用户体验
工作原理
SSE (Server-Sent Events)
流式响应基于 SSE 协议实现。当设置 stream: true 时,服务器会逐步返回内容片段, 客户端可以实时渲染响应,无需等待完整结果。
- 设置
stream: true启用流式模式 - 逐步接收内容片段,减少等待时间
- 支持实时渲染,提升用户体验
- 兼容 OpenAI 和 Anthropic 流式协议
OpenAI 协议流式
使用 OpenAI 兼容的流式接口,通过设置 stream: true 获取增量响应。
curl -X POST https://api.mint-cloud.ai/v1/chat/completions \
-H "Authorization: Bearer $MINT_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"model": "openai/gpt-4o",
"messages": [{"role": "user", "content": "写一首诗"}],
"stream": true
}'Anthropic 协议流式
使用 Anthropic 原生流式接口,通过消息流式 API 获取实时响应。
from anthropic import Anthropic
client = Anthropic(
api_key="your-mint-api-key",
base_url="https://api.mint-cloud.ai/v1"
)
with client.messages.stream(
model="anthropic/claude-sonnet-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": "写一首诗"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)流式 + Function Calling
在流式响应中处理函数调用时,需要监听 tool_calls 事件。
from openai import OpenAI
client = OpenAI(
api_key="your-mint-api-key",
base_url="https://api.mint-cloud.ai/v1"
)
stream = client.chat.completions.create(
model="openai/gpt-4o",
messages=[{
"role": "user",
"content": "北京的天气怎么样?"
}],
tools=[{
"type": "function",
"function": {
"name": "get_weather",
"description": "获取天气信息",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string", "description": "城市名称"}
},
"required": ["location"]
}
}
}],
stream=True
)
for chunk in stream:
delta = chunk.choices[0].delta
# 处理内容片段
if delta.content:
print(f"内容: {delta.content}")
# 处理函数调用
if delta.tool_calls:
for tool_call in delta.tool_calls:
print(f"函数调用: {tool_call.function.name}")
print(f"参数: {tool_call.function.arguments}")错误处理和重连
流式连接可能因网络问题中断,实现重连机制可以提高可靠性。
import time
from openai import OpenAI
client = OpenAI(
api_key="your-mint-api-key",
base_url="https://api.mint-cloud.ai/v1"
)
def stream_with_retry(messages, max_retries=5, base_delay=1):
"""带指数退避的重试机制"""
for attempt in range(max_retries):
try:
stream = client.chat.completions.create(
model="openai/gpt-4o",
messages=messages,
stream=True
)
full_content = ""
for chunk in stream:
if chunk.choices[0].delta.content:
full_content += chunk.choices[0].delta.content
print(chunk.choices[0].delta.content, end="", flush=True)
return full_content
except Exception as e:
if attempt == max_retries - 1:
raise Exception(f"最大重试次数已达上限: {e}")
# 指数退避策略
delay = base_delay * (2 ** attempt)
print(f"\n连接中断,{delay}秒后重试... (尝试 {attempt + 1}/{max_retries})")
time.sleep(delay)
# 使用示例
messages = [{"role": "user", "content": "讲一个故事"}]
stream_with_retry(messages)最佳实践
使用 flush=True
在 Python 的 print 语句中使用 flush=True,确保内容立即输出, 而不是等待缓冲区刷新,提升实时性。
实现连接保活
配置适当的超时时间,使用 HTTP Keep-Alive 保持连接, 减少连接建立的开销,提升流式响应效率。
优雅处理中断
实现重连机制时,使用指数退避策略,避免频繁重试造成服务器压力。 同时保存已接收的内容,支持断点续传。
流式状态管理
前端组件需要正确管理加载状态,使用 isStreaming 状态控制 UI 展示, 避免用户看到不完整的响应内容产生困惑。