Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
330 changes: 257 additions & 73 deletions dist/app.py

Large diffs are not rendered by default.

331 changes: 258 additions & 73 deletions dist/apps/code-review.py

Large diffs are not rendered by default.

330 changes: 257 additions & 73 deletions dist/apps/incident-management.py

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions examples/code_review/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ class CodeReviewStateOverrides(BaseModel):

pr_url: str | None = None
repo: str | None = None
environment: str | None = None
base_branch: str | None = None
pr_number: int | None = None
49 changes: 33 additions & 16 deletions src/runtime/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import os
from contextlib import asynccontextmanager
from pathlib import Path
from typing import AsyncIterator, Literal
from typing import Any, AsyncIterator, Literal

from fastapi import APIRouter, FastAPI, HTTPException, Request, Response, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
Expand Down Expand Up @@ -120,6 +120,7 @@
class SessionStartBody(BaseModel):
query: str
environment: str
state_overrides: dict[str, Any] | None = None
# Generic submitter dict — the framework projects ``id``/``team``
# onto the row's reporter columns; apps interpret the rest. The
# legacy ``reporter_id`` / ``reporter_team`` fields were removed
Expand Down Expand Up @@ -359,22 +360,13 @@
# at root for monitor / load-balancer health-check conventions.
api_v1 = APIRouter(prefix="/api/v1")

# CORS: env-driven so the React dev server (Vite at :5173) can call
# every endpoint, SSE included. Override via ``ASR_CORS_ORIGINS``
# (comma-separated) — production deployments lock the origin list
# down by setting the env var to the narrower allow-list.
# ``allow_credentials=False`` matches the bearer-token auth pattern
# (no cookies); methods are explicit so OPTIONS preflights are
# handled the same way for every route.
_cors_origins_raw = os.environ.get(
"ASR_CORS_ORIGINS",
"http://localhost:5173,http://127.0.0.1:5173", # Vite dev defaults
)
_cors_origins = [o.strip() for o in _cors_origins_raw.split(",") if o.strip()]
# CORS is config-driven via ``cfg.api``. ``ASR_CONFIG`` selects the
# config file; deployments set origins/credentials in YAML rather
# than via a second env-var override path.
fastapi_app.add_middleware(
CORSMiddleware,
allow_origins=_cors_origins,
allow_credentials=False,
allow_origins=cfg.api.cors_origins,
allow_credentials=cfg.api.cors_allow_credentials,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["*"],
)
Expand Down Expand Up @@ -529,9 +521,34 @@
"""
svc = request.app.state.service
try:
if (
body.state_overrides is not None
and "environment" in body.state_overrides
and body.environment != body.state_overrides["environment"]
):
raise HTTPException(

Check failure on line 529 in src/runtime/api.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Document this HTTPException with status code 422 in the "responses" parameter.

See more on https://sonarcloud.io/project/issues?id=RandomCodeSpace_asr&issues=AZ4xZ8clBO_6kV5uXnqA&open=AZ4xZ8clBO_6kV5uXnqA&pullRequest=41
status_code=422,
detail={
"error": {
"code": "conflicting_environment",
"message": (
"environment and state_overrides.environment "
"must match when both are supplied"
),
"details": {
"environment": body.environment,
"state_overrides_environment": (
body.state_overrides["environment"]
),
},
}
},
)
state_overrides = dict(body.state_overrides or {})
state_overrides.setdefault("environment", body.environment)
sid = svc.start_session(
query=body.query,
state_overrides={"environment": body.environment},
state_overrides=state_overrides,
submitter=body.submitter,
)
except Exception as e: # noqa: BLE001
Expand Down
30 changes: 28 additions & 2 deletions src/runtime/api_recent_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from typing import AsyncIterator
from fastapi import APIRouter, HTTPException, Request
from fastapi.responses import StreamingResponse
from runtime.storage.event_log import SessionEvent

_SSE_MEDIA_TYPE = "text/event-stream"
_SESSION_KINDS = frozenset({
Expand All @@ -23,6 +24,29 @@
})


def _event_payload(orch, ev: SessionEvent) -> dict:
payload = dict(ev.payload or {})
payload.setdefault("id", ev.session_id)
if ev.kind == "session.status_changed":
payload.setdefault("status", payload.get("to"))
if ev.kind == "session.created":
try:
session = orch.store.load(ev.session_id)
except Exception: # noqa: BLE001 — SSE enrichment is best effort
session = None
if session is not None:
payload.setdefault("status", session.status)
payload.setdefault("created_at", session.created_at)
payload.setdefault("updated_at", session.updated_at)
label = (
getattr(session, "query", None)
or (session.extra_fields or {}).get("query")
or session.id
)
payload.setdefault("label", label)
return payload


def add_recent_events_routes(api_v1: APIRouter) -> None:
"""Mount the /sessions/recent/events SSE handler on the api_v1 router.

Expand All @@ -47,7 +71,8 @@ async def _stream() -> AsyncIterator[str]:
if ev.kind in _SESSION_KINDS:
payload = {"seq": ev.seq, "kind": ev.kind,
"session_id": ev.session_id,
"payload": ev.payload, "ts": ev.ts}
"payload": _event_payload(orch, ev),
"ts": ev.ts}
last_seq = ev.seq
yield f"data: {json.dumps(payload)}\n\n"
# Tail: poll for new rows; exit on client disconnect
Expand All @@ -57,7 +82,8 @@ async def _stream() -> AsyncIterator[str]:
if ev.kind in _SESSION_KINDS:
payload = {"seq": ev.seq, "kind": ev.kind,
"session_id": ev.session_id,
"payload": ev.payload, "ts": ev.ts}
"payload": _event_payload(orch, ev),
"ts": ev.ts}
last_seq = ev.seq
yield f"data: {json.dumps(payload)}\n\n"

Expand Down
8 changes: 7 additions & 1 deletion src/runtime/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,13 @@ def resolve_framework_app_config(


class ApiConfig(BaseModel):
"""API surface knobs surfaced to the React frontend."""
"""API surface knobs surfaced to the React frontend.

Values come from the loaded runtime config. Environment variables
choose the config file (``ASR_CONFIG``) but do not override these
CORS fields directly; deployments that need different origins
should set them in YAML.
"""

# CORS origins allowed by the FastAPI CORSMiddleware. Default
# covers the two common React dev-server URLs (Vite, CRA/Next).
Expand Down
6 changes: 6 additions & 0 deletions src/runtime/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,12 @@ async def node(state: GraphState) -> dict:
inc_id, "agent_started",
agent=skill.name, started_at=started_at,
)
event_log.record(
inc_id,
"session.agent_running",
id=inc_id,
agent=skill.name,
)
except Exception: # noqa: BLE001 — telemetry must not break the agent
logger.debug(
"event_log.record(agent_started) failed", exc_info=True,
Expand Down
28 changes: 25 additions & 3 deletions src/runtime/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,19 @@ def _emit_status_changed_event(
return
status_def = statuses.statuses.get(to_status)
if status_def is not None and status_def.terminal:
if event_log is not None:
try:
event_log.record(
inc.id,
"session.agent_running",
id=inc.id,
agent=None,
)
except Exception: # noqa: BLE001 — telemetry must not break finalize
_log.debug(
"event_log.record(session.agent_running) failed",
exc_info=True,
)
_extract_lesson_on_terminal(orch=orch, inc=inc)


Expand Down Expand Up @@ -1235,13 +1248,20 @@ async def start_session(self, *, query: str,
# ``__new__`` (bypassing ``__init__``) working.
state_overrides_cls = getattr(self, "_state_overrides_cls", None)
if state_overrides_cls is not None and state_overrides is not None:
state_overrides_cls.model_validate(state_overrides)
state_overrides = state_overrides_cls.model_validate(
state_overrides
).model_dump(exclude_none=True)
submitter = _coerce_submitter(submitter, reporter_id, reporter_team)
sub_id = (submitter or {}).get("id", "user-mock")
sub_team = (submitter or {}).get("team", "platform")
env = (state_overrides or {}).get("environment", "")
inc = self.store.create(query=query, environment=env,
reporter_id=sub_id, reporter_team=sub_team)
inc = self.store.create(
query=query,
environment=env,
reporter_id=sub_id,
reporter_team=sub_team,
state_overrides=state_overrides,
)
# Emit session.created on the cross-session SSE stream so the
# React UI's Other Sessions monitor lights up the new tile in
# real time. ``session_id`` already lands on the row; the
Expand Down Expand Up @@ -1271,6 +1291,8 @@ async def start_session(self, *, query: str,
last_agent=None, error=None),
config=self._thread_config(inc.id),
)
if not await self._is_graph_paused(inc.id):
await self._finalize_session_status_async(inc.id)
return inc.id

async def start_investigation(self, *, query: str, environment: str,
Expand Down
73 changes: 41 additions & 32 deletions src/runtime/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,6 @@ def start_session(
)
sub_id = (resolved_submitter or {}).get("id", "user-mock")
sub_team = (resolved_submitter or {}).get("team", "platform")
env = (resolved_overrides or {}).get("environment", "")

async def _scheduler() -> str:
# Enforce the concurrency cap on the loop thread so the
Expand All @@ -484,16 +483,24 @@ async def _scheduler() -> str:
if len(self._registry) >= self.max_concurrent_sessions:
raise SessionCapExceeded(self.max_concurrent_sessions)
orch = await self._ensure_orchestrator()
overrides_for_create = resolved_overrides
state_overrides_cls = getattr(orch, "_state_overrides_cls", None)
if state_overrides_cls is not None and overrides_for_create is not None:
overrides_for_create = state_overrides_cls.model_validate(
overrides_for_create
).model_dump(exclude_none=True)
env_for_create = (overrides_for_create or {}).get("environment", "")
# Allocate the row (and its id) synchronously on the loop
# so the caller gets a stable id back. The graph then runs
# in a separate task — registration happens here, before
# the task is created, so ``list_active_sessions`` sees the
# entry immediately.
inc = orch.store.create(
query=query,
environment=env,
environment=env_for_create,
reporter_id=sub_id,
reporter_team=sub_team,
state_overrides=overrides_for_create,
)
session_id = inc.id
# Emit session.created on the cross-session SSE stream so
Expand Down Expand Up @@ -541,8 +548,8 @@ async def _run() -> None:
raise SessionBusy(session_id)
# Hold the per-session lock for the full graph turn,
# including any HITL interrupt() pause (D-01).
async with orch._locks.acquire(session_id):
try:
try:
async with orch._locks.acquire(session_id):
await orch.graph.ainvoke(
GraphState(
session=inc,
Expand All @@ -552,34 +559,36 @@ async def _run() -> None:
),
config=orch._thread_config(session_id),
)
except asyncio.CancelledError:
raise
except Exception as exc: # noqa: BLE001
# Phase 11 (FOC-04 / D-11-04): GraphInterrupt is a
# pending-approval pause, not a failure. Don't stamp
# status='error' on the registry entry -- let
# LangGraph's checkpointer hold the paused state
# and let the UI's Approve/Reject action drive
# resume.
try:
from langgraph.errors import GraphInterrupt
if isinstance(exc, GraphInterrupt):
# Propagate so the underlying Task
# observer (stop_session etc.) still
# sees the exception, but skip the
# status='error' write.
raise
except ImportError: # pragma: no cover
pass
# Mark the registry entry so any concurrent snapshot
# observes the failure before the done-callback
# evicts it. The exception itself is preserved on
# the task object for ``stop_session`` and any
# other observer that holds a Task reference.
e = self._registry.get(session_id)
if e is not None:
e.status = "error"
raise
if not await orch._is_graph_paused(session_id):
await orch._finalize_session_status_async(session_id)
except asyncio.CancelledError:
raise
except Exception as exc: # noqa: BLE001
# Phase 11 (FOC-04 / D-11-04): GraphInterrupt is a
# pending-approval pause, not a failure. Don't stamp
# status='error' on the registry entry -- let
# LangGraph's checkpointer hold the paused state
# and let the UI's Approve/Reject action drive
# resume.
try:
from langgraph.errors import GraphInterrupt
if isinstance(exc, GraphInterrupt):
# Propagate so the underlying Task
# observer (stop_session etc.) still
# sees the exception, but skip the
# status='error' write.
raise
except ImportError: # pragma: no cover
pass
# Mark the registry entry so any concurrent snapshot
# observes the failure before the done-callback
# evicts it. The exception itself is preserved on
# the task object for ``stop_session`` and any
# other observer that holds a Task reference.
e = self._registry.get(session_id)
if e is not None:
e.status = "error"
raise

task = asyncio.create_task(_run(), name=f"session:{session_id}")
entry.task = task
Expand Down
10 changes: 10 additions & 0 deletions src/runtime/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ class IncidentState(Session):


# Per-call audit metadata for the risk-rated tool gateway.
SessionStatus = Literal[
"new",
"in_progress",
"awaiting_input",
"resolved",
"escalated",
"stopped",
"error",
"duplicate",
]
ToolRisk = Literal["low", "medium", "high"]
ToolStatus = Literal[
"executed", # auto / legacy default
Expand Down
Loading
Loading