这是 OpenAI 文档中流式响应的代码 platform.openai.com/docs/guides…
1import { OpenAI } from "openai"; 2const client = new OpenAI(); 3 4const stream = await client.responses.create({ 5 model: "gpt-5", 6 input: [ 7 { 8 role: "user", 9 content: "Say 'double bubble bath' ten times fast.", 10 }, 11 ], 12 stream: true, 13}); 14 15for await (const event of stream) 16 console.log(event); 17} 18
在我第一次看到这段代码时,有无数的疑惑出现在了我的大脑中:
stream是什么?- 为什么可以通过
for await of来遍历? - 这和异步有什么关系吗?
- 服务端要如何将
stream一点点返回给前端? - 前端要如何接收数据?
- ……
如果你也有类似的疑问,请耐心阅读本文,相信你一定能找到答案。
本文的代码在这里 github.com/wangkaiwd/a…
Iterable protocol 和 Iterator protocol
支持 for...of 循环的变量,一定要符合 Iterable protocol 和 Iterator protocol
Iterable protocol :
- 变量是一个对象
- 对象必须实现
[Symbol.iterator]方法 [Symbol.iterator]方法必须返回遵循Iterator protocol约定的对象
Iterator protocol :
- 变量是一个对象
- 对象必须实现
next方法 next方法要返回一个对象{ done: boolean, value: any }done表示迭代是否结束value表示迭代器的返回值
下面是一个示例:
1function makeIterableObj (array: any[]) { 2 return { 3 [Symbol.iterator] () { 4 let nextIndex = 0 5 return { 6 next () { 7 if (nextIndex < array.length) { 8 const result = { value: array[nextIndex], done: false } 9 nextIndex++ 10 return result 11 } 12 return { done: true, value: undefined } 13 }, 14 } 15 }, 16 } 17} 18 19const iterableObj = makeIterableObj(['one', 'two']) 20
可以手动循环 iterableObj
1const iterator = iterableObj[Symbol.iterator]() 2while (true) { 3 const { value, done } = iterator.next() 4 if (done) { 5 break 6 } 7 console.log('value', value) 8} 9 10// 输出结果 11// value one 12// value two 13
也可以通过 for...of 来循环 iterableObj :
1// 这里的 item 就是 next 方法执行后得到的 value 2for (const item of iterableObj) { 3 console.log('item', item) 4} 5 6// 输出结果 7// item one 8// item two 9
Async iterable protocol 和 Async iterator protocol
理解了 iterable protocol 和 iterator protocol 再来理解 async iterable protocol 和 async iterator protocol 就会容易很多。
异步相比于同步,有以下区别:
- 对象必须有 [Symbol.asyncIterator]() 方法
- [Symbol.asyncIterator]() 返回
async iterator async iterator的next方法返回Promise,Promise成功时的值为{ value: any, done: boolean }
同样的示例改为异步版本:
1const sleep = (result: IResult) => { 2 return new Promise<IResult>((resolve) => { 3 setTimeout(() => { 4 resolve(result) 5 }, 1000) 6 }) 7} 8 9function makeIterableObj (array: any[]) { 10 return { 11 [Symbol.asyncIterator] () { 12 let nextIndex = 0 13 return { 14 next () { 15 if (nextIndex < array.length) { 16 const promise = sleep({ value: array[nextIndex], done: false }) 17 nextIndex++ 18 return promise 19 } 20 return sleep({ done: true, value: undefined }) 21 }, 22 } 23 }, 24 } 25} 26
手动循环:
1const asyncIterableObj = makeIterableObj(['one', 'two']) 2const iterator = asyncIterableObj[Symbol.asyncIterator]() 3while (true) { 4 const { value, done } = await iterator.next() 5 if (done) { 6 break 7 } 8 console.log('value', value) 9} 10
使用 for await ... of 循环
1for await (const item of makeIterableObj(['one', 'two'])) { 2 console.log('item', item) 3} 4
此时再回到开篇的示例:
1const stream = await client.responses.create() 2
stream 其实就是一个遵循 async iterable protocol 的对象
可读流 ReadableStream
下面是一个 ReadableStream 的示例:每隔 1s 向流中写入4个字符,直到字符完全写入到流中
1let mockData = [`This is a sample string that will be streamed in chunks.`](https://xplanc.org/primers/document/zh/03.HTML/EX.HTML%20%E5%85%83%E7%B4%A0/EX.a.md) 2 3let timer: any = null 4const step = 4 5 6const stream = new ReadableStream({ 7 start (controller) { 8 timer = setInterval(() => { 9 const chunk = mockData.slice(0, step) 10 // 删除已经写入的字符 11 mockData = mockData.slice(step) 12 if (!mockData) { 13 // 字符处理完成后,停止写入 14 controller.close() 15 if (timer) { 16 clearInterval(timer) 17 timer = null 18 } 19 } 20 // 添加字符到 stream 21 controller.enqueue(chunk) 22 }, 1000) 23 }, 24 cancel () { 25 clearInterval(timer) 26 }, 27}) 28
ReadableStream 默认实现了 Symbol.asyncIterator ,所以它是一个异步可迭代对象,可以使用 for await ... of 来循环
1for await (const chunk of stream) { 2 console.log('chunk', chunk) 3} 4
ReadableStream 自己也提供了 getReader 方法来读取流:
1const stream = createStream() 2const reader = stream.getReader() 3// 循环直到 done 为 true 时结束 4while (true) { 5 const { done, value } = await reader.read() 6 if (done) { 7 break 8 } 9 console.log('value', value) 10} 11
这是 mdn 官方仓库中的一个示例,也可以结合一起学习:github.com/mdn/dom-exa…
服务端 SSE
目前的 AI 应用服务端流式响应使用 Server-Sent Events 来实现,简称 SSE 。下面是 ChatGPT 网页版的响应内容:
mdn 的相关介绍在这里:developer.mozilla.org/en-US/docs/…
sse 示例
MDN 的示例是使用 PHP 实现的,代码比较难懂,我也没有找到一个可以直接运行的案例。为了方便理解,我参考 stackoverflow.com/questions/3… ,使用 express 实现了流式响应:
1import express from 'express' 2 3const app = express() 4app.use(express.static('public')) 5 6app.get('/countdown', function (req, res) { 7 // sse 响应头设置 8 res.writeHead(200, { 9 'Content-Type': 'text/event-stream', 10 'Cache-Control': 'no-cache', 11 'Connection': 'keep-alive', 12 }) 13 let timer: NodeJS.Timeout | null = null 14 let count = 10 15 timer = setInterval(() => { 16 if (count >= 0) { 17 // 返回内容必须严格遵守格式 18 res.write('data: ' + count + '\n\n') 19 count-- 20 return 21 } 22 // count 小于0时,停止响应 23 if (timer) { 24 clearInterval(timer) 25 timer = null 26 } 27 res.end() 28 }, 1000) 29}) 30 31app.listen(3000, () => console.log('SSE app listening on port 3000')) 32
这段代码会每隔 1s 在响应中写入 count ,直到 count < 0 时结束响应。
代码中以下内容需要注意:
- 响应头设置:
'Content-Type': 'text/event-stream' - 返回内容必须严格遵守格式:
data:+ 空格 + 字符串 + 两个换行符 (\n\n)
AI 流式响应
上面我们先实现了一个简单的流式响应,现在我们把 AI 结合进来
1const client = new OpenAI({ 2 apiKey: process.env.OPENAI_API_KEY, 3 baseURL: 'https://api.deepseek.com', 4}) 5 6const app = express() 7app.use(express.static('public')) 8 9app.get('/chat', async function (req, res) { 10 res.writeHead(200, { 11 'Content-Type': 'text/event-stream', 12 'Cache-Control': 'no-cache', 13 'Connection': 'keep-alive', 14 }) 15 const stream = await client.chat.completions.create({ 16 model: 'deepseek-chat', 17 messages: [{ role: 'user', content: '你是谁?' }], 18 stream: true, 19 }) 20 for await (const chunk of stream) { 21 const content = chunk.choices[0].delta.content 22 // 注意:这里通过 JSON.stringify 来返回 JSON 字符串,更加灵活 23 res.write([`data: ${JSON.stringify({ content })}\n\n`](https://xplanc.org/primers/document/zh/03.HTML/EX.HTML%20%E5%85%83%E7%B4%A0/EX.data.md)) 24 } 25 res.write([`data: [DONE]\n\n`](https://xplanc.org/primers/document/zh/03.HTML/EX.HTML%20%E5%85%83%E7%B4%A0/EX.data.md)) 26 res.end() 27}) 28 29app.listen(3000, () => console.log(` 30SSE app listening on port 3000 31Open http://localhost:3000/sse-ai.html in your browser to access page. 32`)) 33
有以下几点需要注意:
- 如果使用的是
OpenAI兼容的api,例如我在当前示例中使用的deepseek,要使用之前的OpenAI请求标准:github.com/openai/open… 用法和传参都不一样,需要特别留意
- 返回内容要通过
JSON.stringify来处理,方便我们给前端返回更多字段 - 结束时返回
res.write(data: [DONE]\n\n),方便前端使用EventSource时终止请求
前端处理流式响应
EventSource
前端可以使用 EventSource 来处理 sse 响应的内容,代码如下:
1const stop = document.getElementById('stop') 2const start = document.getElementById('start') 3let eventSource = null 4start.addEventListener('click', () => { 5 const eventSource = new EventSource('/chat') 6 eventSource.onmessage = function (event) { 7 // 要手动关闭,否则会一直请求服务器 8 if (event.data === '[DONE]') { 9 eventSource.close() 10 return 11 } 12 const json = JSON.parse(event.data) 13 document.getElementById('log').innerHTML += json.content 14 } 15}) 16stop.addEventListener('click', function () { 17 eventSource.close() 18}) 19
EventSource 有一个细节需要注意:
如果没有调用 eventSource.close() 方法,那么请求会一直不停的发起。所以我在服务端特意在响应结束时返回 data: [DONE]\n\n 来让前端知道什么时候关闭 eventSource
fetch
前面我们介绍了通过 EventSource 来处理服务端的流式响应,但其实它存在很多问题:
- 只能发起
get请求 - 请求参数只能在
url中传递,但是一般要传入给AI的提示词长度可能较大,容易超过url长度的最大限制 - 无法自定义请求头来设置
Authorization,给服务端传递用户token
基于上述的这些原因,我们通常会使用 fetch 方法来处理服务端的流式响应。github.com/Azure/fetch… 就是基于 fetch 实现的用来发起 EventSource 请求的开源库,下面是它的使用示例:
1<script type="module"> 2 import { fetchEventSource } from "https://esm.sh/@microsoft/fetch-event-source"; 3 4 const stop = document.getElementById("stop"); 5 const start = document.getElementById("start"); 6 const controller = new AbortController(); 7 start.addEventListener("click", () => { 8 // 发起post请求 9 fetchEventSource("/chat", { 10 signal: controller.signal, 11 method: "POST", 12 // 一点点处理服务端响应 13 onmessage: (event) => { 14 const data = event.data; 15 if (data === "[DONE]") { 16 console.log("done"); 17 return; 18 } 19 const json = JSON.parse(data); 20 document.getElementById("log").innerHTML += json.content; 21 }, 22 }); 23 }); 24 stop.addEventListener("click", function () { 25 controller.abort(); 26 }); 27</script> 28
这里使用的 POST 请求,我把服务端的示例改为了 all 方法来接收请求,可以同时处理 GET 和 POST 请求
我们也可以自己通过 fetch 请求来看看具体的响应内容
1const response = await fetch("/chat", { 2 signal: controller.signal, 3 method: "POST", 4}); 5
这里的 response.body 就是一个 ReadableStream (ps: 前面的章节有介绍过ReadableStream ,忘记的同学可以再回去看一下 ),所以我们可以通过 for await ... of 或者 getReader 方法来拿到 ReadableStream 中的数据:
1const textDecoder = new TextDecoder(); 2// response.body 是可读流 3for await (const chunk of response.body) { 4 // chunk 是 Uint8Array ,通过 TextDecoder 转换为字符串 5 console.log('chunk', chunk) 6 const text = textDecoder.decode(chunk); 7 if (text === "[DONE]") { 8 console.log("done"); 9 return; 10 } 11 console.log('text', text) 12} 13 14// 使用 getReader 方法获取数据 15// const reader = response.body.getReader(); 16// while (true) { 17// const { done, value } = await reader.read(); 18// if (done) { 19// break; 20// } 21// const text = textDecoder.decode(value); 22// if (text === "[DONE]") { 23// console.log("done"); 24// return; 25// } 26// console.log('text', text) 27// } 28
最终结果如下:
我们拿到的是服务端返回符合 SSE 规范的字符串,将字符根据规则解析后,就能拿到最终的结果了。这其实就是 fetch-event-source 帮我们实现的逻辑
踩坑
我在使用 fetch-event-source 的过程中发现了如下问题:
如果服务端返回的内容只包含 \n ,那么前端接收到的内容为空字符。在 markdown 渲染的场景下,会导致格式完全错乱。 下面是伪代码,方便理解
1// 服务端如果返回的内容如果只包含 \n 2res.write('data: ' + '\n\n' + '\n\n') 3 4// 前端拿到的内容为空字符串 5onmessage: (event) => { 6 const data = event.data; 7 // true 8 console.log(data === '') 9} 10
官方也有相关的 issue 一直没有修复:github.com/Azure/fetch…
所以在使用 fetch-event-source 时可以通过 JSON.stringify 来传入 json 字符串,防止前端接收到空字符串
1const content = chunk.choices[0].delta.content 2// JSON.stringify 避免了返回内容只有 `\n` 的情况 3res.write([`data: ${JSON.stringify({ content })}\n\n`](https://xplanc.org/primers/document/zh/03.HTML/EX.HTML%20%E5%85%83%E7%B4%A0/EX.data.md)) 4
结语
在 AI 出现之前,这些知识很少有使用场景。但随着 AI 的快速发展,这些代码不断地出现在我眼前,也让我有了更多实践的机会。这篇文章是我在实践中的一些沉淀和总结,希望能帮到你。
参考
- developer.mozilla.org/en-US/docs/…
- developer.mozilla.org/en-US/docs/…
- developer.mozilla.org/en-US/docs/…
- github.com/mdn/dom-exa…
- developer.mozilla.org/en-US/docs/…
- tpiros.dev/blog/stream…
《一文搞懂 AI 流式响应》 是转载文章,点击查看原文。