Skip to content

feat(mollifier): trigger burst smoothing — Phase 1 (monitoring)#3614

Open
d-cs wants to merge 50 commits into
mainfrom
mollifier-phase-2
Open

feat(mollifier): trigger burst smoothing — Phase 1 (monitoring)#3614
d-cs wants to merge 50 commits into
mainfrom
mollifier-phase-2

Conversation

@d-cs
Copy link
Copy Markdown
Collaborator

@d-cs d-cs commented May 13, 2026

Summary

  • Introduce the Mollifier: a Redis-backed buffer for trigger() API calls during traffic spikes, with a per-env trip evaluator and a drainer ack-loop.
  • Phase 1 is dual-write monitoring — every mollified trigger is buffered to Redis AND continues to engine.trigger. No customer-facing behaviour change.
  • Telemetry events: mollifier.would_mollify, mollifier.buffered, mollifier.drained, plus the mollifier.decisions counter.
  • Gated behind a feature flag (default off).

Test plan

  • pnpm run test --filter @trigger.dev/redis-worker
  • pnpm run test --filter webapp -- mollifier
  • Manual: with flag off, no behaviour change vs main
  • Manual: with flag on + threshold lowered, observe mollifier.buffered + mollifier.drained log pairs with matching runId

@d-cs d-cs self-assigned this May 13, 2026
@changeset-bot
Copy link
Copy Markdown

changeset-bot Bot commented May 13, 2026

🦋 Changeset detected

Latest commit: 5255c47

The changes in this PR will be included in the next version bump.

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 13, 2026

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

This PR implements the first two phases of a trigger burst-smoothing system ("mollifier"): it adds a Redis-backed MollifierBuffer and MollifierDrainer, Zod schemas and payload (de)serialization, a real trip evaluator and mollifier gate wired into RunEngineTriggerTaskService, OpenTelemetry metrics, worker startup drainer wiring, environment configuration and feature flag, package re-exports, and comprehensive tests (unit, integration, and fuzz) validating buffer, drainer, gate, and evaluator behavior while keeping fail-open semantics and deferring full activation to later phases.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 9.52% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Description check ⚠️ Warning The PR description is incomplete and does not follow the required template structure. Missing required sections and structured information. Add missing template sections: fill in issue reference, complete testing details, and add screenshots if applicable. Structure should match the provided template with all checkboxes and sections.
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and specifically summarizes the main change: introducing the Mollifier feature for trigger burst smoothing with Phase 1 focusing on monitoring/dual-write behavior.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch mollifier-phase-2

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

coderabbitai[bot]

This comment was marked as resolved.

@d-cs
Copy link
Copy Markdown
Collaborator Author

d-cs commented May 14, 2026

Code review

Found 2 issues:

  1. resolveOrgFlag resolves the global feature flag, not a per-org flag. flag() is called without overrides, so it reads only the global FeatureFlag row — other call sites in the webapp (e.g. canAccessAi/canAccessQuery) explicitly pass the org's featureFlags JSON as overrides. With the current code, the variable named orgFlagEnabled and the call-site comment "if the org has the mollifier feature flag enabled" don't match behaviour: the moment the global flag flips on, every org is mollified — there's no per-org rollout.

