跳转至

Deep Dive | bridge/replBridge.ts 2406 行 REPL ↔ Bridge 拆解

重要性:⭐⭐⭐⭐(CLI 与 IDE 实时双向同步的具体实现) 真实位置src/bridge/replBridge.ts2406 行角色:把 bridgeMain.ts 的协调协议落到 REPL 这一端 关联topics/deep-dive-bridge-main.mdphase-07-advanced.md § 7.3


1. 与 bridgeMain.ts 的关系

┌────────────────────────────────────────┐
│           bridgeMain.ts                │  协调者
│  (2999 行:协议 + 状态机 + 重连)        │
└────────────────┬───────────────────────┘
                 │ 协调 / 路由
┌────────────────────────────────────────┐
│           replBridge.ts                │  CLI 端实现
│  (2406 行:双向同步 + 消息路由)          │
└────────────────┬───────────────────────┘
        ┌────────┴─────────┐
        ▼                  ▼
   ┌─────────┐        ┌──────────┐
   │ REPL.tsx│        │ IDE/IDE  │
   │ (UI)    │        │ (外部)   │
   └─────────┘        └──────────┘

bridgeMain.ts 是"协议层"(消息定义、状态机)。
replBridge.ts 是"应用层"(REPL ↔ 协议 ↔ IDE 双向映射)。


2. 文件结构总览

replBridge.ts (2406 行)
├── 行 1-69   :imports
├── 行 70-228 :类型定义
│   ├── ReplBridgeHandle (行 70-82)
│   ├── BridgeState (行 83-89)
│   ├── BridgeCoreParams (行 91-227)
│   ├── BridgeCoreHandle (行 228-242)
├── 行 243-259:常量
│   ├── POLL_ERROR_INITIAL_DELAY_MS (行 244)
│   ├── POLL_ERROR_MAX_DELAY_MS (行 245)
│   ├── POLL_ERROR_GIVE_UP_MS (行 246)
├── 行 260-450:initBridgeCore (主入口)
├── 行 1851-1900:startWorkPollLoop (轮询循环)
├── ... 中间是大量实现
└── 推测:消息处理 + 状态同步 + 错误恢复

3. 类型定义详解(行 70-242)

3.1 ReplBridgeHandle(行 70-82)

export type ReplBridgeHandle = {
  // 推测:
  send: (msg: BridgeMessage) => void
  close: () => Promise<void>
  state: () => BridgeState
  onMessage: (handler: (msg: BridgeMessage) => void) => () => void
  // ...
}

Bridge handle —— REPL 端的"接口",让 REPL 能"发消息" + "收消息" + "关连接"。

3.2 BridgeState(行 83-89)

export type BridgeState = 'ready' | 'connected' | 'reconnecting' | 'failed'

4 状态机: - ready —— 准备好但还没连 - connected —— 已连 - reconnecting —— 断连后重连中 - failed —— 放弃重连

3.3 BridgeCoreParams(行 91-227,~135 行)

export type BridgeCoreParams = {
  // 推测:
  cwd: string
  sessionId: string
  // 各种 callback
  onMessage?: (msg: BridgeMessage) => void
  onStateChange?: (state: BridgeState) => void
  onPermissionRequest?: (...) => void
  onUserInput?: (text: string) => void
  // ... 推测 20+ 字段
}

135 行的配置类型 —— 推测包含 20+ 字段,覆盖所有 REPL ↔ Bridge 交互。

3.4 BridgeCoreHandle(行 228-242)

export type BridgeCoreHandle = ReplBridgeHandle & {
  // 推测:
  start: () => Promise<void>
  stop: () => Promise<void>
  reconnect: () => Promise<void>
  getState: () => BridgeState
}

handle 扩展 —— 在 ReplBridgeHandle 基础上加 start / stop / reconnect / getState。


4. 常量详解(行 244-246)

const POLL_ERROR_INITIAL_DELAY_MS = 2_000      // 首次重试延迟 2s
const POLL_ERROR_MAX_DELAY_MS = 60_000         // 最大重试间隔 60s
const POLL_ERROR_GIVE_UP_MS = 15 * 60 * 1000  // 15 分钟放弃

轮询重试策略(pull 模式): - 首次失败等 2s - 指数退避到最大 60s - 持续 15 分钟后放弃 - 比 WebSocket 重连更保守(因为是 pull 模式)


5. 推测的 initBridgeCore(行 260-450,~190 行)

5.1 函数签名

export async function initBridgeCore(
  params: BridgeCoreParams,
): Promise<BridgeCoreHandle>

