跳转至

Deep Dive | QueryEngine.ts 1295 行 class 状态机拆解

重要性:⭐⭐⭐⭐(理解 Claude Code 状态机的最佳范本) 真实位置src/QueryEngine.ts1295 行核心组成: - 行 1-128:imports(40+ 业务模块) - 行 130-183:QueryEngineConfig 类型(54 行) - 行 184-1185:class QueryEngine1001 行) - 行 1186-1295:顶层 ask() 函数110 行

关联phase-06-agent-loop.mdtopics/async-generator-pattern.md


1. 文件结构总览

QueryEngine.ts (1295 行)
├── 行 1-128  :imports(40+ 业务模块)
├── 行 130-183:QueryEngineConfig 类型(54 行的"全功能配置")
├── 行 184    :class 注释(说明设计意图)
├── 行 200    :constructor(config)
├── 行 209    :async *submitMessage(...) — 每轮对话入口
├── 行 1186   :顶层 ask() 函数(兼容旧 API)
└── 行 1295   :EOF

2. 行 1-128:Imports —— 40+ 业务模块

import { feature } from 'bun:bundle'
import type { ContentBlockParam } from '@anthropic-ai/sdk/resources/messages.mjs'
import { randomUUID } from 'crypto'
import last from 'lodash-es/last.js'

// 会话状态
import { getSessionId, isSessionPersistenceDisabled } from 'src/bootstrap/state.js'

// SDK 类型(Claude Code 作为库被嵌入时的接口)
import type { SDKCompactBoundaryMessage, SDKMessage, SDKStatus, ... } from 'src/entrypoints/agentSdkTypes.js'

// API 客户端
import { accumulateUsage, updateUsage } from 'src/services/api/claude.js'
import type { NonNullableUsage } from 'src/services/api/logging.js'
import { EMPTY_USAGE } from 'src/services/api/logging.js'
import stripAnsi from 'strip-ansi'

// 业务模块
import type { Command } from './commands.js'
import { getSlashCommandToolSkills } from './commands.js'
import { LOCAL_COMMAND_STDERR_TAG, LOCAL_COMMAND_STDOUT_TAG } from './constants/xml.js'
import { getModelUsage, getTotalAPIDuration, getTotalCost } from './cost-tracker.js'
import type { CanUseToolFn } from './hooks/useCanUseTool.js'

// 记忆
import { loadMemoryPrompt } from './memdir/memdir.js'
import { hasAutoMemPathOverride } from './memdir/paths.js'

// agent 循环
import { query } from './query.js'
import { categorizeRetryableAPIError } from './services/api/errors.js'

// MCP
import type { MCPServerConnection } from './services/mcp/types.js'

// State
import type { AppState } from './state/AppState.js'

// Tool 系统
import { type Tools, type ToolUseContext, toolMatchesName } from './Tool.js'
import type { AgentDefinition } from './tools/AgentTool/loadAgentsDir.js'
import { SYNTHETIC_OUTPUT_TOOL_NAME } from './tools/SyntheticOutputTool/SyntheticOutputTool.js'

// 消息
import type { Message } from './types/message.js'
import type { OrphanedPermission } from './types/textInputTypes.js'

// 工具函数
import { createAbortController } from './utils/abortController.js'
import type { AttributionState } from './utils/commitAttribution.js'
import { getGlobalConfig } from './utils/config.js'
import { getCwd } from './utils/cwd.js'
import { isBareMode, isEnvTruthy } from './utils/envUtils.js'
import { getFastModeState } from './utils/fastMode.js'
// ... 还有 20+ 业务模块

关键洞察:QueryEngine 是个 "控制中心" —— 它知道整个项目的所有业务模块,但只 import 类型 + 调函数没有继承、没有依赖注入框架(直接接 config)。


3. 行 130-183:QueryEngineConfig 类型(54 行)

3.1 完整类型定义

