一文搞懂 AI 流式响应

作者:只想写个小玩意日期:2025/11/15

这是 OpenAI 文档中流式响应的代码 platform.openai.com/docs/guides…

1import { OpenAI } from "openai";
2const client = new OpenAI();
3
4const stream = await client.responses.create({
5    model: "gpt-5",
6    input: [
7        {
8            role: "user",
9            content: "Say 'double bubble bath' ten times fast.",
10        },
11    ],
12    stream: true,
13});
14
15for await (const event of stream) 
16    console.log(event);
17}
18

在我第一次看到这段代码时,有无数的疑惑出现在了我的大脑中:

  • stream 是什么?
  • 为什么可以通过 for await of 来遍历?
  • 这和异步有什么关系吗?
  • 服务端要如何将 stream 一点点返回给前端?
  • 前端要如何接收数据?
  • ……

如果你也有类似的疑问,请耐心阅读本文,相信你一定能找到答案。

本文的代码在这里 github.com/wangkaiwd/a…

Iterable protocol 和 Iterator protocol

支持 for...of 循环的变量,一定要符合 Iterable protocolIterator protocol

Iterable protocol :

  • 变量是一个对象
  • 对象必须实现 [Symbol.iterator] 方法
  • [Symbol.iterator] 方法必须返回遵循 Iterator protocol 约定的对象

Iterator protocol :

  • 变量是一个对象
  • 对象必须实现 next 方法
  • next 方法要返回一个对象 { done: boolean, value: any }
    • done 表示迭代是否结束
    • value 表示迭代器的返回值

下面是一个示例:

1function makeIterableObj (array: any[]) {
2  return {
3    [Symbol.iterator] () {
4      let nextIndex = 0
5      return {
6        next () {
7          if (nextIndex < array.length) {
8            const result = { value: array[nextIndex], done: false }
9            nextIndex++
10            return result
11          }
12          return { done: true, value: undefined }
13        },
14      }
15    },
16  }
17}
18
19const iterableObj = makeIterableObj(['one', 'two'])
20

可以手动循环 iterableObj

1const iterator = iterableObj[Symbol.iterator]()
2while (true) {
3  const { value, done } = iterator.next()
4  if (done) {
5    break
6  }
7  console.log('value', value)
8}
9
10// 输出结果
11// value one
12// value two
13

也可以通过 for...of 来循环 iterableObj :

1// 这里的 item 就是 next 方法执行后得到的 value
2for (const item of iterableObj) {
3  console.log('item', item)
4}
5
6// 输出结果
7// item one
8// item two
9

Async iterable protocol 和 Async iterator protocol

理解了 iterable protocoliterator protocol 再来理解 async iterable protocolasync iterator protocol 就会容易很多。

异步相比于同步,有以下区别:

同样的示例改为异步版本:

1const sleep = (result: IResult) => {
2  return new Promise<IResult>((resolve) => {
3    setTimeout(() => {
4      resolve(result)
5    }, 1000)
6  })
7}
8
9function makeIterableObj (array: any[]) {
10  return {
11    [Symbol.asyncIterator] () {
12      let nextIndex = 0
13      return {
14        next () {
15          if (nextIndex < array.length) {
16            const promise = sleep({ value: array[nextIndex], done: false })
17            nextIndex++
18            return promise
19          }
20          return sleep({ done: true, value: undefined })
21        },
22      }
23    },
24  }
25}
26

手动循环:

1const asyncIterableObj = makeIterableObj(['one', 'two'])
2const iterator = asyncIterableObj[Symbol.asyncIterator]()
3while (true) {
4  const { value, done } = await iterator.next()
5  if (done) {
6    break
7  }
8  console.log('value', value)
9}
10

使用 for await ... of 循环

1for await (const item of makeIterableObj(['one', 'two'])) {
2  console.log('item', item)
3}
4

此时再回到开篇的示例:

1const stream = await client.responses.create()
2

stream 其实就是一个遵循 async iterable protocol 的对象

可读流 ReadableStream

下面是一个 ReadableStream 的示例:每隔 1s 向流中写入4个字符,直到字符完全写入到流中

