Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 148 additions & 66 deletions src/drivers/redis_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { resolveRetention } from '../utils.js'
const redisKey = 'jobs'
const schedulesKey = 'schedules'
const schedulesIndexKey = 'schedules::index'
const schedulesDueKey = 'schedules::due'
type RedisConfig = Redis | RedisOptions

/**
Expand Down Expand Up @@ -352,76 +353,98 @@ const GET_JOB_SCRIPT = `
`

/**
* Lua script for atomically claiming a due schedule.
* Iterates the schedule index server-side and claims the first due schedule.
* Returns the schedule data if claimed, nil otherwise.
* Lua script for atomically claiming a due schedule using a sorted set index.
*
* Uses ZRANGEBYSCORE on schedules::due (scored by next_run_at) for O(log N)
* lookup instead of scanning all schedule hashes via SMEMBERS.
*
* Stale entries (paused, exhausted, deleted) are cleaned from the ZSET on
* sight so subsequent calls skip them.
*
* KEYS[1] = schedules::due (the ZSET)
* KEYS[2] = schedule key prefix (e.g. "schedules::")
* ARGV[1] = now (epoch milliseconds)
*/
const CLAIM_SCHEDULE_SCRIPT = `
local schedules_index_key = KEYS[1]
local schedule_key_prefix = KEYS[2]
local due_key = KEYS[1]
local prefix = KEYS[2]
local now = tonumber(ARGV[1])

local ids = redis.call('SMEMBERS', schedules_index_key)
while true do
local candidates = redis.call('ZRANGEBYSCORE', due_key, '-inf', tostring(now), 'LIMIT', 0, 1)

for i = 1, #ids do
local schedule_key = schedule_key_prefix .. ids[i]
if #candidates == 0 then
return nil
end

local id = candidates[1]
local schedule_key = prefix .. id

-- Get schedule data
local data = redis.call('HGETALL', schedule_key)
if #data > 0 then

-- Deleted schedule still in ZSET
if #data == 0 then
redis.call('ZREM', due_key, id)
else
-- Convert HGETALL result to table
local schedule = {}
for j = 1, #data, 2 do
schedule[data[j]] = data[j + 1]
end

-- Check if schedule is due
if schedule.status == 'active' then
local next_run_at = tonumber(schedule.next_run_at)

if next_run_at and next_run_at <= now then
local run_count = tonumber(schedule.run_count or '0')
local run_limit = schedule.run_limit and tonumber(schedule.run_limit) or nil
local to_date = schedule.to_date and tonumber(schedule.to_date) or nil

-- Check limits
if not (run_limit and run_count >= run_limit) and not (to_date and now > to_date) then
-- This schedule is claimable - atomically update it
local new_run_count = run_count + 1

-- Calculate new next_run_at (simple interval-based for now)
-- Complex cron calculation happens in the caller
local new_next_run_at = ''
local every_ms = schedule.every_ms and tonumber(schedule.every_ms) or nil
if every_ms then
new_next_run_at = tostring(now + every_ms)
end

-- Check if we've hit the limit after this run
if run_limit and new_run_count >= run_limit then
new_next_run_at = ''
end

-- Check if past end date
if to_date and new_next_run_at ~= '' and tonumber(new_next_run_at) > to_date then
new_next_run_at = ''
end

-- Update the schedule atomically
redis.call('HSET', schedule_key,
'next_run_at', new_next_run_at,
'last_run_at', tostring(now),
'run_count', tostring(new_run_count))

-- Return the schedule data (before update) as JSON
return cjson.encode(schedule)
-- Check if schedule is active
if schedule.status ~= 'active' then
redis.call('ZREM', due_key, id)
else
local run_count = tonumber(schedule.run_count or '0')
local run_limit = schedule.run_limit and tonumber(schedule.run_limit) or nil
local to_date = schedule.to_date and tonumber(schedule.to_date) or nil

-- Check limits
if (run_limit and run_count >= run_limit) or (to_date and now > to_date) then
redis.call('ZREM', due_key, id)
else
-- This schedule is claimable - atomically update it
local new_run_count = run_count + 1

-- Calculate new next_run_at (simple interval-based for now)
-- Complex cron calculation happens in the caller
local new_next_run_at = ''
local every_ms = schedule.every_ms and tonumber(schedule.every_ms) or nil
if every_ms then
new_next_run_at = tostring(now + every_ms)
end

-- Check if we've hit the limit after this run
if run_limit and new_run_count >= run_limit then
new_next_run_at = ''
end

-- Check if past end date
if to_date and new_next_run_at ~= '' and tonumber(new_next_run_at) > to_date then
new_next_run_at = ''
end

-- Update the schedule atomically
redis.call('HSET', schedule_key,
'next_run_at', new_next_run_at,
'last_run_at', tostring(now),
'run_count', tostring(new_run_count))

-- Update or remove from ZSET
if new_next_run_at ~= '' then
redis.call('ZADD', due_key, tonumber(new_next_run_at), id)
else
redis.call('ZREM', due_key, id)
end

-- Return the schedule data (before update) as JSON
return cjson.encode(schedule)
end
end
end
end

return nil
`

