Deep Dive | bridge/replBridge.ts 2406 行 REPL ↔ Bridge 拆解¶
重要性:⭐⭐⭐⭐(CLI 与 IDE 实时双向同步的具体实现) 真实位置:
src/bridge/replBridge.ts(2406 行) 角色:把bridgeMain.ts的协调协议落到 REPL 这一端 关联:topics/deep-dive-bridge-main.md、phase-07-advanced.md § 7.3
1. 与 bridgeMain.ts 的关系¶
┌────────────────────────────────────────┐
│ bridgeMain.ts │ 协调者
│ (2999 行:协议 + 状态机 + 重连) │
└────────────────┬───────────────────────┘
│ 协调 / 路由
▼
┌────────────────────────────────────────┐
│ replBridge.ts │ CLI 端实现
│ (2406 行:双向同步 + 消息路由) │
└────────────────┬───────────────────────┘
│
┌────────┴─────────┐
▼ ▼
┌─────────┐ ┌──────────┐
│ REPL.tsx│ │ IDE/IDE │
│ (UI) │ │ (外部) │
└─────────┘ └──────────┘
bridgeMain.ts 是"协议层"(消息定义、状态机)。
replBridge.ts 是"应用层"(REPL ↔ 协议 ↔ IDE 双向映射)。
2. 文件结构总览¶
replBridge.ts (2406 行)
│
├── 行 1-69 :imports
├── 行 70-228 :类型定义
│ ├── ReplBridgeHandle (行 70-82)
│ ├── BridgeState (行 83-89)
│ ├── BridgeCoreParams (行 91-227)
│ ├── BridgeCoreHandle (行 228-242)
│
├── 行 243-259:常量
│ ├── POLL_ERROR_INITIAL_DELAY_MS (行 244)
│ ├── POLL_ERROR_MAX_DELAY_MS (行 245)
│ ├── POLL_ERROR_GIVE_UP_MS (行 246)
│
├── 行 260-450:initBridgeCore (主入口)
│
├── 行 1851-1900:startWorkPollLoop (轮询循环)
│
├── ... 中间是大量实现
│
└── 推测:消息处理 + 状态同步 + 错误恢复
3. 类型定义详解(行 70-242)¶
3.1 ReplBridgeHandle(行 70-82)¶
export type ReplBridgeHandle = {
// 推测:
send: (msg: BridgeMessage) => void
close: () => Promise<void>
state: () => BridgeState
onMessage: (handler: (msg: BridgeMessage) => void) => () => void
// ...
}
Bridge handle —— REPL 端的"接口",让 REPL 能"发消息" + "收消息" + "关连接"。
3.2 BridgeState(行 83-89)¶
4 状态机:
- ready —— 准备好但还没连
- connected —— 已连
- reconnecting —— 断连后重连中
- failed —— 放弃重连
3.3 BridgeCoreParams(行 91-227,~135 行)¶
export type BridgeCoreParams = {
// 推测:
cwd: string
sessionId: string
// 各种 callback
onMessage?: (msg: BridgeMessage) => void
onStateChange?: (state: BridgeState) => void
onPermissionRequest?: (...) => void
onUserInput?: (text: string) => void
// ... 推测 20+ 字段
}
135 行的配置类型 —— 推测包含 20+ 字段,覆盖所有 REPL ↔ Bridge 交互。
3.4 BridgeCoreHandle(行 228-242)¶
export type BridgeCoreHandle = ReplBridgeHandle & {
// 推测:
start: () => Promise<void>
stop: () => Promise<void>
reconnect: () => Promise<void>
getState: () => BridgeState
}
handle 扩展 —— 在 ReplBridgeHandle 基础上加 start / stop / reconnect / getState。
4. 常量详解(行 244-246)¶
const POLL_ERROR_INITIAL_DELAY_MS = 2_000 // 首次重试延迟 2s
const POLL_ERROR_MAX_DELAY_MS = 60_000 // 最大重试间隔 60s
const POLL_ERROR_GIVE_UP_MS = 15 * 60 * 1000 // 15 分钟放弃
轮询重试策略(pull 模式): - 首次失败等 2s - 指数退避到最大 60s - 持续 15 分钟后放弃 - 比 WebSocket 重连更保守(因为是 pull 模式)
5. 推测的 initBridgeCore(行 260-450,~190 行)¶
5.1 函数签名¶
主入口 —— 创建 BridgeCore 句柄,异步初始化。
5.2 推测的内部流程¶
export async function initBridgeCore(params) {
// 1. 验证参数
validateParams(params)
// 2. 创建内部状态
const state: InternalState = {
bridgeState: 'ready',
connection: null,
listeners: new Set(),
lastError: null,
}
// 3. 启动连接
await connect(params, state)
// 4. 注册消息处理
setupMessageHandler(params, state)
// 5. 启动轮询循环(pull 模式)
if (params.transport === 'poll') {
startWorkPollLoop(params, state)
}
// 6. 启动保活
startKeepalive(state)
// 7. 返回 handle
return createHandle(state, params)
}
6 步初始化 —— 大部分逻辑"按顺序启动"。
6. 推测的核心方法(行 450-1850)¶
6.1 消息路由¶
// 推测
function routeMessage(msg: BridgeMessage, params: BridgeCoreParams, state: InternalState) {
switch (msg.type) {
// 用户输入(IDE → CLI)
case 'user_input':
params.onUserInput?.(msg.data.text)
break
// 权限响应(IDE → CLI)
case 'permission_decision':
params.onPermissionDecision?.(msg.data)
break
// 提问回答(IDE → CLI)
case 'question_answer':
params.onQuestionAnswer?.(msg.data)
break
// 文件变化(IDE → CLI)
case 'file_saved':
params.onFileSaved?.(msg.data.path)
break
// 选中变化(IDE → CLI)
case 'selection_changed':
params.onSelectionChanged?.(msg.data)
break
// 取消请求(IDE → CLI)
case 'cancel_request':
params.onCancel?.(msg.data)
break
// ... 20+ 消息类型
}
}
30+ 消息类型分发 —— 每种消息调对应 callback。
6.2 状态推送¶
// 推测
function pushStateUpdate(state: InternalState, params: BridgeCoreParams) {
// 1. 收集当前状态
const update: StateUpdate = {
messages: params.getMessages(),
currentModel: params.getCurrentModel(),
cost: params.getCost(),
tokens: params.getTokens(),
// ... 推测 20+ 字段
}
// 2. 发送给 IDE
state.connection.send({
type: 'state_update',
data: update,
})
}
状态推送 —— 每次 REPL 状态变化时推给 IDE。
6.3 错误恢复¶
// 推测
async function handleConnectionError(
err: Error,
state: InternalState,
params: BridgeCoreParams,
): Promise<void> {
state.bridgeState = 'reconnecting'
state.lastError = err
let attempt = 0
while (state.bridgeState === 'reconnecting') {
attempt++
// 计算延迟
const delay = Math.min(
POLL_ERROR_INITIAL_DELAY_MS * Math.pow(2, attempt - 1),
POLL_ERROR_MAX_DELAY_MS,
)
// 加 jitter
const jitter = delay * 0.2 * (Math.random() * 2 - 1)
const totalDelay = delay + jitter
// 检查是否超时
if (Date.now() - state.firstErrorTime > POLL_ERROR_GIVE_UP_MS) {
state.bridgeState = 'failed'
params.onStateChange?.('failed')
return
}
// 等待 + 重试
await sleep(totalDelay)
try {
await connect(params, state)
state.bridgeState = 'connected'
return // 成功
} catch (err) {
// 继续重试
}
}
}
错误恢复 —— 指数退避 + jitter + 超时放弃。
7. startWorkPollLoop(行 1851-1900+)¶
7.1 函数签名¶
轮询循环 —— 在 pull 模式下定期从 server 拉取工作。
7.2 推测的循环逻辑¶
async function startWorkPollLoop(params, state) {
while (state.bridgeState !== 'failed') {
try {
// 1. 长轮询
const response = await fetch(`${serverUrl}/poll`, {
method: 'POST',
body: JSON.stringify({ sessionId, lastEventId: state.lastEventId }),
// 30s 长轮询
signal: AbortSignal.timeout(30_000),
})
// 2. 处理响应
const events = await response.json()
for (const event of events) {
await routeMessage(event, params, state)
state.lastEventId = event.id
}
} catch (err) {
// 错误处理(指数退避)
await handleConnectionError(err, state, params)
}
}
}
长轮询 —— 每次最多等 30s,server 端"hold"住连接,新事件立即返回。
优点:HTTP 友好(任何环境都能用)。
缺点:30s 内必须"刷新"连接(连接池开销)。
8. 推测的消息处理(行 1900-2406)¶
8.1 出站:REPL → IDE¶
// 推测的 20+ 出站方法
function notifyMessageAdded(msg: Message): void
function notifyToolUseStart(id: string, name: string, input: unknown): void
function notifyToolUseResult(id: string, result: unknown, isError: boolean): void
function notifyCostUpdate(cost: number, tokens: number): void
function notifyFileChanged(path: string, kind: 'modified' | 'created' | 'deleted'): void
function notifyPermissionRequest(id: string, tool: string, input: unknown, reason: string): void
function notifyQuestionRequest(id: string, question: string, options: string[]): void
function notifyPlanApprovalRequest(id: string, plan: string): void
function notifySelectionChange(text: string, path: string, range: Range): void
function notifyIdeState(openFiles: string[], activeFile: string): void
function notifyError(code: string, message: string, stack?: string): void
function notifyProgress(id: string, stage: string, percent: number): void
function notifyElicitation(id: string, schema: Schema, message: string): void
function notifyAttachment(path: string, type: string, size: number): void
20+ 业务方法 —— 每个对应一种 Bridge 消息。
8.2 入站:IDE → REPL¶
// 推测的 20+ 入站处理
function handleUserInput(text: string): void
function handlePermissionDecision(id: string, allow: boolean, remember?: boolean): void
function handlePlanDecision(id: string, approved: boolean): void
function handleQuestionAnswer(id: string, answer: string | string[]): void
function handleCancelRequest(id?: string): void
function handleResetSession(): void
function handleSwitchSession(sessionId: string): void
function handlePauseSession(): void
function handleResumeSession(): void
function handleSetModel(model: string): void
function handleSetPermissionMode(mode: PermissionMode): void
function handleApplyEdit(filePath: string, newContent: string): void
function handleRejectEdit(filePath: string, reason: string): void
function handleFileSelected(path: string): void
function handleFileSaved(path: string, content?: string): void
function handleSelectionChanged(text: string, path: string, range: Range): void
function handleOpenFile(path: string, line?: number, column?: number): void
function handleShowDiff(filePath: string, oldContent: string, newContent: string): void
function handleSetIdeTheme(theme: string): void
function handleGetStatus(): void
20+ 入站处理 —— 每个把 Bridge 消息转成 REPL 状态变化。
9. 关键设计¶
9.1 双向同步的"事件源"¶
// 推测的 4 个事件源
// 1. REPL 本地状态变化(用户键入、工具完成)
appState.subscribe(() => pushStateUpdate(state))
// 2. IDE 消息到达(连接的消息)
connection.onMessage(msg => routeMessage(msg, params))
// 3. 轮询(pull 模式)
startWorkPollLoop(params, state)
// 4. 错误重连
handleConnectionError(err, state, params)
4 个事件源 —— 任意一个变化都要同步。
9.2 "重连 + 状态恢复"¶
// 推测
async function reconnect(state, params) {
// 1. 重新建立连接
await connect(params, state)
// 2. 重发所有"未确认"消息(带 lastEventId)
const lastEventId = state.lastEventId
await fetch(`${serverUrl}/sync`, {
body: JSON.stringify({ sessionId, lastEventId }),
})
// 3. 接收"漏掉的事件"
for (const event of missedEvents) {
routeMessage(event, params, state)
}
}
断连恢复 —— 用 lastEventId 同步增量。
9.3 "双向冲突"处理¶
// 推测:last-write-wins(按时间戳)
function resolveConflict(localUpdate: Update, remoteUpdate: Update): Update {
return localUpdate.timestamp > remoteUpdate.timestamp ? localUpdate : remoteUpdate
}
冲突解决 —— 时间戳 newer 胜。
前端类比:和 CRDT 思想相反,Claude Code 选了"简单粗暴"。
9.4 "缓冲 + 批处理"¶
// 推测
const pendingMessages: BridgeMessage[] = []
function send(msg) {
pendingMessages.push(msg)
scheduleFlush() // setImmediate
}
function scheduleFlush() {
if (flushScheduled) return
flushScheduled = true
setImmediate(flush)
}
function flush() {
const batch = pendingMessages.splice(0, 50)
connection.send(batch)
flushScheduled = false
}
批处理 —— 50 条/批(不是 1 条/批)。
10. 完整消息流(推测)¶
10.1 用户在 IDE 选中文件¶
IDE: user selects file
↓
IDE 发送 'selection_changed' { text, path, range }
↓
replBridge: handleSelectionChanged(text, path, range)
↓
REPL: setState({ selectedFile: ... })
↓
REPL: notifyIdeState(...) // 推回 IDE
↓
IDE 收到更新
双向同步 —— 1 个操作 → 2 条消息。
10.2 用户在 IDE 输入 prompt¶
IDE: user types "fix the bug"
↓
IDE 发送 'user_input' { text: 'fix the bug' }
↓
replBridge: handleUserInput('fix the bug')
↓
REPL: engine.submitMessage('fix the bug')
↓
REPL: 处理流式响应
↓
REPL: notifyMessageAdded(userMsg)
REPL: notifyMessageAdded(assistantMsg)
...
↓
IDE 显示新消息
同步周期 —— IDE 输入 → CLI 处理 → IDE 显示。
10.3 CLI 请求权限¶
CLI: tool.call → needs permission
↓
replBridge: notifyPermissionRequest(id, 'Bash', { cmd: 'rm' }, 'destructive command')
↓
IDE 弹权限窗
↓
用户批准
↓
IDE 发送 'permission_decision' { id, allow: true, remember: false }
↓
replBridge: handlePermissionDecision(id, true, false)
↓
CLI: tool.call 继续
同步权限决策 —— IDE 端的 UI 决定 CLI 行为。
11. 实战:写一个简化版 REPL Bridge¶
// 简化版(~50 行)
class SimpleReplBridge {
private listeners: Set<(msg: any) => void> = new Set()
// REPL 端调
notify(text: string) {
this.broadcast({ type: 'message_added', data: { text } })
}
// IDE 消息到达
onMessage(msg: any) {
this.listeners.forEach(l => l(msg))
}
// 内部
private broadcast(msg: any) {
this.listeners.forEach(l => l(msg))
}
// 订阅
subscribe(handler: (msg: any) => void) {
this.listeners.add(handler)
return () => this.listeners.delete(handler)
}
}
// 用法
const bridge = new SimpleReplBridge()
bridge.subscribe(msg => {
if (msg.type === 'user_input') {
// 处理用户输入
}
})
bridge.notify('hello') // REPL 推消息
对比 Claude Code: - 简化版没有重连 - 简化版没有轮询 - 简化版没有冲突解决 - Claude Code 多了 50+ 边界处理
12. 关键洞察¶
12.1 "REPL 端 vs 协议层"的清晰分离¶
replBridge.ts(2406 行)—— 应用层,处理 REPL 业务bridgeMain.ts(2999 行)—— 协议层,处理消息定义flushGate.ts(推测 200 行)—— 基础设施,背压控制
3 个文件 = 3 个职责。没有混合。
12.2 "20+ 出站 + 20+ 入站"是事件驱动架构¶
每个业务事件 = 一个 Bridge 消息。
REPL 状态变化 = 触发出站消息。
IDE 用户操作 = 触发入站消息。
这是"事件驱动"的教科书实现。
12.3 "重连 + 状态恢复"是分布式系统基础¶
// 推测
async function reconnect(state, params) {
await connect(params, state)
// 同步 lastEventId 之后的事件
const missed = await fetchMissedEvents(state.lastEventId)
for (const event of missed) {
routeMessage(event, params, state)
}
}
和数据库复制、消息队列订阅同种思想。
12.4 "双向冲突"用时间戳解决¶
不引入 CRDT,时间戳 newer 胜。
适合"大多数情况",不适合"高频双向编辑"。
12.5 "批处理"是性能关键¶
50 条/批 + setImmediate flush = OS 原生 microtask 利用。
这是生产级消息总线的"小细节"。
13. 阅读清单¶
- ✅ 完整通读
src/bridge/replBridge.ts(2406 行) - ✅ 读 topics/deep-dive-bridge-main.md 配合
- ✅ 读 phase-07-advanced.md § 7.3
- 📌 读 docs/BRIDGE_PROTOCOL.md(仓库根 docs/)看 30+ 消息类型
- 📌 读
src/bridge/flushGate.ts背压控制 - 📌 读
src/bridge/bridgeMessaging.ts序列化
14. 练习任务¶
- 数出
replBridge.ts里的所有方法 —— 推测 30-40 个 - 画出双向同步的时序图 —— 5 个常见操作(输入/选中/权限/问题/取消)
- 设计你自己的"双端同步"系统 —— 不考虑 IDE,先做"双 CLI 同步"
- 思考:
lastEventId同步模式 vs CRDT,哪个更适合 Claude Code?为什么?