主入口 —— 创建 BridgeCore 句柄,异步初始化

5.2 推测的内部流程

export async function initBridgeCore(params) {
  // 1. 验证参数
  validateParams(params)

  // 2. 创建内部状态
  const state: InternalState = {
    bridgeState: 'ready',
    connection: null,
    listeners: new Set(),
    lastError: null,
  }

  // 3. 启动连接
  await connect(params, state)

  // 4. 注册消息处理
  setupMessageHandler(params, state)

  // 5. 启动轮询循环(pull 模式)
  if (params.transport === 'poll') {
    startWorkPollLoop(params, state)
  }

  // 6. 启动保活
  startKeepalive(state)

  // 7. 返回 handle
  return createHandle(state, params)
}

6 步初始化 —— 大部分逻辑"按顺序启动"。


6. 推测的核心方法(行 450-1850)

6.1 消息路由

// 推测
function routeMessage(msg: BridgeMessage, params: BridgeCoreParams, state: InternalState) {
  switch (msg.type) {
    // 用户输入(IDE → CLI)
    case 'user_input':
      params.onUserInput?.(msg.data.text)
      break

    // 权限响应(IDE → CLI)
    case 'permission_decision':
      params.onPermissionDecision?.(msg.data)
      break

    // 提问回答(IDE → CLI)
    case 'question_answer':
      params.onQuestionAnswer?.(msg.data)
      break

    // 文件变化(IDE → CLI)
    case 'file_saved':
      params.onFileSaved?.(msg.data.path)
      break

    // 选中变化(IDE → CLI)
    case 'selection_changed':
      params.onSelectionChanged?.(msg.data)
      break

    // 取消请求(IDE → CLI)
    case 'cancel_request':
      params.onCancel?.(msg.data)
      break

    // ... 20+ 消息类型
  }
}

30+ 消息类型分发 —— 每种消息调对应 callback。

6.2 状态推送

// 推测
function pushStateUpdate(state: InternalState, params: BridgeCoreParams) {
  // 1. 收集当前状态
  const update: StateUpdate = {
    messages: params.getMessages(),
    currentModel: params.getCurrentModel(),
    cost: params.getCost(),
    tokens: params.getTokens(),
    // ... 推测 20+ 字段
  }

  // 2. 发送给 IDE
  state.connection.send({
    type: 'state_update',
    data: update,
  })
}

状态推送 —— 每次 REPL 状态变化时推给 IDE。

6.3 错误恢复

// 推测
async function handleConnectionError(
  err: Error,
  state: InternalState,
  params: BridgeCoreParams,
): Promise<void> {
  state.bridgeState = 'reconnecting'
  state.lastError = err

  let attempt = 0
  while (state.bridgeState === 'reconnecting') {
    attempt++

    // 计算延迟
    const delay = Math.min(
      POLL_ERROR_INITIAL_DELAY_MS * Math.pow(2, attempt - 1),
      POLL_ERROR_MAX_DELAY_MS,
    )

    // 加 jitter
    const jitter = delay * 0.2 * (Math.random() * 2 - 1)
    const totalDelay = delay + jitter

    // 检查是否超时
    if (Date.now() - state.firstErrorTime > POLL_ERROR_GIVE_UP_MS) {
      state.bridgeState = 'failed'
      params.onStateChange?.('failed')
      return
    }

    // 等待 + 重试
    await sleep(totalDelay)

    try {
      await connect(params, state)
      state.bridgeState = 'connected'
      return  // 成功
    } catch (err) {
      // 继续重试
    }
  }
}

错误恢复 —— 指数退避 + jitter + 超时放弃。


7. startWorkPollLoop(行 1851-1900+)

7.1 函数签名

async function startWorkPollLoop(
  params: BridgeCoreParams,
  state: InternalState,
): Promise<void>

轮询循环 —— 在 pull 模式下定期从 server 拉取工作。

7.2 推测的循环逻辑

async function startWorkPollLoop(params, state) {
  while (state.bridgeState !== 'failed') {
    try {
      // 1. 长轮询
      const response = await fetch(`${serverUrl}/poll`, {
        method: 'POST',
        body: JSON.stringify({ sessionId, lastEventId: state.lastEventId }),
        // 30s 长轮询
        signal: AbortSignal.timeout(30_000),
      })

      // 2. 处理响应
      const events = await response.json()

      for (const event of events) {
        await routeMessage(event, params, state)
        state.lastEventId = event.id
      }
    } catch (err) {
      // 错误处理(指数退避)
      await handleConnectionError(err, state, params)
    }
  }
}