isMollifierEnabled: () => env.MOLLIFIER_ENABLED === "1",
isShadowModeOn: () => env.MOLLIFIER_SHADOW_MODE === "1",
resolveOrgFlag: () =>
flag({ key: FEATURE_FLAG.mollifierEnabled, defaultValue: false }),
evaluator: defaultEvaluator,
logShadow: (inputs, decision) =>

  1. evaluateGate is awaited outside the try block in the trigger hot path. evaluateGate itself has no internal try/catch around resolveOrgFlag() (a DB feature-flag read) or evaluator() (a Redis Lua call), so any transient failure once MOLLIFIER_ENABLED=1 is flipped will throw past the call site and fail the customer's trigger — turning a monitoring-only Phase 1 into a fleet-wide trigger outage on a Redis/DB blip. Suggest wrapping the gate call in a try/catch that falls back to pass_through.

};
const mollifierOutcome = await this.evaluateGate({
envId: environment.id,
orgId: environment.organizationId,
taskId,
});
try {

🤖 Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

@d-cs
Copy link
Copy Markdown
Collaborator Author

d-cs commented May 14, 2026

Nitpicks (test-mock removal) also addressed in 98c1520:

  • apps/webapp/test/mollifierGate.test.ts: dropped vi.fn. makeDeps now returns real closure-based dependencies with plain counter/array spies; the 16-row truth table and decision-log shape tests are unchanged.
  • apps/webapp/test/mollifierTripEvaluator.test.ts: removed the vi.fn/cast fakeBuffer doubles. Each test now uses redisTest from @internal/testcontainers to spin up Redis and exercise a real MollifierBuffer. The trip case uses threshold: 2 + three real evaluateTrip calls to deterministically trip via the production Lua window; the fail-open case closes the buffer up front so the next evaluateTrip hits Connection is closed. — a real failure mode rather than a stub.

coderabbitai[bot]

This comment was marked as resolved.

@d-cs
Copy link
Copy Markdown
Collaborator Author

d-cs commented May 14, 2026

Code review (follow-up)

Four additional issues from the earlier scan, posting now since the gate-naming fix landed:

  1. Webapp CLAUDE.md says "Always use findFirst instead of findUnique" — new test introduces a findUnique. Trivial swap.

expect(result).toBeDefined();
expect(result?.run.friendlyId).toBeDefined();
const pgRun = await prisma.taskRun.findUnique({ where: { id: result!.run.id } });
expect(pgRun).not.toBeNull();
expect(pgRun!.friendlyId).toBe(result!.run.friendlyId);

  1. Drainer is started inside init() unconditionally — workerQueue.initialize() two lines above is gated on WORKER_ENABLED === "true", but getMollifierDrainer() is not. Once MOLLIFIER_ENABLED=1 flips on, every webapp replica (including API-only ones) will spin up a polling loop + Redis connection and race for the same buffer. Consider gating on WORKER_ENABLED (or a dedicated MOLLIFIER_DRAINER_ENABLED).

if (env.WORKER_ENABLED === "true") {
await workerQueue.initialize();
}
try {
const drainer = getMollifierDrainer();
if (drainer) {
// The drainer owns a polling loop and a Redis client; let it drain
// in-flight pops on shutdown rather than tearing the process down
// mid-handler. Idempotent — `drainer.stop()` short-circuits if already
// stopped, so registering on both signals is safe.
const stopDrainer = () => {
drainer.stop().catch((error) => {
logger.error("Failed to stop mollifier drainer", { error });
});
};
process.once("SIGTERM", stopDrainer);
process.once("SIGINT", stopDrainer);
}
} catch (error) {
logger.error("Failed to initialise mollifier drainer", { error });
}
}

  1. includeTtl: true was added to the delayed and pending-version re-enqueue paths, but the producer-side comment in enqueueSystem.ts still explicitly warns these callers off: "Re-enqueues (waitpoint, checkpoint, delayed, pending version) must not add TTL." Either the comment needs updating to reflect the new design, or the new callers are wrong.

const timestamp = (run.queueTimestamp ?? run.createdAt).getTime() - run.priorityMs;
// Include TTL only when explicitly requested (first enqueue from trigger).
// Re-enqueues (waitpoint, checkpoint, delayed, pending version) must not add TTL.
let ttlExpiresAt: number | undefined;
if (includeTtl && run.ttl) {
const expireAt = parseNaturalLanguageDuration(run.ttl);

batchId: run.batchId ?? undefined,
skipRunLock: true,
includeTtl: true,
});

// for a worker version). Arm TTL here so the TTL system can expire it
// if it sits queued waiting on a concurrency slot.
includeTtl: true,
});
});

  1. The comment calls count a "sliding-window counter", but the Lua is INCR + PEXPIRE-on-first — a fixed window. At a window boundary this can briefly admit ~2x threshold before tripping. Additionally PSETEX trippedKey, holdMs rearms the hold TTL on every overage call, so under sustained overload the hold never elapses while traffic stays above threshold. Both may be intentional for Phase 1, but the docstring should match the implementation.

} from "./mollifierTelemetry.server";
// `count` is the *single-instance* sliding-window counter, not a fleet-wide
// aggregate. Each webapp instance maintains its own Redis key, so the fleet
// effective ceiling is `instance_count * threshold`. Phase 2 consumers must
// not treat `count` as a global rate.

