Walkthrough | 构建 Async Stream¶
难度:⭐⭐ 时间:~1h 目标:手写一个 async stream utility,类似 Anthropic SDK
1. Async Stream 是什么¶
Async Stream = 异步 + 流式 + 可消费。 - 模拟 Anthropic SDK 的 streamMessage - 处理 SSE - 提供 helper
2. 目标¶
手写一个 MiniAnthropic 类:
- messages.create() 同步
- messages.stream() 异步流
- 文本聚合 helper
- 错误处理
3. 完整代码¶
// mini-anthropic.ts
type Message = {
role: 'user' | 'assistant' | 'system'
content: string
}
type Usage = {
input_tokens: number
output_tokens: number
}
type Response = {
id: string
content: Array<{ type: 'text'; text: string }>
usage: Usage
stop_reason: 'end_turn' | 'max_tokens' | 'stop_sequence'
model: string
}
type StreamEvent =
| { type: 'message_start'; message: Response }
| { type: 'content_block_start'; index: number; content_block: any }
| { type: 'content_block_delta'; index: number; delta: { type: 'text_delta'; text: string } }
| { type: 'content_block_stop'; index: number }
| { type: 'message_delta'; delta: { stop_reason: string } }
| { type: 'message_stop' }
class AnthropicError extends Error {
constructor(public status: number, message: string) {
super(message)
this.name = 'AnthropicError'
}
}
export class MiniAnthropic {
private apiKey: string
private baseURL: string
constructor(apiKey: string, baseURL = 'https://api.anthropic.com') {
this.apiKey = apiKey
this.baseURL = baseURL
}
messages = {
create: async (params: {
model: string
max_tokens: number
messages: Message[]
system?: string
}): Promise<Response> => {
const response = await fetch(`${this.baseURL}/v1/messages`, {
method: 'POST',
headers: this.headers(false),
body: JSON.stringify(params),
})
if (!response.ok) {
const error = await response.text()
throw new AnthropicError(response.status, error)
}
return await response.json()
},
stream: async function* (
this: MiniAnthropic,
params: any,
): AsyncGenerator<StreamEvent> {
const response = await fetch(`${this.baseURL}/v1/messages`, {
method: 'POST',
headers: this.headers(true),
body: JSON.stringify({ ...params, stream: true }),
})
if (!response.ok) {
throw new AnthropicError(response.status, await response.text())
}
const reader = response.body!.getReader()
const decoder = new TextDecoder()
let buffer = ''
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
// SSE: events separated by \n\n
const events = buffer.split('\n\n')
buffer = events.pop() || ''
for (const event of events) {
// Parse "event: type\ndata: json"
const lines = event.split('\n')
let eventType = ''
let dataLine = ''
for (const line of lines) {
if (line.startsWith('event: ')) eventType = line.slice(7)
if (line.startsWith('data: ')) dataLine = line.slice(6)
}
if (eventType && dataLine && dataLine !== '[DONE]') {
try {
yield JSON.parse(dataLine)
} catch (e) {
// skip
}
}
}
}
},
}
// 5. helper: 收集文本
async collectText(stream: AsyncGenerator<StreamEvent>): Promise<{
text: string
usage: Usage | null
}> {
let text = ''
let usage: Usage | null = null
for await (const event of stream) {
if (event.type === 'content_block_delta' && event.delta?.type === 'text_delta') {
text += event.delta.text
}
if (event.type === 'message_delta') {
// 推测:包含 usage
}
if (event.type === 'message_start') {
usage = event.message.usage
}
}
return { text, usage }
}
// 6. helper: with retry
async withRetry<T>(fn: () => Promise<T>, maxRetries = 3): Promise<T> {
let lastError: Error | null = null
for (let i = 0; i < maxRetries; i++) {
try {
return await fn()
} catch (e) {
lastError = e as Error
if (e instanceof AnthropicError && e.status < 500 && e.status !== 429) {
throw e // 不 retry 4xx (除 429)
}
await new Promise((r) => setTimeout(r, 1000 * 2 ** i))
}
}
throw lastError
}
private headers(stream: boolean): Record<string, string> {
return {
'Content-Type': 'application/json',
'x-api-key': this.apiKey,
'anthropic-version': '2023-06-01',
...(stream ? { 'Accept': 'text/event-stream' } : {}),
}
}
}
~150 行。
4. 使用示例¶
4.1 同步调用¶
const client = new MiniAnthropic(process.env.ANTHROPIC_API_KEY!)
const response = await client.messages.create({
model: 'claude-sonnet-4-6',
max_tokens: 1024,
messages: [{ role: 'user', content: 'Hello' }],
})
console.log(response.content[0].text)
简单。
4.2 流式 + 文本收集¶
const stream = client.messages.stream({
model: 'claude-sonnet-4-6',
max_tokens: 1024,
messages: [{ role: 'user', content: 'Tell a story' }],
})
const { text, usage } = await client.collectText(stream)
console.log('Text:', text)
console.log('Tokens:', usage)
流 + 收集。
4.3 实时流¶
for await (const event of client.messages.stream(params)) {
if (event.type === 'content_block_delta') {
process.stdout.write(event.delta.text)
}
}
实时。
4.4 重试¶
retry。
5. 5 个关键设计¶
5.1 Async Iterator¶
async function*。
5.2 SSE 解析¶
SSE 标准。
5.3 收集 helper¶
helper。
5.4 Retry with backoff¶
exponential。
5.5 4xx 不 retry¶
smart retry。
6. 5 个扩展¶
6.1 Token 累加¶
class TokenCounter {
private total: Usage = { input_tokens: 0, output_tokens: 0 }
add(usage: Usage) {
this.total.input_tokens += usage.input_tokens
this.total.output_tokens += usage.output_tokens
}
get() { return this.total }
getCost(model: string) { /* USD */ }
}
counter。
6.2 Cost 计算¶
const PRICING = {
'claude-sonnet-4-6': { in: 3, out: 15 }, // per 1M
}
function cost(model: string, usage: Usage): number {
const p = PRICING[model]
return (usage.input_tokens * p.in + usage.output_tokens * p.out) / 1_000_000
}
cost。
6.3 多 API key rotation¶
class ClientWithRotation {
private keys: string[] = []
private idx = 0
rotate() {
this.idx = (this.idx + 1) % this.keys.length
}
}
rotation。
6.4 Streaming cancel¶
async function* streamWithCancel(source, signal) {
for await (const event of source) {
if (signal.aborted) return
yield event
}
}
cancel。
6.5 Custom retry policy¶
interface RetryPolicy {
shouldRetry(error: Error, attempt: number): boolean
delay(attempt: number): number
}
policy。
7. 5 个关键洞察¶
- async function* = 流
- SSE 解析 = buffer split
- collectText = helper 简化
- Retry with backoff = 必备
- 4xx 不 retry = 智能
8. 对比真实 SDK¶
| 维度 | Mini | 真实 |
|---|---|---|
| 行数 | 150 | 3419 |
| Stream | 基础 | 完整 |
| Retry | 简单 | jitter + policy |
| 缓存 | 无 | 自动 |
| 多 key | 无 | rotation |
| Beta | 无 | 完整 |
简化 vs 真实。
9. 5 个练习¶
- 加 token 累加 —— TokenCounter class
- 加 cost 计算 —— PRICING dict
- 加 streaming cancel —— AbortController
- 加 custom retry policy —— interface
- 加多 key rotation —— rotate on 429
5 步。
10. 总结¶
构建 Async Stream = async function* + SSE + helper。
核心: - 150 行简化版 - stream + create - collectText helper - retry with backoff
下一步: - 用在 QueryEngine - 加 token 累加 - 加 cancel