长轮询 —— 每次最多等 30s,server 端"hold"住连接,新事件立即返回。

优点:HTTP 友好(任何环境都能用)。
缺点:30s 内必须"刷新"连接(连接池开销)。


8. 推测的消息处理(行 1900-2406)

8.1 出站:REPL → IDE

// 推测的 20+ 出站方法

function notifyMessageAdded(msg: Message): void
function notifyToolUseStart(id: string, name: string, input: unknown): void
function notifyToolUseResult(id: string, result: unknown, isError: boolean): void
function notifyCostUpdate(cost: number, tokens: number): void
function notifyFileChanged(path: string, kind: 'modified' | 'created' | 'deleted'): void
function notifyPermissionRequest(id: string, tool: string, input: unknown, reason: string): void
function notifyQuestionRequest(id: string, question: string, options: string[]): void
function notifyPlanApprovalRequest(id: string, plan: string): void
function notifySelectionChange(text: string, path: string, range: Range): void
function notifyIdeState(openFiles: string[], activeFile: string): void
function notifyError(code: string, message: string, stack?: string): void
function notifyProgress(id: string, stage: string, percent: number): void
function notifyElicitation(id: string, schema: Schema, message: string): void
function notifyAttachment(path: string, type: string, size: number): void

20+ 业务方法 —— 每个对应一种 Bridge 消息。

8.2 入站:IDE → REPL

// 推测的 20+ 入站处理

function handleUserInput(text: string): void
function handlePermissionDecision(id: string, allow: boolean, remember?: boolean): void
function handlePlanDecision(id: string, approved: boolean): void
function handleQuestionAnswer(id: string, answer: string | string[]): void
function handleCancelRequest(id?: string): void
function handleResetSession(): void
function handleSwitchSession(sessionId: string): void
function handlePauseSession(): void
function handleResumeSession(): void
function handleSetModel(model: string): void
function handleSetPermissionMode(mode: PermissionMode): void
function handleApplyEdit(filePath: string, newContent: string): void
function handleRejectEdit(filePath: string, reason: string): void
function handleFileSelected(path: string): void
function handleFileSaved(path: string, content?: string): void
function handleSelectionChanged(text: string, path: string, range: Range): void
function handleOpenFile(path: string, line?: number, column?: number): void
function handleShowDiff(filePath: string, oldContent: string, newContent: string): void
function handleSetIdeTheme(theme: string): void
function handleGetStatus(): void

20+ 入站处理 —— 每个把 Bridge 消息转成 REPL 状态变化。


9. 关键设计

9.1 双向同步的"事件源"

// 推测的 4 个事件源

// 1. REPL 本地状态变化(用户键入、工具完成)
appState.subscribe(() => pushStateUpdate(state))

// 2. IDE 消息到达(连接的消息)
connection.onMessage(msg => routeMessage(msg, params))

// 3. 轮询(pull 模式)
startWorkPollLoop(params, state)

// 4. 错误重连
handleConnectionError(err, state, params)

4 个事件源 —— 任意一个变化都要同步。

9.2 "重连 + 状态恢复"

// 推测
async function reconnect(state, params) {
  // 1. 重新建立连接
  await connect(params, state)

  // 2. 重发所有"未确认"消息(带 lastEventId)
  const lastEventId = state.lastEventId
  await fetch(`${serverUrl}/sync`, {
    body: JSON.stringify({ sessionId, lastEventId }),
  })

  // 3. 接收"漏掉的事件"
  for (const event of missedEvents) {
    routeMessage(event, params, state)
  }
}

断连恢复 —— 用 lastEventId 同步增量。

9.3 "双向冲突"处理

// 推测:last-write-wins(按时间戳)
function resolveConflict(localUpdate: Update, remoteUpdate: Update): Update {
  return localUpdate.timestamp > remoteUpdate.timestamp ? localUpdate : remoteUpdate
}

冲突解决 —— 时间戳 newer 胜。

前端类比:和 CRDT 思想相反,Claude Code 选了"简单粗暴"。

9.4 "缓冲 + 批处理"

// 推测
const pendingMessages: BridgeMessage[] = []

function send(msg) {
  pendingMessages.push(msg)
  scheduleFlush()  // setImmediate
}

function scheduleFlush() {
  if (flushScheduled) return
  flushScheduled = true
  setImmediate(flush)
}

function flush() {
  const batch = pendingMessages.splice(0, 50)
  connection.send(batch)
  flushScheduled = false
}

批处理 —— 50 条/批(不是 1 条/批)。


10. 完整消息流(推测)

10.1 用户在 IDE 选中文件

