From 812af117dc3c00ea7afaf0a0cac07bf0479a3725 Mon Sep 17 00:00:00 2001 From: Isaac <91521821+isimisi@users.noreply.github.com> Date: Fri, 15 May 2026 10:41:30 +0200 Subject: [PATCH] perf(redis): use sorted set index for O(log N) schedule claiming --- src/drivers/redis_adapter.ts | 214 ++++++++++++++++++++++++----------- 1 file changed, 148 insertions(+), 66 deletions(-) diff --git a/src/drivers/redis_adapter.ts b/src/drivers/redis_adapter.ts index 27fe11d..12e6ba8 100644 --- a/src/drivers/redis_adapter.ts +++ b/src/drivers/redis_adapter.ts @@ -16,6 +16,7 @@ import { resolveRetention } from '../utils.js' const redisKey = 'jobs' const schedulesKey = 'schedules' const schedulesIndexKey = 'schedules::index' +const schedulesDueKey = 'schedules::due' type RedisConfig = Redis | RedisOptions /** @@ -352,76 +353,98 @@ const GET_JOB_SCRIPT = ` ` /** - * Lua script for atomically claiming a due schedule. - * Iterates the schedule index server-side and claims the first due schedule. - * Returns the schedule data if claimed, nil otherwise. + * Lua script for atomically claiming a due schedule using a sorted set index. + * + * Uses ZRANGEBYSCORE on schedules::due (scored by next_run_at) for O(log N) + * lookup instead of scanning all schedule hashes via SMEMBERS. + * + * Stale entries (paused, exhausted, deleted) are cleaned from the ZSET on + * sight so subsequent calls skip them. + * + * KEYS[1] = schedules::due (the ZSET) + * KEYS[2] = schedule key prefix (e.g. "schedules::") + * ARGV[1] = now (epoch milliseconds) */ const CLAIM_SCHEDULE_SCRIPT = ` - local schedules_index_key = KEYS[1] - local schedule_key_prefix = KEYS[2] + local due_key = KEYS[1] + local prefix = KEYS[2] local now = tonumber(ARGV[1]) - local ids = redis.call('SMEMBERS', schedules_index_key) + while true do + local candidates = redis.call('ZRANGEBYSCORE', due_key, '-inf', tostring(now), 'LIMIT', 0, 1) - for i = 1, #ids do - local schedule_key = schedule_key_prefix .. ids[i] + if #candidates == 0 then + return nil + end + + local id = candidates[1] + local schedule_key = prefix .. id -- Get schedule data local data = redis.call('HGETALL', schedule_key) - if #data > 0 then + + -- Deleted schedule still in ZSET + if #data == 0 then + redis.call('ZREM', due_key, id) + else -- Convert HGETALL result to table local schedule = {} for j = 1, #data, 2 do schedule[data[j]] = data[j + 1] end - -- Check if schedule is due - if schedule.status == 'active' then - local next_run_at = tonumber(schedule.next_run_at) - - if next_run_at and next_run_at <= now then - local run_count = tonumber(schedule.run_count or '0') - local run_limit = schedule.run_limit and tonumber(schedule.run_limit) or nil - local to_date = schedule.to_date and tonumber(schedule.to_date) or nil - - -- Check limits - if not (run_limit and run_count >= run_limit) and not (to_date and now > to_date) then - -- This schedule is claimable - atomically update it - local new_run_count = run_count + 1 - - -- Calculate new next_run_at (simple interval-based for now) - -- Complex cron calculation happens in the caller - local new_next_run_at = '' - local every_ms = schedule.every_ms and tonumber(schedule.every_ms) or nil - if every_ms then - new_next_run_at = tostring(now + every_ms) - end - - -- Check if we've hit the limit after this run - if run_limit and new_run_count >= run_limit then - new_next_run_at = '' - end - - -- Check if past end date - if to_date and new_next_run_at ~= '' and tonumber(new_next_run_at) > to_date then - new_next_run_at = '' - end - - -- Update the schedule atomically - redis.call('HSET', schedule_key, - 'next_run_at', new_next_run_at, - 'last_run_at', tostring(now), - 'run_count', tostring(new_run_count)) - - -- Return the schedule data (before update) as JSON - return cjson.encode(schedule) + -- Check if schedule is active + if schedule.status ~= 'active' then + redis.call('ZREM', due_key, id) + else + local run_count = tonumber(schedule.run_count or '0') + local run_limit = schedule.run_limit and tonumber(schedule.run_limit) or nil + local to_date = schedule.to_date and tonumber(schedule.to_date) or nil + + -- Check limits + if (run_limit and run_count >= run_limit) or (to_date and now > to_date) then + redis.call('ZREM', due_key, id) + else + -- This schedule is claimable - atomically update it + local new_run_count = run_count + 1 + + -- Calculate new next_run_at (simple interval-based for now) + -- Complex cron calculation happens in the caller + local new_next_run_at = '' + local every_ms = schedule.every_ms and tonumber(schedule.every_ms) or nil + if every_ms then + new_next_run_at = tostring(now + every_ms) + end + + -- Check if we've hit the limit after this run + if run_limit and new_run_count >= run_limit then + new_next_run_at = '' + end + + -- Check if past end date + if to_date and new_next_run_at ~= '' and tonumber(new_next_run_at) > to_date then + new_next_run_at = '' end + + -- Update the schedule atomically + redis.call('HSET', schedule_key, + 'next_run_at', new_next_run_at, + 'last_run_at', tostring(now), + 'run_count', tostring(new_run_count)) + + -- Update or remove from ZSET + if new_next_run_at ~= '' then + redis.call('ZADD', due_key, tonumber(new_next_run_at), id) + else + redis.call('ZREM', due_key, id) + end + + -- Return the schedule data (before update) as JSON + return cjson.encode(schedule) end end end end - - return nil ` /** @@ -700,10 +723,11 @@ export class RedisAdapter implements Adapter { const id = config.id ?? randomUUID() const now = Date.now() const scheduleKey = `${schedulesKey}::${id}` - const [existingRunCount, existingCreatedAt] = await this.#connection.hmget( + const [existingRunCount, existingCreatedAt, existingNextRunAt] = await this.#connection.hmget( scheduleKey, 'run_count', - 'created_at' + 'created_at', + 'next_run_at' ) const scheduleData: Record = { @@ -722,13 +746,17 @@ export class RedisAdapter implements Adapter { if (config.to !== undefined) scheduleData.to_date = config.to.getTime().toString() if (config.limit !== undefined) scheduleData.run_limit = config.limit.toString() - // Upsert schedule and clear stale optional fields from previous config. - await this.#connection + const multi = this.#connection .multi() .hdel(scheduleKey, 'cron_expression', 'every_ms', 'from_date', 'to_date', 'run_limit') .hset(scheduleKey, scheduleData) .sadd(schedulesIndexKey, id) - .exec() + + if (existingNextRunAt) { + multi.zadd(schedulesDueKey, Number.parseInt(existingNextRunAt, 10), id) + } + + await multi.exec() return id } @@ -804,14 +832,34 @@ export class RedisAdapter implements Adapter { } if (updates.runCount !== undefined) data.run_count = updates.runCount.toString() - if (Object.keys(data).length > 0) { - await this.#connection.hset(scheduleKey, data) + if (Object.keys(data).length === 0) return + + const multi = this.#connection.multi().hset(scheduleKey, data) + + if (updates.nextRunAt) { + multi.zadd(schedulesDueKey, updates.nextRunAt.getTime(), id) + } else if (updates.nextRunAt === null || updates.status === 'paused') { + multi.zrem(schedulesDueKey, id) + } + + if (updates.status === 'active' && updates.nextRunAt === undefined) { + const existing = await this.#connection.hget(scheduleKey, 'next_run_at') + if (existing) { + multi.zadd(schedulesDueKey, Number.parseInt(existing, 10), id) + } } + + await multi.exec() } async deleteSchedule(id: string): Promise { const scheduleKey = `${schedulesKey}::${id}` - await this.#connection.multi().del(scheduleKey).srem(schedulesIndexKey, id).exec() + await this.#connection + .multi() + .del(scheduleKey) + .srem(schedulesIndexKey, id) + .zrem(schedulesDueKey, id) + .exec() } async claimDueSchedule(): Promise { @@ -819,7 +867,7 @@ export class RedisAdapter implements Adapter { const result = await this.#connection.eval( CLAIM_SCHEDULE_SCRIPT, 2, - schedulesIndexKey, + schedulesDueKey, `${schedulesKey}::`, now.toString() ) @@ -841,7 +889,6 @@ export class RedisAdapter implements Adapter { }) const nextRun = cron.next().toDate().getTime() - // Check limits before updating const runCount = Number.parseInt(data.run_count || '0', 10) + 1 const runLimit = data.run_limit ? Number.parseInt(data.run_limit, 10) : null const toDate = data.to_date ? Number.parseInt(data.to_date, 10) : null @@ -854,16 +901,51 @@ export class RedisAdapter implements Adapter { newNextRunAt = '' } - await this.#connection.hset( - `${schedulesKey}::${data.id}`, - 'next_run_at', - newNextRunAt.toString() - ) + const scheduleKey = `${schedulesKey}::${data.id}` + const multi = this.#connection + .multi() + .hset(scheduleKey, 'next_run_at', newNextRunAt.toString()) + + if (typeof newNextRunAt === 'number') { + multi.zadd(schedulesDueKey, newNextRunAt, data.id) + } else { + multi.zrem(schedulesDueKey, data.id) + } + + await multi.exec() } return this.#hashToScheduleData(data) } + async backfillDueIndex(): Promise { + const ids = await this.#connection.smembers(schedulesIndexKey) + if (ids.length === 0) return 0 + + const pipeline = this.#connection.pipeline() + for (const id of ids) { + pipeline.hmget(`${schedulesKey}::${id}`, 'next_run_at', 'status') + } + const results = await pipeline.exec() + if (!results) return 0 + + const addPipeline = this.#connection.pipeline() + let count = 0 + + for (let i = 0; i < ids.length; i++) { + const [err, values] = results[i] + if (err || !values) continue + const [nextRunAt, status] = values as [string | null, string | null] + if (nextRunAt && status === 'active') { + addPipeline.zadd(schedulesDueKey, Number.parseInt(nextRunAt, 10), ids[i]) + count++ + } + } + + if (count > 0) await addPipeline.exec() + return count + } + #hashToScheduleData(data: Record): ScheduleData { return { id: data.id,