跳转至

Walkthrough | 手写 QueryEngine

难度:⭐⭐⭐⭐ 时间:~2h 目标:理解 QueryEngine 核心逻辑,写一个简化版


1. QueryEngine 是什么

QueryEngine = Claude Code 的消息循环核心。 - 接收用户输入 - 调 Claude API - 处理 tool_use - 流式更新 UI - 处理错误

详见 deep-dive-query-engine.md


2. 目标

手写一个简化版 QueryEngine: - 单轮对话 - 1 个 tool(Read) - 流式输出 - 基础错误处理


3. 完整代码

// mini-query-engine.ts
import Anthropic from '@anthropic-ai/sdk'

interface Message {
  role: 'user' | 'assistant'
  content: string | ContentBlock[]
}

interface ContentBlock {
  type: 'text' | 'tool_use' | 'tool_result'
  // ...
}

interface Tool {
  name: string
  description: string
  inputSchema: any
  execute: (input: any) => Promise<string>
}

export class MiniQueryEngine {
  private client: Anthropic
  private messages: Message[] = []
  private tools: Tool[] = []

  constructor(apiKey: string, model: string = 'claude-sonnet-4-6') {
    this.client = new Anthropic({ apiKey })
    this.model = model
  }

  registerTool(tool: Tool) {
    this.tools.push(tool)
  }

  async ask(prompt: string): Promise<string> {
    // 1. 加 user message
    this.messages.push({ role: 'user', content: prompt })

    // 2. 调 API
    let response = await this.callAPI()

    // 3. 循环处理 tool_use
    while (this.hasToolUse(response)) {
      // 4. 加 assistant message
      this.messages.push({
        role: 'assistant',
        content: response.content,
      })

      // 5. 执行 tool
      const toolResults = await this.executeTools(response.content)

      // 6. 加 tool_result
      this.messages.push({
        role: 'user',
        content: toolResults,
      })

      // 7. 调 API again
      response = await this.callAPI()
    }

    // 8. 加 final assistant message
    const text = this.extractText(response)
    this.messages.push({
      role: 'assistant',
      content: text,
    })

    return text
  }

  private async callAPI() {
    return await this.client.messages.create({
      model: this.model,
      max_tokens: 4096,
      tools: this.tools.map((t) => ({
        name: t.name,
        description: t.description,
        input_schema: t.inputSchema,
      })),
      messages: this.messages,
    })
  }

  private hasToolUse(response: any): boolean {
    return response.content.some((b: any) => b.type === 'tool_use')
  }

  private async executeTools(blocks: ContentBlock[]): Promise<ContentBlock[]> {
    const results: ContentBlock[] = []

    for (const block of blocks) {
      if (block.type !== 'tool_use') continue

      const tool = this.tools.find((t) => t.name === block.name)
      if (!tool) {
        results.push({
          type: 'tool_result',
          tool_use_id: block.id,
          content: `Tool ${block.name} not found`,
          is_error: true,
        })
        continue
      }

      try {
        const output = await tool.execute(block.input)
        results.push({
          type: 'tool_result',
          tool_use_id: block.id,
          content: output,
        })
      } catch (e) {
        results.push({
          type: 'tool_result',
          tool_use_id: block.id,
          content: `Error: ${(e as Error).message}`,
          is_error: true,
        })
      }
    }

    return results
  }

  private extractText(response: any): string {
    return response.content
      .filter((b: any) => b.type === 'text')
      .map((b: any) => b.text)
      .join('')
  }
}

~120 行


4. 使用示例

const engine = new MiniQueryEngine(process.env.ANTHROPIC_API_KEY!)

engine.registerTool({
  name: 'Read',
  description: 'Read a file',
  inputSchema: {
    type: 'object',
    properties: { path: { type: 'string' } },
    required: ['path'],
  },
  execute: async ({ path }) => {
    return require('fs').readFileSync(path, 'utf-8')
  },
})

const answer = await engine.ask('Read /etc/hostname')
console.log(answer)

简单


5. 关键设计点

5.1 消息循环

while (this.hasToolUse(response)) {
  // execute tools
  // call API again
}

循环 直到没有 tool_use。

5.2 Tool 执行

const toolResults = await this.executeTools(response.content)

并行 推测(这里串行)。

5.3 错误处理

catch (e) {
  results.push({
    type: 'tool_result',
    tool_use_id: block.id,
    content: `Error: ${(e as Error).message}`,
    is_error: true,
  })
}

错误 → tool_result (is_error: true)


6. 6 个扩展

6.1 流式输出

private async *streamAPI() {
  const stream = await this.client.messages.create({
    model: this.model,
    max_tokens: 4096,
    messages: this.messages,
    stream: true,
  })
  for await (const event of stream) {
    yield event
  }
}

stream: true

6.2 多 tool 并行

private async executeTools(blocks: ContentBlock[]): Promise<ContentBlock[]> {
  return await Promise.all(
    blocks
      .filter((b) => b.type === 'tool_use')
      .map((b) => this.executeOneTool(b))
  )
}

Promise.all

6.3 Tool 权限

private async checkPermission(tool: Tool, input: any): Promise<boolean> {
  // 实现 6 层决策
  return true  // 或 false
}

6 层

6.4 缓存优化

// 静态 system prompt 缓存
const systemPrompt = `You are ...`
// 同样内容 → 同样 prompt → cache 命中

content hash

6.5 工具子集

registerTool(tool)  // 注册
unregisterTool(name) // 注销

动态

6.6 Session 持久化

save(): string  // 返 session ID
load(sessionId: string): void

持久化


7. 5 个关键洞察

  1. 消息循环 是核心
  2. tool execution 是关键
  3. 错误处理 必走 tool_result
  4. 流式 用 stream: true
  5. 持久化 是 session 基础

8. 对比真实 QueryEngine

维度 Mini 真实
行数 120 4000+
工具 1 100+
模式 串行 并行 + 投机
流式 简单 复杂
错误 简单 25+ validator
持久化 完整

简化 vs 真实


9. 5 个练习

  1. 加 streaming —— 用 stream: true
  2. 加 token 累加 —— 记录 input/output
  3. 加 tool 权限 —— 简单 allow/deny
  4. 加 session 持久化 —— save/load JSON
  5. 加 prompt cache —— 静态 system prompt

5 步


10. 总结

手写 QueryEngine = 理解消息循环 + tool 执行 + 错误处理

核心: - 120 行简化版 - 1 个 tool - 流式可选 - 错误隔离

下一步: - 看真实 deep-dive-query-engine.md - 加 streaming - 加多 tool