铸云 AI

流式响应

了解如何在 Mint Cloud AI 平台上实现流式响应,提升用户体验

工作原理

SSE (Server-Sent Events)

流式响应基于 SSE 协议实现。当设置 stream: true 时,服务器会逐步返回内容片段, 客户端可以实时渲染响应,无需等待完整结果。

  • 设置 stream: true 启用流式模式
  • 逐步接收内容片段,减少等待时间
  • 支持实时渲染,提升用户体验
  • 兼容 OpenAI 和 Anthropic 流式协议

OpenAI 协议流式

使用 OpenAI 兼容的流式接口,通过设置 stream: true 获取增量响应。

curl -X POST https://api.mint-cloud.ai/v1/chat/completions \
  -H "Authorization: Bearer $MINT_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "model": "openai/gpt-4o",
    "messages": [{"role": "user", "content": "写一首诗"}],
    "stream": true
  }'

Anthropic 协议流式

使用 Anthropic 原生流式接口,通过消息流式 API 获取实时响应。

from anthropic import Anthropic

client = Anthropic(
    api_key="your-mint-api-key",
    base_url="https://api.mint-cloud.ai/v1"
)

with client.messages.stream(
    model="anthropic/claude-sonnet-4-5",
    max_tokens=1024,
    messages=[{"role": "user", "content": "写一首诗"}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

流式 + Function Calling

在流式响应中处理函数调用时,需要监听 tool_calls 事件。

from openai import OpenAI

client = OpenAI(
    api_key="your-mint-api-key",
    base_url="https://api.mint-cloud.ai/v1"
)

stream = client.chat.completions.create(
    model="openai/gpt-4o",
    messages=[{
        "role": "user",
        "content": "北京的天气怎么样?"
    }],
    tools=[{
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "获取天气信息",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {"type": "string", "description": "城市名称"}
                },
                "required": ["location"]
            }
        }
    }],
    stream=True
)

for chunk in stream:
    delta = chunk.choices[0].delta
    # 处理内容片段
    if delta.content:
        print(f"内容: {delta.content}")
    # 处理函数调用
    if delta.tool_calls:
        for tool_call in delta.tool_calls:
            print(f"函数调用: {tool_call.function.name}")
            print(f"参数: {tool_call.function.arguments}")

错误处理和重连

流式连接可能因网络问题中断,实现重连机制可以提高可靠性。

import time
from openai import OpenAI

client = OpenAI(
    api_key="your-mint-api-key",
    base_url="https://api.mint-cloud.ai/v1"
)

def stream_with_retry(messages, max_retries=5, base_delay=1):
    """带指数退避的重试机制"""
    for attempt in range(max_retries):
        try:
            stream = client.chat.completions.create(
                model="openai/gpt-4o",
                messages=messages,
                stream=True
            )
            
            full_content = ""
            for chunk in stream:
                if chunk.choices[0].delta.content:
                    full_content += chunk.choices[0].delta.content
                    print(chunk.choices[0].delta.content, end="", flush=True)
            return full_content
            
        except Exception as e:
            if attempt == max_retries - 1:
                raise Exception(f"最大重试次数已达上限: {e}")
            
            # 指数退避策略
            delay = base_delay * (2 ** attempt)
            print(f"\n连接中断,{delay}秒后重试... (尝试 {attempt + 1}/{max_retries})")
            time.sleep(delay)

# 使用示例
messages = [{"role": "user", "content": "讲一个故事"}]
stream_with_retry(messages)

最佳实践

使用 flush=True

在 Python 的 print 语句中使用 flush=True,确保内容立即输出, 而不是等待缓冲区刷新,提升实时性。

实现连接保活

配置适当的超时时间,使用 HTTP Keep-Alive 保持连接, 减少连接建立的开销,提升流式响应效率。

优雅处理中断

实现重连机制时,使用指数退避策略,避免频繁重试造成服务器压力。 同时保存已接收的内容,支持断点续传。

流式状态管理

前端组件需要正确管理加载状态,使用 isStreaming 状态控制 UI 展示, 避免用户看到不完整的响应内容产生困惑。