export type QueryEngineConfig = {
  // 必需:会话基础
  cwd: string
  tools: Tools
  commands: Command[]
  mcpClients: MCPServerConnection[]
  agents: AgentDefinition[]
  canUseTool: CanUseToolFn

  // 必需:State 桥接
  getAppState: () => AppState
  setAppState: (f: (prev: AppState) => AppState) => void

  // 可选:历史消息
  initialMessages?: Message[]

  // 可选:文件缓存
  readFileCache: FileStateCache

  // 可选:System prompt
  customSystemPrompt?: string
  appendSystemPrompt?: string

  // 可选:模型选择
  userSpecifiedModel?: string
  fallbackModel?: string
  thinkingConfig?: ThinkingConfig

  // 可选:限制
  maxTurns?: number
  maxBudgetUsd?: number
  taskBudget?: { total: number }

  // 可选:结构化输出
  jsonSchema?: Record<string, unknown>

  // 可选:调试
  verbose?: boolean
  replayUserMessages?: boolean

  // 可选:MCP elicitation
  handleElicitation?: ToolUseContext['handleElicitation']

  // 可选:流式输出
  includePartialMessages?: boolean
  setSDKStatus?: (status: SDKStatus) => void

  // 可选:取消
  abortController?: AbortController

  // 可选:孤儿权限
  orphanedPermission?: OrphanedPermission

  // 可选:历史裁剪
  snipReplay?: (
    yieldedSystemMsg: Message,
    store: Message[],
  ) => { messages: Message[]; executed: boolean } | undefined
}

3.2 关键设计

1. State 是依赖注入的

getAppState: () => AppState
setAppState: (f: (prev: AppState) => AppState) => void

不是 直接 import store而是接收 getter/setter
好处: - 测试时可注入 mock state - 解耦:QueryEngine 不依赖具体 store 实现 - 可同时支持多个 store 实例(REPL、SDK 各自一个)

2. 25 个可选字段 + 7 个必需字段 = 32 个配置点

为什么这么多? - 每个特性都是 opt-in(默认不启用) - 兼容多种使用场景(REPL / SDK / 测试) - 不影响未启用的代码路径

3. snipReplay 字段揭示了高级特性

注释说:"Snip-boundary handler: receives each yielded system message plus the current mutableMessages store. Returns undefined if the message is not a snip boundary..."

"Snip" = 上下文裁剪。长会话时裁剪历史消息以节省 token。
仅 SDK 模式启用(REPL 因为 UI 需要滚动保留全历史,不裁剪)。

4. setSDKStatus 暴露生命周期事件

setSDKStatus?: (status: SDKStatus) => void

SDK 消费者可以监听 compacting / idle / running 等状态。
这是 SDK 的"事件回调" —— 让外部知道内部在做什么。


4. class QueryEngine 注释(行 184-199)

/**
 * QueryEngine owns the query lifecycle and session state for a conversation.
 * It extracts the core logic from ask() into a standalone class that can be
 * used by both the headless/SDK path and (in a future phase) the REPL.
 *
 * One QueryEngine per conversation. Each submitMessage() call starts a new
 * turn within the same conversation. State (messages, file cache, usage, etc.)
 * persists across turns.
 */

3 个关键设计: 1. "一个对话 = 一个 QueryEngine" —— 生命周期模型 2. "submitMessage 启动新轮" —— 多轮对话模型 3. "State 在多轮之间持续" —— 持久化模型

前端类比:和数据库连接的"长连接"模型一样。连接 = 一段对话query = 一轮请求


5. 行 200-208:constructor

constructor(config: QueryEngineConfig) {
  this.config = config
  this.mutableMessages = config.initialMessages ?? []
  this.abortController = config.abortController ?? createAbortController()
  this.permissionDenials = []
  this.readFileState = config.readFileCache
  this.totalUsage = EMPTY_USAGE
}

5 行初始化 5 个状态: | 字段 | 类型 | 初始值 | 含义 | |---|---|---|---| | config | QueryEngineConfig | config | 配置(注入)| | mutableMessages | Message[] | []config.initialMessages | 消息历史(mutable)| | abortController | AbortController | 新建 | 取消信号 | | permissionDenials | SDKPermissionDenial[] | [] | 拒绝历史(避免重问)| | readFileState | FileStateCache | config.readFileCache | 文件读取缓存 | | totalUsage | NonNullableUsage | EMPTY_USAGE | token / 费用累计 |

