Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@athenna/queue",
"version": "5.32.0",
"version": "5.33.0",
"description": "The Athenna queue handler.",
"license": "MIT",
"author": "João Lenon <lenon@athenna.io>",
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export * from '#src/drivers/DatabaseDriver'
export * from '#src/factories/ConnectionFactory'

export * from '#src/facades/Queue'
export * from '#src/worker/BaseWorker'
export * from '#src/worker/WorkerImpl'
export * from '#src/providers/QueueProvider'
export * from '#src/providers/WorkerProvider'
Expand Down
12 changes: 12 additions & 0 deletions src/types/ConnectionOptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,5 +125,17 @@ export type ConnectionOptions = {
* @default Parser.timeToMs('5m')
*/
workerTimeoutMs?: number

/**
* Define how many independent consumer loops run in parallel for this
* connection. Each loop still pulls and processes ONE job at a time, so a
* value of `N` yields an effective concurrency of `N`. This is honored by
* the `@athenna/event` consumer; raise it to drain a backed-up queue faster
* without spinning up extra processes. When the option is `null`/unset it
* defaults to `0`, which falls back to a single serial loop (one-by-one).
*
* @default Config.get(`queue.connections.${connection}.workerConcurrency`, 0)
*/
workerConcurrency?: number
}
}
7 changes: 5 additions & 2 deletions src/types/WorkerOptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ export type WorkerOptions = {
name?: string

/**
* Define how much instances of the same worker could run in parallel.
* Define how many instances of the same worker run in parallel. Each
* instance still processes one job at a time, so a value of `N` yields an
* effective concurrency of `N`. When omitted, the worker falls back to the
* connection's `workerConcurrency` config and, if that is `0`/unset, to `1`.
*
* @default 1
* @default Config.get(`queue.connections.${connection}.workerConcurrency`, 1)
*/
concurrency?: number

Expand Down
65 changes: 65 additions & 0 deletions src/worker/BaseWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* @athenna/queue
*
* (c) João Lenon <lenon@athenna.io>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

import 'reflect-metadata'

import { Queue } from '#src/facades/Queue'
import { Annotation } from '@athenna/ioc'
import type { QueueImpl } from '#src/queue/QueueImpl'

/**
* Base class for workers. Extend it to get a queue instance already bound to
* the worker's own connection, so you don't need to call `Queue.connection()`
* on every operation.
*
* @example
* ```ts
* @Worker()
* export class HelloWorker extends BaseWorker {
* public async handle(ctx: Context) {
* await this.queue.add({ hello: 'world' })
* }
* }
* ```
*/
export class BaseWorker {
/**
* Cached queue instance bound to this worker's connection.
*/
private _queue?: QueueImpl

/**
* The queue connection name of this worker. It is resolved from the
* worker's `@Worker({ connection })` metadata, falling back to the default
* connection (`queue.default`) when the worker is not annotated.
*/
public get connection() {
const meta = Annotation.getMeta(this.constructor)

return meta?.connection ?? Config.get('queue.default')
}

/**
* A queue instance already bound to this worker's connection. Use it to
* enqueue or inspect jobs without calling `Queue.connection(...)` on every
* operation.
*
* @example
* ```ts
* await this.queue.add({ email: 'lenon@athenna.io' })
* ```
*/
public get queue() {
if (!this._queue) {
this._queue = Queue.connection(this.connection)
}

return this._queue
}
}
25 changes: 24 additions & 1 deletion src/worker/WorkerTaskBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,36 @@ export class WorkerTaskBuilder {
return
}

const n = this.worker.concurrency ?? 1
const n = this.resolveConcurrency()

for (let i = 0; i < n; i++) {
this.spawn()
}
}

/**
* Resolve how many worker loops to spawn. An explicit `concurrency` set on
* the worker (e.g. via `@Worker({ concurrency })`) wins, then the worker
* `options.workerConcurrency`, then the connection's `workerConcurrency`
* config. When none is a positive number it falls back to a single serial
* loop.
*/
private resolveConcurrency(): number {
const explicit =
this.worker.concurrency ?? this.worker.options?.workerConcurrency

if (Is.Number(explicit) && explicit > 0) {
return explicit
}

const configured = Config.get(
`queue.connections.${this.worker.connection}.workerConcurrency`,
0
)

return configured > 0 ? configured : 1
}

/**
* Use spawn to force a worker instance to run.
*/
Expand Down
4 changes: 2 additions & 2 deletions templates/worker.edge
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Worker, type Context } from '@athenna/queue'
import { Worker, BaseWorker, type Context } from '@athenna/queue'

@Worker()
export class {{ namePascal }} {
export class {{ namePascal }} extends BaseWorker {
public async handle(ctx: Context) {
//
}
Expand Down
7 changes: 7 additions & 0 deletions tests/fixtures/config/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ export default {
workerTimeoutMs: 200
},

memoryConcurrent: {
driver: 'memory',
queue: 'default',
deadletter: 'deadletter',
workerConcurrency: 3
},

aws_sqs: {
driver: 'aws_sqs',
type: 'standard',
Expand Down
82 changes: 82 additions & 0 deletions tests/unit/worker/BaseWorkerTest.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/**
* @athenna/queue
*
* (c) João Lenon <lenon@athenna.io>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

import { Path } from '@athenna/common'
import { Worker, BaseWorker, QueueImpl } from '#src'
import { LoggerProvider } from '@athenna/logger'
import { QueueProvider } from '#src/providers/QueueProvider'
import { Test, BeforeEach, AfterEach, type Context } from '@athenna/test'

@Worker({ connection: 'fake' })
class FakeConnectionWorker extends BaseWorker {
public async handle() {}
}

@Worker({ connection: 'memory' })
class MemoryConnectionWorker extends BaseWorker {
public async handle() {}
}

@Worker()
class DefaultConnectionWorker extends BaseWorker {
public async handle() {}
}

export class BaseWorkerTest {
@BeforeEach()
public async beforeEach() {
await Config.loadAll(Path.fixtures('config'))

new LoggerProvider().register()
new QueueProvider().register()
}

@AfterEach()
public async afterEach() {
await new QueueProvider().shutdown()

ioc.reconstruct()
Config.clear()
}

@Test()
public async shouldResolveTheConnectionFromTheWorkerMetadata({ assert }: Context) {
const worker = new FakeConnectionWorker()

assert.equal(worker.connection, 'fake')
assert.equal(worker.queue.connectionName, 'fake')
}

@Test()
public async shouldFallBackToTheDefaultConnectionWhenNotAnnotatedWithOne({ assert }: Context) {
const worker = new DefaultConnectionWorker()

assert.equal(worker.connection, Config.get('queue.default'))
assert.equal(worker.queue.connectionName, Config.get('queue.default'))
}

@Test()
public async shouldExposeAReadyToUseQueueInstanceBoundToTheWorkerConnection({ assert }: Context) {
const worker = new MemoryConnectionWorker()

assert.instanceOf(worker.queue, QueueImpl)
assert.equal(worker.queue.connectionName, 'memory')
assert.isTrue(worker.queue.isConnected())
}

@Test()
public async shouldCacheTheQueueInstanceBetweenAccesses({ assert }: Context) {
const worker = new FakeConnectionWorker()

const first = worker.queue
const second = worker.queue

assert.isTrue(first === second)
}
}
62 changes: 62 additions & 0 deletions tests/unit/worker/WorkerImplTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,68 @@ export class WorkerImplTest {
assert.deepEqual(task?.worker.connection, 'fake')
}

@Test()
public async shouldSpawnASingleLoopWhenNoConcurrencyIsConfigured({ assert }: Context) {
const builder = Queue.worker()
.task()
.name('default_concurrency')
.connection('memory')
.handler(() => {})

builder.start()

assert.lengthOf((builder as any).timers, 1)

builder.stop()
}

@Test()
public async shouldSpawnConcurrentLoopsBasedOnConnectionWorkerConcurrencyConfig({ assert }: Context) {
const builder = Queue.worker()
.task()
.name('config_concurrency')
.connection('memoryConcurrent')
.handler(() => {})

builder.start()

assert.lengthOf((builder as any).timers, 3)

builder.stop()
}

@Test()
public async shouldSpawnConcurrentLoopsFromWorkerOptionsWorkerConcurrency({ assert }: Context) {
const builder = Queue.worker()
.task()
.name('options_concurrency')
.connection('memory')
.options({ workerConcurrency: 4 })
.handler(() => {})

builder.start()

assert.lengthOf((builder as any).timers, 4)

builder.stop()
}

@Test()
public async shouldLetExplicitConcurrencyOverrideTheConnectionWorkerConcurrencyConfig({ assert }: Context) {
const builder = Queue.worker()
.task()
.name('explicit_concurrency')
.connection('memoryConcurrent')
.concurrency(5)
.handler(() => {})

builder.start()

assert.lengthOf((builder as any).timers, 5)

builder.stop()
}

@Test()
public async shouldBeAbleToCreateAWorkerTaskWithCustomOptions({ assert }: Context) {
Queue.worker()
Expand Down
Loading