Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export default class ForestadminClientActivityLogPort implements ActivityLogPort

return { id: response.id, index: response.attributes.index };
} catch (cause) {
this.logger.error('Activity log creation failed', {
this.logger.error('Failed to create activity log', {
action: args.action,
collectionId: args.collectionId,
status: (cause as { status?: number }).status,
Expand All @@ -63,10 +63,10 @@ export default class ForestadminClientActivityLogPort implements ActivityLogPort
activityLog: { id: handle.id, attributes: { index: handle.index } },
status: 'completed',
}),
{ logger: this.logger },
{ logger: this.logger, extraRetryStatuses: [404] },
);
} catch (err) {
this.logger.error('Activity log markSucceeded failed after retries', {
this.logger.error('Failed to mark activity log as succeeded', {
handleId: handle.id,
error: extractErrorMessage(err),
});
Expand All @@ -85,10 +85,10 @@ export default class ForestadminClientActivityLogPort implements ActivityLogPort
activityLog: { id: handle.id, attributes: { index: handle.index } },
status: 'failed',
}),
{ logger: this.logger },
{ logger: this.logger, extraRetryStatuses: [404] },
);
} catch (err) {
this.logger.error('Activity log markFailed failed after retries', {
this.logger.error('Failed to mark activity log as failed', {
handleId: handle.id,
stepErrorMessage: errorMessage,
error: extractErrorMessage(err),
Expand Down
12 changes: 7 additions & 5 deletions packages/workflow-executor/src/adapters/with-retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@ function sleep(ms: number): Promise<void> {
});
}

function isRetryable(err: unknown): boolean {
function isRetryable(err: unknown, extra: number[]): boolean {
const { status } = err as { status?: number };
if (typeof status !== 'number') return false;

return typeof status === 'number' && RETRYABLE_STATUS.has(status);
return RETRYABLE_STATUS.has(status) || extra.includes(status);
}

export default async function withRetry<T>(
label: string,
fn: () => Promise<T>,
{ logger }: { logger: Logger },
{ logger, extraRetryStatuses = [] }: { logger: Logger; extraRetryStatuses?: number[] },
): Promise<T> {
let lastError: unknown;

Expand All @@ -30,9 +31,10 @@ export default async function withRetry<T>(
return await fn();
} catch (err) {
lastError = err;
if (!isRetryable(err) || attempt === RETRY_DELAYS_MS.length) throw err;
logger.info(`"${label}" failed, retrying`, {
if (!isRetryable(err, extraRetryStatuses) || attempt === RETRY_DELAYS_MS.length) throw err;
logger.warn(`"${label}" failed, retrying`, {
attempt: attempt + 1,
status: (err as { status?: number }).status,
error: extractErrorMessage(err),
});
// eslint-disable-next-line no-await-in-loop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ describe('ForestadminClientActivityLogPort', () => {

expect(handle).toEqual({ id: 'log-2', index: '1' });
expect(service.createActivityLog).toHaveBeenCalledTimes(2);
expect(logger.info).toHaveBeenCalledWith(
expect(logger.warn).toHaveBeenCalledWith(
expect.stringContaining('createPending'),
expect.objectContaining({ attempt: 1 }),
);
Expand All @@ -97,6 +97,17 @@ describe('ForestadminClientActivityLogPort', () => {
expect(service.createActivityLog).toHaveBeenCalledTimes(4);
});

it('throws immediately on 404 — not retriable for createPending (unlike markSucceeded/markFailed)', async () => {
const service = makeService();
service.createActivityLog.mockRejectedValueOnce(makeHttpError(404));
const port = makePort(service);

await expect(
port.createPending({ renderingId: 5, action: 'update', type: 'write' }),
).rejects.toBeInstanceOf(ActivityLogCreationError);
expect(service.createActivityLog).toHaveBeenCalledTimes(1);
});

it('does not retry on 401 (not a transient error)', async () => {
const service = makeService();
service.createActivityLog.mockRejectedValue(makeHttpError(401));
Expand Down Expand Up @@ -183,10 +194,23 @@ describe('ForestadminClientActivityLogPort', () => {
await jest.advanceTimersByTimeAsync(2_600);
await expect(promise).resolves.toBeUndefined();
expect(logger.error).toHaveBeenCalledWith(
expect.stringContaining('markSucceeded failed'),
'Failed to mark activity log as succeeded',
expect.objectContaining({ handleId: 'log-1' }),
);
});

it('retries on 404 — eventual consistency: record may not be visible yet on the read path', async () => {
const service = makeService();
service.updateActivityLogStatus
.mockRejectedValueOnce(makeHttpError(404))
.mockResolvedValueOnce(undefined);
const port = makePort(service);

const promise = port.markSucceeded({ id: 'log-1', index: '0' });
await jest.advanceTimersByTimeAsync(100);
await expect(promise).resolves.toBeUndefined();
expect(service.updateActivityLogStatus).toHaveBeenCalledTimes(2);
});
});

describe('markFailed', () => {
Expand Down Expand Up @@ -222,13 +246,26 @@ describe('ForestadminClientActivityLogPort', () => {
await jest.advanceTimersByTimeAsync(2_600);
await expect(promise).resolves.toBeUndefined();
expect(logger.error).toHaveBeenCalledWith(
expect.stringContaining('markFailed failed'),
'Failed to mark activity log as failed',
expect.objectContaining({
handleId: 'log-1',
stepErrorMessage: 'step-error-msg',
}),
);
});

it('retries on 404 — eventual consistency: record may not be visible yet on the read path', async () => {
const service = makeService();
service.updateActivityLogStatus
.mockRejectedValueOnce(makeHttpError(404))
.mockResolvedValueOnce(undefined);
const port = makePort(service);

const promise = port.markFailed({ id: 'log-1', index: '0' }, 'boom');
await jest.advanceTimersByTimeAsync(100);
await expect(promise).resolves.toBeUndefined();
expect(service.updateActivityLogStatus).toHaveBeenCalledTimes(2);
});
});

describe('drainer integration', () => {
Expand Down
30 changes: 27 additions & 3 deletions packages/workflow-executor/test/adapters/with-retry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ describe('withRetry', () => {

expect(result).toBe('ok');
expect(fn).toHaveBeenCalledTimes(1);
expect(logger.info).not.toHaveBeenCalled();
expect(logger.warn).not.toHaveBeenCalled();
});

it('retries on retryable HTTP status codes (503)', async () => {
Expand All @@ -44,7 +44,7 @@ describe('withRetry', () => {

await expect(promise).resolves.toBe('ok');
expect(fn).toHaveBeenCalledTimes(2);
expect(logger.info).toHaveBeenCalledWith(
expect(logger.warn).toHaveBeenCalledWith(
'"test" failed, retrying',
expect.objectContaining({ attempt: 1 }),
);
Expand Down Expand Up @@ -114,13 +114,37 @@ describe('withRetry', () => {
expect(fn).toHaveBeenCalledTimes(4);
});

it('retries on status 404 when extraRetryStatuses includes 404', async () => {
const logger = makeLogger();
const fn = jest.fn().mockRejectedValueOnce(makeHttpError(404)).mockResolvedValueOnce('ok');

const promise = withRetry('test', fn, { logger, extraRetryStatuses: [404] });
await jest.advanceTimersByTimeAsync(100);

await expect(promise).resolves.toBe('ok');
expect(fn).toHaveBeenCalledTimes(2);
expect(logger.warn).toHaveBeenCalledWith(
'"test" failed, retrying',
expect.objectContaining({ attempt: 1, status: 404 }),
);
});

it('throws immediately on 404 without extraRetryStatuses', async () => {
const logger = makeLogger();
const fn = jest.fn().mockRejectedValue(makeHttpError(404));

await expect(withRetry('test', fn, { logger })).rejects.toMatchObject({ status: 404 });
expect(fn).toHaveBeenCalledTimes(1);
expect(logger.warn).not.toHaveBeenCalled();
});

it('throws immediately on non-retryable errors (4xx)', async () => {
const logger = makeLogger();
const fn = jest.fn().mockRejectedValue(makeHttpError(400));

await expect(withRetry('test', fn, { logger })).rejects.toMatchObject({ status: 400 });
expect(fn).toHaveBeenCalledTimes(1);
expect(logger.info).not.toHaveBeenCalled();
expect(logger.warn).not.toHaveBeenCalled();
});

it('throws immediately on errors with no status', async () => {
Expand Down
Loading