关键洞察mutableMessages —— 虽然是 mutable,但对外暴露为 read-only(DeepImmutable 类型)。
为什么用 mutable: - 避免每次 setStatearray spread(性能) - 注释里多次提到 "avoids array spreading per message"

性能 trade-off: - ✅ 不创建新数组(节省 GC) - ❌ 需要手动维护不变性 - 约定:外部通过 setAppState 修改,不能直接 push


6. 行 209:submitMessage() —— 核心方法

async *submitMessage(
  prompt: string | ContentBlockParam[],
  options?: { uuid?: string; isMeta?: boolean },
): AsyncGenerator<SDKMessage, void, unknown>

签名分析: - prompt: string | ContentBlockParam[] —— 用户输入(纯文本 OR 富内容) - options.uuid —— 消息 ID(用于追踪) - options.isMeta —— 是否元消息(不显示给用户) - 返回 AsyncGenerator<SDKMessage> —— 流式输出 SDK 消息

关键submitMessageasync generator(不是 Promise)—— 流式返回 SDK 消息。

6.1 推测的内部流程

async *submitMessage(prompt, options) {
  // 1. 检查 abort
  if (this.abortController.signal.aborted) {
    return
  }

  // 2. 处理 prompt(如果含 slash command,提取命令)
  if (prompt.startsWith('/')) {
    const cmdResult = await this.handleSlashCommand(prompt)
    if (cmdResult.handled) {
      yield* cmdResult.messages
      return
    }
  }

  // 3. 追加用户消息
  this.mutableMessages.push({
    role: 'user',
    content: prompt,
    uuid: options?.uuid ?? randomUUID(),
  })

  // 4. 持久化
  if (!isSessionPersistenceDisabled()) {
    await this.persistSession()
  }

  // 5. 跑 agent 循环
  for await (const event of query(
    this.mutableMessages,
    this.buildSystemPrompt(),
    this.buildToolUseContext(),
    this.config.canUseTool,
    { /* ... */ }
  )) {
    // 6. 转换 SDK 消息 + 累计 usage
    this.processQueryEvent(event)
    yield event
  }
}

关键设计点: - slash command 在 submitMessage 内部处理(不是单独的"命令模式") - 持久化是异步的,但不阻塞流式输出 - agent 循环用底层 query()query.ts 的函数)


7. class 内部状态机(推测)

stateDiagram-v2
    [*] --> Created: new QueryEngine(config)
    Created: 5 个字段初始化

    Created --> Idle: ready
    Idle: 等待用户输入
    Idle --> Submitting: submitMessage()

    Submitting: 处理 prompt
    Submitting --> SlashCommand: prompt 是 /xxx
    Submitting --> UserMessage: prompt 是普通输入

    SlashCommand: 调命令处理器
    SlashCommand --> Persisting: 命令产生消息
    SlashCommand --> Idle: 命令无副作用

    UserMessage: 追加 user message
    UserMessage --> Persisting

    Persisting: 写 sessionStorage
    Persisting --> Querying

    Querying: 跑 query() 流
    Querying --> Yielding: 流式返回
    Yielding --> Querying: 继续
    Querying --> Compact: 触发 compact
    Compact --> Querying: 完成

    Querying --> Idle: query() 结束

    Idle --> Aborted: abortController.abort()
    Aborted --> [*]

    Idle --> [*]: 关闭

关键状态: - CreatedIdleSubmittingPersistingQueryingIdle(循环) - 任何状态 都可以 → Aborted(用户取消)


8. ask() 顶层函数(行 1186-1295,110 行)

8.1 签名

export async function* ask({
  // 110 行的解构...
}): AsyncGenerator<SDKMessage, void, unknown>

8.2 推测的实现

export async function* ask(config: AskConfig): AsyncGenerator<SDKMessage> {
  // 1. 构造 QueryEngine
  const engine = new QueryEngine({
    cwd: config.cwd,
    tools: config.tools,
    // ... 30+ 字段映射
  })

  // 2. 提交第一轮
  yield* engine.submitMessage(config.prompt, {
    uuid: config.uuid,
  })
}

为什么需要 ask() 函数 + QueryEngine class

  • QueryEngine class —— 有状态可复用(多轮对话)
  • ask() 函数 —— 无状态一次性(首轮对话 + 兼容旧 API)

前端类比: - class QueryEngineuseState(带状态) - function ask()fetch(无状态调用)

SDK 消费者更倾向 ask()(简单一次性调用),
REPL 消费者更倾向 QueryEngine class(多轮对话)。


9. QueryEngine 内部机制详解

9.1 拒绝追踪(permissionDenials)

private permissionDenials: SDKPermissionDenial[] = []

作用:避免重复询问用户已经拒绝过的工具调用。

// 推测
if (this.hasDeniedBefore(toolName, input)) {
  // 直接拒绝,不再询问
  return { behavior: 'deny', message: 'Already denied' }
}

9.2 token 累计(totalUsage)

private totalUsage: NonNullableUsage = EMPTY_USAGE

每次 API 响应 都累加 input_tokens、output_tokens、cost。

this.totalUsage = accumulateUsage(this.totalUsage, response.usage)

配套: - getTotalAPIDuration() —— 总 API 调用时长 - getTotalCost() —— 总费用 - cost-tracker.tsgetModelUsage() —— 按模型分

9.3 文件状态缓存(readFileState)

private readFileState: FileStateCache

避免重复读同一个文件

// 推测
const content = this.readFileState.get(path) ?? await fs.readFile(path)
this.readFileState.set(path, content)

配套utils/fileStateCache.tscreateFileStateCacheWithSizeLimit 等。

9.4 历史裁剪(snipReplay)

snipReplay?: (
  yieldedSystemMsg: Message,
  store: Message[],
) => { messages: Message[]; executed: boolean } | undefined

SDK 模式专用 —— 长会话时裁剪历史以节省 token。

触发:每次 yield 系统消息时调 snipReplay。 - 返回 undefined —— 不裁剪 - 返回 { messages, executed } —— 用 messages 替换 store 中的某些消息

为什么仅 SDK 启用:REPL 需要全历史供 UI 滚动。

9.5 孤儿权限处理(orphanedPermission)

orphanedPermission?: OrphanedPermission

submitMessage 启动时有未处理的权限请求("孤儿"),自动恢复而不是新提问。

if (this.config.orphanedPermission && !this.hasHandledOrphanedPermission) {
  this.hasHandledOrphanedPermission = true
  yield* this.recoverOrphanedPermission(this.config.orphanedPermission)
}

10. class 方法完整列表(推测)

