From 3887a4b856109cc67f391b0adb68148c8aa18c06 Mon Sep 17 00:00:00 2001 From: Arul Sharma <31745423+arul28@users.noreply.github.com> Date: Mon, 30 Mar 2026 12:24:22 -0400 Subject: [PATCH 01/18] Implement interrupt to cancel active subagents --- .../services/chat/agentChatService.test.ts | 205 ++++++++++++++++++ .../main/services/chat/agentChatService.ts | 21 +- docs/architecture/AI_INTEGRATION.md | 6 +- docs/features/CHAT.md | 25 +++ 4 files changed, 254 insertions(+), 3 deletions(-) diff --git a/apps/desktop/src/main/services/chat/agentChatService.test.ts b/apps/desktop/src/main/services/chat/agentChatService.test.ts index 6e8c314e4..f91dcffa7 100644 --- a/apps/desktop/src/main/services/chat/agentChatService.test.ts +++ b/apps/desktop/src/main/services/chat/agentChatService.test.ts @@ -2807,6 +2807,211 @@ describe("createAgentChatService", () => { service.interrupt({ sessionId: "unknown-session-id" }), ).rejects.toThrow(/not found/i); }); + + it("emits subagent_result stopped for active subagents on claude interrupt", async () => { + const events: AgentChatEventEnvelope[] = []; + + // The stream function is called multiple times: once for warmup, once for the actual turn. + let streamCall = 0; + let hangResolve: (() => void) | null = null; + const hangPromise = new Promise((resolve) => { hangResolve = resolve; }); + const send = vi.fn().mockResolvedValue(undefined); + const setPermissionMode = vi.fn().mockResolvedValue(undefined); + const stream = vi.fn(() => (async function* () { + streamCall += 1; + if (streamCall === 1) { + // Warmup stream — init + result to complete prewarm + yield { + type: "system", + subtype: "init", + session_id: "sdk-interrupt-sub-1", + slash_commands: [], + }; + yield { type: "result", usage: { input_tokens: 1, output_tokens: 1 } }; + return; + } + // Actual turn stream — emit two task_started events, then hang + yield { + type: "system", + subtype: "task_started", + task_id: "sub-task-1", + description: "Subagent A", + }; + yield { + type: "system", + subtype: "task_started", + task_id: "sub-task-2", + description: "Subagent B", + }; + // Hang until test resolves the promise (simulating a long-running turn) + await hangPromise; + yield { type: "result", usage: { input_tokens: 1, output_tokens: 1 } }; + })()); + vi.mocked(unstable_v2_createSession).mockReturnValue({ + send, + stream, + close: vi.fn(), + sessionId: "sdk-interrupt-sub-1", + setPermissionMode, + } as any); + + const { service } = createService({ + onEvent: (event: AgentChatEventEnvelope) => events.push(event), + }); + + const session = await service.createSession({ + laneId: "lane-1", + provider: "claude", + model: "sonnet", + }); + + // Wait for the warmup to complete (it runs in background) + await new Promise((resolve) => setTimeout(resolve, 50)); + + // Start the turn (don't await — it will hang) + const sendPromise = service.sendMessage({ + sessionId: session.id, + text: "Do something with subagents", + }); + + // Wait for the subagent_started events to appear + await waitForEvent( + events, + (e): e is AgentChatEventEnvelope => + e.event.type === "subagent_started" && (e.event as any).taskId === "sub-task-2", + ); + + // Now interrupt — should emit subagent_result "stopped" for both + await service.interrupt({ sessionId: session.id }); + + const stoppedEvents = events.filter( + (e) => e.event.type === "subagent_result" && (e.event as any).status === "stopped", + ); + expect(stoppedEvents).toHaveLength(2); + + const stoppedTaskIds = stoppedEvents.map((e) => (e.event as any).taskId).sort(); + expect(stoppedTaskIds).toEqual(["sub-task-1", "sub-task-2"]); + + // After interrupt, listSubagents should reflect the stopped status + const subagents = service.listSubagents({ sessionId: session.id }); + const stoppedSubagents = subagents.filter((s: any) => s.status === "stopped"); + expect(stoppedSubagents).toHaveLength(2); + + // Clean up: unblock the hanging stream so sendPromise resolves + hangResolve!(); + await sendPromise.catch(() => {}); + }); + + it("claude interrupt idempotency — second call is a no-op", async () => { + const events: AgentChatEventEnvelope[] = []; + const { service } = createService({ + onEvent: (event: AgentChatEventEnvelope) => events.push(event), + }); + + const session = await service.createSession({ + laneId: "lane-1", + provider: "claude", + model: "sonnet", + }); + + // First interrupt creates the runtime (if not already) and sets interrupted=true + await service.interrupt({ sessionId: session.id }); + + // Record event count after first interrupt + const eventsAfterFirst = events.length; + + // Second interrupt should hit the idempotency guard and return immediately + await service.interrupt({ sessionId: session.id }); + + // No new events at all from the second interrupt (it returned early) + const newEvents = events.slice(eventsAfterFirst); + expect(newEvents).toHaveLength(0); + }); + + it("claude interrupt with no active subagents emits no subagent events", async () => { + const events: AgentChatEventEnvelope[] = []; + const { service } = createService({ + onEvent: (event: AgentChatEventEnvelope) => events.push(event), + }); + + const session = await service.createSession({ + laneId: "lane-1", + provider: "claude", + model: "sonnet", + }); + + // Interrupt with no active subagents (default empty map) + await service.interrupt({ sessionId: session.id }); + + // No subagent_result events should have been emitted + const subagentResultEvents = events.filter( + (e) => e.event.type === "subagent_result", + ); + expect(subagentResultEvents).toHaveLength(0); + + // Verify interrupt did execute by confirming a second call is a no-op + const eventsAfterFirst = events.length; + await service.interrupt({ sessionId: session.id }); + const newEvents = events.slice(eventsAfterFirst); + expect(newEvents).toHaveLength(0); + }); + + it("unified interrupt idempotency — second call is a no-op", async () => { + const events: AgentChatEventEnvelope[] = []; + + // Mock streamText to create a stream that hangs, giving us a unified + // runtime in a busy state so we can interrupt it. + let hangResolve: (() => void) | null = null; + const hangPromise = new Promise((resolve) => { hangResolve = resolve; }); + vi.mocked(streamText).mockImplementation(() => ({ + fullStream: (async function* () { + yield { type: "start-step", stepNumber: 0 }; + yield { type: "text-delta", textDelta: "Thinking..." }; + // Hang until resolved + await hangPromise; + yield { type: "finish", usage: {} }; + })(), + }) as any); + + const { service } = createService({ + onEvent: (event: AgentChatEventEnvelope) => events.push(event), + }); + + const session = await service.createSession({ + laneId: "lane-1", + provider: "unified", + model: "anthropic/claude-sonnet-4-6-api", + modelId: "anthropic/claude-sonnet-4-6-api", + permissionMode: "edit", + }); + + // Start a turn so the unified runtime gets created + const sendPromise = service.sendMessage({ + sessionId: session.id, + text: "Do something", + }); + + // Wait for the text event to confirm the stream is running + await waitForEvent( + events, + (e): e is AgentChatEventEnvelope => e.event.type === "text", + ); + + // First interrupt — sets runtime.interrupted = true + await service.interrupt({ sessionId: session.id }); + const eventsAfterFirst = events.length; + + // Second interrupt — should be a no-op due to idempotency guard + await service.interrupt({ sessionId: session.id }); + const newEvents = events.slice(eventsAfterFirst); + + // The second interrupt should have produced no new events at all + expect(newEvents).toHaveLength(0); + + // Clean up + hangResolve!(); + await sendPromise.catch(() => {}); + }); }); // -------------------------------------------------------------------------- diff --git a/apps/desktop/src/main/services/chat/agentChatService.ts b/apps/desktop/src/main/services/chat/agentChatService.ts index 1957c1e25..943725e8d 100644 --- a/apps/desktop/src/main/services/chat/agentChatService.ts +++ b/apps/desktop/src/main/services/chat/agentChatService.ts @@ -5538,6 +5538,7 @@ export function createAgentChatService(args: { // ── Stream processing loop ── const streamSupportsReasoning = runtime.modelDescriptor.capabilities.reasoning; for await (const part of stream.fullStream as AsyncIterable) { + if (runtime.interrupted) break; if (!part || typeof part !== "object") continue; markFirstStreamEvent(String(part.type ?? "unknown")); @@ -8360,6 +8361,7 @@ export function createAgentChatService(args: { // Unified runtime interrupt — auto-decline pending approvals to prevent orphans if (managed.runtime?.kind === "unified") { + if (managed.runtime.interrupted) return; managed.runtime.interrupted = true; managed.runtime.abortController?.abort(); for (const [itemId, approval] of managed.runtime.pendingApprovals) { @@ -8380,6 +8382,8 @@ export function createAgentChatService(args: { } const runtime = ensureClaudeSessionRuntime(managed); + // Idempotency guard: skip if already interrupted (e.g. rapid cancel clicks) + if (runtime.interrupted) return; logger.info("agent_chat.turn_interrupt_requested", { sessionId, provider: "claude", @@ -8387,13 +8391,28 @@ export function createAgentChatService(args: { busy: runtime.busy, warmupInFlight: Boolean(runtime.v2WarmupDone), }); + // Set interrupted before closing the session so the streaming loop sees it + // and breaks cleanly rather than throwing from a closed session. runtime.interrupted = true; cancelClaudeWarmup(managed, runtime, "interrupt"); runtime.activeQuery?.interrupt().catch(() => {}); - // Close the V2 session on interrupt — it will be recreated on the next turn + // Close the V2 session — it will be recreated on the next turn. try { runtime.v2Session?.close(); } catch { /* ignore */ } runtime.v2Session = null; runtime.v2StreamGen = null; + + // Emit subagent_result "stopped" for every active subagent so the UI + // properly transitions them from "running" → "stopped" (matching Claude Code CLI behaviour). + const turnId = runtime.activeTurnId ?? undefined; + for (const { taskId } of runtime.activeSubagents.values()) { + emitChatEvent(managed, { + type: "subagent_result", + taskId, + status: "stopped", + summary: "Interrupted by user", + turnId, + }); + } runtime.activeSubagents.clear(); logger.info("agent_chat.turn_interrupt_completed", { sessionId, diff --git a/docs/architecture/AI_INTEGRATION.md b/docs/architecture/AI_INTEGRATION.md index 26a74aa0c..703243e3f 100644 --- a/docs/architecture/AI_INTEGRATION.md +++ b/docs/architecture/AI_INTEGRATION.md @@ -2,7 +2,7 @@ > Roadmap reference: `docs/final-plan/README.md` is the canonical future plan and sequencing source. -> Last updated: 2026-03-24 +> Last updated: 2026-03-30 The AI integration layer replaces the previous hosted agent with a local-first, provider-flexible approach. Instead of a cloud backend with remote job queues, ADE routes work to configured runtimes (CLI subscriptions, API-key/OpenRouter providers, and local endpoints such as LM Studio/Ollama/vLLM), coordinates tooling through MCP, and manages multi-step workflows via an AI orchestrator. @@ -1519,6 +1519,8 @@ type ChatEvent = | { type: "approval_request"; itemId: string; kind: "command" | "file_change"; description: string; detail: unknown } | { type: "system_notice"; noticeKind: "auth" | "rate_limit" | "hook" | "file_persist" | "info" | "memory" | "provider_health" | "thread_error"; message: string; detail?: string | AgentChatNoticeDetail } | { type: "status"; turnStatus: "started" | "completed" | "interrupted" | "failed"; error?: string } + | { type: "subagent_started"; taskId: string; description: string; turnId?: string } + | { type: "subagent_result"; taskId: string; status: "completed" | "stopped" | "failed"; summary: string; turnId?: string } | { type: "error"; message: string; errorInfo?: string } | { type: "done"; turnId: string }; @@ -1576,7 +1578,7 @@ send({ method: "initialized", params: {} }); | `createSession()` | `thread/start` | Params: `model`, `cwd` (lane worktree), `approvalPolicy`, `sandbox` | | `sendMessage()` | `turn/start` | Input array: `[{ type: "text", text }, ...attachments]` | | `steer()` | `turn/steer` | Appends to in-flight turn; cannot change model/sandbox | -| `interrupt()` | `turn/interrupt` | Turn completes with `status: "interrupted"` | +| `interrupt()` | `turn/interrupt` | Turn completes with `status: "interrupted"`. Idempotent — second call is a no-op. Claude/unified runtimes emit `subagent_result` "stopped" for active subagents. | | `resumeSession()` | `thread/resume` | Params: `threadId`, optional `personality` | | `listSessions()` | `thread/list` | Filter by `cwd` to scope to lane | | `approveToolUse()` | Response to `requestApproval` | Payload: `accept`/`acceptForSession`/`decline`/`cancel` | diff --git a/docs/features/CHAT.md b/docs/features/CHAT.md index 5339b5dbe..cdff16726 100644 --- a/docs/features/CHAT.md +++ b/docs/features/CHAT.md @@ -44,6 +44,31 @@ the abort infrastructure. When a turn exceeds this limit, an error event is emitted and the turn is terminated, preventing a single stalled provider call from blocking the session indefinitely. +## Turn Interruption + +Users can interrupt an in-flight turn via `interrupt()`. The behavior +is provider-specific: + +- **Claude**: The runtime's `interrupted` flag is set, the warmup is + cancelled, the active query is interrupted, and the V2 session is + closed (it will be recreated on the next turn). The streaming loop + checks the `interrupted` flag on each iteration and breaks cleanly + rather than throwing from a closed session. All active subagents are + transitioned from "running" to "stopped" by emitting a + `subagent_result` event with `status: "stopped"` for each one, + matching Claude Code CLI behaviour. The `activeSubagents` map is + then cleared. +- **Codex**: The interrupt is forwarded to the app-server via the + `turn/interrupt` JSON-RPC method. The turn completes with + `status: "interrupted"`. +- **Unified**: The abort controller is signalled and all pending + approval promises are auto-declined to prevent orphaned + human-in-the-loop requests. + +All providers implement an **idempotency guard**: if the runtime is +already interrupted, a second `interrupt()` call is a no-op. This +prevents duplicate side-effects from rapid cancel clicks. + ### Text Batching Streaming assistant text events from Codex and unified providers are From e923182c2bd21639fd278cca886fd7575822f7b3 Mon Sep 17 00:00:00 2001 From: Arul Sharma <31745423+arul28@users.noreply.github.com> Date: Mon, 30 Mar 2026 16:17:44 -0400 Subject: [PATCH 02/18] Add synthetic tool_result and artifact serving Introduce synthetic tool_result generation and local artifact serving to improve proof ingestion and previews. Key changes: - Add new syntheticToolResult module and unit/integration tests that scan tool args for absolute artifact file paths and build synthetic `tool_result` events so the proof observer can ingest screenshots, videos and traces. - Integrate maybeSyntheticToolResult into agentChatService to emit synthetic tool_result events when tools (e.g. Bash, Read, MCP tools) run but the SDK stream omits tool results. - Register a custom ade-artifact protocol in main.ts and add a handler that streams local files (with MIME detection and Range support for video seeking) for renderer previews. - Add an IPC handler (computerUseReadArtifactPreview) that returns base64 data URLs for image previews. - Improve lane import/rebase logic in laneService: resolve remote/local branch refs more robustly, attempt to detect parent lane via merge-base, create tracking branches when needed, and perform cleanup (worktree/branch) on failure. - Update conflictService to deduplicate PR-target rebase needs and emit a deduplicated rebase-needs-updated event. - Add tests for lane import and many synthetic tool result flows. These changes enable the UI to preview and ingest local artifacts produced by tools and make branch import/rebase detection more resilient. --- apps/desktop/src/main/main.ts | 77 ++- .../main/services/chat/agentChatService.ts | 13 + .../computerUse/syntheticToolResult.test.ts | 343 ++++++++++++++ .../computerUse/syntheticToolResult.ts | 82 ++++ .../services/conflicts/conflictService.ts | 12 +- .../src/main/services/ipc/registerIpc.ts | 23 + .../main/services/lanes/laneService.test.ts | 162 +++++++ .../src/main/services/lanes/laneService.ts | 308 +++++++++--- apps/desktop/src/preload/global.d.ts | 1 + apps/desktop/src/preload/preload.ts | 2 + .../chat/AgentChatComposer.test.tsx | 45 +- .../components/chat/AgentChatComposer.tsx | 370 ++------------- .../components/chat/AgentChatPane.tsx | 1 + .../components/chat/ChatComputerUsePanel.tsx | 444 ++++++++---------- .../components/lanes/CreateLaneDialog.tsx | 310 ++++++------ .../components/lanes/LanesPage.test.ts | 26 +- .../renderer/components/lanes/LanesPage.tsx | 62 ++- .../components/prs/CreatePrModal.test.tsx | 36 +- .../renderer/components/prs/CreatePrModal.tsx | 86 +++- apps/desktop/src/shared/ipc.ts | 1 + 20 files changed, 1558 insertions(+), 846 deletions(-) create mode 100644 apps/desktop/src/main/services/computerUse/syntheticToolResult.test.ts create mode 100644 apps/desktop/src/main/services/computerUse/syntheticToolResult.ts diff --git a/apps/desktop/src/main/main.ts b/apps/desktop/src/main/main.ts index f465ed47c..621c0d199 100644 --- a/apps/desktop/src/main/main.ts +++ b/apps/desktop/src/main/main.ts @@ -1,4 +1,4 @@ -import { app, BrowserWindow, nativeImage, shell } from "electron"; +import { app, BrowserWindow, nativeImage, protocol, shell } from "electron"; import path from "node:path"; type NodePtyType = typeof import("node-pty"); import { registerIpc } from "./services/ipc/registerIpc"; @@ -394,7 +394,82 @@ async function createWindow(logger?: Logger): Promise { return win; } +// Register custom protocol for serving local artifact files (images, videos) to the renderer. +// Must be called before app.whenReady(). +protocol.registerSchemesAsPrivileged([ + { scheme: "ade-artifact", privileges: { standard: false, supportFetchAPI: true, stream: true, bypassCSP: true } }, +]); + app.whenReady().then(async () => { + // Handle ade-artifact:// requests — serves local files for proof drawer previews. + // Path is encoded in the URL: ade-artifact:///absolute/path/to/file.png + protocol.handle("ade-artifact", (request) => { + const url = new URL(request.url); + let filePath = decodeURIComponent(url.pathname); + // On Windows, pathname starts with /C:/... — strip leading slash + if (process.platform === "win32" && /^\/[a-zA-Z]:/.test(filePath)) { + filePath = filePath.slice(1); + } + try { + const stat = fs.statSync(filePath); + if (!stat.isFile()) return new Response("Not found", { status: 404 }); + const fileSize = stat.size; + const ext = path.extname(filePath).replace(/^\./, "").toLowerCase(); + const mimeMap: Record = { + png: "image/png", jpg: "image/jpeg", jpeg: "image/jpeg", webp: "image/webp", + gif: "image/gif", bmp: "image/bmp", svg: "image/svg+xml", + mp4: "video/mp4", webm: "video/webm", mov: "video/quicktime", avi: "video/x-msvideo", mkv: "video/x-matroska", + }; + const mime = mimeMap[ext] ?? "application/octet-stream"; + + // Support Range requests — required for