diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 274a034..be1b343 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -53,7 +53,7 @@ jobs: uses: actions/setup-node@v4 with: node-version: 24 - cache: "yarn" + cache: 'yarn' - name: Install dependencies run: yarn diff --git a/.gitignore b/.gitignore index 3df4c92..9153b8c 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,9 @@ coverage npm-debug.log yarn-error.log +# OS specific +.DS_Store + # Editors specific .fleet .idea diff --git a/README.md b/README.md index aafc6cf..8647213 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ npm install @boringnode/queue - **Priority Queues**: Process high-priority jobs first - **Bulk Dispatch**: Efficiently dispatch thousands of jobs at once - **Job Grouping**: Organize related jobs for monitoring +- **Job Deduplication**: Prevent duplicate jobs with custom IDs - **Retry with Backoff**: Exponential, linear, or fixed backoff strategies - **Job Timeout**: Fail or retry jobs that exceed a time limit - **Job History**: Retain completed/failed jobs for debugging @@ -131,6 +132,85 @@ await SendEmailJob.dispatchMany(recipients).group('newsletter-jan-2025') The `groupId` is stored with job data and accessible via `job.data.groupId`. +## Job Deduplication + +Prevent the same job from being pushed multiple times. Four modes, all via `.dedup()`: + +### Simple (skip while job exists) + +```typescript +// First dispatch - job is created +await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run() + +// Second dispatch with same dedup ID - silently skipped +await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run() +``` + +### Throttle (skip within TTL window) + +```typescript +// Within 5s, duplicates are skipped. After 5s, a new job is created. +await SendEmailJob.dispatch({ to: 'user@example.com' }) + .dedup({ id: 'welcome-123', ttl: '5s' }) + .run() +``` + +### Extend (reset TTL on duplicate) + +```typescript +// Each duplicate push resets the TTL timer. +await RateLimitJob.dispatch({ userId: 42 }).dedup({ id: 'rate-42', ttl: '1m', extend: true }).run() +``` + +### Debounce (replace payload + reset TTL) + +```typescript +// Within the 2s window, the latest payload overwrites the previous pending job. +await SaveDraftJob.dispatch({ content: 'latest draft' }) + .dedup({ id: 'draft-42', ttl: '2s', replace: true, extend: true }) + .run() +``` + +### Inspecting the outcome + +`DispatchResult` tells you what happened: + +```typescript +const { jobId, deduped } = await SaveDraftJob.dispatch({ content: '...' }) + .dedup({ id: 'draft-42', ttl: '2s', replace: true }) + .run() + +// deduped: 'added' | 'skipped' | 'replaced' | 'extended' +// jobId: the UUID of the job (the existing one when deduped) +``` + +### How it works + +- The dedup ID is automatically prefixed with the job name (`SendInvoiceJob::order-123`), so different job types can reuse the same key. +- The user-supplied `id` must be ≤ 400 characters, and the combined `::` key must be ≤ 510 characters (constrained by the Knex storage column). Both limits are validated at `.dedup()` time. +- `ttl` accepts a Duration (`'5s'`, `'1m'`) or milliseconds, and must be **positive** when provided. Use `0` or omit `ttl` if you want no expiry — `ttl: 0` is rejected to avoid an ambiguous "expired immediately vs no-expiry" interpretation across engines. +- `extend` and `replace` **require** `ttl` — calling them without `ttl` throws. +- `replace` only applies to jobs in `pending` or `delayed` state. Jobs that are active (executing) or retained in history (`completed`/`failed` with retention) are left alone; the dispatch returns `{ deduped: 'skipped' }`. +- `replace` swaps the **payload only** — priority, queue, delay, groupId, and stored dedup options of the existing job are retained. To change those, use a different dedup id or wait for the TTL to expire. +- `extend` resets the TTL clock but never changes the window length. The window length is fixed to the `ttl` from the first dispatch that created the dedup slot. Later dispatches that pass a different `ttl` only reset the clock; their `ttl` value is ignored. To resize the window, let the slot expire and start over with a new dispatch. +- `extend` works in **all states** — even when the existing job is `active` (executing) or retained in history. Unlike `replace` (which is no-op on non-replaceable states), `extend` always refreshes the dedup TTL window. Use this when you want the dedup slot to keep blocking new dispatches for the lifetime of a long-running job. +- `extend` requires the **first** dispatch to have set a `ttl`. If the slot was created without a `ttl`, later `extend` dispatches have no window to refresh and return `{ deduped: 'skipped' }` instead of `'extended'`. +- `retryJob` does not touch the dedup entry — a retried job continues to occupy the dedup slot. TTL runs on wall-clock time, so long-running retries may outlive the TTL window. Use a generous TTL or no TTL if retries must stay deduped. +- Atomic and race-free: + - **Redis**: a single Lua script per dispatch performs the dedup-key lookup, state check (pending/delayed ZSCORE), payload swap, and TTL refresh atomically. + - **Knex**: transactional `SELECT ... FOR UPDATE` + insert/update inside a transaction. A nested savepoint catches unique-constraint violations under concurrent inserts and returns `{ deduped: 'skipped' }` pointing at the winner. + - **SyncAdapter**: executes inline, no dedup support. + +### Caveats + +- Without `.dedup()`, jobs use auto-generated UUIDs and are never deduplicated. +- The **Sync adapter** ignores `.dedup()` entirely — every dispatch executes inline and `deduped` is always `undefined` on the result. Use Redis or Knex if you need real deduplication. +- `.dedup()` is only available on single dispatch. `dispatchMany` / `pushManyOn` reject jobs with a `dedup` field. +- Scheduled jobs (`.schedule()`) do not support dedup — each cron/interval fire is an independent dispatch. +- With no `ttl`, dedup persists until the job is removed (completed/failed without retention). When retention keeps the record, re-dispatch stays blocked until the record is pruned. +- With `ttl`, dedup expires after the window — a new job (new UUID) is created. The old job still runs. +- Knex MySQL concurrent race: MySQL does not support partial unique indexes, so two `pushOn` calls with the same dedup id firing at the exact same instant can both succeed. Serialize at the app layer if strict guarantees are required, or use Postgres / SQLite / Redis (all of which serialize correctly via the partial unique index or Lua atomicity). + ## Job History & Retention Keep completed and failed jobs for debugging: @@ -536,7 +616,7 @@ import * as boringqueue from '@boringnode/queue' const instrumentation = new QueueInstrumentation({ messagingSystem: 'boringqueue', // default - executionSpanLinkMode: 'link', // or 'parent' + executionSpanLinkMode: 'link', // or 'parent' }) instrumentation.enable() @@ -549,19 +629,19 @@ The instrumentation patches `QueueManager.init()` to automatically inject its wr The instrumentation uses standard [OTel messaging semantic conventions](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/) where they map cleanly, plus a few queue-specific custom attributes. -| Attribute | Kind | Description | -| ------------------------------- | ------- | ------------------------------------------ | -| `messaging.system` | Semconv | `'boringqueue'` (configurable) | -| `messaging.operation.name` | Semconv | `'publish'` or `'process'` | -| `messaging.destination.name` | Semconv | Queue name | -| `messaging.message.id` | Semconv | Job ID for single-message spans | -| `messaging.batch.message_count` | Semconv | Number of jobs in a batch dispatch | -| `messaging.message.retry.count` | Custom | Retry count (0-based) for a job attempt | -| `messaging.job.name` | Custom | Job class name (e.g. `SendEmailJob`) | -| `messaging.job.status` | Custom | `'completed'`, `'failed'`, or `'retrying'` | -| `messaging.job.group_id` | Custom | Queue-specific group identifier | -| `messaging.job.priority` | Custom | Queue-specific job priority | -| `messaging.job.delay_ms` | Custom | Delay before the job becomes available | +| Attribute | Kind | Description | +| ------------------------------- | ------- | --------------------------------------------- | +| `messaging.system` | Semconv | `'boringqueue'` (configurable) | +| `messaging.operation.name` | Semconv | `'publish'` or `'process'` | +| `messaging.destination.name` | Semconv | Queue name | +| `messaging.message.id` | Semconv | Job ID for single-message spans | +| `messaging.batch.message_count` | Semconv | Number of jobs in a batch dispatch | +| `messaging.message.retry.count` | Custom | Retry count (0-based) for a job attempt | +| `messaging.job.name` | Custom | Job class name (e.g. `SendEmailJob`) | +| `messaging.job.status` | Custom | `'completed'`, `'failed'`, or `'retrying'` | +| `messaging.job.group_id` | Custom | Queue-specific group identifier | +| `messaging.job.priority` | Custom | Queue-specific job priority | +| `messaging.job.delay_ms` | Custom | Delay before the job becomes available | | `messaging.job.queue_time_ms` | Custom | Time spent waiting in queue before processing | ### Trace Context Propagation diff --git a/package.json b/package.json index 9c78240..8cd6350 100644 --- a/package.json +++ b/package.json @@ -129,4 +129,4 @@ "engines": { "node": ">=24.0.0" } -} +} \ No newline at end of file diff --git a/src/contracts/adapter.ts b/src/contracts/adapter.ts index e05067e..8bd8e12 100644 --- a/src/contracts/adapter.ts +++ b/src/contracts/adapter.ts @@ -1,4 +1,5 @@ import type { + DedupOutcome, JobData, JobRecord, JobRetention, @@ -7,6 +8,17 @@ import type { ScheduleListOptions, } from '../types/main.js' +/** + * Result of a push operation when dedup was involved. + * `outcome` tells the dispatcher what happened; `jobId` is the ID of the + * existing job when deduped (skipped/replaced/extended). + */ +export interface PushResult { + outcome: DedupOutcome + /** ID of the existing job when a duplicate was detected, otherwise the newly added job's id. */ + jobId: string +} + /** * A job that has been acquired by a worker for processing. * Extends JobData with the timestamp when the job was acquired. @@ -119,24 +131,27 @@ export interface Adapter { * Push a job to the default queue for immediate processing. * * @param jobData - The job data to push + * @returns PushResult if jobData.dedup is set, otherwise void */ - push(jobData: JobData): Promise + push(jobData: JobData): Promise /** * Push a job to a specific queue for immediate processing. * * @param queue - The queue name to push to * @param jobData - The job data to push + * @returns PushResult if jobData.dedup is set, otherwise void */ - pushOn(queue: string, jobData: JobData): Promise + pushOn(queue: string, jobData: JobData): Promise /** * Push a job to the default queue with a delay. * * @param jobData - The job data to push * @param delay - Delay in milliseconds before the job becomes available + * @returns PushResult if jobData.dedup is set, otherwise void */ - pushLater(jobData: JobData, delay: number): Promise + pushLater(jobData: JobData, delay: number): Promise /** * Push a job to a specific queue with a delay. @@ -144,8 +159,9 @@ export interface Adapter { * @param queue - The queue name to push to * @param jobData - The job data to push * @param delay - Delay in milliseconds before the job becomes available + * @returns PushResult if jobData.dedup is set, otherwise void */ - pushLaterOn(queue: string, jobData: JobData, delay: number): Promise + pushLaterOn(queue: string, jobData: JobData, delay: number): Promise /** * Push multiple jobs to the default queue for immediate processing. diff --git a/src/drivers/fake_adapter.ts b/src/drivers/fake_adapter.ts index ddc528a..3671cc4 100644 --- a/src/drivers/fake_adapter.ts +++ b/src/drivers/fake_adapter.ts @@ -2,7 +2,7 @@ import assert from 'node:assert/strict' import { randomUUID } from 'node:crypto' import { isDeepStrictEqual } from 'node:util' import { CronExpressionParser } from 'cron-parser' -import type { Adapter, AcquiredJob } from '../contracts/adapter.js' +import type { Adapter, AcquiredJob, PushResult } from '../contracts/adapter.js' import type { JobData, JobClass, @@ -16,6 +16,14 @@ import { DEFAULT_PRIORITY } from '../constants.js' import { parse } from '../utils.js' import { Job } from '../job.js' +interface DedupEntry { + jobId: string + createdAt: number + ttl?: number + replace?: boolean + extend?: boolean +} + interface ActiveJob { job: JobData acquiredAt: number @@ -71,6 +79,7 @@ export class FakeAdapter implements Adapter { #pendingTimeouts = new Set() #schedules = new Map() #pushedJobs: FakeJobRecord[] = [] + #dedupIndex = new Map>() #onDispose?: () => void /** @@ -116,6 +125,7 @@ export class FakeAdapter implements Adapter { this.#failedJobs.clear() this.#schedules.clear() this.#pushedJobs = [] + this.#dedupIndex.clear() } assertPushed(matcher: FakeJobMatcher, query?: FakeJobQuery): void { @@ -155,24 +165,36 @@ export class FakeAdapter implements Adapter { return jobs.length } - async push(jobData: JobData): Promise { + async push(jobData: JobData): Promise { return this.pushOn('default', jobData) } - async pushOn(queue: string, jobData: JobData): Promise { + async pushOn(queue: string, jobData: JobData): Promise { + const deduped = await this.#applyDedup(queue, jobData) + if (deduped) return deduped + this.#recordPush(queue, jobData) this.#enqueue(queue, jobData) + + if (jobData.dedup) { + return { outcome: 'added', jobId: jobData.id } + } } - async pushLater(jobData: JobData, delay: number): Promise { + async pushLater(jobData: JobData, delay: number): Promise { return this.pushLaterOn('default', jobData, delay) } - pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { + async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { + const deduped = await this.#applyDedup(queue, jobData) + if (deduped) return deduped + this.#recordPush(queue, jobData, delay) this.#schedulePush(queue, jobData, delay) - return Promise.resolve() + if (jobData.dedup) { + return { outcome: 'added', jobId: jobData.id } + } } async pushMany(jobs: JobData[]): Promise { @@ -180,6 +202,10 @@ export class FakeAdapter implements Adapter { } async pushManyOn(queue: string, jobs: JobData[]): Promise { + if (jobs.some((j) => j.dedup)) { + throw new Error('dedup is not supported in batch dispatch; use single dispatch') + } + for (const job of jobs) { await this.pushOn(queue, job) } @@ -226,6 +252,7 @@ export class FakeAdapter implements Adapter { this.#activeJobs.delete(jobId) if (removeOnComplete === undefined || removeOnComplete === true) { + this.#cleanupDedupForJob(queue, active.job) return } @@ -244,6 +271,7 @@ export class FakeAdapter implements Adapter { this.#activeJobs.delete(jobId) if (removeOnFail === undefined || removeOnFail === true) { + this.#cleanupDedupForJob(queue, active.job) return } @@ -298,6 +326,7 @@ export class FakeAdapter implements Adapter { if (currentStalledCount >= maxStalledCount) { // Fail permanently - just remove from active this.#activeJobs.delete(jobId) + this.#cleanupDedupForJob(active.queue, active.job) continue } @@ -526,26 +555,131 @@ export class FakeAdapter implements Adapter { records.push(record) if (retention && retention !== true) { - this.#applyRetention(records, retention) + this.#applyRetention(records, retention, queue) } } - #applyRetention(records: JobRecord[], retention: JobRetention) { + #applyRetention(records: JobRecord[], retention: JobRetention, queue: string) { if (retention === false || retention === true) { return } + const pruned: JobRecord[] = [] + if (retention.age !== undefined) { const maxAgeMs = parse(retention.age) if (maxAgeMs > 0) { const cutoff = Date.now() - maxAgeMs - const filtered = records.filter((record) => (record.finishedAt ?? 0) >= cutoff) - records.splice(0, records.length, ...filtered) + const kept: JobRecord[] = [] + for (const record of records) { + if ((record.finishedAt ?? 0) >= cutoff) { + kept.push(record) + } else { + pruned.push(record) + } + } + records.splice(0, records.length, ...kept) } } if (retention.count !== undefined && retention.count > 0 && records.length > retention.count) { - records.splice(0, records.length - retention.count) + const excess = records.length - retention.count + pruned.push(...records.slice(0, excess)) + records.splice(0, excess) + } + + for (const record of pruned) { + this.#cleanupDedupForJob(queue, record.data) + } + } + + #applyDedup(queue: string, jobData: JobData): PushResult | null { + if (!jobData.dedup) return null + + const dedupId = jobData.dedup.id + const now = Date.now() + const entry = this.#getDedupEntry(queue, dedupId) + + if (entry) { + const withinTtl = !entry.ttl || now - entry.createdAt < entry.ttl + if (withinTtl) { + const existing = this.#findJobById(queue, entry.jobId) + if (existing) { + const replaceable = existing.location === 'pending' || existing.location === 'delayed' + if (jobData.dedup.replace && replaceable) { + existing.job.payload = structuredClone(jobData.payload) + if (jobData.dedup.extend && entry.ttl) { + entry.createdAt = now + } + return { outcome: 'replaced', jobId: entry.jobId } + } + if (jobData.dedup.extend && entry.ttl) { + entry.createdAt = now + return { outcome: 'extended', jobId: entry.jobId } + } + return { outcome: 'skipped', jobId: entry.jobId } + } + } + } + + this.#setDedupEntry(queue, dedupId, { + jobId: jobData.id, + createdAt: now, + ttl: jobData.dedup.ttl, + replace: jobData.dedup.replace, + extend: jobData.dedup.extend, + }) + + return null + } + + #findJobById( + queue: string, + jobId: string + ): { + job: JobData + location: 'pending' | 'delayed' | 'active' | 'completed' | 'failed' + } | null { + const active = this.#activeJobs.get(jobId) + if (active && active.queue === queue) { + return { job: active.job, location: 'active' } + } + const pending = this.#queues.get(queue)?.find((j) => j.id === jobId) + if (pending) { + return { job: pending, location: 'pending' } + } + const delayed = this.#delayedJobs.get(queue)?.get(jobId) + if (delayed) { + return { job: delayed.job, location: 'delayed' } + } + const completed = this.#findHistory(this.#completedJobs, queue, jobId) + if (completed) { + return { job: completed.data, location: 'completed' } + } + const failed = this.#findHistory(this.#failedJobs, queue, jobId) + if (failed) { + return { job: failed.data, location: 'failed' } + } + return null + } + + #getDedupEntry(queue: string, dedupId: string): DedupEntry | undefined { + return this.#dedupIndex.get(queue)?.get(dedupId) + } + + #setDedupEntry(queue: string, dedupId: string, entry: DedupEntry): void { + if (!this.#dedupIndex.has(queue)) { + this.#dedupIndex.set(queue, new Map()) + } + this.#dedupIndex.get(queue)!.set(dedupId, entry) + } + + #cleanupDedupForJob(queue: string, job: JobData): void { + if (!job.dedup) return + const entry = this.#getDedupEntry(queue, job.dedup.id) + // Only delete if the entry points to THIS job (replace may have re-pointed it elsewhere) + if (entry && entry.jobId === job.id) { + this.#dedupIndex.get(queue)?.delete(job.dedup.id) } } diff --git a/src/drivers/knex_adapter.ts b/src/drivers/knex_adapter.ts index 56bcd34..a2a17cc 100644 --- a/src/drivers/knex_adapter.ts +++ b/src/drivers/knex_adapter.ts @@ -1,8 +1,9 @@ import { randomUUID } from 'node:crypto' import KnexPkg from 'knex' import type { Knex } from 'knex' -import type { Adapter, AcquiredJob } from '../contracts/adapter.js' +import type { Adapter, AcquiredJob, PushResult } from '../contracts/adapter.js' import type { + DedupOutcome, JobData, JobRecord, JobRetention, @@ -117,9 +118,7 @@ export class KnexAdapter implements Adapter { // Update job to active status // For SQLite (no SKIP LOCKED), add status='pending' guard to prevent double-claim - const updateQuery = trx(this.#jobsTable) - .where('id', job.id) - .where('queue', queue) + const updateQuery = trx(this.#jobsTable).where('id', job.id).where('queue', queue) if (!this.#supportsSkipLocked()) { updateQuery.where('status', 'pending') @@ -178,14 +177,11 @@ export class KnexAdapter implements Adapter { const priority = jobData.priority ?? DEFAULT_PRIORITY const score = calculateScore(priority, now) - await trx(this.#jobsTable) - .where('id', job.id) - .where('queue', queue) - .update({ - status: 'pending', - score, - execute_at: null, - }) + await trx(this.#jobsTable).where('id', job.id).where('queue', queue).update({ + status: 'pending', + score, + execute_at: null, + }) } }) } @@ -331,45 +327,49 @@ export class KnexAdapter implements Adapter { if (retryAt && retryAt.getTime() > now) { // Move to delayed - await this.#connection(this.#jobsTable) - .where('id', jobId) - .where('queue', queue) - .update({ - status: 'delayed', - data: updatedData, - worker_id: null, - acquired_at: null, - score: null, - execute_at: retryAt.getTime(), - }) + await this.#connection(this.#jobsTable).where('id', jobId).where('queue', queue).update({ + status: 'delayed', + data: updatedData, + worker_id: null, + acquired_at: null, + score: null, + execute_at: retryAt.getTime(), + }) } else { // Move back to pending const priority = jobData.priority ?? DEFAULT_PRIORITY const score = calculateScore(priority, now) - await this.#connection(this.#jobsTable) - .where('id', jobId) - .where('queue', queue) - .update({ - status: 'pending', - data: updatedData, - worker_id: null, - acquired_at: null, - score, - execute_at: null, - }) + await this.#connection(this.#jobsTable).where('id', jobId).where('queue', queue).update({ + status: 'pending', + data: updatedData, + worker_id: null, + acquired_at: null, + score, + execute_at: null, + }) } } - async push(jobData: JobData): Promise { + async push(jobData: JobData): Promise { return this.pushOn('default', jobData) } - async pushOn(queue: string, jobData: JobData): Promise { + async pushOn(queue: string, jobData: JobData): Promise { const priority = jobData.priority ?? DEFAULT_PRIORITY const timestamp = Date.now() const score = calculateScore(priority, timestamp) + if (jobData.dedup) { + return this.#pushWithDedup(queue, jobData, { + id: jobData.id, + queue, + status: 'pending', + data: JSON.stringify(jobData), + score, + }) + } + await this.#connection(this.#jobsTable).insert({ id: jobData.id, queue, @@ -379,13 +379,23 @@ export class KnexAdapter implements Adapter { }) } - async pushLater(jobData: JobData, delay: number): Promise { + async pushLater(jobData: JobData, delay: number): Promise { return this.pushLaterOn('default', jobData, delay) } - async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { + async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { const executeAt = Date.now() + delay + if (jobData.dedup) { + return this.#pushWithDedup(queue, jobData, { + id: jobData.id, + queue, + status: 'delayed', + data: JSON.stringify(jobData), + execute_at: executeAt, + }) + } + await this.#connection(this.#jobsTable).insert({ id: jobData.id, queue, @@ -395,6 +405,138 @@ export class KnexAdapter implements Adapter { }) } + async #pushWithDedup( + queue: string, + jobData: JobData, + insertRow: Record + ): Promise { + const dedup = jobData.dedup! + + try { + return await this.#pushWithDedupTxn(queue, jobData, insertRow, dedup) + } catch (err) { + if (this.#isMissingDedupColumn(err)) { + throw new Error( + `Dedup columns missing on "${this.#jobsTable}". Run QueueSchemaService.addDedupColumns() on your jobs table before dispatching jobs with .dedup().`, + { cause: err } + ) + } + throw err + } + } + + #isMissingDedupColumn(err: unknown): boolean { + if (!err || typeof err !== 'object') return false + const message = (err as { message?: string }).message + if (!message) return false + // Postgres: 'column "dedup_id" does not exist' + // SQLite: 'no such column: dedup_id' + // MySQL: "Unknown column 'dedup_id' in 'where clause'" + return ( + /dedup_id/.test(message) && /(does not exist|no such column|Unknown column)/i.test(message) + ) + } + + #pushWithDedupTxn( + queue: string, + jobData: JobData, + insertRow: Record, + dedup: NonNullable + ): Promise { + return this.#connection.transaction(async (trx) => { + const existing = await trx(this.#jobsTable) + .where('queue', queue) + .where('dedup_id', dedup.id) + .orderBy('dedup_at', 'desc') + .forUpdate() + .first() + + const now = Date.now() + + if (existing) { + const dedupAt = existing.dedup_at != null ? Number(existing.dedup_at) : null + const dedupTtl = existing.dedup_ttl != null ? Number(existing.dedup_ttl) : null + const withinTtl = dedupTtl === null || (dedupAt !== null && now - dedupAt < dedupTtl) + + if (withinTtl) { + const status = existing.status as JobStatus + const replaceable = status === 'pending' || status === 'delayed' + + if (dedup.replace && replaceable) { + const storedData = + typeof existing.data === 'string' ? JSON.parse(existing.data) : existing.data + const newData = { ...storedData, payload: jobData.payload } + const updates: Record = { data: JSON.stringify(newData) } + if (dedup.extend && dedupTtl) { + updates.dedup_at = now + } + await trx(this.#jobsTable).where({ id: existing.id, queue }).update(updates) + return { outcome: 'replaced' as DedupOutcome, jobId: existing.id as string } + } + + if (dedup.extend && dedupTtl) { + await trx(this.#jobsTable).where({ id: existing.id, queue }).update({ dedup_at: now }) + return { outcome: 'extended' as DedupOutcome, jobId: existing.id as string } + } + + return { outcome: 'skipped' as DedupOutcome, jobId: existing.id as string } + } + // TTL expired — release the dedup slot from the old row so the new + // insert can claim it. The old job keeps running to completion; only + // its dedup identity is cleared. Retained history rows are excluded + // from the partial unique index predicate, so no update needed there. + const status = existing.status as JobStatus + if (status === 'pending' || status === 'delayed') { + await trx(this.#jobsTable) + .where({ id: existing.id, queue }) + .update({ dedup_id: null, dedup_at: null, dedup_ttl: null }) + } + } + + let raceLost = false + try { + await trx.transaction(async (sp) => { + await sp(this.#jobsTable).insert({ + ...insertRow, + dedup_id: dedup.id, + dedup_at: now, + dedup_ttl: dedup.ttl ?? null, + }) + }) + } catch (err) { + if (this.#isUniqueViolation(err)) { + raceLost = true + } else { + throw err + } + } + + if (raceLost) { + const winner = await trx(this.#jobsTable) + .where('queue', queue) + .where('dedup_id', dedup.id) + .whereIn('status', ['pending', 'delayed']) + .orderBy('dedup_at', 'desc') + .first() + if (winner) { + return { outcome: 'skipped' as DedupOutcome, jobId: winner.id as string } + } + } + + return { outcome: 'added' as DedupOutcome, jobId: jobData.id } + }) + } + + #isUniqueViolation(err: unknown): boolean { + if (!err || typeof err !== 'object') return false + const e = err as { code?: string; message?: string } + return ( + e.code === '23505' || + e.code === 'SQLITE_CONSTRAINT_UNIQUE' || + /UNIQUE constraint/i.test(e.message ?? '') + ) + } + async pushMany(jobs: JobData[]): Promise { return this.pushManyOn('default', jobs) } @@ -402,6 +544,10 @@ export class KnexAdapter implements Adapter { async pushManyOn(queue: string, jobs: JobData[]): Promise { if (jobs.length === 0) return + if (jobs.some((j) => j.dedup)) { + throw new Error('dedup is not supported in batch dispatch; use single dispatch') + } + const now = Date.now() const rows = jobs.map((job) => ({ id: job.id, @@ -458,10 +604,7 @@ export class KnexAdapter implements Adapter { if (currentStalledCount >= maxStalledCount) { // Fail permanently - remove the job - await trx(this.#jobsTable) - .where('id', row.id) - .where('queue', queue) - .delete() + await trx(this.#jobsTable).where('id', row.id).where('queue', queue).delete() } else { // Recover: increment stalledCount and put back in pending jobData.stalledCount = currentStalledCount + 1 @@ -534,9 +677,9 @@ export class KnexAdapter implements Adapter { } async getSchedule(id: string): Promise { - const row = (await this.#connection(this.#schedulesTable) - .where('id', id) - .first()) as ScheduleRow | undefined + const row = (await this.#connection(this.#schedulesTable).where('id', id).first()) as + | ScheduleRow + | undefined if (!row) return null return this.#rowToScheduleData(row) @@ -565,16 +708,12 @@ export class KnexAdapter implements Adapter { if (updates.runCount !== undefined) data.run_count = updates.runCount if (Object.keys(data).length > 0) { - await this.#connection(this.#schedulesTable) - .where('id', id) - .update(data) + await this.#connection(this.#schedulesTable).where('id', id).update(data) } } async deleteSchedule(id: string): Promise { - await this.#connection(this.#schedulesTable) - .where('id', id) - .delete() + await this.#connection(this.#schedulesTable).where('id', id).delete() } async claimDueSchedule(): Promise { @@ -629,13 +768,11 @@ export class KnexAdapter implements Adapter { } // Update atomically - await trx(this.#schedulesTable) - .where('id', row.id) - .update({ - next_run_at: nextRunAt, - last_run_at: now, - run_count: newRunCount, - }) + await trx(this.#schedulesTable).where('id', row.id).update({ + next_run_at: nextRunAt, + last_run_at: now, + run_count: newRunCount, + }) // Return schedule data (before update state for payload) return this.#rowToScheduleData(row) diff --git a/src/drivers/redis_adapter.ts b/src/drivers/redis_adapter.ts index 27fe11d..c12a400 100644 --- a/src/drivers/redis_adapter.ts +++ b/src/drivers/redis_adapter.ts @@ -2,7 +2,8 @@ import { randomUUID } from 'node:crypto' import { Redis, type RedisOptions } from 'ioredis' import { DEFAULT_PRIORITY } from '../constants.js' import { calculateScore } from '../utils.js' -import type { Adapter, AcquiredJob } from '../contracts/adapter.js' +import type { Adapter, AcquiredJob, PushResult } from '../contracts/adapter.js' +import type { DedupOutcome } from '../types/main.js' import type { JobData, JobRecord, @@ -35,6 +36,79 @@ const PUSH_JOB_SCRIPT = ` return 1 ` +/** + * Lua script for pushing a dedup job. + * + * Behavior: + * - If dedup key exists AND job still exists AND within TTL: apply replace/extend, skip insert. + * - If dedup key exists but job data missing (orphan): proceed to insert new. + * - If TTL expired or no prior entry: insert new job, record dedup key with TTL. + * + * Replace only applies to jobs in pending or delayed state. Active and + * retained completed/failed jobs are left untouched (returns 'skipped'). + * Replace swaps the payload only — priority/queue/delay/groupId/dedup + * options of the existing job are preserved. + * + * Extend uses the ORIGINAL ttl recorded on the existing job (stored in + * its dedup field), not the ttl arg of the current dispatch. Matches + * Knex/Fake behavior: extend resets the clock but never changes the + * window length. + * + * Returns {outcome, job_id}: outcome ∈ 'added' | 'skipped' | 'replaced' | 'extended'. + */ +const PUSH_DEDUP_JOB_SCRIPT = ` + local data_key = KEYS[1] + local pending_key = KEYS[2] + local dedup_key = KEYS[3] + local delayed_key = KEYS[4] + local job_id = ARGV[1] + local job_data = ARGV[2] + local score = tonumber(ARGV[3]) + local ttl = tonumber(ARGV[4]) + local extend = tonumber(ARGV[5]) + local replace = tonumber(ARGV[6]) + + local existing = redis.call('GET', dedup_key) + if existing then + local existing_data = redis.call('HGET', data_key, existing) + if existing_data then + local in_pending = redis.call('ZSCORE', pending_key, existing) + local in_delayed = redis.call('ZSCORE', delayed_key, existing) + local replaceable = in_pending or in_delayed + local ok_e, existing_job = pcall(cjson.decode, existing_data) + local original_ttl = nil + if ok_e and type(existing_job) == 'table' and existing_job.dedup then + original_ttl = tonumber(existing_job.dedup.ttl) + end + if replace == 1 and replaceable then + local ok_n, new_job = pcall(cjson.decode, job_data) + if ok_e and ok_n and existing_job and new_job then + existing_job.payload = new_job.payload + redis.call('HSET', data_key, existing, cjson.encode(existing_job)) + if extend == 1 and original_ttl and original_ttl > 0 then + redis.call('PEXPIRE', dedup_key, original_ttl) + end + return {'replaced', existing} + end + return {'skipped', existing} + end + if extend == 1 and original_ttl and original_ttl > 0 then + redis.call('PEXPIRE', dedup_key, original_ttl) + return {'extended', existing} + end + return {'skipped', existing} + end + end + + redis.call('HSET', data_key, job_id, job_data) + redis.call('ZADD', pending_key, score, job_id) + redis.call('SET', dedup_key, job_id) + if ttl > 0 then + redis.call('PEXPIRE', dedup_key, ttl) + end + return {'added', job_id} +` + /** * Lua script for pushing a delayed job. * Stores job data in the central hash and adds jobId to delayed ZSET. @@ -52,6 +126,63 @@ const PUSH_DELAYED_JOB_SCRIPT = ` return 1 ` +/** + * Lua script for pushing a dedup delayed job. + * Same semantics as PUSH_DEDUP_JOB_SCRIPT but adds to delayed ZSET. + */ +const PUSH_DEDUP_DELAYED_JOB_SCRIPT = ` + local data_key = KEYS[1] + local delayed_key = KEYS[2] + local dedup_key = KEYS[3] + local pending_key = KEYS[4] + local job_id = ARGV[1] + local job_data = ARGV[2] + local execute_at = tonumber(ARGV[3]) + local ttl = tonumber(ARGV[4]) + local extend = tonumber(ARGV[5]) + local replace = tonumber(ARGV[6]) + + local existing = redis.call('GET', dedup_key) + if existing then + local existing_data = redis.call('HGET', data_key, existing) + if existing_data then + local in_pending = redis.call('ZSCORE', pending_key, existing) + local in_delayed = redis.call('ZSCORE', delayed_key, existing) + local replaceable = in_pending or in_delayed + local ok_e, existing_job = pcall(cjson.decode, existing_data) + local original_ttl = nil + if ok_e and type(existing_job) == 'table' and existing_job.dedup then + original_ttl = tonumber(existing_job.dedup.ttl) + end + if replace == 1 and replaceable then + local ok_n, new_job = pcall(cjson.decode, job_data) + if ok_e and ok_n and existing_job and new_job then + existing_job.payload = new_job.payload + redis.call('HSET', data_key, existing, cjson.encode(existing_job)) + if extend == 1 and original_ttl and original_ttl > 0 then + redis.call('PEXPIRE', dedup_key, original_ttl) + end + return {'replaced', existing} + end + return {'skipped', existing} + end + if extend == 1 and original_ttl and original_ttl > 0 then + redis.call('PEXPIRE', dedup_key, original_ttl) + return {'extended', existing} + end + return {'skipped', existing} + end + end + + redis.call('HSET', data_key, job_id, job_data) + redis.call('ZADD', delayed_key, execute_at, job_id) + redis.call('SET', dedup_key, job_id) + if ttl > 0 then + redis.call('PEXPIRE', dedup_key, ttl) + end + return {'added', job_id} +` + /** * Lua script for atomic job acquisition. * 1. Check and process delayed jobs @@ -110,16 +241,30 @@ const ACQUIRE_JOB_SCRIPT = ` /** * Lua script for removing a job completely (no history). + * Also cleans up the dedup key if the job had dedup metadata. */ const REMOVE_JOB_SCRIPT = ` local data_key = KEYS[1] local active_key = KEYS[2] local job_id = ARGV[1] + local dedup_prefix = ARGV[2] if redis.call('HEXISTS', active_key, job_id) == 0 then return 0 end + -- Read job data to extract dedup.id before deleting + local job_data = redis.call('HGET', data_key, job_id) + if job_data then + local ok, job = pcall(cjson.decode, job_data) + if ok and job and job.dedup and job.dedup.id then + local dkey = dedup_prefix .. job.dedup.id + if redis.call('GET', dkey) == job_id then + redis.call('DEL', dkey) + end + end + end + redis.call('HDEL', active_key, job_id) redis.call('HDEL', data_key, job_id) @@ -129,6 +274,7 @@ const REMOVE_JOB_SCRIPT = ` /** * Lua script for finalizing a job in history. * Removes from active, stores finalization info, and prunes old records. + * When pruning removes job data, also deletes the associated dedup key. */ const FINALIZE_JOB_SCRIPT = ` local data_key = KEYS[1] @@ -140,6 +286,7 @@ const FINALIZE_JOB_SCRIPT = ` local max_age = tonumber(ARGV[3]) local max_count = tonumber(ARGV[4]) local error_message = ARGV[5] + local dedup_prefix = ARGV[6] -- Verify job is active if redis.call('HEXISTS', active_key, job_id) == 0 then @@ -159,11 +306,28 @@ const FINALIZE_JOB_SCRIPT = ` redis.call('HSET', history_key, job_id, cjson.encode(record)) redis.call('ZADD', index_key, now, job_id) + local function delete_dedup_for(ids) + for i = 1, #ids do + local id = ids[i] + local d = redis.call('HGET', data_key, id) + if d then + local ok, job = pcall(cjson.decode, d) + if ok and job and job.dedup and job.dedup.id then + local dkey = dedup_prefix .. job.dedup.id + if redis.call('GET', dkey) == id then + redis.call('DEL', dkey) + end + end + end + end + end + -- Prune by age if max_age and max_age > 0 then local cutoff = now - max_age local expired = redis.call('ZRANGEBYSCORE', index_key, 0, cutoff) if #expired > 0 then + delete_dedup_for(expired) redis.call('ZREM', index_key, unpack(expired)) redis.call('HDEL', history_key, unpack(expired)) redis.call('HDEL', data_key, unpack(expired)) @@ -177,6 +341,7 @@ const FINALIZE_JOB_SCRIPT = ` local excess = size - max_count local stale = redis.call('ZRANGE', index_key, 0, excess - 1) if #stale > 0 then + delete_dedup_for(stale) redis.call('ZREM', index_key, unpack(stale)) redis.call('HDEL', history_key, unpack(stale)) redis.call('HDEL', data_key, unpack(stale)) @@ -250,6 +415,7 @@ const RECOVER_STALLED_JOBS_SCRIPT = ` local now = tonumber(ARGV[1]) local stalled_threshold = tonumber(ARGV[2]) local max_stalled_count = tonumber(ARGV[3]) + local dedup_prefix = ARGV[4] local recovered = 0 local stalled_cutoff = now - stalled_threshold @@ -275,7 +441,13 @@ const RECOVER_STALLED_JOBS_SCRIPT = ` -- Check if job has exceeded max stalled count if current_stalled_count >= max_stalled_count then - -- Job failed permanently, remove data too + -- Job failed permanently, remove data + dedup key (only if pointer still ours) + if job.dedup and job.dedup.id then + local dkey = dedup_prefix .. job.dedup.id + if redis.call('GET', dkey) == job_id then + redis.call('DEL', dkey) + end + end redis.call('HDEL', data_key, job_id) else -- Recover: increment stalledCount and put back in pending @@ -476,6 +648,14 @@ export class RedisAdapter implements Adapter { } } + #getDedupKey(queue: string, dedupId: string): string { + return `${this.#getDedupPrefix(queue)}${dedupId}` + } + + #getDedupPrefix(queue: string): string { + return `${redisKey}::${queue}::dedup::` + } + setWorkerId(workerId: string): void { this.#workerId = workerId } @@ -514,10 +694,11 @@ export class RedisAdapter implements Adapter { async completeJob(jobId: string, queue: string, removeOnComplete?: JobRetention): Promise { const keys = this.#getKeys(queue) + const dedupPrefix = this.#getDedupPrefix(queue) const { keep, maxAge, maxCount } = resolveRetention(removeOnComplete) if (!keep) { - await this.#connection.eval(REMOVE_JOB_SCRIPT, 2, keys.data, keys.active, jobId) + await this.#connection.eval(REMOVE_JOB_SCRIPT, 2, keys.data, keys.active, jobId, dedupPrefix) return } @@ -532,7 +713,8 @@ export class RedisAdapter implements Adapter { Date.now().toString(), maxAge.toString(), maxCount.toString(), - '' + '', + dedupPrefix ) } @@ -543,10 +725,11 @@ export class RedisAdapter implements Adapter { removeOnFail?: JobRetention ): Promise { const keys = this.#getKeys(queue) + const dedupPrefix = this.#getDedupPrefix(queue) const { keep, maxAge, maxCount } = resolveRetention(removeOnFail) if (!keep) { - await this.#connection.eval(REMOVE_JOB_SCRIPT, 2, keys.data, keys.active, jobId) + await this.#connection.eval(REMOVE_JOB_SCRIPT, 2, keys.data, keys.active, jobId, dedupPrefix) return } @@ -561,7 +744,8 @@ export class RedisAdapter implements Adapter { Date.now().toString(), maxAge.toString(), maxCount.toString(), - error?.message || '' + error?.message || '', + dedupPrefix ) } @@ -604,18 +788,37 @@ export class RedisAdapter implements Adapter { return JSON.parse(result as string) } - push(jobData: JobData): Promise { + push(jobData: JobData): Promise { return this.pushOn('default', jobData) } - pushLater(jobData: JobData, delay: number): Promise { + pushLater(jobData: JobData, delay: number): Promise { return this.pushLaterOn('default', jobData, delay) } - async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { + async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { const keys = this.#getKeys(queue) const executeAt = Date.now() + delay + if (jobData.dedup) { + const dedupKey = this.#getDedupKey(queue, jobData.dedup.id) + const result = (await this.#connection.eval( + PUSH_DEDUP_DELAYED_JOB_SCRIPT, + 4, + keys.data, + keys.delayed, + dedupKey, + keys.pending, + jobData.id, + JSON.stringify(jobData), + executeAt.toString(), + (jobData.dedup.ttl ?? 0).toString(), + jobData.dedup.extend ? '1' : '0', + jobData.dedup.replace ? '1' : '0' + )) as [string, string] + return { outcome: result[0] as DedupOutcome, jobId: result[1] } + } + await this.#connection.eval( PUSH_DELAYED_JOB_SCRIPT, 2, @@ -627,12 +830,31 @@ export class RedisAdapter implements Adapter { ) } - async pushOn(queue: string, jobData: JobData): Promise { + async pushOn(queue: string, jobData: JobData): Promise { const keys = this.#getKeys(queue) const priority = jobData.priority ?? DEFAULT_PRIORITY const timestamp = Date.now() const score = calculateScore(priority, timestamp) + if (jobData.dedup) { + const dedupKey = this.#getDedupKey(queue, jobData.dedup.id) + const result = (await this.#connection.eval( + PUSH_DEDUP_JOB_SCRIPT, + 4, + keys.data, + keys.pending, + dedupKey, + keys.delayed, + jobData.id, + JSON.stringify(jobData), + score.toString(), + (jobData.dedup.ttl ?? 0).toString(), + jobData.dedup.extend ? '1' : '0', + jobData.dedup.replace ? '1' : '0' + )) as [string, string] + return { outcome: result[0] as DedupOutcome, jobId: result[1] } + } + await this.#connection.eval( PUSH_JOB_SCRIPT, 2, @@ -651,6 +873,10 @@ export class RedisAdapter implements Adapter { async pushManyOn(queue: string, jobs: JobData[]): Promise { if (jobs.length === 0) return + if (jobs.some((j) => j.dedup)) { + throw new Error('dedup is not supported in batch dispatch; use single dispatch') + } + const keys = this.#getKeys(queue) const now = Date.now() const multi = this.#connection.multi() @@ -690,7 +916,8 @@ export class RedisAdapter implements Adapter { keys.pending, now.toString(), stalledThreshold.toString(), - maxStalledCount.toString() + maxStalledCount.toString(), + this.#getDedupPrefix(queue) ) return recovered as number diff --git a/src/drivers/sync_adapter.ts b/src/drivers/sync_adapter.ts index 796db00..d97fa55 100644 --- a/src/drivers/sync_adapter.ts +++ b/src/drivers/sync_adapter.ts @@ -59,6 +59,10 @@ export class SyncAdapter implements Adapter { } async pushManyOn(queue: string, jobs: JobData[]): Promise { + if (jobs.some((j) => j.dedup)) { + throw new Error('dedup is not supported in batch dispatch; use single dispatch') + } + for (const job of jobs) { await this.pushOn(queue, job) } diff --git a/src/job_batch_dispatcher.ts b/src/job_batch_dispatcher.ts index 61864af..35452aa 100644 --- a/src/job_batch_dispatcher.ts +++ b/src/job_batch_dispatcher.ts @@ -165,7 +165,6 @@ export class JobBatchDispatcher { const message: JobDispatchMessage = { jobs, queue: this.#queue } - await dispatchChannel.tracePromise(async () => { await wrapInternal(() => adapter.pushManyOn(this.#queue, jobs)) }, message) diff --git a/src/job_dispatcher.ts b/src/job_dispatcher.ts index 9198895..355f45c 100644 --- a/src/job_dispatcher.ts +++ b/src/job_dispatcher.ts @@ -15,11 +15,12 @@ import { parse } from './utils.js' * * ``` * Job.dispatch(payload) - * .toQueue('emails') // optional: target queue - * .priority(1) // optional: 1-10, lower = higher priority - * .in('5m') // optional: delay before processing - * .with('redis') // optional: specific adapter - * .run() // dispatch the job + * .toQueue('emails') // optional: target queue + * .priority(1) // optional: 1-10, lower = higher priority + * .in('5m') // optional: delay before processing + * .dedup({ id: 'order-123' }) // optional: deduplication + * .with('redis') // optional: specific adapter + * .run() // dispatch the job * ``` * * @typeParam T - The payload type for this job @@ -47,6 +48,12 @@ export class JobDispatcher { #delay?: Duration #priority?: number #groupId?: string + #dedup?: { + id: string + ttl?: number + extend?: boolean + replace?: boolean + } /** * Create a new job dispatcher. @@ -148,6 +155,88 @@ export class JobDispatcher { return this } + /** + * Configure deduplication for this job. + * + * Modes: + * - **Simple** (`{ id }`): skip duplicates while the job exists. + * - **Throttle** (`{ id, ttl }`): skip duplicates within a TTL window. + * - **Extend** (`{ id, ttl, extend: true }`): reset the TTL clock on each duplicate. + * The window length stays at the original ttl from the first dispatch. + * - **Replace** (`{ id, ttl, replace: true }`): swap the payload of the existing + * pending/delayed job on duplicate within TTL. Active jobs and retained + * completed/failed jobs return `'skipped'`. Only `payload` changes — + * priority/queue/delay/groupId are preserved. + * - **Debounce** (`{ id, ttl, replace: true, extend: true }`): replace + reset TTL. + * + * The id is automatically prefixed with the job name to prevent collisions + * between different job types. + * + * @param options.id - Unique deduplication key + * @param options.ttl - TTL as Duration ('5s', 5000). Required for extend/replace. + * @param options.extend - Reset the TTL clock on duplicate within window. Window + * length stays at the original ttl; this option's `ttl` arg is ignored on extend. + * @param options.replace - Swap payload of existing pending/delayed job within + * window. Active and retained jobs are not modified. + * + * @example + * ```typescript + * // Simple dedup + * await SendInvoiceJob.dispatch({ orderId: 123 }) + * .dedup({ id: 'order-123' }) + * + * // Throttle: 5 second window + * await SendEmailJob.dispatch({ to: 'x' }) + * .dedup({ id: 'welcome', ttl: '5s' }) + * + * // Debounce: replace payload within window + * await SaveDraftJob.dispatch({ content: 'latest' }) + * .dedup({ id: 'draft-42', ttl: '2s', replace: true, extend: true }) + * ``` + */ + dedup(options: { id: string; ttl?: Duration; extend?: boolean; replace?: boolean }): this { + if (!options.id) { + throw new Error('Dedup ID must be a non-empty string') + } + + if (options.id.length > 400) { + throw new Error('Dedup ID must be 400 characters or less') + } + + // The stored dedup key is `::` and must fit within the + // adapter storage limit (Knex column is VARCHAR(510)). Reject long + // combinations early so the failure surfaces at dispatch time rather + // than at insert. + const prefixedLength = this.#name.length + 2 + options.id.length + if (prefixedLength > 510) { + throw new Error( + `Dedup ID combined with job name exceeds 510 characters ` + + `(got ${prefixedLength}). Shorten either the job name or the dedup id.` + ) + } + + if ((options.extend || options.replace) && options.ttl === undefined) { + throw new Error('dedup.ttl is required when extend or replace is set') + } + + let parsedTtl: number | undefined + if (options.ttl !== undefined) { + parsedTtl = parse(options.ttl) + if (!Number.isFinite(parsedTtl) || parsedTtl <= 0) { + throw new Error('dedup.ttl must be a positive duration') + } + } + + this.#dedup = { + id: options.id, + ttl: parsedTtl, + extend: options.extend, + replace: options.replace, + } + + return this + } + /** * Use a specific adapter for this job. * @@ -182,6 +271,7 @@ export class JobDispatcher { */ async run(): Promise { const id = randomUUID() + const dedupId = this.#dedup ? `${this.#name}::${this.#dedup.id}` : undefined debug('dispatching job %s with id %s using payload %s', this.#name, id, this.#payload) @@ -197,18 +287,40 @@ export class JobDispatcher { priority: this.#priority, groupId: this.#groupId, createdAt: Date.now(), + ...(dedupId + ? { + dedup: { + id: dedupId, + ttl: this.#dedup!.ttl, + extend: this.#dedup!.extend, + replace: this.#dedup!.replace, + }, + } + : {}), } const message: JobDispatchMessage = { jobs: [jobData], queue: this.#queue, delay: parsedDelay } + let pushResult: { outcome: DispatchResult['deduped']; jobId: string } | undefined await dispatchChannel.tracePromise(async () => { - if (parsedDelay !== undefined) { - await wrapInternal(() => adapter.pushLaterOn(this.#queue, jobData, parsedDelay)) - } else { - await wrapInternal(() => adapter.pushOn(this.#queue, jobData)) + const result = + parsedDelay !== undefined + ? await wrapInternal(() => adapter.pushLaterOn(this.#queue, jobData, parsedDelay)) + : await wrapInternal(() => adapter.pushOn(this.#queue, jobData)) + + if (result && typeof result === 'object' && 'outcome' in result) { + pushResult = { outcome: result.outcome, jobId: result.jobId } + message.dedupOutcome = result.outcome } }, message) + if (pushResult && this.#dedup) { + return { + jobId: pushResult.jobId, + deduped: pushResult.outcome, + } + } + return { jobId: id } } diff --git a/src/queue_config_resolver.ts b/src/queue_config_resolver.ts index 1679f85..02e863b 100644 --- a/src/queue_config_resolver.ts +++ b/src/queue_config_resolver.ts @@ -102,10 +102,7 @@ export class QueueConfigResolver { * merge like `retry.maxRetries`. */ #normalizeJobRetryConfig(jobOptions?: JobOptions): RetryConfig | undefined { - if ( - !jobOptions || - (jobOptions.retry === undefined && jobOptions.maxRetries === undefined) - ) { + if (!jobOptions || (jobOptions.retry === undefined && jobOptions.maxRetries === undefined)) { return undefined } diff --git a/src/services/queue_schema.ts b/src/services/queue_schema.ts index ed53eef..241c752 100644 --- a/src/services/queue_schema.ts +++ b/src/services/queue_schema.ts @@ -26,13 +26,65 @@ export class QueueSchemaService { table.bigint('execute_at').unsigned().nullable() table.bigint('finished_at').unsigned().nullable() table.text('error').nullable() + table.string('dedup_id', 510).nullable() + table.bigint('dedup_at').unsigned().nullable() + table.bigint('dedup_ttl').unsigned().nullable() table.primary(['id', 'queue']) table.index(['queue', 'status', 'score']) table.index(['queue', 'status', 'execute_at']) table.index(['queue', 'status', 'finished_at']) + table.index(['queue', 'dedup_id']) extend?.(table) }) + + await this.#createDedupActiveUniqueIndex(tableName) + } + + /** + * Idempotent migration: adds dedup columns (dedup_id, dedup_at, dedup_ttl) + * and a (queue, dedup_id) index to an existing jobs table. + * + * Safe to run multiple times. Uses hasColumn checks so it won't fail on re-runs. + * For large Postgres tables, consider pausing workers during the run. + */ + async addDedupColumns(tableName: string = 'queue_jobs'): Promise { + const hasDedupId = await this.#connection.schema.hasColumn(tableName, 'dedup_id') + const hasDedupAt = await this.#connection.schema.hasColumn(tableName, 'dedup_at') + const hasDedupTtl = await this.#connection.schema.hasColumn(tableName, 'dedup_ttl') + + if (!hasDedupId || !hasDedupAt || !hasDedupTtl) { + await this.#connection.schema.alterTable(tableName, (table) => { + if (!hasDedupId) table.string('dedup_id', 510).nullable() + if (!hasDedupAt) table.bigint('dedup_at').unsigned().nullable() + if (!hasDedupTtl) table.bigint('dedup_ttl').unsigned().nullable() + }) + } + + if (!hasDedupId) { + await this.#connection.schema.alterTable(tableName, (table) => { + table.index(['queue', 'dedup_id']) + }) + } + + await this.#createDedupActiveUniqueIndex(tableName) + } + + /** + * Partial unique index on (queue, dedup_id) for active dedup slots. + * Prevents two concurrent inserts with the same dedup_id from both succeeding. + * Only PG and SQLite support partial unique indexes; MySQL is skipped. + */ + async #createDedupActiveUniqueIndex(tableName: string): Promise { + const client = this.#connection.client.config.client + if (client !== 'pg' && client !== 'better-sqlite3' && client !== 'sqlite3') return + + const indexName = `${tableName}_dedup_active_uidx` + await this.#connection.raw( + `CREATE UNIQUE INDEX IF NOT EXISTS ?? ON ?? ("queue", "dedup_id") ` + + `WHERE "dedup_id" IS NOT NULL AND "status" IN ('pending', 'delayed')`, + [indexName, tableName] + ) } /** @@ -57,10 +109,7 @@ export class QueueSchemaService { table.integer('run_count').unsigned().notNullable().defaultTo(0) table.timestamp('next_run_at').nullable() table.timestamp('last_run_at').nullable() - table - .timestamp('created_at') - .notNullable() - .defaultTo(this.#connection.fn.now()) + table.timestamp('created_at').notNullable().defaultTo(this.#connection.fn.now()) table.index(['status', 'next_run_at']) extend?.(table) diff --git a/src/types/main.ts b/src/types/main.ts index 141a9b4..024ccfc 100644 --- a/src/types/main.ts +++ b/src/types/main.ts @@ -55,9 +55,20 @@ export type JobStatus = 'pending' | 'active' | 'delayed' | 'completed' | 'failed * console.log(`Dispatched job: ${jobId}`) * ``` */ +/** + * Outcome of a dedup-enabled dispatch. + * - `added`: new job was inserted + * - `skipped`: duplicate found within TTL, skipped silently + * - `replaced`: duplicate found within TTL, existing job's payload was replaced + * - `extended`: duplicate found within TTL, TTL window was reset + */ +export type DedupOutcome = 'added' | 'skipped' | 'replaced' | 'extended' + export interface DispatchResult { /** Unique identifier for this specific job instance */ jobId: string + /** Dedup outcome (only present when `.dedup()` was used). */ + deduped?: DedupOutcome } /** @@ -145,6 +156,41 @@ export interface JobData { * Injected by OTel plugin at dispatch time. */ traceContext?: Record + + /** + * Deduplication configuration for this job. + * When set, adapters apply dedup semantics keyed on `dedup.id`. + * Set automatically when `.dedup()` is called on the dispatcher. + */ + dedup?: { + /** + * Dedup key, prefixed with the job name (e.g. `SendInvoiceJob::order-123`). + * The combined `::` length is capped at 510 characters by the + * Knex storage column. Dispatcher validates this at `.dedup()` time. + */ + id: string + /** + * TTL in milliseconds (must be positive). When set, dedup lock auto-expires + * after TTL. After expiry, the same dedup id produces a brand-new job + * (coexists with prior). Omit `ttl` entirely for a no-expiry lock that + * persists until the job is removed. + */ + ttl?: number + /** + * Reset the TTL clock when a duplicate arrives within the window. + * The window length stays at the original ttl — passing a different + * `ttl` on the duplicating dispatch does not resize the window. + */ + extend?: boolean + /** + * Swap the payload of the existing pending or delayed job when a + * duplicate arrives within the TTL window. Active jobs and retained + * completed/failed jobs return `'skipped'` without mutation. Only + * `payload` is swapped — priority, queue, delay, and groupId of the + * existing job are preserved. + */ + replace?: boolean + } } /** diff --git a/src/types/tracing_channels.ts b/src/types/tracing_channels.ts index e2507d8..77d2057 100644 --- a/src/types/tracing_channels.ts +++ b/src/types/tracing_channels.ts @@ -6,7 +6,7 @@ */ import type { AcquiredJob } from '../contracts/adapter.js' -import type { JobData } from './main.js' +import type { DedupOutcome, JobData } from './main.js' /** * Tracing data structure for job dispatch events. @@ -21,6 +21,13 @@ export type JobDispatchMessage = { /** Delay in milliseconds before the job becomes available */ delay?: number + /** + * Deduplication outcome when the job used `.dedup()`. Allows OTel/tracing + * consumers to distinguish added vs skipped/replaced/extended dispatches. + * Populated by the dispatcher after the push call completes. + */ + dedupOutcome?: DedupOutcome + /** Error that caused the dispatch to fail */ error?: Error } diff --git a/src/worker.ts b/src/worker.ts index 5316d00..83cec94 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -7,7 +7,13 @@ import { JobPool } from './job_pool.js' import { JobExecutionRuntime } from './job_runtime.js' import { dispatchChannel, executeChannel } from './tracing_channels.js' import type { Adapter, AcquiredJob } from './contracts/adapter.js' -import type { JobContext, JobOptions, JobRetention, QueueManagerConfig, WorkerCycle } from './types/main.js' +import type { + JobContext, + JobOptions, + JobRetention, + QueueManagerConfig, + WorkerCycle, +} from './types/main.js' import type { JobDispatchMessage, JobExecuteMessage } from './types/tracing_channels.js' import { Locator } from './locator.js' import { DEFAULT_PRIORITY } from './constants.js' @@ -347,11 +353,26 @@ export class Worker { return executeChannel.tracePromise(async () => { try { await runtime.execute(instance, payload, context) - await this.#wrapInternal(() => this.#adapter.completeJob(job.id, queue, retention.removeOnComplete)) + await this.#wrapInternal(() => + this.#adapter.completeJob(job.id, queue, retention.removeOnComplete) + ) executeMessage.status = 'completed' - debug('worker %s: successfully executed job %s in %dms', this.#id, job.id, (performance.now() - startTime).toFixed(2)) + debug( + 'worker %s: successfully executed job %s in %dms', + this.#id, + job.id, + (performance.now() - startTime).toFixed(2) + ) } catch (e) { - await this.#handleExecutionFailure({ error: e as Error, job, queue, instance, runtime, retention, executeMessage }) + await this.#handleExecutionFailure({ + error: e as Error, + job, + queue, + instance, + runtime, + retention, + executeMessage, + }) } executeMessage.duration = Number((performance.now() - startTime).toFixed(2)) @@ -377,7 +398,12 @@ export class Worker { if (outcome.type === 'failed') { options.executeMessage.status = 'failed' await this.#wrapInternal(() => - this.#adapter.failJob(options.job.id, options.queue, outcome.storageError, options.retention.removeOnFail) + this.#adapter.failJob( + options.job.id, + options.queue, + outcome.storageError, + options.retention.removeOnFail + ) ) await options.instance.failed?.(outcome.hookError) return @@ -389,8 +415,15 @@ export class Worker { options.executeMessage.nextRetryAt = outcome.retryAt if (outcome.retryAt) { - debug('worker %s: job %s will retry at %s', this.#id, options.job.id, outcome.retryAt.toISOString()) - await this.#wrapInternal(() => this.#adapter.retryJob(options.job.id, options.queue, outcome.retryAt)) + debug( + 'worker %s: job %s will retry at %s', + this.#id, + options.job.id, + outcome.retryAt.toISOString() + ) + await this.#wrapInternal(() => + this.#adapter.retryJob(options.job.id, options.queue, outcome.retryAt) + ) } else { await this.#wrapInternal(() => this.#adapter.retryJob(options.job.id, options.queue)) } @@ -426,7 +459,9 @@ export class Worker { } catch (error) { debug('worker %s: failed to initialize job %s (%s)', this.#id, job.id, job.name) const retention = QueueManager.getConfigResolver().resolveJobOptions(queue) - await this.#wrapInternal(() => this.#adapter.failJob(job.id, queue, error as Error, retention.removeOnFail)) + await this.#wrapInternal(() => + this.#adapter.failJob(job.id, queue, error as Error, retention.removeOnFail) + ) throw error } } diff --git a/tests/_mocks/memory_adapter.ts b/tests/_mocks/memory_adapter.ts index 08fbc12..fa64e31 100644 --- a/tests/_mocks/memory_adapter.ts +++ b/tests/_mocks/memory_adapter.ts @@ -1,5 +1,5 @@ import { randomUUID } from 'node:crypto' -import type { Adapter, AcquiredJob } from '../../src/contracts/adapter.js' +import type { Adapter, AcquiredJob, PushResult } from '../../src/contracts/adapter.js' import type { JobData, JobRecord, @@ -21,6 +21,14 @@ interface DelayedJob { executeAt: number } +interface DedupEntry { + jobId: string + createdAt: number + ttl?: number + replace?: boolean + extend?: boolean +} + export function memory() { return () => new MemoryAdapter() } @@ -33,6 +41,7 @@ export class MemoryAdapter implements Adapter { #failedJobs: Map = new Map() #pendingTimeouts: Set = new Set() #schedules: Map = new Map() + #dedupIndex: Map> = new Map() setWorkerId(_workerId: string): void {} @@ -46,23 +55,33 @@ export class MemoryAdapter implements Adapter { return jobs.length } - async push(jobData: JobData): Promise { + async push(jobData: JobData): Promise { return this.pushOn('default', jobData) } - async pushOn(queue: string, jobData: JobData): Promise { + async pushOn(queue: string, jobData: JobData): Promise { + const deduped = this.#applyDedup(queue, jobData) + if (deduped) return deduped + if (!this.#queues.has(queue)) { this.#queues.set(queue, []) } this.#queues.get(queue)!.push(jobData) + + if (jobData.dedup) { + return { outcome: 'added', jobId: jobData.id } + } } - async pushLater(jobData: JobData, delay: number): Promise { + async pushLater(jobData: JobData, delay: number): Promise { return this.pushLaterOn('default', jobData, delay) } - pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { + async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise { + const deduped = this.#applyDedup(queue, jobData) + if (deduped) return deduped + if (!this.#delayedJobs.has(queue)) { this.#delayedJobs.set(queue, new Map()) } @@ -73,12 +92,17 @@ export class MemoryAdapter implements Adapter { const timeout = setTimeout(() => { this.#pendingTimeouts.delete(timeout) this.#delayedJobs.get(queue)?.delete(jobData.id) - void this.pushOn(queue, jobData) + if (!this.#queues.has(queue)) { + this.#queues.set(queue, []) + } + this.#queues.get(queue)!.push(jobData) }, delay) this.#pendingTimeouts.add(timeout) - return Promise.resolve() + if (jobData.dedup) { + return { outcome: 'added', jobId: jobData.id } + } } async pushMany(jobs: JobData[]): Promise { @@ -86,6 +110,10 @@ export class MemoryAdapter implements Adapter { } async pushManyOn(queue: string, jobs: JobData[]): Promise { + if (jobs.some((j) => j.dedup)) { + throw new Error('dedup is not supported in batch dispatch; use single dispatch') + } + for (const job of jobs) { await this.pushOn(queue, job) } @@ -132,6 +160,7 @@ export class MemoryAdapter implements Adapter { this.#activeJobs.delete(jobId) if (removeOnComplete === undefined || removeOnComplete === true) { + this.#cleanupDedupForJob(queue, active.job) return } @@ -150,6 +179,7 @@ export class MemoryAdapter implements Adapter { this.#activeJobs.delete(jobId) if (removeOnFail === undefined || removeOnFail === true) { + this.#cleanupDedupForJob(queue, active.job) return } @@ -204,6 +234,7 @@ export class MemoryAdapter implements Adapter { if (currentStalledCount >= maxStalledCount) { // Fail permanently - just remove from active this.#activeJobs.delete(jobId) + this.#cleanupDedupForJob(queue, active.job) continue } @@ -396,26 +427,41 @@ export class MemoryAdapter implements Adapter { records.push(record) if (retention && retention !== true) { - this.#applyRetention(records, retention) + this.#applyRetention(records, retention, queue) } } - #applyRetention(records: JobRecord[], retention: JobRetention) { + #applyRetention(records: JobRecord[], retention: JobRetention, queue: string) { if (retention === false || retention === true) { return } + const pruned: JobRecord[] = [] + if (retention.age !== undefined) { const maxAgeMs = parse(retention.age) if (maxAgeMs > 0) { const cutoff = Date.now() - maxAgeMs - const filtered = records.filter((record) => (record.finishedAt ?? 0) >= cutoff) - records.splice(0, records.length, ...filtered) + const kept: JobRecord[] = [] + for (const record of records) { + if ((record.finishedAt ?? 0) >= cutoff) { + kept.push(record) + } else { + pruned.push(record) + } + } + records.splice(0, records.length, ...kept) } } if (retention.count !== undefined && retention.count > 0 && records.length > retention.count) { - records.splice(0, records.length - retention.count) + const excess = records.length - retention.count + pruned.push(...records.slice(0, excess)) + records.splice(0, excess) + } + + for (const record of pruned) { + this.#cleanupDedupForJob(queue, record.data) } } @@ -425,4 +471,85 @@ export class MemoryAdapter implements Adapter { return records.find((record) => record.data.id === jobId) ?? null } + + #applyDedup(queue: string, jobData: JobData): PushResult | null { + if (!jobData.dedup) return null + + const dedupId = jobData.dedup.id + const now = Date.now() + const entry = this.#dedupIndex.get(queue)?.get(dedupId) + + if (entry) { + const withinTtl = !entry.ttl || now - entry.createdAt < entry.ttl + if (withinTtl) { + const existing = this.#findJobById(queue, entry.jobId) + if (existing) { + const replaceable = existing.location === 'pending' || existing.location === 'delayed' + if (jobData.dedup.replace && replaceable) { + existing.job.payload = structuredClone(jobData.payload) + if (jobData.dedup.extend && entry.ttl) { + entry.createdAt = now + } + return { outcome: 'replaced', jobId: entry.jobId } + } + if (jobData.dedup.extend && entry.ttl) { + entry.createdAt = now + return { outcome: 'extended', jobId: entry.jobId } + } + return { outcome: 'skipped', jobId: entry.jobId } + } + } + } + + if (!this.#dedupIndex.has(queue)) { + this.#dedupIndex.set(queue, new Map()) + } + this.#dedupIndex.get(queue)!.set(dedupId, { + jobId: jobData.id, + createdAt: now, + ttl: jobData.dedup.ttl, + replace: jobData.dedup.replace, + extend: jobData.dedup.extend, + }) + + return null + } + + #findJobById( + queue: string, + jobId: string + ): { + job: JobData + location: 'pending' | 'delayed' | 'active' | 'completed' | 'failed' + } | null { + const active = this.#activeJobs.get(jobId) + if (active && active.queue === queue) { + return { job: active.job, location: 'active' } + } + const pending = this.#queues.get(queue)?.find((j) => j.id === jobId) + if (pending) { + return { job: pending, location: 'pending' } + } + const delayed = this.#delayedJobs.get(queue)?.get(jobId) + if (delayed) { + return { job: delayed.job, location: 'delayed' } + } + const completed = this.#findHistory(this.#completedJobs, queue, jobId) + if (completed) { + return { job: completed.data, location: 'completed' } + } + const failed = this.#findHistory(this.#failedJobs, queue, jobId) + if (failed) { + return { job: failed.data, location: 'failed' } + } + return null + } + + #cleanupDedupForJob(queue: string, job: JobData): void { + if (!job.dedup) return + const entry = this.#dedupIndex.get(queue)?.get(job.dedup.id) + if (entry && entry.jobId === job.id) { + this.#dedupIndex.get(queue)?.delete(job.dedup.id) + } + } } diff --git a/tests/_utils/register_driver_test_suite.ts b/tests/_utils/register_driver_test_suite.ts index ecf5727..0a50d0e 100644 --- a/tests/_utils/register_driver_test_suite.ts +++ b/tests/_utils/register_driver_test_suite.ts @@ -613,7 +613,9 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { assert.isNull(job3) }) - test('recoverStalledJobs should only recover jobs from the targeted queue', async ({ assert }) => { + test('recoverStalledJobs should only recover jobs from the targeted queue', async ({ + assert, + }) => { const adapter = await options.createAdapter() adapter.setWorkerId('worker-1') @@ -1647,4 +1649,664 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) { assert.equal(second!.id, 'medium') assert.equal(third!.id, 'low') }) + + test('pushOn with dedup should skip duplicate job', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('test-queue', { + id: 'TestJob::order-1', + name: 'TestJob', + payload: { attempt: 1 }, + attempts: 0, + dedup: { id: 'order-1' }, + }) + + await adapter.pushOn('test-queue', { + id: 'TestJob::order-1', + name: 'TestJob', + payload: { attempt: 2 }, + attempts: 0, + dedup: { id: 'order-1' }, + }) + + const size = await adapter.sizeOf('test-queue') + assert.equal(size, 1) + + const job = await adapter.popFrom('test-queue') + assert.deepEqual(job!.payload, { attempt: 1 }) + }) + + test('pushOn without dedup should insert normally', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('test-queue', { + id: 'job-1', + name: 'TestJob', + payload: { data: 'first' }, + attempts: 0, + }) + + await adapter.pushOn('test-queue', { + id: 'job-2', + name: 'TestJob', + payload: { data: 'second' }, + attempts: 0, + }) + + const size = await adapter.sizeOf('test-queue') + assert.equal(size, 2) + }) + + test('pushLaterOn with dedup should skip duplicate delayed job', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushLaterOn( + 'test-queue', + { + id: 'TestJob::delayed-1', + name: 'TestJob', + payload: { attempt: 1 }, + attempts: 0, + dedup: { id: 'delayed-1' }, + }, + 60_000 + ) + + await adapter.pushLaterOn( + 'test-queue', + { + id: 'TestJob::delayed-1', + name: 'TestJob', + payload: { attempt: 2 }, + attempts: 0, + dedup: { id: 'delayed-1' }, + }, + 60_000 + ) + + const job = await adapter.getJob('TestJob::delayed-1', 'test-queue') + assert.isNotNull(job) + assert.deepEqual(job!.data.payload, { attempt: 1 }) + }) + + test('pushLaterOn dedup replace preserves the original job id', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushLaterOn( + 'rep-delayed-queue', + { + id: 'delayed-rep-uuid-1', + name: 'TestJob', + payload: { version: 1 }, + attempts: 0, + dedup: { id: 'TestJob::delayed-rep-1', ttl: 10_000, replace: true }, + }, + 50 + ) + + const second = await adapter.pushLaterOn( + 'rep-delayed-queue', + { + id: 'delayed-rep-uuid-2', + name: 'TestJob', + payload: { version: 2 }, + attempts: 0, + dedup: { id: 'TestJob::delayed-rep-1', ttl: 10_000, replace: true }, + }, + 50 + ) + assert.equal(second && typeof second === 'object' && second.outcome, 'replaced') + assert.equal(second && typeof second === 'object' && second.jobId, 'delayed-rep-uuid-1') + + await new Promise((r) => setTimeout(r, 80)) + + const job = await adapter.popFrom('rep-delayed-queue') + assert.isNotNull(job) + assert.equal(job!.id, 'delayed-rep-uuid-1') + assert.deepEqual(job!.payload, { version: 2 }) + }) + + test('pushOn with dedup should allow same id on different queues', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('queue-a', { + id: 'TestJob::shared-id', + name: 'TestJob', + payload: { queue: 'a' }, + attempts: 0, + dedup: { id: 'shared-id' }, + }) + + await adapter.pushOn('queue-b', { + id: 'TestJob::shared-id', + name: 'TestJob', + payload: { queue: 'b' }, + attempts: 0, + dedup: { id: 'shared-id' }, + }) + + const sizeA = await adapter.sizeOf('queue-a') + const sizeB = await adapter.sizeOf('queue-b') + assert.equal(sizeA, 1) + assert.equal(sizeB, 1) + }) + + test('dedup TTL: new job allowed after TTL expires', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('ttl-queue', { + id: 'uuid-1', + name: 'TestJob', + payload: { n: 1 }, + attempts: 0, + dedup: { id: 'TestJob::ttl-1', ttl: 80 }, + }) + + const second = await adapter.pushOn('ttl-queue', { + id: 'uuid-2', + name: 'TestJob', + payload: { n: 2 }, + attempts: 0, + dedup: { id: 'TestJob::ttl-1', ttl: 80 }, + }) + assert.equal(second && typeof second === 'object' && second.outcome, 'skipped') + + await new Promise((r) => setTimeout(r, 150)) + + const third = await adapter.pushOn('ttl-queue', { + id: 'uuid-3', + name: 'TestJob', + payload: { n: 3 }, + attempts: 0, + dedup: { id: 'TestJob::ttl-1', ttl: 80 }, + }) + assert.equal(third && typeof third === 'object' && third.outcome, 'added') + }) + + test('dedup replace: duplicate within TTL swaps payload on pending job', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('rep-queue', { + id: 'rep-uuid-1', + name: 'TestJob', + payload: { version: 1 }, + attempts: 0, + dedup: { id: 'TestJob::rep-1', ttl: 10_000, replace: true }, + }) + + const second = await adapter.pushOn('rep-queue', { + id: 'rep-uuid-2', + name: 'TestJob', + payload: { version: 2 }, + attempts: 0, + dedup: { id: 'TestJob::rep-1', ttl: 10_000, replace: true }, + }) + assert.equal(second && typeof second === 'object' && second.outcome, 'replaced') + assert.equal(second && typeof second === 'object' && second.jobId, 'rep-uuid-1') + + const size = await adapter.sizeOf('rep-queue') + assert.equal(size, 1) + + const job = await adapter.popFrom('rep-queue') + assert.equal(job!.id, 'rep-uuid-1') + assert.deepEqual(job!.payload, { version: 2 }) + }) + + test('dedup extend: duplicate within TTL resets the window', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('ext-queue', { + id: 'ext-uuid-1', + name: 'TestJob', + payload: { n: 1 }, + attempts: 0, + dedup: { id: 'TestJob::ext-1', ttl: 100, extend: true }, + }) + + await new Promise((r) => setTimeout(r, 60)) + + const second = await adapter.pushOn('ext-queue', { + id: 'ext-uuid-2', + name: 'TestJob', + payload: { n: 2 }, + attempts: 0, + dedup: { id: 'TestJob::ext-1', ttl: 100, extend: true }, + }) + assert.equal(second && typeof second === 'object' && second.outcome, 'extended') + + await new Promise((r) => setTimeout(r, 60)) + + // Without extend, 50ms elapsed > 40ms TTL would've expired. + const third = await adapter.pushOn('ext-queue', { + id: 'ext-uuid-3', + name: 'TestJob', + payload: { n: 3 }, + attempts: 0, + dedup: { id: 'TestJob::ext-1', ttl: 100, extend: true }, + }) + assert.equal(third && typeof third === 'object' && third.outcome, 'extended') + }) + + test('dedup: cleanup removes dedup entry when job is completed without retention', async ({ + assert, + }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('clean-queue', { + id: 'clean-uuid-1', + name: 'TestJob', + payload: { n: 1 }, + attempts: 0, + dedup: { id: 'TestJob::clean-1' }, + }) + + const popped = await adapter.popFrom('clean-queue') + await adapter.completeJob(popped!.id, 'clean-queue', true) + + // Dedup should be cleaned — new push should succeed + const second = await adapter.pushOn('clean-queue', { + id: 'clean-uuid-2', + name: 'TestJob', + payload: { n: 2 }, + attempts: 0, + dedup: { id: 'TestJob::clean-1' }, + }) + assert.equal(second && typeof second === 'object' && second.outcome, 'added') + }) + + test('dedup: cleanup removes dedup entry when job fails without retention', async ({ + assert, + }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('clean-fail', { + id: 'fail-uuid-1', + name: 'TestJob', + payload: { n: 1 }, + attempts: 0, + dedup: { id: 'TestJob::fail-1' }, + }) + + const popped = await adapter.popFrom('clean-fail') + await adapter.failJob(popped!.id, 'clean-fail', new Error('boom'), true) + + const second = await adapter.pushOn('clean-fail', { + id: 'fail-uuid-2', + name: 'TestJob', + payload: { n: 2 }, + attempts: 0, + dedup: { id: 'TestJob::fail-1' }, + }) + assert.equal(second && typeof second === 'object' && second.outcome, 'added') + }) + + test('dedup: retryJob preserves dedup entry (new dispatch stays blocked)', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('retry-queue', { + id: 'retry-uuid-1', + name: 'TestJob', + payload: { n: 1 }, + attempts: 0, + dedup: { id: 'TestJob::retry-1' }, + }) + + const popped = await adapter.popFrom('retry-queue') + await adapter.retryJob(popped!.id, 'retry-queue') + + // retry puts job back — dedup entry still points to same job + const second = await adapter.pushOn('retry-queue', { + id: 'retry-uuid-2', + name: 'TestJob', + payload: { n: 2 }, + attempts: 0, + dedup: { id: 'TestJob::retry-1' }, + }) + assert.equal(second && typeof second === 'object' && second.outcome, 'skipped') + }) + + test('dedup: pushManyOn rejects jobs with dedup', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await assert.rejects( + () => + adapter.pushManyOn('batch-queue', [ + { id: 'a', name: 'TestJob', payload: {}, attempts: 0 }, + { + id: 'b', + name: 'TestJob', + payload: {}, + attempts: 0, + dedup: { id: 'TestJob::batch-1' }, + }, + ]), + /dedup is not supported in batch dispatch/ + ) + }) + + test('dedup TTL: old pending job still runs after TTL expiry, new dispatch adds as new entry', async ({ + assert, + }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('ttl-keep-queue', { + id: 'keep-uuid-1', + name: 'TestJob', + payload: { n: 1 }, + attempts: 0, + dedup: { id: 'TestJob::keep-1', ttl: 50 }, + }) + + await new Promise((r) => setTimeout(r, 120)) + + const second = await adapter.pushOn('ttl-keep-queue', { + id: 'keep-uuid-2', + name: 'TestJob', + payload: { n: 2 }, + attempts: 0, + dedup: { id: 'TestJob::keep-1', ttl: 50 }, + }) + assert.equal(second && typeof second === 'object' && second.outcome, 'added') + assert.equal(second && typeof second === 'object' && second.jobId, 'keep-uuid-2') + + assert.equal(await adapter.sizeOf('ttl-keep-queue'), 2) + + const first = await adapter.popFrom('ttl-keep-queue') + assert.equal(first!.id, 'keep-uuid-1') + assert.deepEqual(first!.payload, { n: 1 }) + + const next = await adapter.popFrom('ttl-keep-queue') + assert.equal(next!.id, 'keep-uuid-2') + assert.deepEqual(next!.payload, { n: 2 }) + }) + + test('dedup replace: preserves priority and groupId of the existing job', async ({ assert }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('rep-preserve-queue', { + id: 'preserve-uuid-1', + name: 'TestJob', + payload: { version: 1 }, + attempts: 0, + priority: 1, + groupId: 'group-a', + dedup: { id: 'TestJob::preserve-1', ttl: 10_000, replace: true }, + }) + + const second = await adapter.pushOn('rep-preserve-queue', { + id: 'preserve-uuid-2', + name: 'TestJob', + payload: { version: 2 }, + attempts: 0, + priority: 9, + dedup: { id: 'TestJob::preserve-1', ttl: 10_000, replace: true }, + }) + assert.equal(second && typeof second === 'object' && second.outcome, 'replaced') + assert.equal(second && typeof second === 'object' && second.jobId, 'preserve-uuid-1') + + const record = await adapter.getJob('preserve-uuid-1', 'rep-preserve-queue') + assert.isNotNull(record) + assert.deepEqual(record!.data.payload, { version: 2 }) + assert.equal(record!.data.priority, 1) + assert.equal(record!.data.groupId, 'group-a') + }) + + test('dedup replace: leaves retained completed jobs untouched, returns skipped', async ({ + assert, + }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('rep-retain-queue', { + id: 'retain-uuid-1', + name: 'TestJob', + payload: { version: 1 }, + attempts: 0, + dedup: { id: 'TestJob::retain-1', ttl: 10_000, replace: true }, + }) + + const popped = await adapter.popFrom('rep-retain-queue') + await adapter.completeJob(popped!.id, 'rep-retain-queue', false) + + const second = await adapter.pushOn('rep-retain-queue', { + id: 'retain-uuid-2', + name: 'TestJob', + payload: { version: 2 }, + attempts: 0, + dedup: { id: 'TestJob::retain-1', ttl: 10_000, replace: true }, + }) + assert.equal(second && typeof second === 'object' && second.outcome, 'skipped') + assert.equal(second && typeof second === 'object' && second.jobId, 'retain-uuid-1') + + const record = await adapter.getJob('retain-uuid-1', 'rep-retain-queue') + assert.isNotNull(record) + assert.deepEqual(record!.data.payload, { version: 1 }) + }) + + test('dedup extend: window length stays the original ttl even when later dispatches pass a different ttl', async ({ + assert, + }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('extend-original-queue', { + id: 'extend-orig-uuid-1', + name: 'TestJob', + payload: { n: 1 }, + attempts: 0, + dedup: { id: 'TestJob::extend-orig-1', ttl: 100, extend: true }, + }) + + await new Promise((r) => setTimeout(r, 50)) + + const second = await adapter.pushOn('extend-original-queue', { + id: 'extend-orig-uuid-2', + name: 'TestJob', + payload: { n: 2 }, + attempts: 0, + dedup: { id: 'TestJob::extend-orig-1', ttl: 5000, extend: true }, + }) + assert.equal(second && typeof second === 'object' && second.outcome, 'extended') + + // 150ms after the reset (T50). Original 100ms window expired at T150. + // If the engine were honoring the new 5000ms ttl, the slot would still + // be alive and this dispatch would return 'extended'. + await new Promise((r) => setTimeout(r, 200)) + + const third = await adapter.pushOn('extend-original-queue', { + id: 'extend-orig-uuid-3', + name: 'TestJob', + payload: { n: 3 }, + attempts: 0, + dedup: { id: 'TestJob::extend-orig-1', ttl: 100, extend: true }, + }) + assert.equal(third && typeof third === 'object' && third.outcome, 'added') + assert.equal(third && typeof third === 'object' && third.jobId, 'extend-orig-uuid-3') + }) + + test('dedup debounce: replace + extend swaps payload and resets the TTL window', async ({ + assert, + }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('debounce-queue', { + id: 'debounce-uuid-1', + name: 'TestJob', + payload: { version: 1 }, + attempts: 0, + dedup: { id: 'TestJob::debounce-1', ttl: 200, extend: true, replace: true }, + }) + + await new Promise((r) => setTimeout(r, 120)) + + const second = await adapter.pushOn('debounce-queue', { + id: 'debounce-uuid-2', + name: 'TestJob', + payload: { version: 2 }, + attempts: 0, + dedup: { id: 'TestJob::debounce-1', ttl: 200, extend: true, replace: true }, + }) + assert.equal(second && typeof second === 'object' && second.outcome, 'replaced') + assert.equal(second && typeof second === 'object' && second.jobId, 'debounce-uuid-1') + + const midRecord = await adapter.getJob('debounce-uuid-1', 'debounce-queue') + assert.deepEqual(midRecord!.data.payload, { version: 2 }) + + // 240ms total elapsed > original 200ms TTL, but the second dispatch reset + // the window at T=120. Only 120ms into the new window → still alive. + await new Promise((r) => setTimeout(r, 120)) + + const third = await adapter.pushOn('debounce-queue', { + id: 'debounce-uuid-3', + name: 'TestJob', + payload: { version: 3 }, + attempts: 0, + dedup: { id: 'TestJob::debounce-1', ttl: 200, extend: true, replace: true }, + }) + assert.equal(third && typeof third === 'object' && third.outcome, 'replaced') + assert.equal(third && typeof third === 'object' && third.jobId, 'debounce-uuid-1') + + const finalRecord = await adapter.getJob('debounce-uuid-1', 'debounce-queue') + assert.deepEqual(finalRecord!.data.payload, { version: 3 }) + }) + + test('dedup replace: returns skipped when existing job is already active (in-flight)', async ({ + assert, + }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('active-rep-queue', { + id: 'active-rep-uuid-1', + name: 'TestJob', + payload: { version: 1 }, + attempts: 0, + dedup: { id: 'TestJob::active-rep-1', ttl: 10_000, replace: true }, + }) + + // Move job to active state — worker has popped it. + const popped = await adapter.popFrom('active-rep-queue') + assert.equal(popped!.id, 'active-rep-uuid-1') + + const second = await adapter.pushOn('active-rep-queue', { + id: 'active-rep-uuid-2', + name: 'TestJob', + payload: { version: 2 }, + attempts: 0, + dedup: { id: 'TestJob::active-rep-1', ttl: 10_000, replace: true }, + }) + + assert.equal(second && typeof second === 'object' && second.outcome, 'skipped') + assert.equal(second && typeof second === 'object' && second.jobId, 'active-rep-uuid-1') + + // Payload must not be swapped while job is in-flight. + assert.deepEqual(popped!.payload, { version: 1 }) + }) + + test('dedup extend: refreshes TTL even when existing job is already active (in-flight)', async ({ + assert, + }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + await adapter.pushOn('active-ext-queue', { + id: 'active-ext-uuid-1', + name: 'TestJob', + payload: { n: 1 }, + attempts: 0, + dedup: { id: 'TestJob::active-ext-1', ttl: 200, extend: true }, + }) + + // Move to active mid-window. + await new Promise((r) => setTimeout(r, 80)) + const popped = await adapter.popFrom('active-ext-queue') + assert.equal(popped!.id, 'active-ext-uuid-1') + + // Extend against an in-flight job — implementation refreshes the dedup TTL + // even though the existing job is active (not replaceable). + const second = await adapter.pushOn('active-ext-queue', { + id: 'active-ext-uuid-2', + name: 'TestJob', + payload: { n: 2 }, + attempts: 0, + dedup: { id: 'TestJob::active-ext-1', ttl: 200, extend: true }, + }) + assert.equal(second && typeof second === 'object' && second.outcome, 'extended') + assert.equal(second && typeof second === 'object' && second.jobId, 'active-ext-uuid-1') + + // Without the extend, the slot would have expired by now (80 + 150 > 200). + // With the extend at T=80, the window restarted; at T=230 only 150ms into + // new window → still blocking. + await new Promise((r) => setTimeout(r, 150)) + + const third = await adapter.pushOn('active-ext-queue', { + id: 'active-ext-uuid-3', + name: 'TestJob', + payload: { n: 3 }, + attempts: 0, + dedup: { id: 'TestJob::active-ext-1', ttl: 200, extend: true }, + }) + assert.equal(third && typeof third === 'object' && third.outcome, 'extended') + assert.equal(third && typeof third === 'object' && third.jobId, 'active-ext-uuid-1') + }) + + test('dedup: concurrent pushOn with same id - only one wins, rest skipped', async ({ + assert, + }) => { + const adapter = await options.createAdapter() + adapter.setWorkerId('worker-1') + + const dispatches = Array.from({ length: 5 }, (_, i) => + adapter.pushOn('concurrent-dedup-queue', { + id: `concurrent-uuid-${i}`, + name: 'TestJob', + payload: { n: i }, + attempts: 0, + dedup: { id: 'TestJob::concurrent-1' }, + }) + ) + + const results = await Promise.all(dispatches) + const outcomes = results.map((r) => (r && typeof r === 'object' ? r.outcome : undefined)) + + assert.equal( + outcomes.filter((o) => o === 'added').length, + 1, + `Expected exactly one 'added' outcome, got ${JSON.stringify(outcomes)}` + ) + assert.equal( + outcomes.filter((o) => o === 'skipped').length, + 4, + `Expected four 'skipped' outcomes, got ${JSON.stringify(outcomes)}` + ) + + const size = await adapter.sizeOf('concurrent-dedup-queue') + assert.equal(size, 1) + + // All skipped results must point at the same winner job id. + const winners = results + .filter((r) => r && typeof r === 'object' && r.outcome === 'added') + .map((r) => (r as { jobId: string }).jobId) + const skippedJobIds = results + .filter((r) => r && typeof r === 'object' && r.outcome === 'skipped') + .map((r) => (r as { jobId: string }).jobId) + for (const id of skippedJobIds) { + assert.equal(id, winners[0], 'skipped dispatch should reference the winning job id') + } + }) } diff --git a/tests/adapter.spec.ts b/tests/adapter.spec.ts index a276625..cfa4286 100644 --- a/tests/adapter.spec.ts +++ b/tests/adapter.spec.ts @@ -2,7 +2,7 @@ import Knex from 'knex' import { test } from '@japa/runner' import { Redis } from 'ioredis' import { MemoryAdapter } from './_mocks/memory_adapter.js' -import { RedisAdapter } from '../src/drivers/redis_adapter.js' +import { redis, RedisAdapter } from '../src/drivers/redis_adapter.js' import { KnexAdapter } from '../src/drivers/knex_adapter.js' import { QueueSchemaService } from '../src/services/queue_schema.js' import { registerDriverTestSuite } from './_utils/register_driver_test_suite.js' @@ -147,6 +147,275 @@ test.group('Adapter | Redis', (group) => { 'deleteSchedule should be emitted in a single write window to avoid partial state' ) }) + + test('completeJob should not delete a newer TTL dedup lock when Redis keyPrefix is disabled', async ({ + assert, + }) => { + const redisOptions = { + host: process.env.REDIS_HOST || 'localhost', + port: Number.parseInt(process.env.REDIS_PORT || '6379', 10), + db: 15, + keyPrefix: '', + } + const inspectorConnection = new Redis(redisOptions) + const adapter = redis(redisOptions)() + const queue = 'raw-ttl-clean-queue' + const dedupId = 'TestJob::raw-ttl-clean-1' + const dedupKey = `jobs::${queue}::dedup::${dedupId}` + + await connection.flushdb() + + try { + await adapter.pushOn(queue, { + id: 'raw-ttl-clean-uuid-1', + name: 'TestJob', + payload: { n: 1 }, + attempts: 0, + dedup: { id: dedupId, ttl: 80 }, + }) + + const first = await adapter.popFrom(queue) + assert.equal(first!.id, 'raw-ttl-clean-uuid-1') + + await new Promise((r) => setTimeout(r, 150)) + + const second = await adapter.pushOn(queue, { + id: 'raw-ttl-clean-uuid-2', + name: 'TestJob', + payload: { n: 2 }, + attempts: 0, + dedup: { id: dedupId, ttl: 10_000 }, + }) + assert.equal(second && typeof second === 'object' && second.outcome, 'added') + assert.equal(await inspectorConnection.get(dedupKey), 'raw-ttl-clean-uuid-2') + + await adapter.completeJob(first!.id, queue, true) + + assert.equal(await inspectorConnection.get(dedupKey), 'raw-ttl-clean-uuid-2') + + const third = await adapter.pushOn(queue, { + id: 'raw-ttl-clean-uuid-3', + name: 'TestJob', + payload: { n: 3 }, + attempts: 0, + dedup: { id: dedupId, ttl: 10_000 }, + }) + + assert.equal(third && typeof third === 'object' && third.outcome, 'skipped') + assert.equal(third && typeof third === 'object' && third.jobId, 'raw-ttl-clean-uuid-2') + } finally { + await connection.flushdb() + await adapter.destroy() + await inspectorConnection.quit() + } + }) + + test('history pruning should not delete a newer dedup lock when Redis keyPrefix is disabled', async ({ + assert, + }) => { + const redisOptions = { + host: process.env.REDIS_HOST || 'localhost', + port: Number.parseInt(process.env.REDIS_PORT || '6379', 10), + db: 15, + keyPrefix: '', + } + const inspectorConnection = new Redis(redisOptions) + const adapter = redis(redisOptions)() + const queue = 'raw-finalize-prune-queue' + const dedupId = 'TestJob::raw-finalize-prune-1' + const dedupKey = `jobs::${queue}::dedup::${dedupId}` + + await connection.flushdb() + + try { + await adapter.pushOn(queue, { + id: 'raw-finalize-prune-uuid-1', + name: 'TestJob', + payload: { n: 1 }, + attempts: 0, + dedup: { id: dedupId, ttl: 80 }, + }) + + const first = await adapter.popFrom(queue) + assert.equal(first!.id, 'raw-finalize-prune-uuid-1') + + await adapter.completeJob(first!.id, queue, { count: 1 }) + + await new Promise((r) => setTimeout(r, 150)) + + const second = await adapter.pushOn(queue, { + id: 'raw-finalize-prune-uuid-2', + name: 'TestJob', + payload: { n: 2 }, + attempts: 0, + dedup: { id: dedupId }, + }) + assert.equal(second && typeof second === 'object' && second.outcome, 'added') + assert.equal(await inspectorConnection.get(dedupKey), 'raw-finalize-prune-uuid-2') + + const popped = await adapter.popFrom(queue) + assert.equal(popped!.id, 'raw-finalize-prune-uuid-2') + + await adapter.completeJob(popped!.id, queue, { count: 1 }) + + assert.equal(await inspectorConnection.get(dedupKey), 'raw-finalize-prune-uuid-2') + + const third = await adapter.pushOn(queue, { + id: 'raw-finalize-prune-uuid-3', + name: 'TestJob', + payload: { n: 3 }, + attempts: 0, + dedup: { id: dedupId }, + }) + + assert.equal(third && typeof third === 'object' && third.outcome, 'skipped') + assert.equal(third && typeof third === 'object' && third.jobId, 'raw-finalize-prune-uuid-2') + } finally { + await connection.flushdb() + await adapter.destroy() + await inspectorConnection.quit() + } + }) + + test('recoverStalledJobs should not delete a newer dedup lock when Redis keyPrefix is disabled', async ({ + assert, + }) => { + const redisOptions = { + host: process.env.REDIS_HOST || 'localhost', + port: Number.parseInt(process.env.REDIS_PORT || '6379', 10), + db: 15, + keyPrefix: '', + } + const inspectorConnection = new Redis(redisOptions) + const adapter = redis(redisOptions)() + const queue = 'raw-stall-dedup-queue' + const dedupId = 'TestJob::raw-stall-dedup-1' + const dedupKey = `jobs::${queue}::dedup::${dedupId}` + + await connection.flushdb() + + try { + await adapter.pushOn(queue, { + id: 'raw-stall-dedup-uuid-1', + name: 'TestJob', + payload: { n: 1 }, + attempts: 0, + stalledCount: 0, + dedup: { id: dedupId, ttl: 80 }, + }) + + const first = await adapter.popFrom(queue) + assert.equal(first!.id, 'raw-stall-dedup-uuid-1') + + await new Promise((r) => setTimeout(r, 150)) + + const second = await adapter.pushOn(queue, { + id: 'raw-stall-dedup-uuid-2', + name: 'TestJob', + payload: { n: 2 }, + attempts: 0, + dedup: { id: dedupId }, + }) + assert.equal(second && typeof second === 'object' && second.outcome, 'added') + assert.equal(await inspectorConnection.get(dedupKey), 'raw-stall-dedup-uuid-2') + + // First job still active + stalled. With maxStalledCount=0 it fails permanently. + const recovered = await adapter.recoverStalledJobs(queue, 10, 0) + assert.equal(recovered, 0) + + assert.equal(await inspectorConnection.get(dedupKey), 'raw-stall-dedup-uuid-2') + + const third = await adapter.pushOn(queue, { + id: 'raw-stall-dedup-uuid-3', + name: 'TestJob', + payload: { n: 3 }, + attempts: 0, + dedup: { id: dedupId }, + }) + + assert.equal(third && typeof third === 'object' && third.outcome, 'skipped') + assert.equal(third && typeof third === 'object' && third.jobId, 'raw-stall-dedup-uuid-2') + } finally { + await connection.flushdb() + await adapter.destroy() + await inspectorConnection.quit() + } + }) + + test('dedup replace should return skipped when stored job_data is malformed JSON', async ({ + assert, + }) => { + const adapter = new RedisAdapter(connection) + const queue = 'malformed-dedup-queue' + const dataKey = `jobs::${queue}::data` + + await adapter.pushOn(queue, { + id: 'malformed-uuid-1', + name: 'TestJob', + payload: { version: 1 }, + attempts: 0, + dedup: { id: 'TestJob::malformed-1', ttl: 10_000, replace: true }, + }) + + await connection.hset(dataKey, 'malformed-uuid-1', '{not valid json') + + const second = await adapter.pushOn(queue, { + id: 'malformed-uuid-2', + name: 'TestJob', + payload: { version: 2 }, + attempts: 0, + dedup: { id: 'TestJob::malformed-1', ttl: 10_000, replace: true }, + }) + assert.equal(second && typeof second === 'object' && second.outcome, 'skipped') + assert.equal(second && typeof second === 'object' && second.jobId, 'malformed-uuid-1') + + const stored = await connection.hget(dataKey, 'malformed-uuid-1') + assert.equal(stored, '{not valid json') + }) + + test('dedup: orphan dedup pointer is reclaimed when job data is missing', async ({ assert }) => { + const adapter = new RedisAdapter(connection) + const queue = 'orphan-dedup-queue' + const dataKey = `jobs::${queue}::data` + const dedupKey = `jobs::${queue}::dedup::TestJob::orphan-1` + + await adapter.pushOn(queue, { + id: 'orphan-uuid-1', + name: 'TestJob', + payload: { version: 1 }, + attempts: 0, + dedup: { id: 'TestJob::orphan-1' }, + }) + + // Simulate the pointer outliving the job data (e.g. an external pruner + // removes the hash entry and pending ZSET entry while the dedup key has + // not expired yet). The dedup pointer remains pointing at a vanished id. + const pendingKey = `jobs::${queue}::pending` + await connection.hdel(dataKey, 'orphan-uuid-1') + await connection.zrem(pendingKey, 'orphan-uuid-1') + + const before = await connection.get(dedupKey) + assert.equal(before, 'orphan-uuid-1', 'dedup pointer should still reference the orphaned id') + + // A fresh dispatch should treat the orphan pointer as reclaimable and add + // a new job, repointing the dedup key to the new winner. + const second = await adapter.pushOn(queue, { + id: 'orphan-uuid-2', + name: 'TestJob', + payload: { version: 2 }, + attempts: 0, + dedup: { id: 'TestJob::orphan-1' }, + }) + + assert.equal(second && typeof second === 'object' && second.outcome, 'added') + assert.equal(second && typeof second === 'object' && second.jobId, 'orphan-uuid-2') + + const after = await connection.get(dedupKey) + assert.equal(after, 'orphan-uuid-2', 'dedup pointer should be reclaimed for the new job') + + const size = await adapter.sizeOf(queue) + assert.equal(size, 1) + }) }) test.group('Adapter | Knex (SQLite)', (group) => { @@ -353,4 +622,104 @@ test.group('Adapter | Knex (PostgreSQL)', (group) => { `Expected a single schedule DELETE query, got ${scheduleDeleteQueries.length}` ) }) + + test('concurrent dedup pushes should not both insert when no existing row is lockable', async ({ + assert, + }) => { + const dedupId = 'TestJob::pg-concurrent-missing-row' + const barrierFunction = 'queue_jobs_test_dedup_insert_barrier' + const barrierTrigger = 'queue_jobs_test_dedup_insert_barrier_trigger' + + await connection.raw(` + CREATE OR REPLACE FUNCTION ${barrierFunction}() + RETURNS trigger AS $$ + DECLARE + attempts integer := 0; + BEGIN + IF NEW.dedup_id = '${dedupId}' THEN + IF pg_try_advisory_lock(90312001) THEN + LOOP + EXIT WHEN NOT pg_try_advisory_lock(90312002); + PERFORM pg_advisory_unlock(90312002); + attempts := attempts + 1; + IF attempts > 1000 THEN + RAISE EXCEPTION 'timed out waiting for concurrent insert'; + END IF; + PERFORM pg_sleep(0.01); + END LOOP; + ELSE + PERFORM pg_advisory_lock(90312002); + END IF; + END IF; + + RETURN NEW; + END; + $$ LANGUAGE plpgsql + `) + + await connection.raw(` + CREATE TRIGGER ${barrierTrigger} + BEFORE INSERT ON ${tableName} + FOR EACH ROW + EXECUTE FUNCTION ${barrierFunction}() + `) + + const createConnection = () => + Knex({ + client: 'pg', + connection: { + host: process.env.PG_HOST || 'localhost', + port: Number.parseInt(process.env.PG_PORT || '5432', 10), + user: process.env.PG_USER || 'postgres', + password: process.env.PG_PASSWORD || 'postgres', + database: process.env.PG_DATABASE || 'queue_test', + }, + pool: { min: 1, max: 1 }, + }) + + const connectionA = createConnection() + const connectionB = createConnection() + const adapterA = new KnexAdapter({ connection: connectionA, tableName, schedulesTableName }) + const adapterB = new KnexAdapter({ connection: connectionB, tableName, schedulesTableName }) + + try { + const results = await Promise.all([ + adapterA.pushOn('pg-dedup-race-queue', { + id: 'pg-dedup-race-uuid-1', + name: 'TestJob', + payload: { n: 1 }, + attempts: 0, + dedup: { id: dedupId }, + }), + adapterB.pushOn('pg-dedup-race-queue', { + id: 'pg-dedup-race-uuid-2', + name: 'TestJob', + payload: { n: 2 }, + attempts: 0, + dedup: { id: dedupId }, + }), + ]) + + const outcomes = results.map((result) => + result && typeof result === 'object' ? result.outcome : undefined + ) + assert.equal(outcomes.filter((outcome) => outcome === 'added').length, 1) + assert.equal(outcomes.filter((outcome) => outcome === 'skipped').length, 1) + + const count = await connection(tableName) + .where('queue', 'pg-dedup-race-queue') + .where('dedup_id', dedupId) + .count<{ total: string }[]>('* as total') + .first() + + assert.equal(Number(count?.total), 1) + } finally { + await adapterA.destroy() + await adapterB.destroy() + await connectionA.destroy() + await connectionB.destroy() + await connection.raw(`DROP TRIGGER IF EXISTS ${barrierTrigger} ON ${tableName}`) + await connection.raw(`DROP FUNCTION IF EXISTS ${barrierFunction}()`) + } + }) }) diff --git a/tests/fake_adapter.spec.ts b/tests/fake_adapter.spec.ts index a4eae64..94924b6 100644 --- a/tests/fake_adapter.spec.ts +++ b/tests/fake_adapter.spec.ts @@ -91,6 +91,64 @@ test.group('FakeAdapter', () => { await adapter.destroy() }) + test('should skip duplicate pushOn when dedup is set', async ({ assert }) => { + const adapter = fake()() + + await adapter.pushOn('default', { + id: 'TestJob::order-1', + name: 'TestJob', + payload: { attempt: 1 }, + attempts: 0, + dedup: { id: 'order-1' }, + }) + + await adapter.pushOn('default', { + id: 'TestJob::order-1', + name: 'TestJob', + payload: { attempt: 2 }, + attempts: 0, + dedup: { id: 'order-1' }, + }) + + const size = await adapter.size() + assert.equal(size, 1) + adapter.assertPushedCount(1) + + await adapter.destroy() + }) + + test('should skip duplicate pushLaterOn when dedup is set', async () => { + const adapter = fake()() + + await adapter.pushLaterOn( + 'default', + { + id: 'TestJob::delayed-1', + name: 'TestJob', + payload: { attempt: 1 }, + attempts: 0, + dedup: { id: 'delayed-1' }, + }, + 5000 + ) + + await adapter.pushLaterOn( + 'default', + { + id: 'TestJob::delayed-1', + name: 'TestJob', + payload: { attempt: 2 }, + attempts: 0, + dedup: { id: 'delayed-1' }, + }, + 5000 + ) + + adapter.assertPushedCount(1) + + await adapter.destroy() + }) + test('should support job class matchers', async ({ assert }) => { const adapter = fake()() @@ -128,5 +186,4 @@ test.group('FakeAdapter', () => { await adapter.destroy() }) - }) diff --git a/tests/job_dispatcher.spec.ts b/tests/job_dispatcher.spec.ts index 7c71311..9184a83 100644 --- a/tests/job_dispatcher.spec.ts +++ b/tests/job_dispatcher.spec.ts @@ -317,6 +317,273 @@ test.group('JobDispatcher | groupId', () => { }) }) +test.group('JobDispatcher | dedup', () => { + test('should throw error when dedup id is empty', async ({ assert }) => { + assert.throws( + () => new JobDispatcher('TestJob', { data: 'test' }).dedup({ id: '' }), + 'Dedup ID must be a non-empty string' + ) + }) + + test('should store dedup id prefixed with job name', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + const result = await new JobDispatcher('SendInvoiceJob', { orderId: 123 }) + .dedup({ id: 'order-123' }) + .run() + + assert.match(result.jobId, /^[0-9a-f-]{36}$/) + assert.equal(result.deduped, 'added') + + const job = await sharedAdapter.pop() + assert.isNotNull(job) + assert.equal(job!.id, result.jobId) + assert.equal(job!.dedup?.id, 'SendInvoiceJob::order-123') + }) + + test('should set dedup field on job data when dedup is configured', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + await new JobDispatcher('UniqueJob', { data: 'test' }).dedup({ id: 'my-id' }).run() + + const job = await sharedAdapter.pop() + assert.isNotNull(job) + assert.equal(job!.dedup?.id, 'UniqueJob::my-id') + }) + + test('should not set dedup field when dedup is not configured', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + await new JobDispatcher('RegularJob', { data: 'test' }).run() + + const job = await sharedAdapter.pop() + assert.isNotNull(job) + assert.isUndefined(job!.dedup) + }) + + test('should silently skip duplicate job with same dedup id', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + await new JobDispatcher('DedupJob', { attempt: 1 }).dedup({ id: 'dedup-1' }).run() + await new JobDispatcher('DedupJob', { attempt: 2 }).dedup({ id: 'dedup-1' }).run() + + const size = await sharedAdapter.size() + assert.equal(size, 1) + + const job = await sharedAdapter.pop() + assert.deepEqual(job!.payload, { attempt: 1 }) + }) + + test('should allow same dedup id for different job names', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + await new JobDispatcher('JobA', { type: 'a' }).dedup({ id: 'same-id' }).run() + await new JobDispatcher('JobB', { type: 'b' }).dedup({ id: 'same-id' }).run() + + const size = await sharedAdapter.size() + assert.equal(size, 2) + }) + + test('should work with other options like priority and queue', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + const { jobId, deduped } = await new JobDispatcher('PriorityDedupJob', { task: 'important' }) + .dedup({ id: 'task-1' }) + .toQueue('high') + .priority(1) + .run() + + assert.match(jobId, /^[0-9a-f-]{36}$/) + assert.equal(deduped, 'added') + + const job = await sharedAdapter.popFrom('high') + assert.isNotNull(job) + assert.equal(job!.priority, 1) + assert.equal(job!.dedup?.id, 'PriorityDedupJob::task-1') + }) + + test('should throw when extend is set without ttl', ({ assert }) => { + assert.throws( + () => new JobDispatcher('TestJob', {}).dedup({ id: 'x', extend: true }), + 'dedup.ttl is required when extend or replace is set' + ) + }) + + test('should throw when replace is set without ttl', ({ assert }) => { + assert.throws( + () => new JobDispatcher('TestJob', {}).dedup({ id: 'x', replace: true }), + 'dedup.ttl is required when extend or replace is set' + ) + }) + + test('should throw when ttl is negative', ({ assert }) => { + assert.throws( + () => new JobDispatcher('TestJob', {}).dedup({ id: 'x', ttl: -1 }), + 'dedup.ttl must be a positive duration' + ) + }) + + test('should throw when ttl is zero', ({ assert }) => { + assert.throws( + () => new JobDispatcher('TestJob', {}).dedup({ id: 'x', ttl: 0 }), + 'dedup.ttl must be a positive duration' + ) + }) + + test('should throw when dedup id exceeds 400 chars', ({ assert }) => { + assert.throws( + () => new JobDispatcher('TestJob', {}).dedup({ id: 'a'.repeat(401) }), + 'Dedup ID must be 400 characters or less' + ) + }) + + test('should throw when job name + dedup id combined exceeds 510 chars', ({ assert }) => { + const longJobName = 'A'.repeat(200) + assert.throws( + () => new JobDispatcher(longJobName, {}).dedup({ id: 'b'.repeat(400) }), + /combined with job name exceeds 510 characters/ + ) + }) + + test('TTL: new job allowed after TTL expires', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + const first = await new JobDispatcher('ThrottleJob', { n: 1 }) + .dedup({ id: 'throttle-1', ttl: 80 }) + .run() + assert.equal(first.deduped, 'added') + + const second = await new JobDispatcher('ThrottleJob', { n: 2 }) + .dedup({ id: 'throttle-1', ttl: 80 }) + .run() + assert.equal(second.deduped, 'skipped') + + await setTimeout(150) + + const third = await new JobDispatcher('ThrottleJob', { n: 3 }) + .dedup({ id: 'throttle-1', ttl: 80 }) + .run() + assert.equal(third.deduped, 'added') + assert.notEqual(third.jobId, first.jobId) + + const size = await sharedAdapter.size() + assert.equal(size, 2) + }) + + test('extend: duplicate within TTL resets the window', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + const first = await new JobDispatcher('ExtendJob', { n: 1 }) + .dedup({ id: 'ext-1', ttl: 100, extend: true }) + .run() + assert.equal(first.deduped, 'added') + + await setTimeout(60) + + const second = await new JobDispatcher('ExtendJob', { n: 2 }) + .dedup({ id: 'ext-1', ttl: 100, extend: true }) + .run() + assert.equal(second.deduped, 'extended') + assert.equal(second.jobId, first.jobId) + + await setTimeout(60) + + // Without extend, original 40ms TTL would've expired (50ms elapsed). + // With extend, second push reset timer → still within window. + const third = await new JobDispatcher('ExtendJob', { n: 3 }) + .dedup({ id: 'ext-1', ttl: 100, extend: true }) + .run() + assert.equal(third.deduped, 'extended') + }) + + test('replace: duplicate within TTL swaps the pending job payload', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + const first = await new JobDispatcher('ReplaceJob', { version: 1 }) + .dedup({ id: 'draft-1', ttl: 100, replace: true }) + .run() + assert.equal(first.deduped, 'added') + + const second = await new JobDispatcher('ReplaceJob', { version: 2 }) + .dedup({ id: 'draft-1', ttl: 100, replace: true }) + .run() + assert.equal(second.deduped, 'replaced') + assert.equal(second.jobId, first.jobId) + + const size = await sharedAdapter.size() + assert.equal(size, 1) + + const job = await sharedAdapter.pop() + assert.deepEqual(job!.payload, { version: 2 }) + }) + + test('replace: active job is not replaced (returns skipped)', async ({ assert }) => { + const sharedAdapter = memory()() + + await QueueManager.init({ + default: 'memory', + adapters: { memory: () => sharedAdapter }, + }) + + await new JobDispatcher('ActiveReplaceJob', { version: 1 }) + .dedup({ id: 'ar-1', ttl: 1000, replace: true }) + .run() + + await sharedAdapter.pop() // moves to active + + const second = await new JobDispatcher('ActiveReplaceJob', { version: 2 }) + .dedup({ id: 'ar-1', ttl: 1000, replace: true }) + .run() + + assert.equal(second.deduped, 'skipped') + }) +}) + test.group('JobBatchDispatcher', () => { test('should dispatch multiple jobs correctly', async ({ assert }) => { const sharedAdapter = memory()() diff --git a/tests/otel.spec.ts b/tests/otel.spec.ts index 4fe4b93..70a7a71 100644 --- a/tests/otel.spec.ts +++ b/tests/otel.spec.ts @@ -22,13 +22,17 @@ function makeJob(overrides: Partial = {}): AcquiredJob { * Creates an instrumentation with a fake QueueManager, * captures the injected wrappers from the patched init. */ -async function setupWithWrappers(config: ConstructorParameters[0] = {}) { +async function setupWithWrappers( + config: ConstructorParameters[0] = {} +) { const instrumentation = new QueueInstrumentation(config) instrumentation.enable() let capturedConfig: any const fakeManager = { - init: async (cfg: any) => { capturedConfig = cfg }, + init: async (cfg: any) => { + capturedConfig = cfg + }, } instrumentation.manuallyRegister({ QueueManager: fakeManager }) @@ -37,13 +41,21 @@ async function setupWithWrappers(config: ConstructorParameters(fn: () => Promise, job: AcquiredJob, queue: string) => Promise, - internalOperationWrapper: capturedConfig.internalOperationWrapper as (fn: () => Promise) => Promise, + executionWrapper: capturedConfig.executionWrapper as ( + fn: () => Promise, + job: AcquiredJob, + queue: string + ) => Promise, + internalOperationWrapper: capturedConfig.internalOperationWrapper as ( + fn: () => Promise + ) => Promise, } } test.group('QueueInstrumentation | lifecycle', (group) => { - group.setup(() => { setupTracing() }) + group.setup(() => { + setupTracing() + }) group.each.setup(() => resetSpans()) test('enable() is idempotent', ({ assert }) => { @@ -80,7 +92,9 @@ test.group('QueueInstrumentation | lifecycle', (group) => { }) test.group('QueueInstrumentation | dispatch via DC', (group) => { - group.setup(() => { setupTracing() }) + group.setup(() => { + setupTracing() + }) group.each.setup(() => resetSpans()) test('creates PRODUCER span when no active span', async ({ assert }) => { @@ -120,7 +134,10 @@ test.group('QueueInstrumentation | dispatch via DC', (group) => { assert.isDefined(parentSpan) assert.isDefined(producerSpan) assert.equal(producerSpan!.parentSpanContext?.spanId, parentSpan!.spanContext().spanId) - assert.equal(jobData.traceContext?.traceparent?.split('-')[2], producerSpan!.spanContext().spanId) + assert.equal( + jobData.traceContext?.traceparent?.split('-')[2], + producerSpan!.spanContext().spanId + ) instrumentation.disable() }) @@ -188,7 +205,9 @@ test.group('QueueInstrumentation | dispatch via DC', (group) => { }) test.group('QueueInstrumentation | execute via executionWrapper', (group) => { - group.setup(() => { setupTracing() }) + group.setup(() => { + setupTracing() + }) group.each.setup(() => resetSpans()) test('creates CONSUMER span with semconv attributes', async ({ assert }) => { @@ -321,10 +340,21 @@ test.group('QueueInstrumentation | execute via executionWrapper', (group) => { test('queue_time_ms end-to-end via dispatch then execute', async ({ assert }) => { const { instrumentation, executionWrapper } = await setupWithWrappers() - const jobData: JobData = { id: 'e2e-qt-1', name: 'E2EQueueTimeJob', payload: {}, attempts: 0, createdAt: Date.now() } + const jobData: JobData = { + id: 'e2e-qt-1', + name: 'E2EQueueTimeJob', + payload: {}, + attempts: 0, + createdAt: Date.now(), + } await dispatchChannel.tracePromise(async () => {}, { jobs: [jobData], queue: 'default' }) - const job = makeJob({ id: 'e2e-qt-1', name: 'E2EQueueTimeJob', createdAt: jobData.createdAt, traceContext: jobData.traceContext }) + const job = makeJob({ + id: 'e2e-qt-1', + name: 'E2EQueueTimeJob', + createdAt: jobData.createdAt, + traceContext: jobData.traceContext, + }) const message: JobExecuteMessage = { job, queue: 'default', status: 'completed' } await executeChannel.tracePromise(async () => { @@ -347,11 +377,15 @@ test.group('QueueInstrumentation | execute via executionWrapper', (group) => { const message: JobExecuteMessage = { job, queue: 'default', status: 'completed' } await executeChannel.tracePromise(async () => { - await executionWrapper(async () => { - const tracer = trace.getTracer('test-child') - const childSpan = tracer.startSpan('child-operation') - childSpan.end() - }, job, 'default') + await executionWrapper( + async () => { + const tracer = trace.getTracer('test-child') + const childSpan = tracer.startSpan('child-operation') + childSpan.end() + }, + job, + 'default' + ) }, message) const spans = getFinishedSpans() @@ -367,7 +401,9 @@ test.group('QueueInstrumentation | execute via executionWrapper', (group) => { }) test.group('QueueInstrumentation | trace linking', (group) => { - group.setup(() => { setupTracing() }) + group.setup(() => { + setupTracing() + }) group.each.setup(() => resetSpans()) test('link mode links consumer to dispatch trace', async ({ assert }) => { @@ -394,7 +430,9 @@ test.group('QueueInstrumentation | trace linking', (group) => { }) test('parent mode makes consumer child of dispatch', async ({ assert }) => { - const { instrumentation, executionWrapper } = await setupWithWrappers({ executionSpanLinkMode: 'parent' }) + const { instrumentation, executionWrapper } = await setupWithWrappers({ + executionSpanLinkMode: 'parent', + }) const jobData: JobData = { id: 'parent-1', name: 'ParentedJob', payload: {}, attempts: 0 } await dispatchChannel.tracePromise(async () => {}, { jobs: [jobData], queue: 'default' }) @@ -417,7 +455,9 @@ test.group('QueueInstrumentation | trace linking', (group) => { }) test.group('QueueInstrumentation | manuallyRegister', (group) => { - group.setup(() => { setupTracing() }) + group.setup(() => { + setupTracing() + }) group.each.setup(() => resetSpans()) test('patches init to inject wrappers', async ({ assert }) => { @@ -448,7 +488,9 @@ test.group('QueueInstrumentation | manuallyRegister', (group) => { }) test.group('QueueInstrumentation | custom config', (group) => { - group.setup(() => { setupTracing() }) + group.setup(() => { + setupTracing() + }) group.each.setup(() => resetSpans()) test('custom messagingSystem attribute', async ({ assert }) => { diff --git a/tests/sync_adapter.spec.ts b/tests/sync_adapter.spec.ts index 5bf0e00..d2a64bf 100644 --- a/tests/sync_adapter.spec.ts +++ b/tests/sync_adapter.spec.ts @@ -58,9 +58,7 @@ test.group('SyncAdapter', (group) => { assert.deepEqual(contextJobIds, Array(contextJobIds.length).fill(jobId)) }) - test('should log delayed sync job failures without unhandled rejections', async ({ - assert, - }) => { + test('should log delayed sync job failures without unhandled rejections', async ({ assert }) => { const logger = new MemoryLogger() let unhandledError: unknown const onUnhandledRejection = (error: unknown) => { @@ -117,4 +115,32 @@ test.group('SyncAdapter', (group) => { assert.instanceOf(logger.logs[0].obj?.err, Error) assert.equal((logger.logs[0].obj?.err as Error).message, 'failed hook exploded') }) + + test('should ignore .dedup() and execute every dispatch inline', async ({ assert }) => { + const executedPayloads: Array<{ n: number }> = [] + + class DedupIgnoredSyncJob extends Job<{ n: number }> { + async execute() { + executedPayloads.push(this.payload) + } + } + + await QueueManager.init({ + default: 'sync', + adapters: { sync: sync() }, + }) + + Locator.register('DedupIgnoredSyncJob', DedupIgnoredSyncJob) + + const first = await DedupIgnoredSyncJob.dispatch({ n: 1 }).dedup({ id: 'sync-dedup-1' }).run() + const second = await DedupIgnoredSyncJob.dispatch({ n: 2 }).dedup({ id: 'sync-dedup-1' }).run() + const third = await DedupIgnoredSyncJob.dispatch({ n: 3 }) + .dedup({ id: 'sync-dedup-1', ttl: 10_000, replace: true }) + .run() + + assert.deepEqual(executedPayloads, [{ n: 1 }, { n: 2 }, { n: 3 }]) + assert.isUndefined(first.deduped) + assert.isUndefined(second.deduped) + assert.isUndefined(third.deduped) + }) }) diff --git a/tests/worker.spec.ts b/tests/worker.spec.ts index 226531b..fbe5b65 100644 --- a/tests/worker.spec.ts +++ b/tests/worker.spec.ts @@ -549,7 +549,9 @@ test.group('Worker', () => { const controller = new AbortController() const originalTimeout = AbortSignal.timeout const originalAddEventListener = controller.signal.addEventListener.bind(controller.signal) - const originalRemoveEventListener = controller.signal.removeEventListener.bind(controller.signal) + const originalRemoveEventListener = controller.signal.removeEventListener.bind( + controller.signal + ) let addedAbortListeners = 0 let removedAbortListeners = 0