方法 角色
constructor(config) 初始化
submitMessage(prompt, options) 启动一轮对话(主入口
persistSession() 写 sessionStorage
recoverOrphanedPermission(p) 恢复孤儿权限
buildSystemPrompt() 构造 system prompt
buildToolUseContext() 构造 ToolUseContext
processQueryEvent(event) 处理 query() 事件 + 累计 usage
handleSlashCommand(prompt) 处理 /xxx 命令
hasDeniedBefore(tool, input) 检查拒绝历史
accumulateUsage(response) 累加 usage

总方法数推测:15-20 个。


11. class 生命周期详解

[创建]
  new QueryEngine(config)
  → 5 个字段初始化

[第 1 轮]
  for await (msg of engine.submitMessage(prompt1)) {
    // 处理 msg
  }

  // query() 返回,submitMessage 完成
  // mutableMessages 包含 user + assistant 消息

[第 2 轮]
  for await (msg of engine.submitMessage(prompt2)) {
    // 处理 msg
  }

  // mutableMessages 现在有 user1 + assistant1 + user2 + assistant2

[关闭]
  engine.abortController.abort()  // 可选
  // 没有显式 close —— GC 自动清理

关键: - 每次 submitMessage 是独立的"流" - mutableMessages 跨调用持续(class 实例的字段) - 没有 explicit close —— 显式资源释放


12. ask() vs QueryEngine

维度 ask() 函数 QueryEngine class
状态 无状态(每次新构造) 有状态(跨 submitMessage 持续)
用途 一次性对话 多轮对话
复杂度 简单调用 复杂生命周期
SDK 友好度 ⭐⭐⭐⭐ ⭐⭐
REPL 友好度 ⭐⭐ ⭐⭐⭐⭐⭐
测试友好度 ⭐⭐⭐⭐⭐ ⭐⭐⭐

结论: - SDKask()("调一次完事") - REPLQueryEngine("长连接持续")

注释里说"未来阶段 REPL 可能用 QueryEngine" —— 现在 REPL 还没切到 class(可能因为迁移成本),但设计意图清晰。


13. 关键洞察

13.1 class vs function 的"哲学选择"

Claude Code 在大多数地方默认函数少数用 class
class 的判定标准:"跨调用状态"或"长生命周期"。

QueryEngine 是"长生命周期" → class

13.2 State 注入 vs State 导入

QueryEngine 通过 getAppState / setAppState 注入 state,不直接 import store

好处: - 可测试(mock state 注入) - 可多实例(REPL 一个,SDK 一个) - 不耦合(state 实现可换)

前端类比:和 React Context Provider 的"提升状态到公共祖先"是同种思路。

13.3 异步生成器 + class 的"双剑合璧"

  • class 管理"长生命周期状态"
  • async generator 提供"流式输出"

两者结合 = "持续状态 + 流式响应" —— Claude Code 的核心抽象。

13.4 配置驱动的工程哲学

54 行的 QueryEngineConfig 看似夸张,但每个字段都是"必填 vs 选填"显式

好处: - TypeScript 防止缺字段 - 默认值明确(??) - 新功能加 config 字段即可,不动 class 内部

13.5 snipReplay 的"SDK 专属"设计

注释明确说:"SDK-only: the REPL keeps full history for UI scrollback..."

设计哲学不同使用场景有不同 trade-off。 - SDK:长期 headless → 裁剪历史省 token - REPL:有 UI 滚动 → 保留全历史

同一个 class 通过 config 区分


14. 实战:用 QueryEngine 写一个 SDK 客户端

import { QueryEngine, ask } from './QueryEngine.js'

// 方式 1:一次性对话(用 ask())
async function oneShot() {
  for await (const msg of ask({
    cwd: process.cwd(),
    tools: [...],
    commands: [...],
    // ... config
    prompt: 'Hello',
  })) {
    console.log(msg)
  }
}

// 方式 2:多轮对话(用 QueryEngine class)
async function multiTurn() {
  const engine = new QueryEngine({
    cwd: process.cwd(),
    tools: [...],
    commands: [...],
    // ... config
  })

  for await (const msg of engine.submitMessage('Hello')) {
    console.log(msg)
  }

  for await (const msg of engine.submitMessage('How are you?')) {
    console.log(msg)
  }
}

15. 阅读清单

  1. ✅ 完整通读 src/QueryEngine.ts(1295 行)
  2. ✅ 读 phase-06-agent-loop.md 配合
  3. 📌 读 src/query.ts(query() 函数)
  4. 📌 读 src/state/AppStateStore.ts深度专题
  5. 📌 读 src/entrypoints/agentSdkTypes.ts(SDK 类型)

16. 练习任务

  1. 数 class 的方法数 —— 完整列出(推测 15-20 个)
  2. 画 class 字段的关系图 —— mutableMessagesreadFileStatetotalUsage 怎么关联
  3. 设计你自己的 SDK 客户端 —— 用 ask() 函数还是 QueryEngine class?为什么?
  4. 思考:class 状态机的"显式状态转换"和"函数式状态机"(如 React useReducer)哪个好?Claude Code 为什么选 class?