跳转至

阶段 6 | Agent 循环 + 流式 API

目标:理解 Claude Code 的"思考 → 行动 → 观察"循环怎么从前端代码角度实现 —— 流式响应解析、工具调用、错误重试、上下文压缩、多 agent 协调。 时长:2~3 天 前端类比:聊天 IM 客户端的核心循环 —— WebSocket 收消息 → 解析 → 渲染 → 发消息。


6.1 三大件

src/query.ts                1729 行   query()  +  queryLoop()       ← 核心循环
src/QueryEngine.ts          1295 行   class QueryEngine + ask()    ← 状态机封装
src/services/api/claude.ts  3419 行   API 客户端(SDK 包装 + 流处理)  ← 传输层

⚠️ 这三个文件任何一个都 > 1000 行,别想顺序读完。从接口和类型入手。

6.2 核心循环:query.tsasync function* query()

// src/query.ts:219
export async function* query(
  messages: Message[],
  systemPrompt: SystemPrompt,
  context: ToolUseContext,
  canUseTool: CanUseToolFn,
  options: QueryOptions
): AsyncGenerator<StreamEvent, void, void>

// src/query.ts:241
async function* queryLoop(
  // ... 同 query 但内部循环
): AsyncGenerator<StreamEvent, void, void>

6.2.1 关键设计:异步生成器

async function* + yield 是 Claude Code 流式架构的基石:

async function* query(...) {
  while (true) {
    // 1. 构造请求
    const stream = await makeStreamRequest(...)

    // 2. 逐 token 解析
    for await (const event of stream) {
      yield event  // 转发每个 stream event
    }

    // 3. 检查是否需要继续
    if (shouldTerminate(state)) break

    // 4. 准备下一轮
  }
}

为什么用 async function* 而不是 Promise?

模式 能力
Promise<T> 一次返回,无法中途消费
EventEmitter 多消费者、但缺 backpressure
Observable(RxJS) 强大但重、学习曲线
async function*(TC39) 标准、backpressure 友好、可被 for await 消费、可被 for await break 取消

前端类比:和 React Server Components 的 streaming、fetchReadableStream、WebSocket 的 message event 是同种思想。Claude Code 选 async function*最符合 JavaScript 标准的方式

6.2.2 queryLoop 的核心职责

观察 1729 行的 query.ts,循环内部必做的事:

  1. 构造 messages 数组 —— 加入 system prompt、tool results、上轮 assistant 响应
  2. 发起 API 请求 —— services/api/claude.ts 的流式调用
  3. 解析 stream 事件
  4. message_start → 标记新消息开始
  5. content_block_start → 文本/tool_use block 开始
  6. content_block_delta流式更新(每个 token)
  7. content_block_stop → block 结束
  8. message_delta → finish_reason、usage 统计
  9. message_stop → 整条消息结束
  10. 检查 tool_use:如果 assistant 返回了 tool_use,准备执行
  11. 执行工具(带权限检查):canUseTool(tool, input) → 用户授权 → 执行
  12. 工具结果回填:构造 tool_result message,拼到 messages 末尾
  13. 检查 finish_reason
  14. end_turn → 用户完成,break
  15. tool_use → 工具调用后继续循环
  16. max_tokens / stop_sequence → 异常处理
  17. 错误重试withRetry.ts 处理可重试错误(429、5xx)
  18. 上下文压缩(如果 token 接近上限):compact/autoCompact.ts
  19. 回到步骤 1(如果还需要继续)

6.3 状态机封装:QueryEngine class

// src/QueryEngine.ts:184
export class QueryEngine {
  // 内部状态
  // 事件源
  // 重试策略
  // 配额追踪

  // 公开方法
  async *ask(messages, options): AsyncGenerator<...>
}

6.3.1 为什么用 class 不用函数?

