From 787c121dde57b9c06be3ff32998d894e4bcae4b0 Mon Sep 17 00:00:00 2001 From: jlenon7 Date: Fri, 24 Apr 2026 17:09:01 -0300 Subject: [PATCH] fix: also open context when worker fails --- package-lock.json | 4 +- package.json | 2 +- src/drivers/AwsSqsDriver.ts | 52 ++++++----- src/drivers/DatabaseDriver.ts | 68 +++++++------- src/drivers/Driver.ts | 25 ++++++ src/drivers/FakeDriver.ts | 15 ++++ src/drivers/MemoryDriver.ts | 50 ++++++----- src/queue/QueueImpl.ts | 3 +- src/worker/WorkerTaskBuilder.ts | 118 ++++++++++++++++--------- tests/unit/kernels/WorkerKernelTest.ts | 48 +++++++++- 10 files changed, 259 insertions(+), 126 deletions(-) diff --git a/package-lock.json b/package-lock.json index 3e66a08..4f8b864 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@athenna/queue", - "version": "5.28.0", + "version": "5.29.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@athenna/queue", - "version": "5.28.0", + "version": "5.29.0", "license": "MIT", "dependencies": { "@aws-sdk/client-sqs": "^3.1019.0" diff --git a/package.json b/package.json index 48bef70..c0f0a91 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@athenna/queue", - "version": "5.28.0", + "version": "5.29.0", "description": "The Athenna queue handler.", "license": "MIT", "author": "João Lenon ", diff --git a/src/drivers/AwsSqsDriver.ts b/src/drivers/AwsSqsDriver.ts index 9fc1313..28f595f 100644 --- a/src/drivers/AwsSqsDriver.ts +++ b/src/drivers/AwsSqsDriver.ts @@ -413,32 +413,36 @@ export class AwsSqsDriver extends Driver { heartbeatTimeout = undefined } - try { - startHeartbeat() - - await processor({ - id: job.id, - attempts: job.attempts, - data: job.data - }) - - stopHeartbeat() + const workerJob = { + id: job.id, + attempts: job.attempts, + data: job.data + } - if (!AwsSqsDriver.ackedIds.has(job.id)) { - await this.changeJobVisibility( - job.id, - this.msToS(this.noAckDelayMs + requeueJitterMs) - ) + await this.runScopedQueueProcessor(processor, workerJob, async () => { + try { + startHeartbeat() + + await processor(workerJob) + + stopHeartbeat() + + if (!AwsSqsDriver.ackedIds.has(job.id)) { + await this.changeJobVisibility( + job.id, + this.msToS(this.noAckDelayMs + requeueJitterMs) + ) + } + } catch (error) { + await new AwsSqsDriverExceptionHandler().handle({ + job, + error, + driver: this, + stopHeartbeat, + requeueJitterMs + }) } - } catch (error) { - await new AwsSqsDriverExceptionHandler().handle({ - job, - error, - driver: this, - stopHeartbeat, - requeueJitterMs - }) - } + }) } /** diff --git a/src/drivers/DatabaseDriver.ts b/src/drivers/DatabaseDriver.ts index 1956a0e..306504e 100644 --- a/src/drivers/DatabaseDriver.ts +++ b/src/drivers/DatabaseDriver.ts @@ -326,38 +326,42 @@ export class DatabaseDriver extends Driver { DatabaseDriver.ackedIds.delete(job.id) - try { - await processor({ - id: job.id, - attempts: job.attempts, - data: job.data - }) - - /** - * If the job still exists after processing, it means that the job was - * not processed for some reason, so we need to make it available again - * after a delay. - */ - if (!DatabaseDriver.ackedIds.has(job.id)) { - job.reservedUntil = null - job.availableAt = Date.now() + this.noAckDelayMs + requeueJitterMs - - await this.client - .table(this.table) - .where('queue', this.queueName) - .where('id', job.id) - .update({ - availableAt: job.availableAt, - reservedUntil: job.reservedUntil - }) - } - } catch (error) { - await new DatabaseDriverExceptionHandler().handle({ - job, - error, - driver: this, - requeueJitterMs - }) + const workerJob = { + id: job.id, + attempts: job.attempts, + data: job.data } + + await this.runScopedQueueProcessor(processor, workerJob, async () => { + try { + await processor(workerJob) + + /** + * If the job still exists after processing, it means that the job was + * not processed for some reason, so we need to make it available again + * after a delay. + */ + if (!DatabaseDriver.ackedIds.has(job.id)) { + job.reservedUntil = null + job.availableAt = Date.now() + this.noAckDelayMs + requeueJitterMs + + await this.client + .table(this.table) + .where('queue', this.queueName) + .where('id', job.id) + .update({ + availableAt: job.availableAt, + reservedUntil: job.reservedUntil + }) + } + } catch (error) { + await new DatabaseDriverExceptionHandler().handle({ + job, + error, + driver: this, + requeueJitterMs + }) + } + }) } } diff --git a/src/drivers/Driver.ts b/src/drivers/Driver.ts index 654214d..8bbb802 100644 --- a/src/drivers/Driver.ts +++ b/src/drivers/Driver.ts @@ -11,6 +11,17 @@ import { Utils } from '#src/utils' import { Config } from '@athenna/config' import type { ConnectionOptions } from '#src/types' +export const RUN_WITH_WORKER_CONTEXT = Symbol.for( + '@athenna/queue.runWithWorkerContext' +) + +export type ScopedQueueProcessor = ((data: T) => any | Promise) & { + [RUN_WITH_WORKER_CONTEXT]?: ( + data: T, + callback: () => any | Promise + ) => any | Promise +} + export abstract class Driver { /** * Set if this instance is connected. @@ -164,6 +175,20 @@ export abstract class Driver { return random } + protected runScopedQueueProcessor( + processor: ScopedQueueProcessor, + data: T, + callback: () => any | Promise + ) { + const runner = processor[RUN_WITH_WORKER_CONTEXT] + + if (runner) { + return runner(data, callback) + } + + return callback() + } + /** * Connect to client. */ diff --git a/src/drivers/FakeDriver.ts b/src/drivers/FakeDriver.ts index b41460f..027365c 100644 --- a/src/drivers/FakeDriver.ts +++ b/src/drivers/FakeDriver.ts @@ -11,6 +11,7 @@ import { Log } from '@athenna/logger' import { Json, Options } from '@athenna/common' import type { ConnectionOptions } from '#src/types' import { ConnectionFactory } from '#src/factories/ConnectionFactory' +import { RUN_WITH_WORKER_CONTEXT, type ScopedQueueProcessor } from '#src/drivers/Driver' export class FakeDriver { public constructor(connection?: string, client?: any) { @@ -224,6 +225,20 @@ export class FakeDriver { return 0 } + protected runScopedQueueProcessor( + processor: ScopedQueueProcessor, + data: T, + callback: () => any | Promise + ) { + const runner = processor[RUN_WITH_WORKER_CONTEXT] + + if (runner) { + return runner(data, callback) + } + + return callback() + } + /** * Process the next job of the queue with a handler. * diff --git a/src/drivers/MemoryDriver.ts b/src/drivers/MemoryDriver.ts index b4b7921..9678263 100644 --- a/src/drivers/MemoryDriver.ts +++ b/src/drivers/MemoryDriver.ts @@ -264,29 +264,33 @@ export class MemoryDriver extends Driver { job.attempts-- job.reservedUntil = Date.now() + this.visibilityTimeout - try { - await processor({ - id: job.id, - attempts: job.attempts, - data: job.data - }) - - /** - * If the job still exists after processing, it means that the job was - * not processed for some reason, so we need to make it available again - * after a delay. - */ - if (!MemoryDriver.ackedIds.has(job.id)) { - job.reservedUntil = null - job.availableAt = Date.now() + this.noAckDelayMs + requeueJitterMs - } - } catch (error) { - await new MemoryDriverExceptionHandler().handle({ - job, - error, - driver: this, - requeueJitterMs - }) + const workerJob = { + id: job.id, + attempts: job.attempts, + data: job.data } + + await this.runScopedQueueProcessor(processor, workerJob, async () => { + try { + await processor(workerJob) + + /** + * If the job still exists after processing, it means that the job was + * not processed for some reason, so we need to make it available again + * after a delay. + */ + if (!MemoryDriver.ackedIds.has(job.id)) { + job.reservedUntil = null + job.availableAt = Date.now() + this.noAckDelayMs + requeueJitterMs + } + } catch (error) { + await new MemoryDriverExceptionHandler().handle({ + job, + error, + driver: this, + requeueJitterMs + }) + } + }) } } diff --git a/src/queue/QueueImpl.ts b/src/queue/QueueImpl.ts index 638bab6..3a233cb 100644 --- a/src/queue/QueueImpl.ts +++ b/src/queue/QueueImpl.ts @@ -13,11 +13,10 @@ import type { Job, ConnectionOptions } from '#src/types' import type { FakeDriver } from '#src/drivers/FakeDriver' import type { AwsSqsDriver } from '#src/drivers/AwsSqsDriver' import type { MemoryDriver } from '#src/drivers/MemoryDriver' -import type { Driver as DriverImpl } from '#src/drivers/Driver' import type { DatabaseDriver } from '#src/drivers/DatabaseDriver' import { ConnectionFactory } from '#src/factories/ConnectionFactory' -export class QueueImpl extends Macroable { +export class QueueImpl extends Macroable { /** * The connection name used for this instance. */ diff --git a/src/worker/WorkerTaskBuilder.ts b/src/worker/WorkerTaskBuilder.ts index e544090..161d868 100644 --- a/src/worker/WorkerTaskBuilder.ts +++ b/src/worker/WorkerTaskBuilder.ts @@ -12,6 +12,7 @@ import { Queue } from '#src/facades/Queue' import { WorkerImpl } from '#src/worker/WorkerImpl' import { Is, Module, Parser } from '@athenna/common' import type { Context, ConnectionOptions } from '#src/types' +import { RUN_WITH_WORKER_CONTEXT } from '#src/drivers/Driver' import type { WorkerHandler } from '#src/types/WorkerHandler' import { WorkerTimeoutException } from '#src/exceptions/WorkerTimeoutException' @@ -51,6 +52,7 @@ export class WorkerTaskBuilder { } = {} private timers: NodeJS.Timeout[] = [] + private rawHandler?: WorkerHandler public constructor() { this.worker.connection = Config.get('queue.default') @@ -98,39 +100,10 @@ export class WorkerTaskBuilder { * ``` */ public handler(handler: WorkerHandler) { - const logIfEnabled = (ctx: any) => { - if (WorkerImpl.loggerIsSet) { - const channel = Config.get('worker.logger.channel', 'worker') - const isToLogRequest = Config.get('worker.logger.isToLogRequest') - - if (!isToLogRequest) { - return Log.channelOrVanilla(channel).info(ctx) - } - - if (isToLogRequest(ctx)) { - return Log.channelOrVanilla(channel).info(ctx) - } - } - } + this.rawHandler = handler this.worker.handler = async ctx => { - const execute = async () => { - ctx.traceId = WorkerImpl.rTracerPlugin - ? WorkerImpl.rTracerPlugin.id() - : ctx.traceId ?? null - - return this.runWithOtelContext(ctx, async () => { - await handler(ctx) - - logIfEnabled(ctx) - }) - } - - if (WorkerImpl.rTracerPlugin) { - return WorkerImpl.rTracerPlugin.runWithId(execute) - } - - return execute() + return this.executeInWorkerContext(ctx, () => this.executeHandler(ctx)) } const task = WorkerImpl.tasks.find( @@ -196,17 +169,9 @@ export class WorkerTaskBuilder { options: this.worker.options }) - await queue.process(job => { - const ctx = { - name: this.worker.name, - traceId: null, - connection: this.worker.connection, - options: this.worker.options, - job - } + const processor = this.createScopedProcessor() - return this.worker.handler(ctx) - }) + await queue.process(processor) } /** @@ -354,6 +319,77 @@ export class WorkerTaskBuilder { return Math.floor(Math.random() * (max + 1)) } + private createScopedProcessor(): (data: unknown) => any | Promise { + let currentCtx: Context | null = null + + const processor: (data: unknown) => any | Promise = async job => { + const ctx = currentCtx || this.createContext(job as Context['job']) + + return this.executeHandler(ctx) + } + + processor[RUN_WITH_WORKER_CONTEXT] = async (job, callback) => { + const ctx = this.createContext(job) + + currentCtx = ctx + + try { + return await this.executeInWorkerContext(ctx, callback) + } finally { + currentCtx = null + } + } + + return processor + } + + private createContext(job: Context['job']): Context { + return { + name: this.worker.name, + traceId: null, + connection: this.worker.connection, + options: this.worker.options, + job + } + } + + private async executeHandler(ctx: Context) { + await this.rawHandler(ctx) + + this.logIfEnabled(ctx) + } + + private logIfEnabled(ctx: any) { + if (WorkerImpl.loggerIsSet) { + const channel = Config.get('worker.logger.channel', 'worker') + const isToLogRequest = Config.get('worker.logger.isToLogRequest') + + if (!isToLogRequest) { + return Log.channelOrVanilla(channel).info(ctx) + } + + if (isToLogRequest(ctx)) { + return Log.channelOrVanilla(channel).info(ctx) + } + } + } + + private executeInWorkerContext(ctx: Context, callback: () => T): T { + const execute = () => { + ctx.traceId = WorkerImpl.rTracerPlugin + ? WorkerImpl.rTracerPlugin.id() + : ctx.traceId ?? null + + return this.runWithOtelContext(ctx, callback) + } + + if (WorkerImpl.rTracerPlugin) { + return WorkerImpl.rTracerPlugin.runWithId(execute) + } + + return execute() + } + private runWithOtelContext(ctx: any, callback: () => T): T { if (!Config.is('worker.otel.contextEnabled', true) || !otelModule) { return callback() diff --git a/tests/unit/kernels/WorkerKernelTest.ts b/tests/unit/kernels/WorkerKernelTest.ts index 3e38bf6..993b448 100644 --- a/tests/unit/kernels/WorkerKernelTest.ts +++ b/tests/unit/kernels/WorkerKernelTest.ts @@ -11,7 +11,7 @@ import { Queue } from '#src/facades/Queue' import { Worker } from '#src/facades/Worker' import { OtelProvider } from '@athenna/otel' import { Path, Sleep } from '@athenna/common' -import { LoggerProvider } from '@athenna/logger' +import { Log, LoggerProvider } from '@athenna/logger' import { WorkerImpl } from '#src/worker/WorkerImpl' import { WorkerKernel } from '#src/kernels/WorkerKernel' import { constants } from '#tests/fixtures/constants/index' @@ -143,6 +143,52 @@ export class WorkerKernelTest { assert.equal(values.connection, 'memory') } + @Test() + @Cleanup(() => { + Config.set('worker.otel.contextEnabled', false) + }) + @Cleanup(() => { + Config.set('worker.otel.contextBindings', []) + }) + @Cleanup(() => { + Config.set('worker.logger.prettifyException', true) + }) + public async shouldKeepOtelContextActiveDuringWorkerExceptionLogging({ assert }: Context) { + const workerNameKey = createContextKey('worker.exception.name') + const workerConnectionKey = createContextKey('worker.exception.connection') + const values = { + name: null, + connection: null + } + + Config.set('worker.otel.contextEnabled', true) + Config.set('worker.otel.contextBindings', [ + { key: workerNameKey, resolve: ctx => ctx.name }, + { key: workerConnectionKey, resolve: ctx => ctx.connection } + ]) + Config.set('worker.logger.prettifyException', false) + + Log.when('channelOrVanilla').return({ + error: () => { + values.name = context.active().getValue(workerNameKey) as any + values.connection = context.active().getValue(workerConnectionKey) as any + } + }) + + Worker.task() + .name('otel_worker_exception') + .connection('memory') + .handler(async () => { + throw new Error('testing') + }) + + await Queue.add({ test: 1 }) + await Queue.worker().runByName('otel_worker_exception') + + assert.equal(values.name, 'otel_worker_exception') + assert.equal(values.connection, 'memory') + } + @Test() public async shouldBeAbleToRegisterWorkersOfTheRcFileWithAndWithoutAnnotations({ assert }: Context) { const kernel = new WorkerKernel()