Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 62 additions & 3 deletions dist/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6458,7 +6458,11 @@ async def _run() -> None:
),
config=orch._thread_config(session_id),
)
if not await orch._is_graph_paused(session_id):
if await orch._is_graph_paused(session_id):
# Issue #42: mark the row awaiting_input so UIs
# filtering by that status see the paused session.
await orch._mark_session_paused_async(session_id)
else:
await orch._finalize_session_status_async(session_id)
except asyncio.CancelledError:
raise
Expand Down Expand Up @@ -14882,6 +14886,54 @@ async def _finalize_session_status_async(
async with self._locks.acquire(session_id):
return self._finalize_session_status(session_id)

async def _mark_session_paused_async(
self, session_id: str,
) -> str | None:
"""Lock-guarded write that flips a paused session to
``"awaiting_input"`` and emits the matching ``status_changed``
events.

Companion to :meth:`_finalize_session_status_async`. The
finalizer handles graph-completed runs; this handles
graph-paused runs (HITL gate). Without it, sessions that
pause at a gate keep their old status (typically ``"new"`` /
``"in_progress"``) and UIs that filter by ``"awaiting_input"``
(the approvals queue, the rail's Active group) miss them — see
issue #42.

No-op (returns ``None``) when the session is already at
``"awaiting_input"``, already in a terminal status (a late
pause-write must not unwind a finalize that landed in
between), or the row is missing. Otherwise transitions the
row to ``"awaiting_input"`` and emits both per-session
``status_changed`` and cross-session
``session.status_changed`` events via
:func:`_emit_status_changed_event`.
"""
async with self._locks.acquire(session_id):
try:
inc = self.store.load(session_id)
except (FileNotFoundError, ValueError, KeyError, LookupError):
return None
if inc.status == "awaiting_input":
return None
statuses = getattr(getattr(self, "cfg", None), "orchestrator", None)
if statuses is not None:
current_def = statuses.statuses.get(inc.status)
if current_def is not None and current_def.terminal:
return None
from_status = inc.status
inc.status = "awaiting_input"
self.store.save(inc)
_emit_status_changed_event(
orch=self,
inc=inc,
from_status=from_status,
to_status="awaiting_input",
cause="gate_paused",
)
return "awaiting_input"

async def _is_graph_paused(self, session_id: str) -> bool:
"""Return True iff the compiled graph has a pending step waiting
to resume (i.e. it's paused at an ``interrupt()`` boundary).
Expand Down Expand Up @@ -15082,7 +15134,9 @@ async def start_session(self, *, query: str,
last_agent=None, error=None),
config=self._thread_config(inc.id),
)
if not await self._is_graph_paused(inc.id):
if await self._is_graph_paused(inc.id):
await self._mark_session_paused_async(inc.id)
else:
await self._finalize_session_status_async(inc.id)
return inc.id

Expand Down Expand Up @@ -15140,7 +15194,10 @@ async def stream_session(self, *, query: str, environment: str,
# session is waiting for operator approval, not done. Stamping
# default_terminal_status here would orphan the pending_approval
# ToolCall row written by the gateway just before the pause.
# Issue #42: still flip the session row to awaiting_input so
# the approvals queue + sessions rail Active group pick it up.
if await self._is_graph_paused(inc.id):
await self._mark_session_paused_async(inc.id)
yield {"event": "session_paused", "incident_id": inc.id,
"ts": _event_ts()}
else:
Expand Down Expand Up @@ -15422,8 +15479,10 @@ async def _retry_session_locked(self, session_id: str) -> AsyncIterator[dict]:
yield self._to_ui_event(ev, session_id)
# See ``stream_session`` for why pause-detection guards the
# finalize call: a HITL pause must not be coerced into a
# terminal status.
# terminal status. Issue #42: still flip to awaiting_input so
# UIs filtering by that status see the retried session.
if await self._is_graph_paused(session_id):
await self._mark_session_paused_async(session_id)
yield {"event": "session_paused", "incident_id": session_id,
"ts": _event_ts()}
else:
Expand Down
65 changes: 62 additions & 3 deletions dist/apps/code-review.py
Original file line number Diff line number Diff line change
Expand Up @@ -6511,7 +6511,11 @@ async def _run() -> None:
),
config=orch._thread_config(session_id),
)
if not await orch._is_graph_paused(session_id):
if await orch._is_graph_paused(session_id):
# Issue #42: mark the row awaiting_input so UIs
# filtering by that status see the paused session.
await orch._mark_session_paused_async(session_id)
else:
await orch._finalize_session_status_async(session_id)
except asyncio.CancelledError:
raise
Expand Down Expand Up @@ -14935,6 +14939,54 @@ async def _finalize_session_status_async(
async with self._locks.acquire(session_id):
return self._finalize_session_status(session_id)

async def _mark_session_paused_async(
self, session_id: str,
) -> str | None:
"""Lock-guarded write that flips a paused session to
``"awaiting_input"`` and emits the matching ``status_changed``
events.

Companion to :meth:`_finalize_session_status_async`. The
finalizer handles graph-completed runs; this handles
graph-paused runs (HITL gate). Without it, sessions that
pause at a gate keep their old status (typically ``"new"`` /
``"in_progress"``) and UIs that filter by ``"awaiting_input"``
(the approvals queue, the rail's Active group) miss them — see
issue #42.

No-op (returns ``None``) when the session is already at
``"awaiting_input"``, already in a terminal status (a late
pause-write must not unwind a finalize that landed in
between), or the row is missing. Otherwise transitions the
row to ``"awaiting_input"`` and emits both per-session
``status_changed`` and cross-session
``session.status_changed`` events via
:func:`_emit_status_changed_event`.
"""
async with self._locks.acquire(session_id):
try:
inc = self.store.load(session_id)
except (FileNotFoundError, ValueError, KeyError, LookupError):
return None
if inc.status == "awaiting_input":
return None
statuses = getattr(getattr(self, "cfg", None), "orchestrator", None)
if statuses is not None:
current_def = statuses.statuses.get(inc.status)
if current_def is not None and current_def.terminal:
return None
from_status = inc.status
inc.status = "awaiting_input"
self.store.save(inc)
_emit_status_changed_event(
orch=self,
inc=inc,
from_status=from_status,
to_status="awaiting_input",
cause="gate_paused",
)
return "awaiting_input"

async def _is_graph_paused(self, session_id: str) -> bool:
"""Return True iff the compiled graph has a pending step waiting
to resume (i.e. it's paused at an ``interrupt()`` boundary).
Expand Down Expand Up @@ -15135,7 +15187,9 @@ async def start_session(self, *, query: str,
last_agent=None, error=None),
config=self._thread_config(inc.id),
)
if not await self._is_graph_paused(inc.id):
if await self._is_graph_paused(inc.id):
await self._mark_session_paused_async(inc.id)
else:
await self._finalize_session_status_async(inc.id)
return inc.id

Expand Down Expand Up @@ -15193,7 +15247,10 @@ async def stream_session(self, *, query: str, environment: str,
# session is waiting for operator approval, not done. Stamping
# default_terminal_status here would orphan the pending_approval
# ToolCall row written by the gateway just before the pause.
# Issue #42: still flip the session row to awaiting_input so
# the approvals queue + sessions rail Active group pick it up.
if await self._is_graph_paused(inc.id):
await self._mark_session_paused_async(inc.id)
yield {"event": "session_paused", "incident_id": inc.id,
"ts": _event_ts()}
else:
Expand Down Expand Up @@ -15475,8 +15532,10 @@ async def _retry_session_locked(self, session_id: str) -> AsyncIterator[dict]:
yield self._to_ui_event(ev, session_id)
# See ``stream_session`` for why pause-detection guards the
# finalize call: a HITL pause must not be coerced into a
# terminal status.
# terminal status. Issue #42: still flip to awaiting_input so
# UIs filtering by that status see the retried session.
if await self._is_graph_paused(session_id):
await self._mark_session_paused_async(session_id)
yield {"event": "session_paused", "incident_id": session_id,
"ts": _event_ts()}
else:
Expand Down
65 changes: 62 additions & 3 deletions dist/apps/incident-management.py
Original file line number Diff line number Diff line change
Expand Up @@ -6523,7 +6523,11 @@ async def _run() -> None:
),
config=orch._thread_config(session_id),
)
if not await orch._is_graph_paused(session_id):
if await orch._is_graph_paused(session_id):
# Issue #42: mark the row awaiting_input so UIs
# filtering by that status see the paused session.
await orch._mark_session_paused_async(session_id)
else:
await orch._finalize_session_status_async(session_id)
except asyncio.CancelledError:
raise
Expand Down Expand Up @@ -14947,6 +14951,54 @@ async def _finalize_session_status_async(
async with self._locks.acquire(session_id):
return self._finalize_session_status(session_id)

async def _mark_session_paused_async(
self, session_id: str,
) -> str | None:
"""Lock-guarded write that flips a paused session to
``"awaiting_input"`` and emits the matching ``status_changed``
events.

Companion to :meth:`_finalize_session_status_async`. The
finalizer handles graph-completed runs; this handles
graph-paused runs (HITL gate). Without it, sessions that
pause at a gate keep their old status (typically ``"new"`` /
``"in_progress"``) and UIs that filter by ``"awaiting_input"``
(the approvals queue, the rail's Active group) miss them — see
issue #42.

No-op (returns ``None``) when the session is already at
``"awaiting_input"``, already in a terminal status (a late
pause-write must not unwind a finalize that landed in
between), or the row is missing. Otherwise transitions the
row to ``"awaiting_input"`` and emits both per-session
``status_changed`` and cross-session
``session.status_changed`` events via
:func:`_emit_status_changed_event`.
"""
async with self._locks.acquire(session_id):
try:
inc = self.store.load(session_id)
except (FileNotFoundError, ValueError, KeyError, LookupError):
return None
if inc.status == "awaiting_input":
return None
statuses = getattr(getattr(self, "cfg", None), "orchestrator", None)
if statuses is not None:
current_def = statuses.statuses.get(inc.status)
if current_def is not None and current_def.terminal:
return None
from_status = inc.status
inc.status = "awaiting_input"
self.store.save(inc)
_emit_status_changed_event(
orch=self,
inc=inc,
from_status=from_status,
to_status="awaiting_input",
cause="gate_paused",
)
return "awaiting_input"

async def _is_graph_paused(self, session_id: str) -> bool:
"""Return True iff the compiled graph has a pending step waiting
to resume (i.e. it's paused at an ``interrupt()`` boundary).
Expand Down Expand Up @@ -15147,7 +15199,9 @@ async def start_session(self, *, query: str,
last_agent=None, error=None),
config=self._thread_config(inc.id),
)
if not await self._is_graph_paused(inc.id):
if await self._is_graph_paused(inc.id):
await self._mark_session_paused_async(inc.id)
else:
await self._finalize_session_status_async(inc.id)
return inc.id

Expand Down Expand Up @@ -15205,7 +15259,10 @@ async def stream_session(self, *, query: str, environment: str,
# session is waiting for operator approval, not done. Stamping
# default_terminal_status here would orphan the pending_approval
# ToolCall row written by the gateway just before the pause.
# Issue #42: still flip the session row to awaiting_input so
# the approvals queue + sessions rail Active group pick it up.
if await self._is_graph_paused(inc.id):
await self._mark_session_paused_async(inc.id)
yield {"event": "session_paused", "incident_id": inc.id,
"ts": _event_ts()}
else:
Expand Down Expand Up @@ -15487,8 +15544,10 @@ async def _retry_session_locked(self, session_id: str) -> AsyncIterator[dict]:
yield self._to_ui_event(ev, session_id)
# See ``stream_session`` for why pause-detection guards the
# finalize call: a HITL pause must not be coerced into a
# terminal status.
# terminal status. Issue #42: still flip to awaiting_input so
# UIs filtering by that status see the retried session.
if await self._is_graph_paused(session_id):
await self._mark_session_paused_async(session_id)
yield {"event": "session_paused", "incident_id": session_id,
"ts": _event_ts()}
else:
Expand Down
59 changes: 57 additions & 2 deletions src/runtime/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,54 @@
async with self._locks.acquire(session_id):
return self._finalize_session_status(session_id)

async def _mark_session_paused_async(
self, session_id: str,
) -> str | None:
"""Lock-guarded write that flips a paused session to
``"awaiting_input"`` and emits the matching ``status_changed``
events.

Companion to :meth:`_finalize_session_status_async`. The
finalizer handles graph-completed runs; this handles
graph-paused runs (HITL gate). Without it, sessions that
pause at a gate keep their old status (typically ``"new"`` /
``"in_progress"``) and UIs that filter by ``"awaiting_input"``
(the approvals queue, the rail's Active group) miss them — see
issue #42.

No-op (returns ``None``) when the session is already at
``"awaiting_input"``, already in a terminal status (a late
pause-write must not unwind a finalize that landed in
between), or the row is missing. Otherwise transitions the
row to ``"awaiting_input"`` and emits both per-session
``status_changed`` and cross-session
``session.status_changed`` events via
:func:`_emit_status_changed_event`.
"""
async with self._locks.acquire(session_id):
try:
inc = self.store.load(session_id)
except (FileNotFoundError, ValueError, KeyError, LookupError):

Check warning on line 1121 in src/runtime/orchestrator.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this redundant Exception class; it derives from another which is already caught.

See more on https://sonarcloud.io/project/issues?id=RandomCodeSpace_asr&issues=AZ4xrpV9brWrxHmeE3kd&open=AZ4xrpV9brWrxHmeE3kd&pullRequest=45
return None
if inc.status == "awaiting_input":
return None
statuses = getattr(getattr(self, "cfg", None), "orchestrator", None)
if statuses is not None:
current_def = statuses.statuses.get(inc.status)
if current_def is not None and current_def.terminal:
return None
from_status = inc.status
inc.status = "awaiting_input"
self.store.save(inc)
_emit_status_changed_event(
orch=self,
inc=inc,
from_status=from_status,
to_status="awaiting_input",
cause="gate_paused",
)
return "awaiting_input"

async def _is_graph_paused(self, session_id: str) -> bool:
"""Return True iff the compiled graph has a pending step waiting
to resume (i.e. it's paused at an ``interrupt()`` boundary).
Expand Down Expand Up @@ -1291,7 +1339,9 @@
last_agent=None, error=None),
config=self._thread_config(inc.id),
)
if not await self._is_graph_paused(inc.id):
if await self._is_graph_paused(inc.id):
await self._mark_session_paused_async(inc.id)
else:
await self._finalize_session_status_async(inc.id)
return inc.id

Expand Down Expand Up @@ -1349,7 +1399,10 @@
# session is waiting for operator approval, not done. Stamping
# default_terminal_status here would orphan the pending_approval
# ToolCall row written by the gateway just before the pause.
# Issue #42: still flip the session row to awaiting_input so
# the approvals queue + sessions rail Active group pick it up.
if await self._is_graph_paused(inc.id):
await self._mark_session_paused_async(inc.id)
yield {"event": "session_paused", "incident_id": inc.id,
"ts": _event_ts()}
else:
Expand Down Expand Up @@ -1631,8 +1684,10 @@
yield self._to_ui_event(ev, session_id)
# See ``stream_session`` for why pause-detection guards the
# finalize call: a HITL pause must not be coerced into a
# terminal status.
# terminal status. Issue #42: still flip to awaiting_input so
# UIs filtering by that status see the retried session.
if await self._is_graph_paused(session_id):
await self._mark_session_paused_async(session_id)
yield {"event": "session_paused", "incident_id": session_id,
"ts": _event_ts()}
else:
Expand Down
Loading
Loading