/**
Expand Down Expand Up @@ -700,10 +723,11 @@ export class RedisAdapter implements Adapter {
const id = config.id ?? randomUUID()
const now = Date.now()
const scheduleKey = `${schedulesKey}::${id}`
const [existingRunCount, existingCreatedAt] = await this.#connection.hmget(
const [existingRunCount, existingCreatedAt, existingNextRunAt] = await this.#connection.hmget(
scheduleKey,
'run_count',
'created_at'
'created_at',
'next_run_at'
)

const scheduleData: Record<string, string> = {
Expand All @@ -722,13 +746,17 @@ export class RedisAdapter implements Adapter {
if (config.to !== undefined) scheduleData.to_date = config.to.getTime().toString()
if (config.limit !== undefined) scheduleData.run_limit = config.limit.toString()

// Upsert schedule and clear stale optional fields from previous config.
await this.#connection
const multi = this.#connection
.multi()
.hdel(scheduleKey, 'cron_expression', 'every_ms', 'from_date', 'to_date', 'run_limit')
.hset(scheduleKey, scheduleData)
.sadd(schedulesIndexKey, id)
.exec()

if (existingNextRunAt) {
multi.zadd(schedulesDueKey, Number.parseInt(existingNextRunAt, 10), id)
}

await multi.exec()

return id
}
Expand Down Expand Up @@ -804,22 +832,42 @@ export class RedisAdapter implements Adapter {
}
if (updates.runCount !== undefined) data.run_count = updates.runCount.toString()

if (Object.keys(data).length > 0) {
await this.#connection.hset(scheduleKey, data)
if (Object.keys(data).length === 0) return

const multi = this.#connection.multi().hset(scheduleKey, data)

if (updates.nextRunAt) {
multi.zadd(schedulesDueKey, updates.nextRunAt.getTime(), id)
} else if (updates.nextRunAt === null || updates.status === 'paused') {
multi.zrem(schedulesDueKey, id)
}

if (updates.status === 'active' && updates.nextRunAt === undefined) {
const existing = await this.#connection.hget(scheduleKey, 'next_run_at')
if (existing) {
multi.zadd(schedulesDueKey, Number.parseInt(existing, 10), id)
}
}

await multi.exec()
}

async deleteSchedule(id: string): Promise<void> {
const scheduleKey = `${schedulesKey}::${id}`
await this.#connection.multi().del(scheduleKey).srem(schedulesIndexKey, id).exec()
await this.#connection
.multi()
.del(scheduleKey)
.srem(schedulesIndexKey, id)
.zrem(schedulesDueKey, id)
.exec()
}

async claimDueSchedule(): Promise<ScheduleData | null> {
const now = Date.now()
const result = await this.#connection.eval(
CLAIM_SCHEDULE_SCRIPT,
2,
schedulesIndexKey,
schedulesDueKey,
`${schedulesKey}::`,
now.toString()
)
Expand All @@ -841,7 +889,6 @@ export class RedisAdapter implements Adapter {
})
const nextRun = cron.next().toDate().getTime()

// Check limits before updating
const runCount = Number.parseInt(data.run_count || '0', 10) + 1
const runLimit = data.run_limit ? Number.parseInt(data.run_limit, 10) : null
const toDate = data.to_date ? Number.parseInt(data.to_date, 10) : null
Expand All @@ -854,16 +901,51 @@ export class RedisAdapter implements Adapter {
newNextRunAt = ''
}

await this.#connection.hset(
`${schedulesKey}::${data.id}`,
'next_run_at',
newNextRunAt.toString()
)
const scheduleKey = `${schedulesKey}::${data.id}`
const multi = this.#connection
.multi()
.hset(scheduleKey, 'next_run_at', newNextRunAt.toString())

if (typeof newNextRunAt === 'number') {
multi.zadd(schedulesDueKey, newNextRunAt, data.id)
} else {
multi.zrem(schedulesDueKey, data.id)
}

await multi.exec()
}

return this.#hashToScheduleData(data)
}

async backfillDueIndex(): Promise<number> {
const ids = await this.#connection.smembers(schedulesIndexKey)
if (ids.length === 0) return 0

const pipeline = this.#connection.pipeline()
for (const id of ids) {
pipeline.hmget(`${schedulesKey}::${id}`, 'next_run_at', 'status')
}
const results = await pipeline.exec()
if (!results) return 0

const addPipeline = this.#connection.pipeline()
let count = 0

for (let i = 0; i < ids.length; i++) {
const [err, values] = results[i]
if (err || !values) continue
const [nextRunAt, status] = values as [string | null, string | null]
if (nextRunAt && status === 'active') {
addPipeline.zadd(schedulesDueKey, Number.parseInt(nextRunAt, 10), ids[i])
count++
}
}

if (count > 0) await addPipeline.exec()
return count
}

#hashToScheduleData(data: Record<string, string>): ScheduleData {
return {
id: data.id,
Expand Down
Loading