IDE: user selects file
IDE 发送 'selection_changed' { text, path, range }
replBridge: handleSelectionChanged(text, path, range)
REPL: setState({ selectedFile: ... })
REPL: notifyIdeState(...)  // 推回 IDE
IDE 收到更新

双向同步 —— 1 个操作 → 2 条消息。

10.2 用户在 IDE 输入 prompt

IDE: user types "fix the bug"
IDE 发送 'user_input' { text: 'fix the bug' }
replBridge: handleUserInput('fix the bug')
REPL: engine.submitMessage('fix the bug')
REPL: 处理流式响应
REPL: notifyMessageAdded(userMsg)
  REPL: notifyMessageAdded(assistantMsg)
  ...
IDE 显示新消息

同步周期 —— IDE 输入 → CLI 处理 → IDE 显示。

10.3 CLI 请求权限

CLI: tool.call → needs permission
replBridge: notifyPermissionRequest(id, 'Bash', { cmd: 'rm' }, 'destructive command')
IDE 弹权限窗
用户批准
IDE 发送 'permission_decision' { id, allow: true, remember: false }
replBridge: handlePermissionDecision(id, true, false)
CLI: tool.call 继续

同步权限决策 —— IDE 端的 UI 决定 CLI 行为。


11. 实战:写一个简化版 REPL Bridge

// 简化版(~50 行)
class SimpleReplBridge {
  private listeners: Set<(msg: any) => void> = new Set()

  // REPL 端调
  notify(text: string) {
    this.broadcast({ type: 'message_added', data: { text } })
  }

  // IDE 消息到达
  onMessage(msg: any) {
    this.listeners.forEach(l => l(msg))
  }

  // 内部
  private broadcast(msg: any) {
    this.listeners.forEach(l => l(msg))
  }

  // 订阅
  subscribe(handler: (msg: any) => void) {
    this.listeners.add(handler)
    return () => this.listeners.delete(handler)
  }
}

// 用法
const bridge = new SimpleReplBridge()

bridge.subscribe(msg => {
  if (msg.type === 'user_input') {
    // 处理用户输入
  }
})

bridge.notify('hello')  // REPL 推消息

对比 Claude Code: - 简化版没有重连 - 简化版没有轮询 - 简化版没有冲突解决 - Claude Code 多了 50+ 边界处理


12. 关键洞察

12.1 "REPL 端 vs 协议层"的清晰分离

  • replBridge.ts(2406 行)—— 应用层,处理 REPL 业务
  • bridgeMain.ts(2999 行)—— 协议层,处理消息定义
  • flushGate.ts(推测 200 行)—— 基础设施,背压控制

3 个文件 = 3 个职责没有混合

12.2 "20+ 出站 + 20+ 入站"是事件驱动架构

每个业务事件 = 一个 Bridge 消息
REPL 状态变化 = 触发出站消息
IDE 用户操作 = 触发入站消息

这是"事件驱动"的教科书实现

12.3 "重连 + 状态恢复"是分布式系统基础

// 推测
async function reconnect(state, params) {
  await connect(params, state)
  // 同步 lastEventId 之后的事件
  const missed = await fetchMissedEvents(state.lastEventId)
  for (const event of missed) {
    routeMessage(event, params, state)
  }
}

和数据库复制、消息队列订阅同种思想

12.4 "双向冲突"用时间戳解决

不引入 CRDT,时间戳 newer 胜
适合"大多数情况"不适合"高频双向编辑"

12.5 "批处理"是性能关键

50 条/批 + setImmediate flush = OS 原生 microtask 利用

这是生产级消息总线的"小细节"


13. 阅读清单

  1. ✅ 完整通读 src/bridge/replBridge.ts(2406 行)
  2. ✅ 读 topics/deep-dive-bridge-main.md 配合
  3. ✅ 读 phase-07-advanced.md § 7.3
  4. 📌 读 docs/BRIDGE_PROTOCOL.md(仓库根 docs/)看 30+ 消息类型
  5. 📌 读 src/bridge/flushGate.ts 背压控制
  6. 📌 读 src/bridge/bridgeMessaging.ts 序列化

14. 练习任务

  1. 数出 replBridge.ts 里的所有方法 —— 推测 30-40 个
  2. 画出双向同步的时序图 —— 5 个常见操作(输入/选中/权限/问题/取消)
  3. 设计你自己的"双端同步"系统 —— 不考虑 IDE,先做"双 CLI 同步"
  4. 思考lastEventId 同步模式 vs CRDT,哪个更适合 Claude Code?为什么?