Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/contracts/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,16 @@ export interface Adapter {
*/
destroy(): Promise<void>

/**
* Run adapter-specific migrations needed after a major version upgrade.
*
* This method is idempotent — it is always safe to call multiple times.
* Adapters that have no pending migrations return immediately.
*
* Call this once during your deployment process before starting workers.
*/
migrate(): Promise<void>

/**
* Create or update a schedule.
*
Expand Down
4 changes: 4 additions & 0 deletions src/drivers/fake_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,10 @@ export class FakeAdapter implements Adapter {
return Promise.resolve()
}

migrate(): Promise<void> {
return Promise.resolve()
}

async upsertSchedule(config: ScheduleConfig): Promise<string> {
const id = config.id ?? randomUUID()
const existing = this.#schedules.get(id)
Expand Down
2 changes: 2 additions & 0 deletions src/drivers/knex_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ export class KnexAdapter implements Adapter {
}
}

async migrate(): Promise<void> {}

async pop(): Promise<AcquiredJob | null> {
return this.popFrom('default')
}
Expand Down
95 changes: 79 additions & 16 deletions src/drivers/redis_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
const redisKey = 'jobs'
const schedulesKey = 'schedules'
const schedulesIndexKey = 'schedules::index'
const schedulesDueKey = 'schedules::due'
type RedisConfig = Redis | RedisOptions

/**
Expand Down Expand Up @@ -65,7 +66,6 @@ export class RedisAdapter implements Adapter {
readonly #connection: Redis
readonly #ownsConnection: boolean
#workerId: string = ''

constructor(connection: Redis, ownsConnection: boolean = false) {
this.#connection = connection
this.#ownsConnection = ownsConnection
Expand Down Expand Up @@ -408,10 +408,11 @@ export class RedisAdapter implements Adapter {
const id = config.id ?? randomUUID()
const now = Date.now()
const scheduleKey = `${schedulesKey}::${id}`
const [existingRunCount, existingCreatedAt] = await this.#connection.hmget(
const [existingRunCount, existingCreatedAt, existingNextRunAt] = await this.#connection.hmget(
scheduleKey,
'run_count',
'created_at'
'created_at',
'next_run_at'
)

const scheduleData: Record<string, string> = {
Expand All @@ -430,13 +431,17 @@ export class RedisAdapter implements Adapter {
if (config.to !== undefined) scheduleData.to_date = config.to.getTime().toString()
if (config.limit !== undefined) scheduleData.run_limit = config.limit.toString()

// Upsert schedule and clear stale optional fields from previous config.
await this.#connection
const multi = this.#connection
.multi()
.hdel(scheduleKey, 'cron_expression', 'every_ms', 'from_date', 'to_date', 'run_limit')
.hset(scheduleKey, scheduleData)
.sadd(schedulesIndexKey, id)
.exec()

if (existingNextRunAt) {
multi.zadd(schedulesDueKey, Number.parseInt(existingNextRunAt, 10), id)
}

await multi.exec()

return id
}
Expand Down Expand Up @@ -512,22 +517,46 @@ export class RedisAdapter implements Adapter {
}
if (updates.runCount !== undefined) data.run_count = updates.runCount.toString()

if (Object.keys(data).length > 0) {
await this.#connection.hset(scheduleKey, data)
if (Object.keys(data).length === 0) return

const multi = this.#connection.multi().hset(scheduleKey, data)

if (updates.nextRunAt) {
multi.zadd(schedulesDueKey, updates.nextRunAt.getTime(), id)
} else if (updates.nextRunAt === null || updates.status === 'paused') {
multi.zrem(schedulesDueKey, id)
}

if (updates.status === 'active' && updates.nextRunAt === undefined) {
const existing = await this.#connection.hget(scheduleKey, 'next_run_at')
if (existing) {
multi.zadd(schedulesDueKey, Number.parseInt(existing, 10), id)
}
}

await multi.exec()
}

async deleteSchedule(id: string): Promise<void> {
const scheduleKey = `${schedulesKey}::${id}`
await this.#connection.multi().del(scheduleKey).srem(schedulesIndexKey, id).exec()
await this.#connection
.multi()
.del(scheduleKey)
.srem(schedulesIndexKey, id)
.zrem(schedulesDueKey, id)
.exec()
}

async migrate(): Promise<void> {
await this.backfillDueIndex()
}

async claimDueSchedule(): Promise<ScheduleData | null> {
const now = Date.now()
const result = await this.#connection.eval(
CLAIM_SCHEDULE_SCRIPT,
2,
schedulesIndexKey,
schedulesDueKey,
`${schedulesKey}::`,
now.toString()
)
Expand All @@ -549,7 +578,6 @@ export class RedisAdapter implements Adapter {
})
const nextRun = cron.next().toDate().getTime()

// Check limits before updating
const runCount = Number.parseInt(data.run_count || '0', 10) + 1
const runLimit = data.run_limit ? Number.parseInt(data.run_limit, 10) : null
const toDate = data.to_date ? Number.parseInt(data.to_date, 10) : null
Expand All @@ -562,16 +590,51 @@ export class RedisAdapter implements Adapter {
newNextRunAt = ''
}

await this.#connection.hset(
`${schedulesKey}::${data.id}`,
'next_run_at',
newNextRunAt.toString()
)
const scheduleKey = `${schedulesKey}::${data.id}`
const multi = this.#connection
.multi()
.hset(scheduleKey, 'next_run_at', newNextRunAt.toString())

if (typeof newNextRunAt === 'number') {
multi.zadd(schedulesDueKey, newNextRunAt, data.id)
} else {
multi.zrem(schedulesDueKey, data.id)
}

await multi.exec()
}

return this.#hashToScheduleData(data)
}

async backfillDueIndex(): Promise<number> {
const ids = await this.#connection.smembers(schedulesIndexKey)
if (ids.length === 0) return 0

const pipeline = this.#connection.pipeline()
for (const id of ids) {
pipeline.hmget(`${schedulesKey}::${id}`, 'next_run_at', 'status')
}
const results = await pipeline.exec()
if (!results) return 0

const addPipeline = this.#connection.pipeline()
let count = 0

for (let i = 0; i < ids.length; i++) {
const [err, values] = results[i]
if (err || !values) continue
const [nextRunAt, status] = values as [string | null, string | null]
if (nextRunAt && status === 'active') {
addPipeline.zadd(schedulesDueKey, Number.parseInt(nextRunAt, 10), ids[i])
count++
}
}

if (count > 0) await addPipeline.exec()
return count
}

#hashToScheduleData(data: Record<string, string>): ScheduleData {
return {
id: data.id,
Expand Down
125 changes: 78 additions & 47 deletions src/drivers/redis_scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -461,74 +461,105 @@ ${REDIS_JOB_STORAGE_LUA}
`

/**
* Lua script for atomically claiming a due schedule.
* Iterates the schedule index server-side and claims the first due schedule.
* Returns the schedule data if claimed, nil otherwise.
* Lua script for atomically claiming a due schedule using a sorted set index.
*
* Uses ZRANGEBYSCORE on schedules::due (scored by next_run_at) for O(log N)
* lookup instead of scanning all schedule hashes via SMEMBERS.
*
* Stale entries (paused, exhausted, deleted) are cleaned from the ZSET on
* sight so subsequent calls skip them.
*
* KEYS[1] = schedules::due (the ZSET)
* KEYS[2] = schedule key prefix (e.g. "schedules::")
* ARGV[1] = now (epoch milliseconds)
*/
export const CLAIM_SCHEDULE_SCRIPT = `
local schedules_index_key = KEYS[1]
local schedule_key_prefix = KEYS[2]
local due_key = KEYS[1]
local prefix = KEYS[2]
local now = tonumber(ARGV[1])

local ids = redis.call('SMEMBERS', schedules_index_key)
while true do
local candidates = redis.call('ZRANGEBYSCORE', due_key, '-inf', tostring(now), 'LIMIT', 0, 1)

if #candidates == 0 then
return nil
end

for i = 1, #ids do
local schedule_key = schedule_key_prefix .. ids[i]
local id = candidates[1]
local schedule_key = prefix .. id

-- Get schedule data
local data = redis.call('HGETALL', schedule_key)
if #data > 0 then

-- Deleted schedule still in ZSET
if #data == 0 then
redis.call('ZREM', due_key, id)
else
-- Convert HGETALL result to table
local schedule = {}
for j = 1, #data, 2 do
schedule[data[j]] = data[j + 1]
end

-- Check if schedule is due
if schedule.status == 'active' then
local next_run_at = tonumber(schedule.next_run_at)

if next_run_at and next_run_at <= now then
local run_count = tonumber(schedule.run_count or '0')
local run_limit = schedule.run_limit and tonumber(schedule.run_limit) or nil
local to_date = schedule.to_date and tonumber(schedule.to_date) or nil

-- Check limits
if not (run_limit and run_count >= run_limit) and not (to_date and now > to_date) then
-- This schedule is claimable - atomically update it
local new_run_count = run_count + 1

-- Calculate new next_run_at (simple interval-based for now)
-- Complex cron calculation happens in the caller
local new_next_run_at = ''
local every_ms = schedule.every_ms and tonumber(schedule.every_ms) or nil
if every_ms then
new_next_run_at = tostring(now + every_ms)
end
-- Check if schedule is active
if schedule.status ~= 'active' then
redis.call('ZREM', due_key, id)
else
-- Hash is the source of truth for next_run_at.
-- If the ZSET score is stale, repair it and skip this candidate.
local hash_nra = schedule.next_run_at
if not hash_nra or hash_nra == '' then
redis.call('ZREM', due_key, id)
elseif tonumber(hash_nra) > now then
redis.call('ZADD', due_key, tonumber(hash_nra), id)
else
local run_count = tonumber(schedule.run_count or '0')
local run_limit = schedule.run_limit and tonumber(schedule.run_limit) or nil
local to_date = schedule.to_date and tonumber(schedule.to_date) or nil

-- Check if we've hit the limit after this run
if run_limit and new_run_count >= run_limit then
new_next_run_at = ''
end
-- Check limits
if (run_limit and run_count >= run_limit) or (to_date and now > to_date) then
redis.call('ZREM', due_key, id)
else
-- This schedule is claimable - atomically update it
local new_run_count = run_count + 1

-- Calculate new next_run_at (simple interval-based for now)
-- Complex cron calculation happens in the caller
local new_next_run_at = ''
local every_ms = schedule.every_ms and tonumber(schedule.every_ms) or nil
if every_ms then
new_next_run_at = tostring(now + every_ms)
end

-- Check if past end date
if to_date and new_next_run_at ~= '' and tonumber(new_next_run_at) > to_date then
new_next_run_at = ''
end
-- Check if we've hit the limit after this run
if run_limit and new_run_count >= run_limit then
new_next_run_at = ''
end

-- Update the schedule atomically
redis.call('HSET', schedule_key,
'next_run_at', new_next_run_at,
'last_run_at', tostring(now),
'run_count', tostring(new_run_count))
-- Check if past end date
if to_date and new_next_run_at ~= '' and tonumber(new_next_run_at) > to_date then
new_next_run_at = ''
end

-- Return the schedule data (before update) as JSON
return cjson.encode(schedule)
-- Update the schedule atomically
redis.call('HSET', schedule_key,
'next_run_at', new_next_run_at,
'last_run_at', tostring(now),
'run_count', tostring(new_run_count))

-- Update or remove from ZSET
if new_next_run_at ~= '' then
redis.call('ZADD', due_key, tonumber(new_next_run_at), id)
else
redis.call('ZREM', due_key, id)
end

-- Return the schedule data (before update) as JSON
return cjson.encode(schedule)
end
end
end
end
end

return nil
`
4 changes: 4 additions & 0 deletions src/drivers/sync_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ export class SyncAdapter implements Adapter {
return Promise.resolve()
}

migrate(): Promise<void> {
return Promise.resolve()
}

upsertSchedule(_config: ScheduleConfig): Promise<string> {
// No-op: schedules don't make sense for sync adapter
// Return a fake ID so code doesn't break in dev
Expand Down
4 changes: 4 additions & 0 deletions tests/_mocks/memory_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,10 @@ export class MemoryAdapter implements Adapter {
return Promise.resolve()
}

migrate(): Promise<void> {
return Promise.resolve()
}

async upsertSchedule(config: ScheduleConfig): Promise<string> {
const id = config.id ?? randomUUID()
const existing = this.#schedules.get(id)
Expand Down
Loading