QueryEngine 内部维护跨调用的状态: - 重试计数器 - 上次 API 响应时间 - 配额消耗追踪 - 限流 backoff 状态 - 链式调用追踪(QueryChainTracking

前端类比:和"长连接客户端"的封装方式一样 —— IM 客户端、GraphQL Client、WebSocket Client 通常都用 class。

6.3.2 ask() 方法签名

// src/QueryEngine.ts:1186
export async function* ask(...): AsyncGenerator<...>

ask() 是更上层的 API,封装"一次完整的多轮对话"。可能在内部多次调用 query()

6.4 传输层:services/api/claude.ts

3419 行 —— 整个项目第二大文件(仅次于 REPL.tsx 的 5005 行) 角色:Anthropic API SDK 的完整封装

6.4.1 关键 import 透出的能力

观察 claude.ts 头部 import:

导入 作用
@anthropic-ai/sdk/resources/beta/messages/messages.mjs Beta API(流式、thinking、tools)
utils/model/providers.js 多种 API provider:Anthropic 直连 / AWS Bedrock / GCP Vertex
constants/system.js System prompt 前缀、attribution header
utils/auth.js 凭据管理:API key / OAuth / AWS creds / GCP creds
Tool.js 工具接口
tasks/... 任务协调

关键洞察 1Claude Code 不只接 Anthropic 官方 API —— 还支持 AWS Bedrock、GCP Vertex、first-party OAuth (claude.ai 订阅)。这是企业级部署的标配。

关键洞察 23419 行的封装说明 Claude Code 用了大量 beta API 能力(extended thinking、prompt caching、tool use、message batches...)。这是"吃透 SDK"的活儿

6.4.2 SDK 客户端构造:services/api/client.ts

// src/services/api/client.ts
import Anthropic, { type ClientOptions } from '@anthropic-ai/sdk'
import { getProxyFetchOptions } from 'src/utils/proxy.js'

// 4 种认证方式
const apiKey = await getAnthropicApiKey()
const oauthTokens = await getClaudeAIOAuthTokens()
const awsCreds = await refreshAndGetAwsCredentials()
const gcpCreds = await refreshGcpCredentialsIfNeeded()

// 根据 provider 选 ClientOptions
let options: ClientOptions
if (isFirstPartyAnthropicBaseUrl()) {
  options = { apiKey, authToken, ...getProxyFetchOptions() }
} else if (isAwsBedrock()) {
  options = { awsCreds, region, ... }
} else if (isGcpVertex()) {
  options = { gcpCreds, ... }
}

前端类比:和"API client 工厂"模式一样 —— 根据环境变量选不同的后端。

6.5 重试与限流

核心文件: - src/services/api/withRetry.ts - src/services/api/errors.ts - src/services/api/errorUtils.ts - src/services/api/claudeAiLimits.ts + claudeAiLimitsHook.ts - src/services/policyLimits/index.ts(企业策略限制)

6.5.1 重试策略

// withRetry.ts 推测导出
export async function* withRetry<T>(
  fn: () => AsyncGenerator<T>,
  classifyError: (err) => RetryableError | null
): AsyncGenerator<T>

categorizeRetryableAPIError() 区分: - 可重试:429(限流)、5xx(服务端错误)、网络断开 - 不可重试:400(请求无效)、401(认证失败)、402(欠费)

6.5.2 限流消息

rateLimitMessages.ts 提供了人类可读的限流提示

"You're sending requests too quickly. Please wait 30 seconds."

claudeAiLimits.ts 追踪claude.ai 订阅用户的剩余配额

6.5.3 企业策略

policyLimits.ts 处理企业 IT 设置的限额 —— 老板可以限制员工每天用 Claude Code 的次数。

6.6 上下文压缩系统

核心目录src/services/compact/ 10 个文件 —— 大模型对话的"GC"

6.6.1 压缩类型

文件 触发条件 策略
autoCompact.ts token 接近上限(90%?) 整段对话摘要
microCompact.ts 局部 token 累积 只压缩工具结果
apiMicrocompact.ts API 级别的 microCompact 由 SDK 支持
reactiveCompact.ts (Ant-only) 反应式压缩 DCE 门控
contextCollapse.ts (Ant-only) 上下文坍缩 DCE 门控
compact.ts 手动 /compact 命令 用户主动
compactWarningHook.ts 即将压缩时警告 hook 钩子
compactWarningState.ts 警告状态 UI 状态
sessionMemoryCompact.ts 压缩到 session memory 跨会话记忆
grouping.ts 消息分组(决定压缩哪几段) 算法
timeBasedMCConfig.ts 基于时间的 microCompact 配置 配置
postCompactCleanup.ts 压缩后清理 资源释放

关键洞察压缩是分层策略: - microCompact:每次工具结果太大就压缩(高频、轻量) - autoCompact:累计到阈值就整段摘要(中频、重量) - reactiveCompact:动态判断(中频、智能) - sessionMemoryCompact:跨会话记忆(异步、长期)

前端类比:和"前端性能优化"分层的思路一样 —— 微任务(debounce/throttle)、长任务(virtualization)、长期优化(lazy load)。

6.6.2 压缩的挑战

压缩的本质是"丢信息": - 哪些信息可以丢?grouping.ts 决定 - 压缩比例多少?timeBasedMCConfig.ts 配置 - 什么时候提醒用户?compactWarningHook.ts 触发 - 怎么让 LLM 知道压缩发生了?buildPostCompactMessages()(query.ts 用)

6.7 多 Agent 协调:src/tasks/

核心:Claude Code 的"多 agent swarm"模式

6.7.1 任务类型

任务类型 文件 角色
LocalMainSessionTask LocalMainSessionTask.ts 主会话任务
LocalAgentTask/ 子目录 本地子 agent
RemoteAgentTask/ 子目录 远程 agent
InProcessTeammateTask/ 子目录 进程内队友 agent
LocalShellTask/ 子目录 本地 shell 长跑任务
DreamTask/ 子目录 后台 dream 任务
stopTask.ts 停止任务的工具 任务管理

6.7.2 Task 抽象

// src/Task.ts 头部
export type TaskType =
  | 'local_bash'
  | 'local_agent'
  | 'remote_agent'
  | 'in_process_teammate'
  | 'local_workflow'
  | 'monitor_mcp'
  | 'dream'

export type TaskStatus =
  | 'pending'
  | 'running'
  | 'completed'
  | 'failed'
  | 'killed'

export function isTerminalTaskStatus(status: TaskStatus): boolean

关键设计: - 统一 Task 抽象 —— 不管什么任务都映射到 TaskType + TaskStatus - isTerminalTaskStatus —— 状态机保护,避免向死任务注入消息 - TaskHandle —— cleanup 函数,和 React useEffect cleanup 同形

6.7.3 TaskState 形状

// src/tasks/types.ts
export type TaskState = LocalAgentTaskState | InProcessTeammateTaskState | ...

TypeScript 联合类型 + 类型守卫

function isInProcessTeammateTask(task: TaskState): task is InProcessTeammateTaskState {
  return task.type === 'in_process_teammate'
}

前端类比:和 Redux 的 normalized state + selectors 同种模式。

6.8 Coordinator 模式

核心src/coordinator/

coordinatorMode.ts 推测是多 agent 协调的状态机 —— 决定谁是 leader、谁负责什么、消息怎么路由。

多 agent 协调的关键问题: 1. 任务分发 —— SendMessageTool 把消息发给谁? 2. 状态同步 —— TeamCreateTool / TeamDeleteTool 怎么管理队伍? 3. 退出策略 —— useTeammateViewAutoExit 何时自动退出队友视图? 4. 权限冲突 —— utils/swarm/leaderPermissionBridge.ts 处理 leader 的工具权限代理

6.9 关键洞察

6.9.1 异步生成器是 Claude Code 的"反应式编程底座"

async function* 同时被 query、QueryEngine、tool.call、API client 使用 —— 这是项目的"惯用法"。理解了它,CLAUDE Code 60% 的代码就能读懂。

6.9.2 状态机分两层

  • QueryEngine(class)单次对话 的状态机(重试、配额、链路追踪)
  • TaskState(union + 守卫)多 agent / 后台任务 的状态机

两层不冲突,各管各的领域。

6.9.3 压缩系统的复杂度反推 LLM 限制

compact/ 10 个文件说明 token 上限是个真问题
学习时关注:
- 怎么算"快满了"(isAutoCompactEnabledcalculateTokenWarningState) - 怎么压缩(不同策略的 trade-off) - 怎么告诉 LLM "上下文变小了"(buildPostCompactMessages

6.9.4 流式 API 的"语义边界"

Claude Code 把流式响应解析成结构化事件StreamEvent),然后在 query.ts 里逐事件 yield
关键:yield 给消费方的是已解析的语义事件,不是原始 SSE 字符串。
前端类比:和 GraphQL 客户端的"cache normalized response"是同种"反序列化"思路。

6.10 阅读清单

  1. src/query.ts:1-100(imports + 类型)—— 看流式事件类型
  2. src/query.ts:219-280query() 函数签名 + queryLoop 开头)—— 看循环骨架
  3. src/QueryEngine.ts:184-280(class 定义 + 状态)—— 看状态机
  4. src/services/api/claude.ts:1-50(imports)—— 看 API 维度(Beta / Bedrock / Vertex)
  5. src/services/api/withRetry.ts —— 重试策略
  6. 🔍 src/services/compact/autoCompact.ts —— 自动压缩触发逻辑
  7. 🔍 src/Task.ts(全文)—— 任务抽象
  8. 🔍 src/tasks/LocalAgentTask/(选一个看)—— 具体任务实现
  9. 📌 src/coordinator/coordinatorMode.ts —— 协调器

6.11 练习任务

  1. 手写一个最小 agent 循环 —— 5 行的 async function* 循环:调 API → 解析 tool_use → 模拟执行 → 拼结果 → 继续
  2. 列出 query.ts 里的所有 yield 点grep -n "yield" src/query.ts),理解每个 yield 触发什么 UI 渲染
  3. 设计压缩策略对比表 —— microCompact / autoCompact / reactiveCompact / contextCollapse 在"何时触发、压缩什么、压缩后怎么注入 LLM"三个维度的差异
  4. 思考:如果让你把 async function* 改成 RxJS Observable,会牺牲什么?得到什么?Claude Code 选 async function* 的理由是什么?

6.12 下一步

进入 阶段 7:高级系统 —— MCP、plugins、skills、bridge、voice、vim、native 桥接的专题。