numberOfKeys: 2,
lua: `
local rateKey = KEYS[1]
local trippedKey = KEYS[2]
local windowMs = tonumber(ARGV[1])
local threshold = tonumber(ARGV[2])
local holdMs = tonumber(ARGV[3])
local count = redis.call('INCR', rateKey)
if count == 1 then
redis.call('PEXPIRE', rateKey, windowMs)
end
if count > threshold then
redis.call('PSETEX', trippedKey, holdMs, '1')
end
local tripped = redis.call('EXISTS', trippedKey)
return {count, tripped}

🤖 Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

@d-cs d-cs force-pushed the mollifier-phase-2 branch from b4b5719 to f43df01 Compare May 14, 2026 08:54
devin-ai-integration[bot]

This comment was marked as resolved.

devin-ai-integration[bot]

This comment was marked as resolved.

devin-ai-integration[bot]

This comment was marked as resolved.

@d-cs d-cs force-pushed the mollifier-phase-2 branch 2 times, most recently from 088e071 to 2a0d5ee Compare May 14, 2026 12:49
devin-ai-integration[bot]

This comment was marked as resolved.

@d-cs d-cs force-pushed the mollifier-phase-2 branch from e51fac9 to a9400b1 Compare May 14, 2026 14:09
devin-ai-integration[bot]

This comment was marked as resolved.

d-cs and others added 13 commits May 14, 2026 16:43
Redis-backed burst-smoothing layer behind MOLLIFIER_ENABLED=0 (default).
With the kill switch off, the gate short-circuits on its first env check
and production behaviour is identical to main.

@trigger.dev/redis-worker:
- MollifierBuffer: atomic Lua-backed FIFO with accept / pop / ack /
  requeue / fail + TTL. Per-env queues with HSET entry storage,
  atomic RPOP + status transition, FIFO retry ordering.
- MollifierDrainer: generic round-robin worker with concurrency cap,
  retry semantics, and a stop deadline to avoid livelock on a hung
  handler. Phase 3 will wire the handler to engine.trigger().
- Full testcontainers-backed test suite (21 tests).

apps/webapp:
- evaluateGate cascade-check (kill switch -> org feature flag ->
  shadow mode -> trip evaluator -> mollify / shadow_log / pass_through).
  Dependencies injected for testability; the trip evaluator stub
  returns { divert: false } in phase 1.
- Inserted into RunEngineTriggerTaskService.call() before
  traceEventConcern.traceRun. The mollify branch throws (unreachable
  in phase 1).
- Lazy MollifierBuffer + MollifierDrainer singletons; no Redis
  connection unless MOLLIFIER_ENABLED=1.
- 12 MOLLIFIER_* env vars (all safe defaults) and a mollifierEnabled
  feature flag in the global catalog.
- Drainer booted from worker.server.ts on first import.
- Read-fallback stub for phase 3.
- Gate cascade tests + .env loader so env.server validates in vitest
  workers.

Phase 2 will land the real trip evaluator; phase 3 will activate the
buffer-write + drain path.
…dual-write monitoring + drainer ack loop)

Phase 1 of the trigger-burst smoothing initiative. Adds the A-side trip
evaluator (atomic Lua sliding-window per env) and wires it into the trigger
hot path. When the per-org mollifierEnabled feature flag is on AND the
evaluator says divert, the canonical replay payload is buffered to Redis
(via buffer.accept) AND the trigger continues through engine.trigger —
i.e. dual-write. The drainer pops + acks (no-op handler) to prove the
dequeue mechanism works end-to-end. Operators audit by joining
mollifier.buffered (write) and mollifier.drained (consume) logs by runId.

Buffer primitives hardened:
- accept is idempotent on duplicate runId (Lua EXISTS guard)
- pop skips orphan queue references (entry HASH TTL'd while runId queued)
- fail no-ops on missing entry (no partial FAILED hash leak)
- mollifier:envs set pruned on draining pop, restored on requeue
- 16-row truth-table test enumerates the gate cascade
- BufferedTriggerPayload defines the canonical replay shape Phase 2 will
  use to invoke engine.trigger
- payload hash for audit-equivalence computed off the hot path (in the
  drainer) to avoid CPU during a spike

Regression tests in apps/webapp/test/engine/triggerTask.test.ts pin the
mollifier integration:
- validation throws BEFORE the gate runs (no orphan buffer write on
  rejected triggers)
- mollify dual-write happy path (Postgres + Redis both reflect the run)
- pass_through path does NOT call buffer.accept
- engine.trigger throwing AFTER buffer.accept leaves an orphan
  (documented behaviour — drainer auto-cleans; audit-trail surfaces it)
- idempotency-key match short-circuits BEFORE the gate is consulted
- debounce match produces an orphan (documented behaviour — Phase 2
  must lift handleDebounce upfront before buffer.accept)

Behaviour with MOLLIFIER_ENABLED=0 (default) is byte-identical to main.
With MOLLIFIER_ENABLED=1 and the flag off, only mollifier.would_mollify
logs fire (no buffer state). With the flag on, dual-write activates.

Includes two opt-in *.fuzz.test.ts suites (gated on FUZZ=1) that
randomise operation sequences against evaluateTrip and the drainer to
find timing edges. They are clearly marked TEMPORARY in their headers.
- changeset: drop "deferred" wording — phase-1 actively dual-writes + runs
  the drainer ack loop.
- worker.server.ts: wrap mollifier drainer init in try/catch + register
  SIGTERM/SIGINT handlers so the polling loop stops cleanly on shutdown.
- bufferedTriggerPayload: only serialise idempotencyKeyExpiresAt when an
  idempotencyKey is present (avoid impossible orphan-expiry payloads).
- mollifierTelemetry: narrow recordDecision reason to DecisionReason union
  to keep OTEL attribute cardinality bounded.
- mollifierGate: rename resolveOrgFlag → resolveFlag. The underlying
  FeatureFlag table is global by key, so the "org" prefix was misleading;
  per-org gating is out of scope for phase-1.
- tests: drop vi.fn mocks. mollifierGate now uses plain closure spies;
  mollifierTripEvaluator runs against a real MollifierBuffer backed by a
  redisTest container (closed client exercises the fail-open path).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…stacking

Worker.init() is called per request from entry.server.tsx, so the
process.once SIGTERM/SIGINT pair added in 98c1520 would stack a fresh
listener every request under dev hot-reload (process.once only removes
after firing). Gate registration on a process-global flag, matching the
existing __worker__ pattern.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…tion.featureFlags

The mollifier gate's resolveOrgFlag was a global feature-flag lookup
named as if org-scoped. Phase-1 plan and design doc both intended
per-org gating; the implementation regressed because the global
flag() helper has no orgId parameter.

Adopt the existing per-org feature-flag pattern (used by canAccessAi,
canAccessPrivateConnections, compute beta gating): pass
`Organization.featureFlags` through as `flag()` overrides. Per-org
opt-in now works admin-toggleable via the existing
Organization.featureFlags JSON column — no schema migration needed.

- mollifierGate: revert resolveFlag/flagEnabled back to
  resolveOrgFlag/orgFlagEnabled (the name now matches reality).
  GateInputs gains `orgFeatureFlags`; the default resolver passes
  them as overrides to `flag()`.
- triggerTask.server.ts: thread `environment.organization.featureFlags`
  into the gate call.
- tests: three new postgresTest cases exercise the real DB-backed
  resolveOrgFlag end-to-end, proving (a) per-org opt-in isolation,
  (b) unrelated beta flags don't bleed across, (c) per-org overrides
  take precedence over the global FeatureFlag row.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…nect

The unit cascade tests in mollifierGate.test.ts import the gate module,
which transitively pulls in ~/db.server. That module constructs the
prisma singleton at import time and eagerly calls $connect(), which
fails against localhost:5432 in the unit-test shard and surfaces as an
unhandled rejection that fails the whole vitest run. Mocking the module
keeps the cascade tests pure and leaves the postgresTest cases on the
testcontainer-fixture prisma untouched.
- Gate drainer init on WORKER_ENABLED so only worker replicas run the polling loop.
- Update the enqueueSystem TTL comment now that delayed/pending-version are first enqueues.
- Correct the mollifier gate docstring to describe the fixed-window counter and tripped-key rearm.
- Swap findUnique for findFirst in the trigger task test to match the webapp Prisma rule.
…eFlags

The gate's `GateInputs` now requires `orgFeatureFlags`, but the surface type used by the trigger service was still the pre-org-scope shape, so the default evaluator wasn't assignable and the call site couldn't pass the flag overrides.
…est startup

The per-org isolation suite uses `postgresTest`, which spins up a fresh Postgres testcontainer per case. On CI the 5s vitest default regularly times out on container start before the test body runs. Match the 30s `vi.setConfig` used by other postgresTest suites in this app.
…rrors

resolveOrgFlag now checks the per-org Organization.featureFlags override
in-memory before falling back to the global flag() helper, so the common
per-org enablement path resolves without a Prisma round-trip on every
trigger call. evaluateGate also wraps the flag resolution in try/catch
and fails open to false on error, mirroring the trip evaluator.
…exit

Pass a configurable timeout to drainer.stop() so SIGTERM/SIGINT can't hang
forever if an in-flight handler is wedged. Matches the precedent set by
BATCH_TRIGGER_WORKER_SHUTDOWN_TIMEOUT_MS (default 30s).
processOneFromEnv now catches buffer.pop() failures so one env's hiccup
doesn't reject Promise.all and bubble up to the loop's outer catch. The
polling loop itself wraps each runOnce in try/catch and backs off with
capped exponential delay (up to 5s) instead of exiting permanently on the
first listEnvs/pop error. Stop semantics are unchanged: only the stopping
flag breaks the loop.

Adds two regression tests using a stub buffer (no Redis container) so
fault injection is deterministic.
The phase-1 scaffolding referenced MollifierBuffer, getMollifierBuffer,
and deserialiseMollifierSnapshot without importing them — CI typecheck
fails with TS2304. The runtime path is gated behind MOLLIFIER_ENABLED=0
so this never produced a runtime symptom, but the types must resolve.
With the drainer walking listOrgs → listEnvsForOrg → pop, the flat
mollifier:envs SET has no consumer — `mollifier:orgs` and the per-org
`mollifier:org-envs:${orgId}` SETs cover everything the drainer needs.
Removing it drops three Lua write ops per accept/pop/requeue and one
Redis key per active env.

Changes:
- Lua: acceptMollifierEntry, popAndMarkDraining, requeueMollifierEntry
  no longer touch mollifier:envs. Their KEYS arrays shrink by one.
- TS: listEnvs() method removed; only listOrgs() and listEnvsForOrg()
  remain. TS bindings updated to match the new arg shapes.
- buffer.test.ts: listEnvs() assertions converted to listEnvsForOrg(
  "org_1") so they verify the equivalent org-level membership. The
  "stale envs SET cleanup on empty-pop" test is removed (envs SET is
  gone). The "pop skips orphans" test's trailing-cleanup assertion is
  updated to document the deliberate stale-tolerance in the no-runId
  branch of popAndMarkDraining (can't read orgId without a popped
  entry, so org-envs cleanup is skipped there).
- drainer.test.ts: stub helper moved to module scope and gains an
  `eachEnvAsOwnOrg(envs)` convenience that supplies listOrgs +
  listEnvsForOrg in tests where each env is its own org. Stub helpers
  duplicated across describe blocks are removed in favour of the
  shared one.

24/24 drainer tests pass; buffer tests pass in isolation (a few timeout
under full-suite contention against the shared redis container —
unrelated to this change).
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 0 new potential issues.

View 14 additional findings in Devin Review.

Open in Devin Review

d-cs and others added 4 commits May 15, 2026 11:19
…olver

`triggerTask` is the highest-throughput code path in the system and the
webapp CLAUDE.md forbids new DB queries there. The previous resolver fell
back to `flag()` (a Prisma read against `FeatureFlag`) when the org had
no `mollifierEnabled` override, which added a query to every trigger
whenever `MOLLIFIER_ENABLED=1`. The fleet-wide kill switch already lives
in `MOLLIFIER_ENABLED`; rollout is per-org via `Organization.featureFlags`
JSON, matching `canAccessAi`/`hasComputeAccess`/etc. Drop the fallback so
the resolver is purely in-memory.

Tests no longer need a postgres testcontainer or `makeFlag(prisma)`; the
per-org isolation suite now asserts directly on `Organization.featureFlags`
shape and adds a regression test for the no-override -> false contract.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…own deadlines

Two correctness/perf fixes on top of the phase-2 drainer:

1. `runOnce` was awaiting `listEnvsForOrg` serially before any pop ran.
   At the default `maxOrgsPerTick=500` and a ~1ms RTT, that's a ~500ms
   per-tick latency floor before `pLimit` even sees work. `Promise.all`
   over the org slice lets ioredis auto-pipeline the SMEMBERS into a
   single round-trip. Order is preserved so the org→envs pairing stays
   deterministic and `pickEnvForOrg` still rotates per org.

2. The SIGTERM handler is sync fire-and-forget: `drainer.stop({timeoutMs})`
   returns a promise that keeps the loop alive, but in cluster mode the
   primary process runs its own `GRACEFUL_SHUTDOWN_TIMEOUT` and will hit
   `process.exit(0)` independently. If the drainer's deadline exceeds
   the primary's, the drainer's "log a warning on timeout" turns into
   "hard exit with no log". Assert at boot that
   `MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS <= GRACEFUL_SHUTDOWN_TIMEOUT - 1s`
   so a misconfig fails loud instead of disappearing at shutdown.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…tracking

The previous wording still described the in-memory env→org cache and the
"uncached envs treated as their own pseudo-org for one tick" sentinel —
both removed when the buffer started tracking `mollifier:orgs` and
`mollifier:org-envs:${orgId}` atomically. Re-describe the drainer in
terms of the current org-walk so the published changelog matches the
shipped code.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@d-cs d-cs marked this pull request as ready for review May 15, 2026 10:56
devin-ai-integration[bot]

This comment was marked as resolved.

Comment thread apps/webapp/app/runEngine/services/triggerTask.server.ts Outdated
d-cs added 2 commits May 15, 2026 13:04
Replace each vi.fn(async handler) with a plain async closure that
records calls via captured counter/array variables. Assertions move
from handler.mock.* / toHaveBeenCalled* matchers to checks against
the captured state, e.g. handlerCalls.length / handlerCalls[0].
Functionally equivalent; aligns with the package convention of using
real testcontainers + closure-based probes (cf. mollifierGate.test.ts
and mollifierTripEvaluator.test.ts) rather than vitest fakes.
…g polling loop

The drainer was started inside the singleton factory, with the
shutdown-timeout-vs-GRACEFUL_SHUTDOWN_TIMEOUT reconciliation living in
worker.server.ts init() afterwards. If that validation threw, the polling
loop was already running and the SIGTERM handler registration below it
was never reached — the loop kept polling with no graceful-shutdown
path, and the singleton was cached in its running state (so subsequent
init() calls returned the same drainer and validation kept failing).

Move the timeout check into initializeMollifierDrainer() before
drainer.start(). singleton() uses ??=, so a throw inside the factory
leaves the cache slot unset and the next getMollifierDrainer() call
re-runs the factory — no half-started state, no missing SIGTERM
handler. The catch in worker.server.ts init() still logs and aborts
drainer registration on either the validation error or a Redis init
failure.
devin-ai-integration[bot]

This comment was marked as resolved.

d-cs added 7 commits May 15, 2026 13:23
…g polling loop

Move the MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS / GRACEFUL_SHUTDOWN_TIMEOUT
reconciliation check from worker.server.ts init() into
initializeMollifierDrainer() — BEFORE drainer.start() — so a
misconfigured deploy fails loud at module-load time instead of starting
the polling loop and then throwing back at the caller before the SIGTERM
handler can be registered.

The singleton() helper uses ??=, so a throw inside the factory leaves
the cache slot unset and the next getMollifierDrainer() call re-runs the
factory. No half-started state, no missing SIGTERM handler. The catch in
worker.server.ts init() still logs and aborts drainer registration on
either the validation error or a Redis init failure — same observable
behaviour from the caller's perspective.
initializeMollifierDrainer() no longer calls drainer.start() — it
returns a configured-but-stopped drainer. worker.server.ts init() now
invokes drainer.start() AFTER the SIGTERM/SIGINT handlers are
registered, gated on the same __mollifierShutdownRegistered__ guard so
dev hot-reloads can't double-start.

Closes the residual race window between drainer.start() (previously
fired inside the singleton factory) and process.once("SIGTERM",
stopDrainer) in worker.server.ts. With construction and starting
separated, a signal landing during boot can never find the polling
loop running without a graceful-stop path.
…er.server.ts

worker.server.ts is the original graphile-worker / ZodWorker file —
every task in its catalog is annotated "@deprecated, moved to
commonWorker.server.ts" (or similar). Adding new lifecycle wiring
there during phase-2 was a mis-routing.

Move the SIGTERM/SIGINT registration + drainer.start() call into a new
mollifierDrainerWorker.server.ts alongside the redis-worker workers,
and invoke its initMollifierDrainerWorker() from entry.server.tsx
right after Worker.init(). The drainer's own factory still validates
shutdown timeouts before constructing; the bootstrap registers signal
handlers BEFORE calling start(), preserving the create+start contract.

Also adds a header to worker.server.ts marking it legacy and pointing
new lifecycle code at the redis-worker pattern, so the next person
doesn't have to re-derive the routing rule.
The drainer's polling loop has been gated on WORKER_ENABLED, which
couples it to the legacy ZodWorker role. To split the drainer onto a
dedicated worker service in cloud (and keep all other replicas as
producer-only), introduce its own switch.

Semantics:
  - Unset                              → inherits MOLLIFIER_ENABLED.
    Single-container self-hosters with MOLLIFIER_ENABLED=1 get the
    drainer for free, no second flag to remember.
  - Explicit MOLLIFIER_DRAINER_ENABLED=0 → drainer off on this replica.
    Cloud sets this everywhere except the dedicated drainer service.
  - Explicit MOLLIFIER_DRAINER_ENABLED=1 → drainer on, subject to
    MOLLIFIER_ENABLED still being the master kill switch (a drainer
    can't construct without the gate-side buffer singleton).

The bootstrap in mollifierDrainerWorker.server.ts now gates on the new
flag instead of WORKER_ENABLED, so the drainer's lifecycle is no longer
coupled to the legacy worker role.
All MOLLIFIER_* env vars renamed to TRIGGER_MOLLIFIER_*. The mollifier
primitive is generic — buffer + drainer + trip evaluator with no
trigger-specific assumptions at the redis-worker layer — but this
PR's webapp wiring is specifically the trigger-task mollifier, with
PII-sensitive payload handling and trigger-flow semantics. If we later
mollify another surface (deploys, schedules, etc.) those will want
their own env-var namespace; pre-prefixing now avoids a breaking
rename later.

Renames are mechanical: schema keys in env.server.ts, env.* references
across the v3/mollifier* modules, and a handful of doc-comment
mentions. The bootstrap fallback that has DRAINER_ENABLED default to
the ENABLED value is updated to read TRIGGER_MOLLIFIER_ENABLED from
process.env too. Code-side naming (classes, file names, the literal
word "mollifier") stays unchanged — the rename is env-var only.
The literal reading of "never mock anything" trips up AI reviewers
(and humans new to the repo) — they flag any `vi.mock` / `vi.fn` /
`vi.spyOn` they see, even when the usage isn't actually faking
behavior. Three patterns are fine and should NOT be flagged:

1. Module-load workarounds — vi.mock("~/db.server") at the top of a
   unit test to stop prisma.$connect() firing at import. Cuts the
   import graph, doesn't fake DB behavior.
2. Hand-written DI doubles where the real implementation has its own
   dedicated infra-backed tests (CapturingMollifierBuffer, MockPayloadProcessor,
   etc.). Unit test covers wiring, integration test covers the seam target.
3. vi.fn as a DI-seam probe — convenience for "was the seam called."
   Equivalent to a closure-counter; not load-bearing on what's proven.

Still 🔴: spying on the code path under test then asserting the spy was
called (tautology), or replacing real infra with mocks in tests meant
to cover real behavior (e.g. mocking Redis in a Redis-queue test).
devin-ai-integration[bot]

This comment was marked as resolved.

@d-cs d-cs force-pushed the mollifier-phase-2 branch 2 times, most recently from 9483092 to 50868ff Compare May 15, 2026 16:22
devin-ai-integration[bot]

This comment was marked as resolved.

d-cs added 3 commits May 15, 2026 17:27
…oop wins the race

The Promise.race between this.loopPromise and this.delay(timeoutMs)
discarded the timeout's underlying setTimeout handle whenever the loop
branch won. The discarded timer was still ref'd by libuv and pinned the
Node event loop alive for the remainder of `timeoutMs` — exactly the
shutdown slack the timeout was supposed to bound.

Inline the timer in stop() with a captured handle and clearTimeout() it
in a finally block, so every exit path (loop-won, timeout-won, throw)
releases the ref. The in-loop delay() calls are unchanged — they're
awaited normally and their timers fire-and-clear themselves.
`process.once("SIGTERM", stopDrainer)` was the odd one out — every
other webapp service (runsReplicationInstance, llmPricingRegistry,
dynamicFlushScheduler, marqs, eventLoopMonitor) registers through
`signalsEmitter` from `~/services/signals.server`, an EventEmitter
backed by a single `process.on()` that fans out to all listeners.

Switching gets us:
  - codebase consistency;
  - `.on` (not `.once`) so a second SIGTERM, if the orchestrator emits
    one before SIGKILL, still reaches us;
  - if SIGTERM lands in the narrow gap between the listener attaching
    and drainer.start() below, the first invocation no-ops (stop()
    returns early because isRunning is false) but the listener stays
    attached for any subsequent signal, instead of being consumed and
    leaving the now-running drainer with no graceful-stop path.
This addition was applied while phase-2 was already in review and is
out of scope for the mollifier PR. The underlying clarification is
worth landing — just not on this branch.
devin-ai-integration[bot]

This comment was marked as resolved.

evaluateGate ran on every trigger regardless of TRIGGER_MOLLIFIER_ENABLED.
With the flag off (the default everywhere it hasn't been opted in), the
gate still produced a `pass_through` decision after allocating a
GateInputs object, spreading defaultGateDependencies inside evaluateGate,
and incrementing the `mollifier.decisions{outcome=pass_through}` OTel
counter. Cheap individually, but triggerTask is the hottest code path in
the system — multiply by trigger rate and the unnecessary work compounds.

Guard the gate call with a direct env.TRIGGER_MOLLIFIER_ENABLED check at
the call site. When the flag is off, mollifierOutcome is null and the
downstream `mollifierOutcome?.action === "mollify"` branch skips the
buffer dual-write entirely — zero allocation, zero counter increment on
the disabled path. When the flag is on, behaviour is unchanged.

Lost-signal note: with mollifier off, we no longer count "pass_through"
decisions in the OTel counter (the gate never runs). That's a non-issue
— "pass_through count when feature is off" is just total trigger rate,
which is already observable via the trigger handler's own spans/counters
upstream. The gate counter remains the source of truth for the
mollify/shadow/pass_through ratio when the feature is on, which is the
load-bearing signal.
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 new potential issue.

View 18 additional findings in Devin Review.

Open in Devin Review

Comment on lines +346 to +355
const mollifierOutcome: GateOutcome | null =
env.TRIGGER_MOLLIFIER_ENABLED === "1"
? await this.evaluateGate({
envId: environment.id,
orgId: environment.organizationId,
taskId,
orgFeatureFlags:
(environment.organization.featureFlags as Record<string, unknown> | null) ?? null,
})
: null;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Hard-coded env.TRIGGER_MOLLIFIER_ENABLED check bypasses DI'd evaluateGate, making mollifier integration tests unreachable

The short-circuit at triggerTask.server.ts:347 reads the global env singleton directly (env.TRIGGER_MOLLIFIER_ENABLED === "1") instead of going through the DI'd evaluateGate. When this env var is not set (defaults to "0" per env.server.ts:1057), mollifierOutcome is unconditionally null, so the injected gate is never called and the buffer-write branch at line 377 is unreachable.

This means three mollifier integration tests will fail:

  • "mollify action triggers dual-write" (line 1328: expect(buffer.accepted).toHaveLength(1) gets 0)
  • "engine.trigger throwing AFTER buffer.accept" (line 1491: same assertion)
  • "debounce match produces an orphan buffer entry" (line 1730: same assertion)

The .env file does not exist in CI (confirmed: apps/webapp/.env is a symlink to ../../.env which is absent), and .env.example contains no TRIGGER_MOLLIFIER vars, so dotenv silently no-ops and the Zod default of "0" takes effect.

The DI pattern is correctly wired at construction (lines 91–105) but defeated by the env read at the call site

The constructor accepts optional evaluateGate / getMollifierBuffer hooks and stores them as instance fields. But the call site gates on the module-level env singleton before reaching those hooks. The fix is to make the enabled check part of the DI surface — e.g. check this.evaluateGate !== defaultEvaluateGate to skip the env guard when a custom gate is injected, or make the env check itself injectable.

Prompt for agents
The issue is in RunEngineTriggerTaskService.call() in apps/webapp/app/runEngine/services/triggerTask.server.ts at lines 346-355. The code reads env.TRIGGER_MOLLIFIER_ENABLED directly from the global env singleton to short-circuit the mollifier gate evaluation. This bypasses the DI'd evaluateGate that was injected via the constructor (lines 91-105), making the mollifier integration tests in test/engine/triggerTask.test.ts unreachable because the env var defaults to '0' in test environments.

The production intent is correct: skip the gate when the feature is globally off. The problem is that the env check is not part of the DI surface.

Possible approaches:
1. Add an isMollifierEnabled callback to the constructor options (like evaluateGate and getMollifierBuffer), defaulting to () => env.TRIGGER_MOLLIFIER_ENABLED === '1'. Tests can override it to return true.
2. Move the env check inside evaluateGate itself (it already has isMollifierEnabled in GateDependencies), and remove the outer check. The DI'd evaluateGate in tests already returns a fixed outcome, so the inner check is irrelevant. The production evaluateGate already checks isMollifierEnabled internally, so the outer check is redundant.
3. Skip the env guard when a non-default evaluateGate is injected (detect by comparing to the default import).

Approach 2 is simplest: just remove the ternary at lines 346-355 and always call this.evaluateGate. When TRIGGER_MOLLIFIER_ENABLED is '0' in production, the default evaluateGate's isMollifierEnabled() check returns false and it returns pass_through with an OTel counter increment — but the comment at line 340-345 explicitly says the short-circuit exists to avoid that counter increment. If that overhead is acceptable, approach 2 works. Otherwise approach 1 is the cleanest DI fix.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants