Walkthrough | 手写 Async Generator¶
难度:⭐⭐⭐ 时间:~1h 目标:理解 async generator 模式,实现一个简单的消息流
1. Async Generator 是什么¶
Async Generator = TypeScript 的 async function* + yield。
- 异步 + 流式
- 懒求值
- 可取消
- backpressure 友好
Claude Code 用例: - API 流式响应 - Bash 输出 - 进度通知
2. 目标¶
手写一个简化版 async generator: - 模拟 API 流式响应 - 处理 cancel - 处理 backpressure - 错误处理
3. 完整代码¶
// mini-async-stream.ts
// 1. 简单流
export async function* simpleStream(items: string[]) {
for (const item of items) {
await new Promise((r) => setTimeout(r, 100)) // 模拟延迟
yield item
}
}
// 2. 带 cancel
export async function* streamWithCancel(
items: string[],
signal: AbortSignal,
) {
for (const item of items) {
if (signal.aborted) {
console.log('cancelled')
return
}
await new Promise((r) => setTimeout(r, 100))
yield item
}
}
// 3. 模拟 Claude API 流
export async function* mockClaudeStream(prompt: string): AsyncGenerator<{
type: 'text' | 'done'
content: string
}> {
// 模拟响应
const response = `Response to: ${prompt}`
for (const char of response) {
await new Promise((r) => setTimeout(r, 20))
yield { type: 'text', content: char }
}
yield { type: 'done', content: '' }
}
// 4. 带错误
export async function* streamWithError(): AsyncGenerator<any> {
try {
yield 'a'
yield 'b'
throw new Error('mid-stream error')
} catch (e) {
yield { type: 'error', error: e }
}
}
// 5. Backpressure 友好
export async function* backpressureStream(
source: AsyncIterable<number>,
process: (n: number) => Promise<void>,
): AsyncGenerator<void> {
for await (const item of source) {
await process(item) // 等待消费者就绪
yield // 让出控制
}
}
// 6. 实际用法
async function main() {
console.log('1. Simple stream:')
for await (const item of simpleStream(['a', 'b', 'c'])) {
console.log(item)
}
console.log('\n2. With cancel:')
const controller = new AbortController()
setTimeout(() => controller.abort(), 150)
try {
for await (const item of streamWithCancel(
['a', 'b', 'c', 'd', 'e'],
controller.signal,
)) {
console.log(item)
}
} catch (e) {
// ignore
}
console.log('\n3. Mock Claude:')
for await (const event of mockClaudeStream('Hello')) {
if (event.type === 'text') process.stdout.write(event.content)
if (event.type === 'done') console.log()
}
}
main()
~100 行。
4. Async Generator 基础¶
4.1 声明¶
async function*。
4.2 使用¶
for await...of。
4.3 取消¶
const controller = new AbortController()
setTimeout(() => controller.abort(), 1000)
for await (const item of gen(controller.signal)) {
// 会被取消
}
AbortController。
4.4 错误处理¶
try/catch。
5. Claude API 流式示例¶
import Anthropic from '@anthropic-ai/sdk'
const client = new Anthropic({ apiKey: process.env.ANTHROPIC_API_KEY! })
async function streamConversation(prompt: string) {
const stream = await client.messages.create({
model: 'claude-sonnet-4-6',
max_tokens: 1024,
messages: [{ role: 'user', content: prompt }],
stream: true,
})
for await (const event of stream) {
if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') {
process.stdout.write(event.delta.text)
}
}
console.log() // 换行
}
streamConversation('Tell a joke')
真实。
6. 5 个 generator 模式¶
6.1 简单 yield¶
基础。
6.2 动态 yield¶
动态。
6.3 无限流¶
无限。
6.4 Transform¶
async function* upper(source: AsyncIterable<string>) {
for await (const s of source) {
yield s.toUpperCase()
}
}
transform。
6.5 Merge¶
merge。
7. 5 个 Claude Code 用例¶
7.1 API 流¶
const stream = await client.messages.create({ ..., stream: true })
for await (const event of stream) { ... }
API。
7.2 Bash 输出¶
Bash。
7.3 Progress 通知¶
async function* progress(total: number) {
for (let i = 0; i < total; i++) {
yield { progress: i, total }
await work()
}
}
Progress。
7.4 消息流¶
async function* messages(turns: Turn[]) {
for (const turn of turns) {
yield turn.user
yield* assistantResponse(turn)
}
}
Messages。
7.5 File 读取¶
async function* readLines(path: string) {
const stream = createReadStream(path, { encoding: 'utf-8' })
const rl = createInterface({ input: stream })
for await (const line of rl) yield line
}
File。
8. 5 个关键洞察¶
async function*是声明for await...of是消费yield是产出AbortController是取消- backpressure 自动
9. 5 个练习¶
- 加 map ——
map<T, U>(source, fn) - 加 filter ——
filter<T>(source, pred) - 加 take ——
take<T>(source, n) - 加 merge ——
merge<T>(...sources) - 加 retry —— 出错时重试
5 步。
10. 总结¶
手写 Async Generator = 理解 async + 流式 + 懒求值。
核心:
- async function* 声明
- for await...of 消费
- yield 产出
- AbortController 取消
- 自动 backpressure
下一步: - 用在 Claude API 流式 - 用在 Bash 输出 - 用在 progress 通知