1let mockData = [`This is a sample string that will be streamed in chunks.`](https://xplanc.org/primers/document/zh/03.HTML/EX.HTML%20%E5%85%83%E7%B4%A0/EX.a.md)
2
3let timer: any = null
4const step = 4
5
6const stream = new ReadableStream({
7  start (controller) {
8    timer = setInterval(() => {
9      const chunk = mockData.slice(0, step)
10      // 删除已经写入的字符
11      mockData = mockData.slice(step)
12      if (!mockData) {
13        // 字符处理完成后,停止写入
14        controller.close()
15        if (timer) {
16          clearInterval(timer)
17          timer = null
18        }
19      }
20      // 添加字符到 stream
21      controller.enqueue(chunk)
22    }, 1000)
23  },
24  cancel () {
25    clearInterval(timer)
26  },
27})
28

ReadableStream 默认实现了 Symbol.asyncIterator ,所以它是一个异步可迭代对象,可以使用 for await ... of 来循环

1for await (const chunk of stream) {
2  console.log('chunk', chunk)
3}
4

ReadableStream 自己也提供了 getReader 方法来读取流:

1const stream = createStream()
2const reader = stream.getReader()
3// 循环直到 done 为 true 时结束
4while (true) {
5  const { done, value } = await reader.read()
6  if (done) {
7    break
8  }
9  console.log('value', value)
10}
11

这是 mdn 官方仓库中的一个示例,也可以结合一起学习:github.com/mdn/dom-exa…

服务端 SSE

目前的 AI 应用服务端流式响应使用 Server-Sent Events 来实现,简称 SSE 。下面是 ChatGPT 网页版的响应内容:

mdn 的相关介绍在这里:developer.mozilla.org/en-US/docs/…

sse 示例

MDN 的示例是使用 PHP 实现的,代码比较难懂,我也没有找到一个可以直接运行的案例。为了方便理解,我参考 stackoverflow.com/questions/3… ,使用 express 实现了流式响应:

1import express from 'express'
2
3const app = express()
4app.use(express.static('public'))
5
6app.get('/countdown', function (req, res) {
7  // sse 响应头设置
8  res.writeHead(200, {
9    'Content-Type': 'text/event-stream',
10    'Cache-Control': 'no-cache',
11    'Connection': 'keep-alive',
12  })
13  let timer: NodeJS.Timeout | null = null
14  let count = 10
15  timer = setInterval(() => {
16    if (count >= 0) {
17      // 返回内容必须严格遵守格式
18      res.write('data: ' + count + '\n\n')
19      count--
20      return
21    }
22    // count 小于0时,停止响应
23    if (timer) {
24      clearInterval(timer)
25      timer = null
26    }
27    res.end()
28  }, 1000)
29})
30
31app.listen(3000, () => console.log('SSE app listening on port 3000'))
32

这段代码会每隔 1s 在响应中写入 count ,直到 count < 0 时结束响应。

代码中以下内容需要注意:

  • 响应头设置: 'Content-Type': 'text/event-stream'
  • 返回内容必须严格遵守格式: data: + 空格 + 字符串 + 两个换行符 (\n\n)

AI 流式响应

上面我们先实现了一个简单的流式响应,现在我们把 AI 结合进来

1const client = new OpenAI({
2  apiKey: process.env.OPENAI_API_KEY,
3  baseURL: 'https://api.deepseek.com',
4})
5
6const app = express()
7app.use(express.static('public'))
8
9app.get('/chat', async function (req, res) {
10  res.writeHead(200, {
11    'Content-Type': 'text/event-stream',
12    'Cache-Control': 'no-cache',
13    'Connection': 'keep-alive',
14  })
15  const stream = await client.chat.completions.create({
16    model: 'deepseek-chat',
17    messages: [{ role: 'user', content: '你是谁?' }],
18    stream: true,
19  })
20  for await (const chunk of stream) {
21    const content = chunk.choices[0].delta.content
22    // 注意:这里通过 JSON.stringify 来返回 JSON 字符串,更加灵活
23    res.write([`data: ${JSON.stringify({ content })}\n\n`](https://xplanc.org/primers/document/zh/03.HTML/EX.HTML%20%E5%85%83%E7%B4%A0/EX.data.md))
24  }
25  res.write([`data: [DONE]\n\n`](https://xplanc.org/primers/document/zh/03.HTML/EX.HTML%20%E5%85%83%E7%B4%A0/EX.data.md))
26  res.end()
27})
28
29app.listen(3000, () => console.log(`
30SSE app listening on port 3000
31Open http://localhost:3000/sse-ai.html in your browser to access page.
32`))
33

有以下几点需要注意:

  1. 如果使用的是 OpenAI 兼容的 api ,例如我在当前示例中使用的 deepseek ,要使用之前的 OpenAI 请求标准:github.com/openai/open… 用法和传参都不一样,需要特别留意
  2. 返回内容要通过 JSON.stringify 来处理,方便我们给前端返回更多字段
  3. 结束时返回 res.write(data: [DONE]\n\n) ,方便前端使用 EventSource 时终止请求

前端处理流式响应

EventSource

前端可以使用 EventSource 来处理 sse 响应的内容,代码如下:

1const stop = document.getElementById('stop')
2const start = document.getElementById('start')
3let eventSource = null
4start.addEventListener('click', () => {
5  const eventSource = new EventSource('/chat')
6  eventSource.onmessage = function (event) {
7    // 要手动关闭,否则会一直请求服务器
8    if (event.data === '[DONE]') {
9      eventSource.close()
10      return
11    }
12    const json = JSON.parse(event.data)
13    document.getElementById('log').innerHTML += json.content
14  }
15})
16stop.addEventListener('click', function () {
17  eventSource.close()
18})
19

完整代码:github.com/wangkaiwd/a…

EventSource 有一个细节需要注意

如果没有调用 eventSource.close() 方法,那么请求会一直不停的发起。所以我在服务端特意在响应结束时返回 data: [DONE]\n\n 来让前端知道什么时候关闭 eventSource

fetch

前面我们介绍了通过 EventSource 来处理服务端的流式响应,但其实它存在很多问题:

  • 只能发起 get 请求
  • 请求参数只能在 url 中传递,但是一般要传入给 AI 的提示词长度可能较大,容易超过 url 长度的最大限制
  • 无法自定义请求头来设置 Authorization ,给服务端传递用户 token

基于上述的这些原因,我们通常会使用 fetch 方法来处理服务端的流式响应。github.com/Azure/fetch… 就是基于 fetch 实现的用来发起 EventSource 请求的开源库,下面是它的使用示例:

1<script type="module">
2  import { fetchEventSource } from "https://esm.sh/@microsoft/fetch-event-source";
3
4  const stop = document.getElementById("stop");
5  const start = document.getElementById("start");
6  const controller = new AbortController();
7  start.addEventListener("click", () => {
8    // 发起post请求
9    fetchEventSource("/chat", {
10      signal: controller.signal,
11      method: "POST",
12      // 一点点处理服务端响应
13      onmessage: (event) => {
14        const data = event.data;
15        if (data === "[DONE]") {
16          console.log("done");
17          return;
18        }
19        const json = JSON.parse(data);
20        document.getElementById("log").innerHTML += json.content;
21      },
22    });
23  });
24  stop.addEventListener("click", function () {
25    controller.abort();
26  });
27</script>
28

完整代码:github.com/wangkaiwd/a…

这里使用的 POST 请求,我把服务端的示例改为了 all 方法来接收请求,可以同时处理 GETPOST 请求

我们也可以自己通过 fetch 请求来看看具体的响应内容

1const response = await fetch("/chat", {
2  signal: controller.signal,
3  method: "POST",
4});
5

这里的 response.body 就是一个 ReadableStream (ps: 前面的章节有介绍过ReadableStream ,忘记的同学可以再回去看一下 ),所以我们可以通过 for await ... of 或者 getReader 方法来拿到 ReadableStream 中的数据:

1const textDecoder = new TextDecoder();
2// response.body 是可读流
3for await (const chunk of response.body) {
4  // chunk 是 Uint8Array ,通过 TextDecoder 转换为字符串
5  console.log('chunk', chunk)
6  const text = textDecoder.decode(chunk);
7  if (text === "[DONE]") {
8    console.log("done");
9    return;
10  }
11  console.log('text', text)
12}
13
14// 使用 getReader 方法获取数据
15//   const reader = response.body.getReader();
16//   while (true) {
17//     const { done, value } = await reader.read();
18//     if (done) {
19//       break;
20//     }
21//     const text = textDecoder.decode(value);
22//     if (text === "[DONE]") {
23//       console.log("done");
24//       return;
25//     }
26//     console.log('text', text)
27//   }
28

最终结果如下:

我们拿到的是服务端返回符合 SSE 规范的字符串,将字符根据规则解析后,就能拿到最终的结果了。这其实就是 fetch-event-source 帮我们实现的逻辑

踩坑

我在使用 fetch-event-source 的过程中发现了如下问题:

如果服务端返回的内容只包含 \n ,那么前端接收到的内容为空字符。在 markdown 渲染的场景下,会导致格式完全错乱。 下面是伪代码,方便理解

1// 服务端如果返回的内容如果只包含 \n
2res.write('data: ' + '\n\n' + '\n\n')
3
4// 前端拿到的内容为空字符串
5onmessage: (event) => {
6  const data = event.data;
7  // true
8  console.log(data === '')
9}
10

官方也有相关的 issue 一直没有修复:github.com/Azure/fetch…

所以在使用 fetch-event-source 时可以通过 JSON.stringify 来传入 json 字符串,防止前端接收到空字符串

1const content = chunk.choices[0].delta.content
2// JSON.stringify 避免了返回内容只有 `\n` 的情况
3res.write([`data: ${JSON.stringify({ content })}\n\n`](https://xplanc.org/primers/document/zh/03.HTML/EX.HTML%20%E5%85%83%E7%B4%A0/EX.data.md))
4

结语

AI 出现之前,这些知识很少有使用场景。但随着 AI 的快速发展,这些代码不断地出现在我眼前,也让我有了更多实践的机会。这篇文章是我在实践中的一些沉淀和总结,希望能帮到你。

参考


一文搞懂 AI 流式响应》 是转载文章,点击查看原文


相关推荐


TRAE SOLO推出1天啦,限免到15号,你还没体验吗
也无风雨也雾晴2025/11/13

Trae SOLO模式已经正式推出一天了 点击左上角的logo,就可以切换solo模式了,切换后的ide布局非常的酷炫,有两个模式 coder:可以用来改bug,写需求这些 builder:适合从0到1的开发,从生成prd开始到构建完整项目,而且trae还提供了便捷部署到vercel可以线上访问的集成 但是没有模型相关的选择,内置进去了,没有暴露出来给用户去选择 我试了一下使用coder改bug,plan模式下提出需求或者bug,会先生成文档,你可以根据文档再次确认需求,没问题后开始实施,这


一文看懂 Agentic AI:搭建单体 vs 多智能体系统,结果出乎意料!
玩转AGI2025/11/12

一文看懂 Agentic AI:搭建单体 vs 多智能体系统,结果出乎意料! 最近,我开始尝试构建不同类型的 Agentic AI 系统,最让我着迷的,是“单智能体(Single-Agent)”和“多智能体(Multi-Agent)”的差异。【AI大模型教程】 说实话,在没真正动手之前,我也只是听过这些概念,觉得听起来很玄。直到我用 LangGraph 和 LangSmith Studio 亲自搭建了两个版本,一个“单兵作战”,一个“多智能体协作”,结果真的让我彻底改观。 我想造一个能帮我追踪


Guava 迭代器增强类介绍
桦说编程2025/11/10

Guava 库为 Java 开发者提供了一系列强大的迭代器增强工具,它们简化了复杂迭代模式的实现。本文将深入探讨 Guava 的 PeekingIterator、AbstractIterator 和 AbstractSequentialIterator。 1. PeekingIterator:洞察先机 标准的 Iterator 接口仅提供 hasNext() 和 next() 方法,这在某些场景下显得力不从心。当需要“预读”下一个元素以做出决策,但又不想立即消耗它时,PeekingIterato


🍎 Electron 桌面端应用合法性问题全流程解决指南(新手友好版)
去码头整点薯片2025/11/8

本文目标:帮助你把本地的 Electron 应用打包成 macOS 的 .dmg,并做到打开不再被 Gatekeeper 拦截(不再提示“来自身份不明的开发者/无法验证是否含有恶意软件”)。 适用对象:个人开发者 & 小团队。 🧩 一、问题场景 当你满心欢喜地将精心开发的 Electron 应用打包分发给用户,却接到反馈:在 macOS 上无法打开,系统弹窗冷冰冰地提示“无法验证开发者”,文件被直接移入废纸篓。 如果这个场景让你感同身受,那么你正遭遇 macOS 强大的 Gatekeepe


Python 的内置函数 globals
IMPYLH2025/11/6

Python 内建函数列表 > Python 的内置函数 globals Python 的内置函数 globals() 是一个非常重要的工具函数,它返回一个字典,表示当前全局符号表。这个字典包含了当前模块中定义的所有全局变量、函数和类的名称及其对应的值。 def globals(): ''' 返回实现当前模块命名空间的字典 :return: 当前模块命名空间的字典 ''' 具体来说: 返回值是一个字典对象字典的键是变量名或函数名(字符串形式)字典的值是


Python 的内置函数 filter
IMPYLH2025/11/1

Python 内建函数列表 > Python 的内置函数 eval Python 的内建函数 filter() 是一个非常有用的高阶函数,它用于对可迭代对象进行筛选过滤。它的基本工作原理是根据指定的函数条件,从输入的可迭代对象中筛选出符合条件的元素,返回一个迭代器对象。 def filter(fn, iterable): ''' 过滤数据 :param fn: 回调函数,返回 True 是元素保留,返回 False 时元素去除 :param iterable


彻底搞懂「字幕」:从格式、软硬到嵌入,告别所有困惑
mortimer2025/10/30

“我明明把字幕加上了,为什么播放时看不到?” “为什么别人的视频能切换中英文字幕,我的就不行?” “软字幕、硬字幕、.srt、.ass……这些到底是什么鬼?” 如果你曾被这些问题困扰,那么这篇文章就是为你准备的。我将用最清晰的逻辑和最易懂的语言,一次性讲透关于字幕的所有核心知识。 第一站:先懂最核心的区别 —— 软字幕 vs. 硬字幕 在讨论任何格式或工具之前,你必须先理解字幕最终呈现的两种形态。这决定了你的字幕能否被关闭或切换。 硬字幕 (Hard Subs):印在画面上的“纹身” 硬字幕


Redis(87)Redis缓存的LRU淘汰策略如何配置?
Victor3562025/10/27

配置Redis缓存的LRU(Least Recently Used,最近最少使用)淘汰策略需要以下几个步骤: 设置最大内存限制 选择和配置淘汰策略 验证配置 以下是详细的步骤和代码示例。 1. 设置最大内存限制 可以在Redis配置文件 redis.conf 中设置 maxmemory 选项来限制Redis使用的最大内存。 # redis.conf # 设置最大内存为256MB maxmemory 256mb 如果你不使用配置文件,可以通过命令行参数直接设置: redis-server -


Redis(80)如何解决Redis的缓存穿透问题?
Victor3562025/10/24

解决Redis缓存穿透问题的方法有多种,具体的解决方案可以根据实际情况选择。以下是几种常见的解决方法及详细的代码示例,包括缓存空结果、使用布隆过滤器以及参数校验。 1. 缓存空结果 当查询数据库返回空结果时,也将其缓存起来,并设置一个较短的过期时间,比如5分钟。这样即使请求数据不存在,也不需要每次都访问数据库。 示例代码: import redis.clients.jedis.Jedis; import com.fasterxml.jackson.databind.ObjectMapper;


零代码改造 + 全链路追踪!Spring AI 最新可观测性详细解读
阿里云云原生2025/10/22

作者:张铭辉(希铭) 本文部分总结于 8 月的 LoongSuite 系列公开课及 AI 原生应用实战营 meetup 分享内容。如需查看原视频欢迎移步:developer.aliyun.com/live/255218… 前言:AI Agent 从 Demo 到生产阶段的挑战 自 2022 年底 GPT-3.5 引爆大模型革命以来,AI 应用经历了从技术探索到产业落地的快速演进。开源模型迭代与低代码平台的兴起,推动了 AI Agent 开发效率的显著提升。然而,行业普遍面临一个核心矛盾:绝大多数

首页编辑器站点地图

Copyright © 2025 聚合阅读

License: CC BY-SA 4.0