diff --git a/dist/app.py b/dist/app.py index 62ee945..8fa2033 100644 --- a/dist/app.py +++ b/dist/app.py @@ -6458,7 +6458,11 @@ async def _run() -> None: ), config=orch._thread_config(session_id), ) - if not await orch._is_graph_paused(session_id): + if await orch._is_graph_paused(session_id): + # Issue #42: mark the row awaiting_input so UIs + # filtering by that status see the paused session. + await orch._mark_session_paused_async(session_id) + else: await orch._finalize_session_status_async(session_id) except asyncio.CancelledError: raise @@ -14882,6 +14886,54 @@ async def _finalize_session_status_async( async with self._locks.acquire(session_id): return self._finalize_session_status(session_id) + async def _mark_session_paused_async( + self, session_id: str, + ) -> str | None: + """Lock-guarded write that flips a paused session to + ``"awaiting_input"`` and emits the matching ``status_changed`` + events. + + Companion to :meth:`_finalize_session_status_async`. The + finalizer handles graph-completed runs; this handles + graph-paused runs (HITL gate). Without it, sessions that + pause at a gate keep their old status (typically ``"new"`` / + ``"in_progress"``) and UIs that filter by ``"awaiting_input"`` + (the approvals queue, the rail's Active group) miss them — see + issue #42. + + No-op (returns ``None``) when the session is already at + ``"awaiting_input"``, already in a terminal status (a late + pause-write must not unwind a finalize that landed in + between), or the row is missing. Otherwise transitions the + row to ``"awaiting_input"`` and emits both per-session + ``status_changed`` and cross-session + ``session.status_changed`` events via + :func:`_emit_status_changed_event`. + """ + async with self._locks.acquire(session_id): + try: + inc = self.store.load(session_id) + except (FileNotFoundError, ValueError, KeyError, LookupError): + return None + if inc.status == "awaiting_input": + return None + statuses = getattr(getattr(self, "cfg", None), "orchestrator", None) + if statuses is not None: + current_def = statuses.statuses.get(inc.status) + if current_def is not None and current_def.terminal: + return None + from_status = inc.status + inc.status = "awaiting_input" + self.store.save(inc) + _emit_status_changed_event( + orch=self, + inc=inc, + from_status=from_status, + to_status="awaiting_input", + cause="gate_paused", + ) + return "awaiting_input" + async def _is_graph_paused(self, session_id: str) -> bool: """Return True iff the compiled graph has a pending step waiting to resume (i.e. it's paused at an ``interrupt()`` boundary). @@ -15082,7 +15134,9 @@ async def start_session(self, *, query: str, last_agent=None, error=None), config=self._thread_config(inc.id), ) - if not await self._is_graph_paused(inc.id): + if await self._is_graph_paused(inc.id): + await self._mark_session_paused_async(inc.id) + else: await self._finalize_session_status_async(inc.id) return inc.id @@ -15140,7 +15194,10 @@ async def stream_session(self, *, query: str, environment: str, # session is waiting for operator approval, not done. Stamping # default_terminal_status here would orphan the pending_approval # ToolCall row written by the gateway just before the pause. + # Issue #42: still flip the session row to awaiting_input so + # the approvals queue + sessions rail Active group pick it up. if await self._is_graph_paused(inc.id): + await self._mark_session_paused_async(inc.id) yield {"event": "session_paused", "incident_id": inc.id, "ts": _event_ts()} else: @@ -15422,8 +15479,10 @@ async def _retry_session_locked(self, session_id: str) -> AsyncIterator[dict]: yield self._to_ui_event(ev, session_id) # See ``stream_session`` for why pause-detection guards the # finalize call: a HITL pause must not be coerced into a - # terminal status. + # terminal status. Issue #42: still flip to awaiting_input so + # UIs filtering by that status see the retried session. if await self._is_graph_paused(session_id): + await self._mark_session_paused_async(session_id) yield {"event": "session_paused", "incident_id": session_id, "ts": _event_ts()} else: diff --git a/dist/apps/code-review.py b/dist/apps/code-review.py index 067613d..7599cac 100644 --- a/dist/apps/code-review.py +++ b/dist/apps/code-review.py @@ -6511,7 +6511,11 @@ async def _run() -> None: ), config=orch._thread_config(session_id), ) - if not await orch._is_graph_paused(session_id): + if await orch._is_graph_paused(session_id): + # Issue #42: mark the row awaiting_input so UIs + # filtering by that status see the paused session. + await orch._mark_session_paused_async(session_id) + else: await orch._finalize_session_status_async(session_id) except asyncio.CancelledError: raise @@ -14935,6 +14939,54 @@ async def _finalize_session_status_async( async with self._locks.acquire(session_id): return self._finalize_session_status(session_id) + async def _mark_session_paused_async( + self, session_id: str, + ) -> str | None: + """Lock-guarded write that flips a paused session to + ``"awaiting_input"`` and emits the matching ``status_changed`` + events. + + Companion to :meth:`_finalize_session_status_async`. The + finalizer handles graph-completed runs; this handles + graph-paused runs (HITL gate). Without it, sessions that + pause at a gate keep their old status (typically ``"new"`` / + ``"in_progress"``) and UIs that filter by ``"awaiting_input"`` + (the approvals queue, the rail's Active group) miss them — see + issue #42. + + No-op (returns ``None``) when the session is already at + ``"awaiting_input"``, already in a terminal status (a late + pause-write must not unwind a finalize that landed in + between), or the row is missing. Otherwise transitions the + row to ``"awaiting_input"`` and emits both per-session + ``status_changed`` and cross-session + ``session.status_changed`` events via + :func:`_emit_status_changed_event`. + """ + async with self._locks.acquire(session_id): + try: + inc = self.store.load(session_id) + except (FileNotFoundError, ValueError, KeyError, LookupError): + return None + if inc.status == "awaiting_input": + return None + statuses = getattr(getattr(self, "cfg", None), "orchestrator", None) + if statuses is not None: + current_def = statuses.statuses.get(inc.status) + if current_def is not None and current_def.terminal: + return None + from_status = inc.status + inc.status = "awaiting_input" + self.store.save(inc) + _emit_status_changed_event( + orch=self, + inc=inc, + from_status=from_status, + to_status="awaiting_input", + cause="gate_paused", + ) + return "awaiting_input" + async def _is_graph_paused(self, session_id: str) -> bool: """Return True iff the compiled graph has a pending step waiting to resume (i.e. it's paused at an ``interrupt()`` boundary). @@ -15135,7 +15187,9 @@ async def start_session(self, *, query: str, last_agent=None, error=None), config=self._thread_config(inc.id), ) - if not await self._is_graph_paused(inc.id): + if await self._is_graph_paused(inc.id): + await self._mark_session_paused_async(inc.id) + else: await self._finalize_session_status_async(inc.id) return inc.id @@ -15193,7 +15247,10 @@ async def stream_session(self, *, query: str, environment: str, # session is waiting for operator approval, not done. Stamping # default_terminal_status here would orphan the pending_approval # ToolCall row written by the gateway just before the pause. + # Issue #42: still flip the session row to awaiting_input so + # the approvals queue + sessions rail Active group pick it up. if await self._is_graph_paused(inc.id): + await self._mark_session_paused_async(inc.id) yield {"event": "session_paused", "incident_id": inc.id, "ts": _event_ts()} else: @@ -15475,8 +15532,10 @@ async def _retry_session_locked(self, session_id: str) -> AsyncIterator[dict]: yield self._to_ui_event(ev, session_id) # See ``stream_session`` for why pause-detection guards the # finalize call: a HITL pause must not be coerced into a - # terminal status. + # terminal status. Issue #42: still flip to awaiting_input so + # UIs filtering by that status see the retried session. if await self._is_graph_paused(session_id): + await self._mark_session_paused_async(session_id) yield {"event": "session_paused", "incident_id": session_id, "ts": _event_ts()} else: diff --git a/dist/apps/incident-management.py b/dist/apps/incident-management.py index a619331..649c9ff 100644 --- a/dist/apps/incident-management.py +++ b/dist/apps/incident-management.py @@ -6523,7 +6523,11 @@ async def _run() -> None: ), config=orch._thread_config(session_id), ) - if not await orch._is_graph_paused(session_id): + if await orch._is_graph_paused(session_id): + # Issue #42: mark the row awaiting_input so UIs + # filtering by that status see the paused session. + await orch._mark_session_paused_async(session_id) + else: await orch._finalize_session_status_async(session_id) except asyncio.CancelledError: raise @@ -14947,6 +14951,54 @@ async def _finalize_session_status_async( async with self._locks.acquire(session_id): return self._finalize_session_status(session_id) + async def _mark_session_paused_async( + self, session_id: str, + ) -> str | None: + """Lock-guarded write that flips a paused session to + ``"awaiting_input"`` and emits the matching ``status_changed`` + events. + + Companion to :meth:`_finalize_session_status_async`. The + finalizer handles graph-completed runs; this handles + graph-paused runs (HITL gate). Without it, sessions that + pause at a gate keep their old status (typically ``"new"`` / + ``"in_progress"``) and UIs that filter by ``"awaiting_input"`` + (the approvals queue, the rail's Active group) miss them — see + issue #42. + + No-op (returns ``None``) when the session is already at + ``"awaiting_input"``, already in a terminal status (a late + pause-write must not unwind a finalize that landed in + between), or the row is missing. Otherwise transitions the + row to ``"awaiting_input"`` and emits both per-session + ``status_changed`` and cross-session + ``session.status_changed`` events via + :func:`_emit_status_changed_event`. + """ + async with self._locks.acquire(session_id): + try: + inc = self.store.load(session_id) + except (FileNotFoundError, ValueError, KeyError, LookupError): + return None + if inc.status == "awaiting_input": + return None + statuses = getattr(getattr(self, "cfg", None), "orchestrator", None) + if statuses is not None: + current_def = statuses.statuses.get(inc.status) + if current_def is not None and current_def.terminal: + return None + from_status = inc.status + inc.status = "awaiting_input" + self.store.save(inc) + _emit_status_changed_event( + orch=self, + inc=inc, + from_status=from_status, + to_status="awaiting_input", + cause="gate_paused", + ) + return "awaiting_input" + async def _is_graph_paused(self, session_id: str) -> bool: """Return True iff the compiled graph has a pending step waiting to resume (i.e. it's paused at an ``interrupt()`` boundary). @@ -15147,7 +15199,9 @@ async def start_session(self, *, query: str, last_agent=None, error=None), config=self._thread_config(inc.id), ) - if not await self._is_graph_paused(inc.id): + if await self._is_graph_paused(inc.id): + await self._mark_session_paused_async(inc.id) + else: await self._finalize_session_status_async(inc.id) return inc.id @@ -15205,7 +15259,10 @@ async def stream_session(self, *, query: str, environment: str, # session is waiting for operator approval, not done. Stamping # default_terminal_status here would orphan the pending_approval # ToolCall row written by the gateway just before the pause. + # Issue #42: still flip the session row to awaiting_input so + # the approvals queue + sessions rail Active group pick it up. if await self._is_graph_paused(inc.id): + await self._mark_session_paused_async(inc.id) yield {"event": "session_paused", "incident_id": inc.id, "ts": _event_ts()} else: @@ -15487,8 +15544,10 @@ async def _retry_session_locked(self, session_id: str) -> AsyncIterator[dict]: yield self._to_ui_event(ev, session_id) # See ``stream_session`` for why pause-detection guards the # finalize call: a HITL pause must not be coerced into a - # terminal status. + # terminal status. Issue #42: still flip to awaiting_input so + # UIs filtering by that status see the retried session. if await self._is_graph_paused(session_id): + await self._mark_session_paused_async(session_id) yield {"event": "session_paused", "incident_id": session_id, "ts": _event_ts()} else: diff --git a/src/runtime/orchestrator.py b/src/runtime/orchestrator.py index 2be5e03..6b7c7a5 100644 --- a/src/runtime/orchestrator.py +++ b/src/runtime/orchestrator.py @@ -1091,6 +1091,54 @@ async def _finalize_session_status_async( async with self._locks.acquire(session_id): return self._finalize_session_status(session_id) + async def _mark_session_paused_async( + self, session_id: str, + ) -> str | None: + """Lock-guarded write that flips a paused session to + ``"awaiting_input"`` and emits the matching ``status_changed`` + events. + + Companion to :meth:`_finalize_session_status_async`. The + finalizer handles graph-completed runs; this handles + graph-paused runs (HITL gate). Without it, sessions that + pause at a gate keep their old status (typically ``"new"`` / + ``"in_progress"``) and UIs that filter by ``"awaiting_input"`` + (the approvals queue, the rail's Active group) miss them — see + issue #42. + + No-op (returns ``None``) when the session is already at + ``"awaiting_input"``, already in a terminal status (a late + pause-write must not unwind a finalize that landed in + between), or the row is missing. Otherwise transitions the + row to ``"awaiting_input"`` and emits both per-session + ``status_changed`` and cross-session + ``session.status_changed`` events via + :func:`_emit_status_changed_event`. + """ + async with self._locks.acquire(session_id): + try: + inc = self.store.load(session_id) + except (FileNotFoundError, ValueError, KeyError, LookupError): + return None + if inc.status == "awaiting_input": + return None + statuses = getattr(getattr(self, "cfg", None), "orchestrator", None) + if statuses is not None: + current_def = statuses.statuses.get(inc.status) + if current_def is not None and current_def.terminal: + return None + from_status = inc.status + inc.status = "awaiting_input" + self.store.save(inc) + _emit_status_changed_event( + orch=self, + inc=inc, + from_status=from_status, + to_status="awaiting_input", + cause="gate_paused", + ) + return "awaiting_input" + async def _is_graph_paused(self, session_id: str) -> bool: """Return True iff the compiled graph has a pending step waiting to resume (i.e. it's paused at an ``interrupt()`` boundary). @@ -1291,7 +1339,9 @@ async def start_session(self, *, query: str, last_agent=None, error=None), config=self._thread_config(inc.id), ) - if not await self._is_graph_paused(inc.id): + if await self._is_graph_paused(inc.id): + await self._mark_session_paused_async(inc.id) + else: await self._finalize_session_status_async(inc.id) return inc.id @@ -1349,7 +1399,10 @@ async def stream_session(self, *, query: str, environment: str, # session is waiting for operator approval, not done. Stamping # default_terminal_status here would orphan the pending_approval # ToolCall row written by the gateway just before the pause. + # Issue #42: still flip the session row to awaiting_input so + # the approvals queue + sessions rail Active group pick it up. if await self._is_graph_paused(inc.id): + await self._mark_session_paused_async(inc.id) yield {"event": "session_paused", "incident_id": inc.id, "ts": _event_ts()} else: @@ -1631,8 +1684,10 @@ async def _retry_session_locked(self, session_id: str) -> AsyncIterator[dict]: yield self._to_ui_event(ev, session_id) # See ``stream_session`` for why pause-detection guards the # finalize call: a HITL pause must not be coerced into a - # terminal status. + # terminal status. Issue #42: still flip to awaiting_input so + # UIs filtering by that status see the retried session. if await self._is_graph_paused(session_id): + await self._mark_session_paused_async(session_id) yield {"event": "session_paused", "incident_id": session_id, "ts": _event_ts()} else: diff --git a/src/runtime/service.py b/src/runtime/service.py index afaad7b..74ffd74 100644 --- a/src/runtime/service.py +++ b/src/runtime/service.py @@ -559,7 +559,11 @@ async def _run() -> None: ), config=orch._thread_config(session_id), ) - if not await orch._is_graph_paused(session_id): + if await orch._is_graph_paused(session_id): + # Issue #42: mark the row awaiting_input so UIs + # filtering by that status see the paused session. + await orch._mark_session_paused_async(session_id) + else: await orch._finalize_session_status_async(session_id) except asyncio.CancelledError: raise diff --git a/tests/test_finalizer_paths.py b/tests/test_finalizer_paths.py index 04801f8..b3aab0d 100644 --- a/tests/test_finalizer_paths.py +++ b/tests/test_finalizer_paths.py @@ -103,6 +103,44 @@ async def aget_state(self, config): return SimpleNamespace(next=()) +class _PausedAtGateGraph: + """Fake graph that simulates a HITL gate pause: writes a + ``pending_approval`` ToolCall, returns from ``ainvoke``, then + reports a non-empty ``next`` tuple to ``aget_state`` so the + orchestrator's ``_is_graph_paused`` returns True. Mirrors what + langgraph 1.x does when an ``interrupt()`` boundary is hit.""" + + def __init__(self, store, *, service=None) -> None: + self.store = store + self.service = service + self.captured_entries = [] + + async def ainvoke(self, state, *, config): + inc = state["session"] + if self.service is not None: + self.captured_entries.append(self.service._registry.get(inc.id)) + # Mid-run state: an in-flight tool call that the gateway has + # parked at a HITL gate. Status stays at the pre-pause value + # ('new' / 'in_progress') until the orchestrator writes + # 'awaiting_input' itself. + inc.tool_calls.append(ToolCall( + agent="resolution", + tool="apply_fix", + args={"target": "payments-svc"}, + result=None, + ts="2026-01-01T00:00:00Z", + status="pending_approval", + )) + self.store.save(inc) + return {} + + async def aget_state(self, config): + # A non-empty ``next`` tuple is langgraph's way of saying + # "the graph has steps queued to run when resumed" — i.e. + # paused at an ``interrupt()``. + return SimpleNamespace(next=("resume_node",)) + + @pytest.mark.asyncio async def test_orchestrator_start_session_finalizes_completed_non_streaming_run( tmp_path, @@ -141,3 +179,108 @@ async def _await_background_task(): assert orch.store.load(sid).status == "resolved" finally: service.shutdown() + + +# --------------------------------------------------------------------------- +# Issue #42: paused-at-gate sessions must transition to 'awaiting_input'. +# Sibling of the C#1 finalizer-asymmetry fix above — the finalizer skips +# paused graphs (correct: a HITL pause must not be coerced into a terminal +# status), but the row needs a paused-side status write so UIs filtering by +# 'awaiting_input' (approvals queue, sessions rail Active group) pick it up. +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_orchestrator_start_session_marks_paused_run_awaiting_input(tmp_path): + """Direct (non-streaming) start_session: when the graph pauses at a + HITL gate, the session row should flip to 'awaiting_input' AND a + status_changed event should be emitted.""" + orch = await Orchestrator.create(_cfg(tmp_path)) + try: + orch.graph = _PausedAtGateGraph(orch.store) + sid = await orch.start_session(query="db pool exhausted") + loaded = orch.store.load(sid) + assert loaded.status == "awaiting_input" + # The pending_approval ToolCall written by the fake gateway + # must still be on the row — pause-write should not clobber it. + assert any( + tc.status == "pending_approval" for tc in loaded.tool_calls + ), loaded.tool_calls + # status_changed (per-session) + session.status_changed + # (cross-session SSE) events must be on the event log so the + # React UI's recent-events stream surfaces the transition. + kinds = [ev.kind for ev in orch.event_log.iter_for(sid)] + assert "status_changed" in kinds + assert "session.status_changed" in kinds + finally: + await orch.aclose() + + +def test_service_background_start_session_marks_paused_run_awaiting_input(tmp_path): + """OrchestratorService background _run() path: same guarantee as + the direct path — the inner finalizer block now mirrors + orchestrator.start_session's pause handling.""" + service = OrchestratorService.get_or_create(_cfg(tmp_path)) + service.start() + try: + async def _install_graph(): + orch = await service._ensure_orchestrator() + graph = _PausedAtGateGraph(orch.store, service=service) + orch.graph = graph + return graph + + graph = service.submit_and_wait(_install_graph(), timeout=10.0) + sid = service.start_session(query="db pool exhausted") + + async def _await_background_task(): + while not graph.captured_entries: + await asyncio.sleep(0.01) + entry = graph.captured_entries[0] + if entry is not None and entry.task is not None: + await entry.task + + service.submit_and_wait(_await_background_task(), timeout=10.0) + orch = service.submit_and_wait(service._ensure_orchestrator(), timeout=10.0) + loaded = orch.store.load(sid) + assert loaded.status == "awaiting_input" + kinds = [ev.kind for ev in orch.event_log.iter_for(sid)] + assert "session.status_changed" in kinds + finally: + service.shutdown() + + +@pytest.mark.asyncio +async def test_mark_session_paused_is_no_op_when_already_awaiting_input(tmp_path): + """Guard: calling _mark_session_paused_async on a session that's + already awaiting_input must NOT emit a spurious status_changed + event (it would re-trigger the React UI's row update with an + identical to-status).""" + orch = await Orchestrator.create(_cfg(tmp_path)) + try: + inc = orch.store.create(query="x", environment="dev") + inc.status = "awaiting_input" + orch.store.save(inc) + before = list(orch.event_log.iter_for(inc.id)) + result = await orch._mark_session_paused_async(inc.id) + after = list(orch.event_log.iter_for(inc.id)) + assert result is None + assert len(after) == len(before) + finally: + await orch.aclose() + + +@pytest.mark.asyncio +async def test_mark_session_paused_is_no_op_on_terminal_status(tmp_path): + """Guard: a late paused-write must not unwind a finalize that + landed in between (e.g. the gateway raised after the audit row + landed but before pausing, and the finalizer ran first).""" + orch = await Orchestrator.create(_cfg(tmp_path)) + try: + inc = orch.store.create(query="x", environment="dev") + inc.status = "resolved" # terminal per _cfg + orch.store.save(inc) + result = await orch._mark_session_paused_async(inc.id) + assert result is None + assert orch.store.load(inc.id).status == "resolved" + finally: + await orch.aclose() diff --git a/tests/test_retry_session_locked_post_policy.py b/tests/test_retry_session_locked_post_policy.py index 852771d..3121183 100644 --- a/tests/test_retry_session_locked_post_policy.py +++ b/tests/test_retry_session_locked_post_policy.py @@ -70,6 +70,13 @@ async def _is_graph_paused(self, sid: str) -> bool: async def _finalize_session_status_async(self, sid: str) -> str | None: return self._finalized + async def _mark_session_paused_async(self, sid: str) -> str | None: + # Issue #42: retry path now writes 'awaiting_input' on the + # paused branch in addition to the existing finalize call. + # The stub records that it was called (paused branch flag). + self._marked_paused_calls = getattr(self, "_marked_paused_calls", 0) + 1 + return "awaiting_input" if self._paused else None + @pytest.fixture def store(tmp_path) -> SessionStore: diff --git a/tests/test_triggers/test_orchestrator_trigger_kwarg.py b/tests/test_triggers/test_orchestrator_trigger_kwarg.py index be4d76c..231a5c6 100644 --- a/tests/test_triggers/test_orchestrator_trigger_kwarg.py +++ b/tests/test_triggers/test_orchestrator_trigger_kwarg.py @@ -12,6 +12,14 @@ async def _always_paused() -> bool: return True +async def _noop_paused_write() -> None: + # Issue #42 stub: the new _mark_session_paused_async would touch + # self._locks; these tests bypass __init__ and never wire the + # lock manager. The trigger-stamp tests only care that the + # paused-branch is taken, not that the row gets a status write. + return None + + @pytest.mark.asyncio async def test_orchestrator_start_session_records_trigger(tmp_path, monkeypatch): """``Orchestrator.start_session(trigger=...)`` stamps provenance on @@ -51,6 +59,10 @@ async def ainvoke(self, state, config): orch.graph = _FakeGraph() orch._thread_config = lambda sid: {"configurable": {"thread_id": sid}} orch._is_graph_paused = lambda sid: _always_paused() + # Issue #42: paused branch now calls _mark_session_paused_async, + # which uses orch._locks. Stub it out since this test bypasses + # __init__ and never wires the lock manager. + orch._mark_session_paused_async = lambda sid: _noop_paused_write() # Tests that bypass __init__ must set the dedup pipeline # attribute to ``None`` so the dedup-check shortcut returns False # without touching the (uninitialised) attribute. @@ -96,6 +108,7 @@ async def ainvoke(self, state, config): orch.graph = _FakeGraph() orch._thread_config = lambda sid: {"configurable": {"thread_id": sid}} orch._is_graph_paused = lambda sid: _always_paused() + orch._mark_session_paused_async = lambda sid: _noop_paused_write() orch.dedup_pipeline = None sid = await orch.start_session(query="q", environment="dev")