diff --git a/infrastructure/terraform/components/callbacks/README.md b/infrastructure/terraform/components/callbacks/README.md
index 9889ab22..c2060977 100644
--- a/infrastructure/terraform/components/callbacks/README.md
+++ b/infrastructure/terraform/components/callbacks/README.md
@@ -17,6 +17,8 @@
|------|-------------|------|---------|:--------:|
| [applications\_map\_parameter\_name](#input\_applications\_map\_parameter\_name) | SSM Parameter Store path for the clientId-to-applicationData map, where applicationData is currently only the applicationId | `string` | `null` | no |
| [aws\_account\_id](#input\_aws\_account\_id) | The AWS Account ID (numeric) | `string` | n/a | yes |
+| [cb\_cooldown\_period\_ms](#input\_cb\_cooldown\_period\_ms) | Full block duration after circuit opens, before half-open probes begin (ms) | `number` | `120000` | no |
+| [cb\_recovery\_period\_ms](#input\_cb\_recovery\_period\_ms) | Linear ramp-up duration after circuit closes (ms) | `number` | `600000` | no |
| [component](#input\_component) | The variable encapsulating the name of this component | `string` | `"callbacks"` | no |
| [default\_tags](#input\_default\_tags) | A map of default tags to apply to all taggable resources within the component | `map(string)` | `{}` | no |
| [deploy\_mock\_clients](#input\_deploy\_mock\_clients) | Flag to deploy mock webhook lambda for integration testing (test/dev environments only) | `bool` | `false` | no |
@@ -40,7 +42,7 @@
| [parent\_acct\_environment](#input\_parent\_acct\_environment) | Name of the environment responsible for the acct resources used, affects things like DNS zone. Useful for named dev environments | `string` | `"main"` | no |
| [pipe\_event\_patterns](#input\_pipe\_event\_patterns) | value | `list(string)` | `[]` | no |
| [pipe\_log\_level](#input\_pipe\_log\_level) | Log level for the EventBridge Pipe. | `string` | `"ERROR"` | no |
-| [pipe\_sqs\_input\_batch\_size](#input\_pipe\_sqs\_input\_batch\_size) | n/a | `number` | `1` | no |
+| [pipe\_sqs\_input\_batch\_size](#input\_pipe\_sqs\_input\_batch\_size) | n/a | `number` | `10` | no |
| [pipe\_sqs\_max\_batch\_window](#input\_pipe\_sqs\_max\_batch\_window) | n/a | `number` | `2` | no |
| [project](#input\_project) | The name of the tfscaffold project | `string` | n/a | yes |
| [region](#input\_region) | The AWS Region | `string` | n/a | yes |
diff --git a/infrastructure/terraform/components/callbacks/module_client_delivery.tf b/infrastructure/terraform/components/callbacks/module_client_delivery.tf
index cce31bd5..e2d0f55a 100644
--- a/infrastructure/terraform/components/callbacks/module_client_delivery.tf
+++ b/infrastructure/terraform/components/callbacks/module_client_delivery.tf
@@ -41,6 +41,8 @@ module "client_delivery" {
mtls_ca_s3_key = local.mtls_ca_s3_key # gitleaks:allow
token_bucket_burst_capacity = var.token_bucket_burst_capacity
+ cb_cooldown_period_ms = var.cb_cooldown_period_ms
+ cb_recovery_period_ms = var.cb_recovery_period_ms
vpc_subnet_ids = try(local.acct.private_subnets[local.bc_name], [])
lambda_security_group_id = aws_security_group.https_client_lambda.id
diff --git a/infrastructure/terraform/components/callbacks/module_perf_runner_lambda.tf b/infrastructure/terraform/components/callbacks/module_perf_runner_lambda.tf
index 7a77c40c..a7bf92db 100644
--- a/infrastructure/terraform/components/callbacks/module_perf_runner_lambda.tf
+++ b/infrastructure/terraform/components/callbacks/module_perf_runner_lambda.tf
@@ -93,7 +93,23 @@ data "aws_iam_policy_document" "perf_runner_lambda" {
resources = [
module.sqs_inbound_event.sqs_queue_arn,
- "${module.sqs_inbound_event.sqs_queue_arn}-dlq",
+ module.sqs_inbound_event.sqs_dlq_arn,
+ "arn:aws:sqs:${var.region}:${var.aws_account_id}:${local.csi}-*-delivery-queue",
+ "arn:aws:sqs:${var.region}:${var.aws_account_id}:${local.csi}-*-delivery-dlq-queue",
+ ]
+ }
+
+ statement {
+ sid = "SQSGetQueueAttributes"
+ effect = "Allow"
+
+ actions = [
+ "sqs:GetQueueAttributes",
+ ]
+
+ resources = [
+ module.sqs_inbound_event.sqs_queue_arn,
+ module.sqs_inbound_event.sqs_dlq_arn,
"arn:aws:sqs:${var.region}:${var.aws_account_id}:${local.csi}-*-delivery-queue",
"arn:aws:sqs:${var.region}:${var.aws_account_id}:${local.csi}-*-delivery-dlq-queue",
]
diff --git a/infrastructure/terraform/components/callbacks/pre.sh b/infrastructure/terraform/components/callbacks/pre.sh
index 39eb0817..8ae910e8 100755
--- a/infrastructure/terraform/components/callbacks/pre.sh
+++ b/infrastructure/terraform/components/callbacks/pre.sh
@@ -12,10 +12,10 @@ deploy_perf_runner="false"
for _tfvar_file in \
"${base_path}/etc/group_${group}.tfvars" \
"${base_path}/etc/env_${region}_${environment}.tfvars"; do
- if [[ -f "${_tfvar_file}" ]]; then
- _val=$(grep -E '^\s*deploy_mock_clients\s*=' "${_tfvar_file}" | tail -1 | sed 's/.*=\s*//;s/\s*$//')
+ if [ -f "${_tfvar_file}" ]; then
+ _val=$(grep -E '^\s*deploy_mock_clients\s*=' "${_tfvar_file}" | tail -1 | sed 's/.*=\s*//;s/\s*$//;s/^"//;s/"$//')
[ -n "${_val}" ] && deploy_mock_clients="${_val}"
- _val=$(grep -E '^\s*deploy_perf_runner\s*=' "${_tfvar_file}" | tail -1 | sed 's/.*=\s*//;s/\s*$//')
+ _val=$(grep -E '^\s*deploy_perf_runner\s*=' "${_tfvar_file}" | tail -1 | sed 's/.*=\s*//;s/\s*$//;s/^"//;s/"$//')
[ -n "${_val}" ] && deploy_perf_runner="${_val}"
fi
done
diff --git a/infrastructure/terraform/components/callbacks/variables.tf b/infrastructure/terraform/components/callbacks/variables.tf
index 68e4eafd..7d8bb9ed 100644
--- a/infrastructure/terraform/components/callbacks/variables.tf
+++ b/infrastructure/terraform/components/callbacks/variables.tf
@@ -102,7 +102,7 @@ variable "pipe_log_level" {
variable "pipe_sqs_input_batch_size" {
type = number
- default = 1
+ default = 10
}
variable "pipe_sqs_max_batch_window" {
@@ -213,3 +213,15 @@ variable "token_bucket_burst_capacity" {
description = "Token bucket burst capacity used by the rate limiter"
default = 2250
}
+
+variable "cb_cooldown_period_ms" {
+ type = number
+ description = "Full block duration after circuit opens, before half-open probes begin (ms)"
+ default = 120000
+}
+
+variable "cb_recovery_period_ms" {
+ type = number
+ description = "Linear ramp-up duration after circuit closes (ms)"
+ default = 600000
+}
diff --git a/infrastructure/terraform/modules/client-delivery/README.md b/infrastructure/terraform/modules/client-delivery/README.md
index 22b98e26..c2920f71 100644
--- a/infrastructure/terraform/modules/client-delivery/README.md
+++ b/infrastructure/terraform/modules/client-delivery/README.md
@@ -11,6 +11,8 @@ No requirements.
|------|-------------|------|---------|:--------:|
| [applications\_map\_parameter\_name](#input\_applications\_map\_parameter\_name) | SSM Parameter Store path for the clientId-to-applicationData map | `string` | n/a | yes |
| [aws\_account\_id](#input\_aws\_account\_id) | Account ID | `string` | n/a | yes |
+| [cb\_cooldown\_period\_ms](#input\_cb\_cooldown\_period\_ms) | Full block duration after circuit opens, before half-open probes begin (ms) | `number` | `120000` | no |
+| [cb\_recovery\_period\_ms](#input\_cb\_recovery\_period\_ms) | Linear ramp-up duration after circuit closes (ms) | `number` | `600000` | no |
| [client\_bus\_name](#input\_client\_bus\_name) | EventBridge bus name for subscription rules | `string` | n/a | yes |
| [client\_config\_bucket](#input\_client\_config\_bucket) | S3 bucket name containing client subscription configuration | `string` | n/a | yes |
| [client\_config\_bucket\_arn](#input\_client\_config\_bucket\_arn) | S3 bucket ARN containing client subscription configuration | `string` | n/a | yes |
@@ -24,7 +26,8 @@ No requirements.
| [force\_lambda\_code\_deploy](#input\_force\_lambda\_code\_deploy) | Force Lambda code redeployment even when commit tag matches | `bool` | `false` | no |
| [group](#input\_group) | The name of the tfscaffold group | `string` | `null` | no |
| [kms\_key\_arn](#input\_kms\_key\_arn) | KMS Key ARN for encryption at rest | `string` | n/a | yes |
-| [lambda\_batch\_size](#input\_lambda\_batch\_size) | Number of SQS messages per Lambda invocation | `number` | `10` | no |
+| [lambda\_batch\_size](#input\_lambda\_batch\_size) | Number of SQS messages per Lambda invocation | `number` | `100` | no |
+| [lambda\_batching\_window\_in\_seconds](#input\_lambda\_batching\_window\_in\_seconds) | Maximum time in seconds to wait for a full batch before invoking Lambda. Allows the delivery queue to fill to batch\_size, improving Lambda concurrency utilisation. | `number` | `1` | no |
| [lambda\_code\_base\_path](#input\_lambda\_code\_base\_path) | Base path to Lambda source code directories | `string` | n/a | yes |
| [lambda\_memory](#input\_lambda\_memory) | Lambda memory allocation in MB | `number` | `256` | no |
| [lambda\_s3\_bucket](#input\_lambda\_s3\_bucket) | S3 bucket for Lambda function artefacts | `string` | n/a | yes |
diff --git a/infrastructure/terraform/modules/client-delivery/module_https_client_lambda.tf b/infrastructure/terraform/modules/client-delivery/module_https_client_lambda.tf
index a1bb48f2..4e26c3a2 100644
--- a/infrastructure/terraform/modules/client-delivery/module_https_client_lambda.tf
+++ b/infrastructure/terraform/modules/client-delivery/module_https_client_lambda.tf
@@ -53,6 +53,8 @@ module "https_client_lambda" {
MTLS_CERT_S3_KEY = var.mtls_cert_s3_key # gitleaks:allow
QUEUE_URL = module.sqs_delivery.sqs_queue_url
TOKEN_BUCKET_BURST_CAPACITY = tostring(var.token_bucket_burst_capacity)
+ CB_COOLDOWN_PERIOD_MS = tostring(var.cb_cooldown_period_ms)
+ CB_RECOVERY_PERIOD_MS = tostring(var.cb_recovery_period_ms)
}
vpc_config = var.lambda_security_group_id != "" ? {
@@ -62,10 +64,11 @@ module "https_client_lambda" {
}
resource "aws_lambda_event_source_mapping" "sqs_delivery" {
- event_source_arn = module.sqs_delivery.sqs_queue_arn
- function_name = module.https_client_lambda.function_arn
- batch_size = var.lambda_batch_size
- enabled = true
+ event_source_arn = module.sqs_delivery.sqs_queue_arn
+ function_name = module.https_client_lambda.function_arn
+ batch_size = var.lambda_batch_size
+ maximum_batching_window_in_seconds = var.lambda_batching_window_in_seconds
+ enabled = true
function_response_types = ["ReportBatchItemFailures"]
}
diff --git a/infrastructure/terraform/modules/client-delivery/variables.tf b/infrastructure/terraform/modules/client-delivery/variables.tf
index 46f66f45..a8762b13 100644
--- a/infrastructure/terraform/modules/client-delivery/variables.tf
+++ b/infrastructure/terraform/modules/client-delivery/variables.tf
@@ -118,7 +118,13 @@ variable "log_subscription_role_arn" {
variable "lambda_batch_size" {
type = number
description = "Number of SQS messages per Lambda invocation"
- default = 10
+ default = 100
+}
+
+variable "lambda_batching_window_in_seconds" {
+ type = number
+ description = "Maximum time in seconds to wait for a full batch before invoking Lambda. Allows the delivery queue to fill to batch_size, improving Lambda concurrency utilisation."
+ default = 1
}
variable "lambda_memory" {
@@ -210,3 +216,15 @@ variable "lambda_security_group_id" {
description = "Security group ID for the Lambda function"
default = ""
}
+
+variable "cb_cooldown_period_ms" {
+ type = number
+ description = "Full block duration after circuit opens, before half-open probes begin (ms)"
+ default = 120000
+}
+
+variable "cb_recovery_period_ms" {
+ type = number
+ description = "Linear ramp-up duration after circuit closes (ms)"
+ default = 600000
+}
diff --git a/lambdas/https-client-lambda/src/__tests__/handler.test.ts b/lambdas/https-client-lambda/src/__tests__/handler.test.ts
index 14da4bd2..f337f7c7 100644
--- a/lambdas/https-client-lambda/src/__tests__/handler.test.ts
+++ b/lambdas/https-client-lambda/src/__tests__/handler.test.ts
@@ -288,6 +288,28 @@ describe("processRecords", () => {
expect(mockChangeVisibility).toHaveBeenCalledTimes(1);
});
+ it("caps visibility delay at SQS maximum (12 hours) for admission-denied batch", async () => {
+ mockAdmit.mockResolvedValue({
+ allowed: false,
+ reason: "rate_limited",
+ retryAfterMs: 60_000,
+ effectiveRate: 10,
+ });
+
+ const record = makeRecord({
+ attributes: {
+ ApproximateReceiveCount: "1000",
+ SentTimestamp: "0",
+ SenderId: "sender",
+ ApproximateFirstReceiveTimestamp: "0",
+ },
+ });
+
+ await processRecords([record]);
+
+ expect(mockChangeVisibility).toHaveBeenCalledWith("receipt-1", 43_200);
+ });
+
it("changes visibility once for transient failure", async () => {
mockDeliverPayload.mockResolvedValue({
outcome: "transient_failure",
@@ -380,8 +402,7 @@ describe("processRecords", () => {
expect(failures).toEqual([{ itemIdentifier: "msg-1" }]);
const visibilityDelay = mockChangeVisibility.mock.calls[0]![1] as number;
- expect(visibilityDelay).toBeGreaterThanOrEqual(2);
- expect(visibilityDelay).toBeLessThanOrEqual(6);
+ expect(visibilityDelay).toBe(2);
expect(mockSendToDlq).not.toHaveBeenCalled();
expect(mockDeliverPayload).not.toHaveBeenCalled();
});
@@ -398,8 +419,7 @@ describe("processRecords", () => {
expect(failures).toEqual([{ itemIdentifier: "msg-1" }]);
const visibilityDelay = mockChangeVisibility.mock.calls[0]![1] as number;
- expect(visibilityDelay).toBeGreaterThanOrEqual(30);
- expect(visibilityDelay).toBeLessThanOrEqual(34);
+ expect(visibilityDelay).toBe(30);
expect(mockSendToDlq).not.toHaveBeenCalled();
expect(mockDeliverPayload).not.toHaveBeenCalled();
});
diff --git a/lambdas/https-client-lambda/src/handler.ts b/lambdas/https-client-lambda/src/handler.ts
index 782777f2..4c195ed5 100644
--- a/lambdas/https-client-lambda/src/handler.ts
+++ b/lambdas/https-client-lambda/src/handler.ts
@@ -43,11 +43,12 @@ import { flushMetrics, resetMetrics } from "services/delivery-metrics";
type RedisClientType = Awaited>;
const DEFAULT_MAX_RETRY_DURATION_MS = 7_200_000; // 2 hours
-const DEFAULT_CONCURRENCY_LIMIT = 5;
+const DEFAULT_CONCURRENCY_LIMIT = 10;
const BURST_MULTIPLIER = 5;
const MAX_BURST_CAPACITY = Number(
process.env.TOKEN_BUCKET_BURST_CAPACITY ?? "2250",
);
+const SQS_MAX_VISIBILITY_TIMEOUT_SEC = 43_200; // 12 hours
const gateConfig: EndpointGateConfig = {
// Max tokens the bucket can hold — absorbs short traffic bursts without throttling
@@ -118,6 +119,15 @@ async function deliverRecord(
clientId: string,
): Promise<{ success: boolean; dlq: boolean }> {
const correlationId = extractCorrelationId(message);
+ const receiveCount = Number(record.attributes.ApproximateReceiveCount);
+
+ logger.info("Processing delivery record", {
+ correlationId,
+ receiveCount,
+ firstReceivedAt: new Date(
+ Number(record.attributes.ApproximateFirstReceiveTimestamp),
+ ).toISOString(),
+ });
const maxRetryDurationMs =
target.delivery?.maxRetryDurationSeconds === undefined
@@ -147,7 +157,7 @@ async function deliverRecord(
message.targetId,
correlationId,
record.messageId,
- Number(record.attributes.ApproximateReceiveCount),
+ receiveCount,
);
const deliveryStart = Date.now();
const result = await deliverPayload(target, payloadJson, signature, agent);
@@ -171,7 +181,6 @@ async function deliverRecord(
}
if (result.outcome === OUTCOME_RATE_LIMITED) {
- const receiveCount = Number(record.attributes.ApproximateReceiveCount);
recordDeliveryRateLimited(clientId, message.targetId, correlationId);
await handleRateLimitedRecord(
record,
@@ -183,7 +192,6 @@ async function deliverRecord(
return { success: true, dlq: false };
}
- const receiveCount = Number(record.attributes.ApproximateReceiveCount);
const backoffSec = jitteredBackoffSeconds(receiveCount);
recordDeliveryFailure(
clientId,
@@ -209,14 +217,17 @@ async function handleBatchDenied(
reason: string,
retryAfterMs: number,
): Promise {
- const delaySec = Math.ceil(retryAfterMs / 1000);
+ const baseDelaySec = Math.max(1, Math.ceil(retryAfterMs / 1000));
const correlationIds = batch.messages.map((m) => extractCorrelationId(m));
recordAdmissionDenied(clientId, batch.targetId, reason, correlationIds);
const failures: SQSBatchItemFailure[] = [];
for (const record of batch.records) {
- // eslint-disable-next-line sonarjs/pseudo-random -- jitter for backoff, not security-sensitive
- const jitterSec = Math.floor(Math.random() * 5);
- await changeVisibility(record.receiptHandle, delaySec + jitterSec);
+ const receiveCount = Number(record.attributes.ApproximateReceiveCount);
+ const delaySec = Math.min(
+ receiveCount * baseDelaySec,
+ SQS_MAX_VISIBILITY_TIMEOUT_SEC,
+ );
+ await changeVisibility(record.receiptHandle, delaySec);
failures.push({ itemIdentifier: record.messageId });
}
return { failures, deliveredCount: 0, dlqCount: 0 };
@@ -264,23 +275,30 @@ async function processTargetBatch(
const failures: SQSBatchItemFailure[] = [];
let processingFailures = 0;
+ const admittedPairs = admitted.map(
+ (record, i): { record: SQSRecord; message: CallbackDeliveryMessage } => ({
+ record,
+ message: admittedMessages[i], // eslint-disable-line security/detect-object-injection -- i is the numeric index from .map(), not user input
+ }),
+ );
+
const deliveryResults = await pMap(
- admitted,
- async (
+ admittedPairs,
+ async ({
+ message,
record,
- index,
- ): Promise<{ record: SQSRecord; success: boolean; dlq: boolean }> => {
+ }): Promise<{ record: SQSRecord; success: boolean; dlq: boolean }> => {
try {
const outcome = await deliverRecord(
record,
- admittedMessages[index],
+ message,
target,
applicationId,
clientId,
);
return { record, success: outcome.success, dlq: outcome.dlq };
} catch (error) {
- const correlationId = extractCorrelationId(admittedMessages[index]);
+ const correlationId = extractCorrelationId(message);
logger.error("Failed to process record", {
messageId: record.messageId,
correlationId,
@@ -348,7 +366,9 @@ async function processTargetBatch(
rejectedCorrelationIds,
);
for (const record of rejected) {
- await changeVisibility(record.receiptHandle, 1);
+ const receiveCount = Number(record.attributes.ApproximateReceiveCount);
+ const delaySec = receiveCount * 1;
+ await changeVisibility(record.receiptHandle, delaySec);
failures.push({ itemIdentifier: record.messageId });
}
}
diff --git a/lambdas/mock-webhook-lambda/src/__tests__/index.test.ts b/lambdas/mock-webhook-lambda/src/__tests__/index.test.ts
index d7463722..fefa87ed 100644
--- a/lambdas/mock-webhook-lambda/src/__tests__/index.test.ts
+++ b/lambdas/mock-webhook-lambda/src/__tests__/index.test.ts
@@ -367,6 +367,54 @@ describe("Mock Webhook Lambda", () => {
const body = JSON.parse(result.body);
expect(body.message).toBe("Forced status 500");
});
+
+ it("should return forced status code when messageId uses timed format and deadline is in the future", async () => {
+ const futureMs = Date.now() + 60_000;
+ const callback = {
+ data: [
+ {
+ type: "MessageStatus",
+ attributes: {
+ messageId: `force-500-until-${futureMs}-some-uuid`,
+ messageStatus: "delivered",
+ },
+ links: { message: "some-message-link" },
+ meta: { idempotencyKey: "some-idempotency-key" },
+ },
+ ],
+ };
+
+ const event = createMockEvent(JSON.stringify(callback));
+ const result = await handler(event);
+
+ expect(result.statusCode).toBe(500);
+ const body = JSON.parse(result.body);
+ expect(body.message).toBe("Forced status 500");
+ });
+
+ it("should return 200 when messageId uses timed format and deadline has passed", async () => {
+ const pastMs = Date.now() - 60_000;
+ const callback = {
+ data: [
+ {
+ type: "MessageStatus",
+ attributes: {
+ messageId: `force-500-until-${pastMs}-some-uuid`,
+ messageStatus: "delivered",
+ },
+ links: { message: "some-message-link" },
+ meta: { idempotencyKey: "some-idempotency-key" },
+ },
+ ],
+ };
+
+ const event = createMockEvent(JSON.stringify(callback));
+ const result = await handler(event);
+
+ expect(result.statusCode).toBe(200);
+ const body = JSON.parse(result.body);
+ expect(body.message).toBe("Callback received");
+ });
});
describe("Logging", () => {
diff --git a/lambdas/mock-webhook-lambda/src/index.ts b/lambdas/mock-webhook-lambda/src/index.ts
index d0bf582d..ea30c0e6 100644
--- a/lambdas/mock-webhook-lambda/src/index.ts
+++ b/lambdas/mock-webhook-lambda/src/index.ts
@@ -57,153 +57,232 @@ function isClientCallbackPayload(
});
}
-async function buildResponse(
- event: APIGatewayProxyEvent,
-): Promise {
- const eventWithContextFields = event as APIGatewayProxyEvent & {
- rawPath?: string;
- requestContext?: {
- http?: { method?: string };
- elb?: { targetGroupArn: string };
- };
+type EventWithContextFields = APIGatewayProxyEvent & {
+ rawPath?: string;
+ requestContext?: {
+ http?: { method?: string };
+ elb?: { targetGroupArn: string };
};
- const headers = Object.fromEntries(
+};
+
+function normalizeHeaders(
+ event: APIGatewayProxyEvent,
+): Record {
+ return Object.fromEntries(
Object.entries(event.headers).map(([k, v]) => [String(k).toLowerCase(), v]),
) as Record;
+}
- const path = event.path ?? eventWithContextFields.rawPath;
+function resolveMtlsStatus(
+ headers: Record,
+ isAlbInvocation: boolean,
+): boolean {
+ if (!isAlbInvocation) {
+ return false;
+ }
- const isAlbInvocation = Boolean(eventWithContextFields.requestContext?.elb);
const clientCertPresent = Boolean(headers["x-amzn-mtls-clientcert"]);
- let isMtls = false;
- if (isAlbInvocation) {
- const certResult = verifyClientCertificate(
- headers["x-amzn-mtls-clientcert"],
- );
- isMtls = certResult.valid;
- if (isMtls) {
- logger.info("mTLS client certificate verified", {
- fingerprint: headers["x-amzn-mtls-clientcert-fingerprint"] ?? "",
- isMtls: true,
- });
- } else {
- logger.info("Mock webhook invoked without mTLS", {
- isMtls: false,
- clientCertPresent,
- reason: certResult.reason,
- });
- }
+ const certResult = verifyClientCertificate(headers["x-amzn-mtls-clientcert"]);
+
+ if (certResult.valid) {
+ logger.info("mTLS client certificate verified", {
+ fingerprint: headers["x-amzn-mtls-clientcert-fingerprint"] ?? "",
+ isMtls: true,
+ });
+ return true;
}
- logger.info("Mock webhook invoked", {
- path,
- method: event.httpMethod,
- hasBody: Boolean(event.body),
- isMtls,
+ logger.info("Mock webhook invoked without mTLS", {
+ isMtls: false,
clientCertPresent,
- "x-api-key": headers["x-api-key"],
- "x-hmac-sha256-signature": headers["x-hmac-sha256-signature"],
- payload: event.body,
+ reason: certResult.reason,
});
+ return false;
+}
+function authenticateApiKey(headers: Record): {
+ error: APIGatewayProxyResult | undefined;
+} {
const expectedApiKey = process.env.API_KEY;
const providedApiKey = headers["x-api-key"];
if (!expectedApiKey || providedApiKey !== expectedApiKey) {
logger.error("Unauthorized: invalid or missing x-api-key");
return {
- statusCode: 401,
- body: JSON.stringify({ message: "Unauthorized" }),
+ error: {
+ statusCode: 401,
+ body: JSON.stringify({ message: "Unauthorized" }),
+ },
};
}
- if (!event.body) {
- logger.error("No event body received");
+ return { error: undefined };
+}
- return {
+type ParseResult = {
+ payload: ClientCallbackPayload | undefined;
+ error: APIGatewayProxyResult | undefined;
+};
+
+function parseError(response: APIGatewayProxyResult): ParseResult {
+ return { payload: undefined, error: response };
+}
+
+function parseAndValidateBody(body: string | null): ParseResult {
+ if (!body) {
+ logger.error("No event body received");
+ return parseError({
statusCode: 400,
body: JSON.stringify({ message: "No body" }),
- };
+ });
}
try {
- const parsed = JSON.parse(event.body) as unknown;
-
+ const parsed = JSON.parse(body) as unknown;
logger.info("Mock webhook parsed payload", { parsedPayload: parsed });
if (!isClientCallbackPayload(parsed)) {
logger.error("Invalid message structure - missing or invalid data array");
-
- return {
+ return parseError({
statusCode: 400,
body: JSON.stringify({ message: "Invalid message structure" }),
- };
+ });
}
if (parsed.data.length !== 1) {
logger.error("Expected exactly 1 callback item in data array", {
receivedCount: parsed.data.length,
});
-
- return {
+ return parseError({
statusCode: 400,
body: JSON.stringify({
message: `Expected exactly 1 callback item, got ${parsed.data.length}`,
}),
- };
+ });
+ }
+
+ return { payload: parsed, error: undefined };
+ } catch (error) {
+ if (error instanceof SyntaxError) {
+ logger.error("Invalid JSON body", { error: error.message });
+ return parseError({
+ statusCode: 400,
+ body: JSON.stringify({ message: "Invalid JSON body" }),
+ });
}
- const [item] = parsed.data;
- const correlationId = item.meta.idempotencyKey;
- const { messageId } = item.attributes;
- const forcedStatusMatch = /^force-(\d{3})-/.exec(messageId);
- if (forcedStatusMatch) {
- const statusCode = Number(forcedStatusMatch[1]);
- logger.info("Forced status code response", {
+ logger.error("Failed to process callback", {
+ error: error instanceof Error ? error.message : String(error),
+ });
+ return parseError({
+ statusCode: 500,
+ body: JSON.stringify({ message: "Internal server error" }),
+ });
+ }
+}
+
+function checkForcedStatusResponse(
+ messageId: string,
+ correlationId: string,
+): { response: APIGatewayProxyResult | undefined } {
+ const timedMatch = /^force-(\d{3})-until-(\d+)-/.exec(messageId);
+ if (timedMatch) {
+ const statusCode = Number(timedMatch[1]);
+ const until = Number(timedMatch[2]);
+ if (Date.now() < until) {
+ logger.info("Timed forced status code response", {
correlationId,
messageId,
statusCode,
+ until,
});
return {
- statusCode,
- body: JSON.stringify({ message: `Forced status ${statusCode}` }),
+ response: {
+ statusCode,
+ body: JSON.stringify({ message: `Forced status ${statusCode}` }),
+ },
};
}
+ return { response: undefined };
+ }
- logger.info("Callback received", {
+ const permanentMatch = /^force-(\d{3})-/.exec(messageId);
+ if (permanentMatch) {
+ const statusCode = Number(permanentMatch[1]);
+ logger.info("Forced status code response", {
correlationId,
messageId,
- callbackType: item.type,
- path,
- isMtls,
- apiKey: providedApiKey,
- signature: headers["x-hmac-sha256-signature"] ?? "",
- payload: JSON.stringify(item),
+ statusCode,
});
-
return {
- statusCode: 200,
- body: JSON.stringify({ message: "Callback received" }),
+ response: {
+ statusCode,
+ body: JSON.stringify({ message: `Forced status ${statusCode}` }),
+ },
};
- } catch (error) {
- if (error instanceof SyntaxError) {
- logger.error("Invalid JSON body", { error: error.message });
+ }
- return {
- statusCode: 400,
- body: JSON.stringify({ message: "Invalid JSON body" }),
- };
- }
+ return { response: undefined };
+}
- logger.error("Failed to process callback", {
- error: error instanceof Error ? error.message : String(error),
- });
+async function buildResponse(
+ event: APIGatewayProxyEvent,
+): Promise {
+ const eventWithContextFields = event as EventWithContextFields;
+ const headers = normalizeHeaders(event);
+ const path = event.path ?? eventWithContextFields.rawPath;
+ const isAlbInvocation = Boolean(eventWithContextFields.requestContext?.elb);
+ const clientCertPresent = Boolean(headers["x-amzn-mtls-clientcert"]);
+ const isMtls = resolveMtlsStatus(headers, isAlbInvocation);
- return {
- statusCode: 500,
- body: JSON.stringify({ message: "Internal server error" }),
- };
+ logger.info("Mock webhook invoked", {
+ path,
+ method: event.httpMethod,
+ hasBody: Boolean(event.body),
+ isMtls,
+ clientCertPresent,
+ "x-api-key": headers["x-api-key"],
+ "x-hmac-sha256-signature": headers["x-hmac-sha256-signature"],
+ payload: event.body,
+ });
+
+ const authResult = authenticateApiKey(headers);
+ if (authResult.error) {
+ return authResult.error;
+ }
+
+ const bodyResult = parseAndValidateBody(event.body);
+ if (bodyResult.error) {
+ return bodyResult.error;
+ }
+
+ const [item] = bodyResult.payload!.data;
+ const correlationId = item.meta.idempotencyKey;
+ const { messageId } = item.attributes;
+
+ const { response: forcedResponse } = checkForcedStatusResponse(
+ messageId,
+ correlationId,
+ );
+ if (forcedResponse) {
+ return forcedResponse;
}
+
+ logger.info("Callback received", {
+ correlationId,
+ messageId,
+ callbackType: item.type,
+ path,
+ isMtls,
+ apiKey: headers["x-api-key"],
+ signature: headers["x-hmac-sha256-signature"] ?? "",
+ payload: JSON.stringify(item),
+ });
+
+ return {
+ statusCode: 200,
+ body: JSON.stringify({ message: "Callback received" }),
+ };
}
export async function handler(
diff --git a/lambdas/perf-runner-lambda/src/__tests__/event-factories.test.ts b/lambdas/perf-runner-lambda/src/__tests__/event-factories.test.ts
index 1c877a17..dcecd707 100644
--- a/lambdas/perf-runner-lambda/src/__tests__/event-factories.test.ts
+++ b/lambdas/perf-runner-lambda/src/__tests__/event-factories.test.ts
@@ -25,6 +25,25 @@ describe("createMessageStatusEvent", () => {
expect(a.id).not.toBe(b.id);
expect(a.data.messageId).not.toBe(b.data.messageId);
});
+
+ it("prefixes messageId with force-{code}- when forcedStatusCode is set", () => {
+ const event = createMessageStatusEvent("perf-client-1", "DELIVERED", 500);
+
+ expect(event.data.messageId).toMatch(/^force-500-[0-9a-f-]+$/);
+ });
+
+ it("prefixes messageId with force-{code}-until-{timestamp}- when both forced fields are set", () => {
+ const until = Date.now() + 60_000;
+ const event = createMessageStatusEvent(
+ "perf-client-1",
+ "DELIVERED",
+ 500,
+ until,
+ );
+
+ const prefix = `force-500-until-${until}-`;
+ expect(event.data.messageId.startsWith(prefix)).toBe(true);
+ });
});
describe("createChannelStatusEvent", () => {
@@ -39,6 +58,25 @@ describe("createChannelStatusEvent", () => {
expect(event.data.messageId).toBeTruthy();
expect(event.id).toBeTruthy();
});
+
+ it("prefixes messageId with force-{code}- when forcedStatusCode is set", () => {
+ const event = createChannelStatusEvent("perf-client-2", "DELIVERED", 503);
+
+ expect(event.data.messageId).toMatch(/^force-503-[0-9a-f-]+$/);
+ });
+
+ it("prefixes messageId with force-{code}-until-{timestamp}- when both forced fields are set", () => {
+ const until = Date.now() + 60_000;
+ const event = createChannelStatusEvent(
+ "perf-client-2",
+ "DELIVERED",
+ 503,
+ until,
+ );
+
+ const prefix = `force-503-until-${until}-`;
+ expect(event.data.messageId.startsWith(prefix)).toBe(true);
+ });
});
describe("createEvent", () => {
@@ -65,4 +103,19 @@ describe("createEvent", () => {
expect(event.type).toBe(EventTypes.CHANNEL_STATUS_PUBLISHED);
expect(event.data.clientId).toBe("perf-client-2");
});
+
+ it("forwards forcedStatusCode and forcedStatusCodeUntilMs from the mix entry", () => {
+ const until = Date.now() + 60_000;
+ const event = createEvent({
+ weight: 1,
+ factory: "messageStatus",
+ clientId: "perf-client-1",
+ messageStatus: "DELIVERED",
+ forcedStatusCode: 500,
+ forcedStatusCodeUntilMs: until,
+ });
+
+ const prefix = `force-500-until-${until}-`;
+ expect(event.data.messageId.startsWith(prefix)).toBe(true);
+ });
});
diff --git a/lambdas/perf-runner-lambda/src/__tests__/index.test.ts b/lambdas/perf-runner-lambda/src/__tests__/index.test.ts
index 3c33bfd6..feebbe46 100644
--- a/lambdas/perf-runner-lambda/src/__tests__/index.test.ts
+++ b/lambdas/perf-runner-lambda/src/__tests__/index.test.ts
@@ -72,6 +72,8 @@ describe("handler", () => {
iamUsername: "test-user",
region: "eu-west-2",
}),
+ undefined,
+ undefined,
);
});
@@ -89,6 +91,8 @@ describe("handler", () => {
"custom-test",
undefined,
expect.anything(),
+ undefined,
+ undefined,
);
});
@@ -135,6 +139,8 @@ describe("handler", () => {
"no-prefix-test",
undefined,
expect.anything(),
+ undefined,
+ undefined,
);
});
@@ -151,6 +157,8 @@ describe("handler", () => {
"no-cache-test",
undefined,
undefined,
+ undefined,
+ undefined,
);
});
@@ -165,6 +173,36 @@ describe("handler", () => {
"webhook-test",
undefined,
expect.anything(),
+ undefined,
+ undefined,
+ );
+ });
+
+ it("passes cloudWatchSettlingMs when provided in the event", async () => {
+ await handler({ testId: "settling-test", cloudWatchSettlingMs: 5000 });
+
+ expect(mockRunPerformanceTest).toHaveBeenCalledWith(
+ expect.anything(),
+ expect.anything(),
+ "settling-test",
+ undefined,
+ expect.anything(),
+ 5000,
+ undefined,
+ );
+ });
+
+ it("passes skipPurge when provided in the event", async () => {
+ await handler({ testId: "skip-purge-test", skipPurge: true });
+
+ expect(mockRunPerformanceTest).toHaveBeenCalledWith(
+ expect.anything(),
+ expect.anything(),
+ "skip-purge-test",
+ undefined,
+ expect.anything(),
+ undefined,
+ true,
);
});
});
diff --git a/lambdas/perf-runner-lambda/src/__tests__/purge.test.ts b/lambdas/perf-runner-lambda/src/__tests__/purge.test.ts
index 14bcf247..52832910 100644
--- a/lambdas/perf-runner-lambda/src/__tests__/purge.test.ts
+++ b/lambdas/perf-runner-lambda/src/__tests__/purge.test.ts
@@ -30,7 +30,7 @@ describe("deriveQueueUrls", () => {
expect(urls).toEqual([
"https://sqs.eu-west-2.amazonaws.com/123456789/nhs-dev-callbacks-inbound-event-queue",
- "https://sqs.eu-west-2.amazonaws.com/123456789/nhs-dev-callbacks-inbound-event-dlq-queue",
+ "https://sqs.eu-west-2.amazonaws.com/123456789/nhs-dev-callbacks-inbound-event-dlq",
"https://sqs.eu-west-2.amazonaws.com/123456789/nhs-dev-callbacks-perf-client-1-delivery-queue",
"https://sqs.eu-west-2.amazonaws.com/123456789/nhs-dev-callbacks-perf-client-1-delivery-dlq-queue",
"https://sqs.eu-west-2.amazonaws.com/123456789/nhs-dev-callbacks-perf-client-2-delivery-queue",
@@ -61,7 +61,7 @@ describe("deriveQueueUrls", () => {
expect(urls).toEqual([
"https://sqs.eu-west-2.amazonaws.com/123456789/nhs-dev-callbacks-inbound-event-queue",
- "https://sqs.eu-west-2.amazonaws.com/123456789/nhs-dev-callbacks-inbound-event-dlq-queue",
+ "https://sqs.eu-west-2.amazonaws.com/123456789/nhs-dev-callbacks-inbound-event-dlq",
"https://sqs.eu-west-2.amazonaws.com/123456789/nhs-dev-callbacks-perf-client-1-delivery-queue",
"https://sqs.eu-west-2.amazonaws.com/123456789/nhs-dev-callbacks-perf-client-1-delivery-dlq-queue",
]);
@@ -88,20 +88,8 @@ describe("purgeQueues", () => {
expect(mockSend).toHaveBeenCalledTimes(2);
});
- it("ignores QueueDoesNotExist errors gracefully", async () => {
- const nonExistentError = Object.assign(new Error("Queue does not exist"), {
- name: "QueueDoesNotExist",
- });
- mockSend.mockRejectedValueOnce(nonExistentError);
-
- await expect(
- purgeQueues(mockSqsClient, ["https://sqs.example.invalid/missing"]),
- ).resolves.toBeUndefined();
- });
-
- it("rethrows non-QueueDoesNotExist errors", async () => {
- const otherError = new Error("Access denied");
- mockSend.mockRejectedValueOnce(otherError);
+ it("throws when a purge fails", async () => {
+ mockSend.mockRejectedValueOnce(new Error("Access denied"));
await expect(
purgeQueues(mockSqsClient, ["https://sqs.example.invalid/queue"]),
diff --git a/lambdas/perf-runner-lambda/src/__tests__/runner.test.ts b/lambdas/perf-runner-lambda/src/__tests__/runner.test.ts
index 622e98a4..9d7acfe5 100644
--- a/lambdas/perf-runner-lambda/src/__tests__/runner.test.ts
+++ b/lambdas/perf-runner-lambda/src/__tests__/runner.test.ts
@@ -12,6 +12,7 @@ import { defaultSleep, runPerformanceTest } from "runner";
import { generatePhaseLoad } from "sqs";
import { deriveQueueUrls, purgeQueues } from "purge";
+import { getQueueDepths } from "sqs-stats";
import { dumpRateLimitState, flushElastiCache } from "elasticache";
import { verifyMockWebhook } from "webhook-verify";
import {
@@ -26,6 +27,7 @@ jest.mock("cloudwatch");
jest.mock("purge");
jest.mock("elasticache");
jest.mock("webhook-verify");
+jest.mock("sqs-stats");
const mockGeneratePhaseLoad = jest.mocked(generatePhaseLoad);
const mockQueryMetricsSnapshot = jest.mocked(queryMetricsSnapshot);
@@ -41,6 +43,7 @@ const mockPurgeQueues = jest.mocked(purgeQueues);
const mockFlushElastiCache = jest.mocked(flushElastiCache);
const mockDumpRateLimitState = jest.mocked(dumpRateLimitState);
const mockVerifyMockWebhook = jest.mocked(verifyMockWebhook);
+const mockGetQueueDepths = jest.mocked(getQueueDepths);
const immediateSleep = jest.fn().mockResolvedValue(undefined);
@@ -118,6 +121,16 @@ beforeEach(() => {
receivedCallbacks: 0,
verified: false,
});
+ mockGetQueueDepths.mockResolvedValue({
+ timestampMs: Date.now(),
+ queues: [
+ {
+ queueUrl: "https://sqs.example.invalid/inbound-event-queue",
+ visible: 100,
+ notVisible: 10,
+ },
+ ],
+ });
immediateSleep.mockResolvedValue(undefined);
});
@@ -557,6 +570,22 @@ describe("runPerformanceTest", () => {
expect(mockPurgeQueues).toHaveBeenCalledTimes(2);
});
+ it("skips both purges when skipPurge is true", async () => {
+ mockQueryMetricsSnapshot.mockResolvedValue(null);
+
+ await runPerformanceTest(
+ deps,
+ scenario,
+ "test-skip-purge",
+ immediateSleep,
+ undefined,
+ undefined,
+ true,
+ );
+
+ expect(mockPurgeQueues).not.toHaveBeenCalled();
+ });
+
it("flushes ElastiCache before and after when deps are provided", async () => {
mockQueryMetricsSnapshot.mockResolvedValue(null);
const elastiCacheDeps = {
@@ -574,7 +603,7 @@ describe("runPerformanceTest", () => {
elastiCacheDeps,
);
- expect(mockFlushElastiCache).toHaveBeenCalledTimes(2);
+ expect(mockFlushElastiCache).toHaveBeenCalledTimes(1);
expect(mockFlushElastiCache).toHaveBeenCalledWith(elastiCacheDeps);
});
@@ -682,6 +711,37 @@ describe("runPerformanceTest", () => {
expect(mockVerifyMockWebhook).not.toHaveBeenCalled();
expect(result.webhookVerification).toBeUndefined();
});
+
+ it("samples queue depths during polling and at final snapshot", async () => {
+ mockQueryMetricsSnapshot.mockResolvedValue(null);
+
+ await runPerformanceTest(
+ deps,
+ scenario,
+ "test-queue-depths",
+ immediateSleep,
+ );
+
+ expect(mockGetQueueDepths).toHaveBeenCalledTimes(2); // one mid-test, one final
+ expect(mockGetQueueDepths).toHaveBeenCalledWith(deps.sqsClient, [
+ "https://sqs.example.invalid/inbound-event-queue",
+ ]);
+ });
+
+ it("uses the provided cloudWatchSettlingMs instead of the default", async () => {
+ mockQueryMetricsSnapshot.mockResolvedValue(null);
+
+ await runPerformanceTest(
+ deps,
+ scenario,
+ "test-settling",
+ immediateSleep,
+ undefined,
+ 5000,
+ );
+
+ expect(immediateSleep).toHaveBeenCalledWith(5000);
+ });
});
describe("defaultSleep", () => {
diff --git a/lambdas/perf-runner-lambda/src/__tests__/sqs-stats.test.ts b/lambdas/perf-runner-lambda/src/__tests__/sqs-stats.test.ts
new file mode 100644
index 00000000..8d2900b8
--- /dev/null
+++ b/lambdas/perf-runner-lambda/src/__tests__/sqs-stats.test.ts
@@ -0,0 +1,75 @@
+import type { SQSClient } from "@aws-sdk/client-sqs";
+import { getQueueDepths } from "sqs-stats";
+
+describe("getQueueDepths", () => {
+ const mockSend = jest.fn();
+ const mockSqsClient = { send: mockSend } as unknown as SQSClient;
+
+ beforeEach(() => {
+ jest.clearAllMocks();
+ });
+
+ it("returns visible and notVisible counts for each queue URL", async () => {
+ mockSend
+ .mockResolvedValueOnce({
+ Attributes: {
+ ApproximateNumberOfMessages: "42",
+ ApproximateNumberOfMessagesNotVisible: "8",
+ },
+ })
+ .mockResolvedValueOnce({
+ Attributes: {
+ ApproximateNumberOfMessages: "10",
+ ApproximateNumberOfMessagesNotVisible: "2",
+ },
+ });
+
+ const result = await getQueueDepths(mockSqsClient, [
+ "https://sqs.example.invalid/queue-a",
+ "https://sqs.example.invalid/queue-b",
+ ]);
+
+ expect(result.queues).toHaveLength(2);
+ expect(result.queues[0]).toEqual({
+ queueUrl: "https://sqs.example.invalid/queue-a",
+ visible: 42,
+ notVisible: 8,
+ });
+ expect(result.queues[1]).toEqual({
+ queueUrl: "https://sqs.example.invalid/queue-b",
+ visible: 10,
+ notVisible: 2,
+ });
+ expect(result.timestampMs).toBeGreaterThan(0);
+ });
+
+ it("defaults to 0 when attributes are missing", async () => {
+ mockSend.mockResolvedValueOnce({ Attributes: undefined });
+
+ const result = await getQueueDepths(mockSqsClient, [
+ "https://sqs.example.invalid/queue-a",
+ ]);
+
+ expect(result.queues[0].visible).toBe(0);
+ expect(result.queues[0].notVisible).toBe(0);
+ });
+
+ it("sends GetQueueAttributesCommand with correct attributes requested", async () => {
+ mockSend.mockResolvedValueOnce({ Attributes: {} });
+
+ await getQueueDepths(mockSqsClient, [
+ "https://sqs.example.invalid/queue-a",
+ ]);
+
+ const command = mockSend.mock.calls[0][0] as {
+ input: { QueueUrl: string; AttributeNames: string[] };
+ };
+ expect(command.input.QueueUrl).toBe("https://sqs.example.invalid/queue-a");
+ expect(command.input.AttributeNames).toContain(
+ "ApproximateNumberOfMessages",
+ );
+ expect(command.input.AttributeNames).toContain(
+ "ApproximateNumberOfMessagesNotVisible",
+ );
+ });
+});
diff --git a/lambdas/perf-runner-lambda/src/event-factories.ts b/lambdas/perf-runner-lambda/src/event-factories.ts
index 6f39add9..c32cb9f7 100644
--- a/lambdas/perf-runner-lambda/src/event-factories.ts
+++ b/lambdas/perf-runner-lambda/src/event-factories.ts
@@ -8,11 +8,32 @@ import type {
import { EventTypes } from "@nhs-notify-client-callbacks/models";
import type { EventMixEntry } from "types";
+function buildMessageId(
+ uuid: string,
+ forcedStatusCode?: number,
+ forcedStatusCodeUntilMs?: number,
+): string {
+ if (forcedStatusCode === undefined) {
+ return uuid;
+ }
+ if (forcedStatusCodeUntilMs === undefined) {
+ return `force-${forcedStatusCode}-${uuid}`;
+ }
+ return `force-${forcedStatusCode}-until-${forcedStatusCodeUntilMs}-${uuid}`;
+}
+
export function createMessageStatusEvent(
clientId: string,
messageStatus: MessageStatus,
+ forcedStatusCode?: number,
+ forcedStatusCodeUntilMs?: number,
): StatusPublishEvent {
- const messageId = crypto.randomUUID();
+ const uuid = crypto.randomUUID();
+ const messageId = buildMessageId(
+ uuid,
+ forcedStatusCode,
+ forcedStatusCodeUntilMs,
+ );
const data: MessageStatusData = {
clientId,
@@ -47,8 +68,15 @@ export function createMessageStatusEvent(
export function createChannelStatusEvent(
clientId: string,
channelStatus: ChannelStatus,
+ forcedStatusCode?: number,
+ forcedStatusCodeUntilMs?: number,
): StatusPublishEvent {
- const messageId = crypto.randomUUID();
+ const uuid = crypto.randomUUID();
+ const messageId = buildMessageId(
+ uuid,
+ forcedStatusCode,
+ forcedStatusCodeUntilMs,
+ );
const data: ChannelStatusData = {
clientId,
@@ -80,8 +108,18 @@ export function createChannelStatusEvent(
export function createEvent(entry: EventMixEntry): StatusPublishEvent {
if (entry.factory === "messageStatus") {
- return createMessageStatusEvent(entry.clientId, entry.messageStatus);
+ return createMessageStatusEvent(
+ entry.clientId,
+ entry.messageStatus,
+ entry.forcedStatusCode,
+ entry.forcedStatusCodeUntilMs,
+ );
}
- return createChannelStatusEvent(entry.clientId, entry.channelStatus);
+ return createChannelStatusEvent(
+ entry.clientId,
+ entry.channelStatus,
+ entry.forcedStatusCode,
+ entry.forcedStatusCodeUntilMs,
+ );
}
diff --git a/lambdas/perf-runner-lambda/src/index.ts b/lambdas/perf-runner-lambda/src/index.ts
index 5974627b..51054b3a 100644
--- a/lambdas/perf-runner-lambda/src/index.ts
+++ b/lambdas/perf-runner-lambda/src/index.ts
@@ -14,7 +14,12 @@ const logger = new Logger();
export async function handler(
event: PerfRunnerPayload,
): Promise {
- const { scenario = DEFAULT_SCENARIO, testId } = event;
+ const {
+ cloudWatchSettlingMs,
+ scenario = DEFAULT_SCENARIO,
+ skipPurge,
+ testId,
+ } = event;
const region = process.env.AWS_REGION ?? "eu-west-2";
const queueUrl = process.env.INBOUND_QUEUE_URL;
@@ -64,6 +69,8 @@ export async function handler(
testId,
undefined,
elastiCacheDeps,
+ cloudWatchSettlingMs,
+ skipPurge,
);
logger.info("Performance test completed", { testId });
diff --git a/lambdas/perf-runner-lambda/src/purge.ts b/lambdas/perf-runner-lambda/src/purge.ts
index e363e706..3f7cb097 100644
--- a/lambdas/perf-runner-lambda/src/purge.ts
+++ b/lambdas/perf-runner-lambda/src/purge.ts
@@ -11,7 +11,7 @@ export function deriveQueueUrls(
return [
inboundQueueUrl,
- `${baseUrl}inbound-event-dlq-queue`,
+ `${baseUrl}inbound-event-dlq`,
...clientIds.flatMap((id) => [
`${baseUrl}${id}-delivery-queue`,
`${baseUrl}${id}-delivery-dlq-queue`,
@@ -23,18 +23,9 @@ export async function purgeQueues(
client: SQSClient,
queueUrls: string[],
): Promise {
- const results = await Promise.allSettled(
+ await Promise.all(
queueUrls.map((url) =>
client.send(new PurgeQueueCommand({ QueueUrl: url })),
),
);
-
- for (const result of results) {
- if (result.status === "rejected") {
- const error = result.reason as { name?: string };
- if (error.name !== "QueueDoesNotExist") {
- throw result.reason as Error;
- }
- }
- }
}
diff --git a/lambdas/perf-runner-lambda/src/runner.ts b/lambdas/perf-runner-lambda/src/runner.ts
index 7a5b5ee6..86ccd435 100644
--- a/lambdas/perf-runner-lambda/src/runner.ts
+++ b/lambdas/perf-runner-lambda/src/runner.ts
@@ -11,8 +11,10 @@ import type {
Scenario,
WebhookVerificationResult,
} from "types";
+import { Logger } from "@nhs-notify-client-callbacks/logger";
import { generatePhaseLoad } from "sqs";
import { deriveQueueUrls, purgeQueues } from "purge";
+import { getQueueDepths } from "sqs-stats";
import { dumpRateLimitState, flushElastiCache } from "elasticache";
import { verifyMockWebhook } from "webhook-verify";
import {
@@ -22,6 +24,8 @@ import {
queryPerClientRateTimeline,
} from "cloudwatch";
+const logger = new Logger();
+
const CLOUDWATCH_SETTLING_MS = 60_000;
export const defaultSleep = (ms: number): Promise =>
@@ -82,12 +86,55 @@ async function collectSnapshots(
return cbStartSec;
}
+async function collectPerClientRateTimelines(
+ deps: RunnerDeps,
+ scenario: Scenario,
+ startSec: number,
+ endSec: number,
+): Promise {
+ if (!deps.deliveryLogGroupPrefix) {
+ return [];
+ }
+
+ const clientIds = [...new Set(scenario.eventMix.map((e) => e.clientId))];
+ const timelinePromises = clientIds.map(async (clientId) => {
+ const logGroupName = `${deps.deliveryLogGroupPrefix}${clientId}`;
+ const entries = await queryPerClientRateTimeline(
+ deps.cloudWatchClient,
+ logGroupName,
+ startSec,
+ endSec,
+ );
+ return { clientId, entries };
+ });
+ const timelines = await Promise.all(timelinePromises);
+ return timelines.filter((t) => t.entries.length > 0);
+}
+
+async function collectWebhookVerification(
+ deps: RunnerDeps,
+ startSec: number,
+ endSec: number,
+): Promise {
+ if (!deps.mockWebhookLogGroup) {
+ return undefined;
+ }
+ return verifyMockWebhook(
+ deps.cloudWatchClient,
+ deps.mockWebhookLogGroup,
+ startSec,
+ endSec,
+ );
+}
+
export async function runPerformanceTest(
deps: RunnerDeps,
scenario: Scenario,
testId: string,
sleepFn: (ms: number) => Promise = defaultSleep,
elastiCacheDeps?: ElastiCacheDeps,
+ cloudWatchSettlingMs: number = CLOUDWATCH_SETTLING_MS,
+ skipPurge = false,
): Promise {
if (scenario.eventMix.length === 0) {
throw new Error("scenario.eventMix must contain at least one entry");
@@ -109,8 +156,15 @@ export async function runPerformanceTest(
const testStartMs = Date.now();
const queueUrls = deriveQueueUrls(deps.queueUrl, scenario);
- await purgeQueues(deps.sqsClient, queueUrls);
+
+ if (skipPurge) {
+ logger.info("Skipping queue purge", { queueUrls });
+ } else {
+ logger.info("Purging queues", { queueUrls });
+ await purgeQueues(deps.sqsClient, queueUrls);
+ }
if (elastiCacheDeps) {
+ logger.info("Clearing rate limit and circuit breaker state");
await flushElastiCache(elastiCacheDeps);
}
@@ -148,6 +202,9 @@ export async function runPerformanceTest(
lastCbSnapshotSec,
out,
);
+ logger.info("Sampling queue depths", { queueUrls });
+ const depthSample = await getQueueDepths(deps.sqsClient, queueUrls);
+ logger.info("Queue depth sample", { queues: depthSample.queues });
if (!stopPolling) {
await sleepFn(scenario.metricsIntervalSecs * 1000);
@@ -157,20 +214,35 @@ export async function runPerformanceTest(
const pollPromise = pollLoop();
- for (const phase of scenario.phases) {
+ for (const [index, phase] of scenario.phases.entries()) {
+ logger.info("Starting phase", {
+ index,
+ targetEps: phase.targetEps,
+ durationSecs: phase.durationSecs,
+ });
const result = await generatePhaseLoad(
deps.sqsClient,
deps.queueUrl,
phase,
- scenario.eventMix,
+ phase.eventMix ?? scenario.eventMix,
);
+ logger.info("Phase complete", {
+ index,
+ targetEps: result.targetEps,
+ achievedEps: result.achievedEps,
+ sent: result.sent,
+ durationMs: result.durationMs,
+ });
phaseResults.push(result);
}
stopPolling = true;
await pollPromise;
- await sleepFn(CLOUDWATCH_SETTLING_MS);
+ logger.info("Waiting for CloudWatch logs to settle", {
+ settlingMs: cloudWatchSettlingMs,
+ });
+ await sleepFn(cloudWatchSettlingMs);
const finalStartSec = Math.floor(testStartMs / 1000);
const finalEndSec = Math.floor(Date.now() / 1000);
@@ -183,45 +255,33 @@ export async function runPerformanceTest(
lastCbSnapshotSec,
out,
);
+ logger.info("Sampling queue depths", { queueUrls });
+ const finalDepthSample = await getQueueDepths(deps.sqsClient, queueUrls);
+ logger.info("Final queue depth sample", { queues: finalDepthSample.queues });
- const perClientRateTimelines: PerClientRateTimeline[] = [];
-
- if (deps.deliveryLogGroupPrefix) {
- const clientIds = [...new Set(scenario.eventMix.map((e) => e.clientId))];
- const timelinePromises = clientIds.map(async (clientId) => {
- const logGroupName = `${deps.deliveryLogGroupPrefix}${clientId}`;
- const entries = await queryPerClientRateTimeline(
- deps.cloudWatchClient,
- logGroupName,
- finalStartSec,
- finalEndSec,
- );
- return { clientId, entries };
- });
- const timelines = await Promise.all(timelinePromises);
- perClientRateTimelines.push(
- ...timelines.filter((t) => t.entries.length > 0),
- );
- }
+ const perClientRateTimelines = await collectPerClientRateTimelines(
+ deps,
+ scenario,
+ finalStartSec,
+ finalEndSec,
+ );
- let webhookVerification: WebhookVerificationResult | undefined;
- if (deps.mockWebhookLogGroup) {
- webhookVerification = await verifyMockWebhook(
- deps.cloudWatchClient,
- deps.mockWebhookLogGroup,
- finalStartSec,
- finalEndSec,
- );
- }
+ const webhookVerification = await collectWebhookVerification(
+ deps,
+ finalStartSec,
+ finalEndSec,
+ );
let rateLimitStateAfter: EndpointRateLimitState[] | undefined;
if (elastiCacheDeps) {
rateLimitStateAfter = await dumpRateLimitState(elastiCacheDeps);
}
- await purgeQueues(deps.sqsClient, queueUrls);
- if (elastiCacheDeps) {
- await flushElastiCache(elastiCacheDeps);
+ if (skipPurge) {
+ logger.info("Skipping final queue purge", { queueUrls });
+ } else {
+ await purgeQueues(deps.sqsClient, queueUrls);
+ logger.info("Final queue purge complete", { queueUrls });
}
return {
diff --git a/lambdas/perf-runner-lambda/src/sqs-stats.ts b/lambdas/perf-runner-lambda/src/sqs-stats.ts
new file mode 100644
index 00000000..5d573793
--- /dev/null
+++ b/lambdas/perf-runner-lambda/src/sqs-stats.ts
@@ -0,0 +1,29 @@
+import { GetQueueAttributesCommand, type SQSClient } from "@aws-sdk/client-sqs";
+import type { QueueDepthSample } from "types";
+
+export async function getQueueDepths(
+ client: SQSClient,
+ queueUrls: string[],
+): Promise {
+ const queues = await Promise.all(
+ queueUrls.map(async (url) => {
+ const response = await client.send(
+ new GetQueueAttributesCommand({
+ QueueUrl: url,
+ AttributeNames: [
+ "ApproximateNumberOfMessages",
+ "ApproximateNumberOfMessagesNotVisible",
+ ],
+ }),
+ );
+ const attrs = response.Attributes ?? {};
+ return {
+ queueUrl: url,
+ visible: Number(attrs.ApproximateNumberOfMessages ?? "0"),
+ notVisible: Number(attrs.ApproximateNumberOfMessagesNotVisible ?? "0"),
+ };
+ }),
+ );
+
+ return { timestampMs: Date.now(), queues };
+}
diff --git a/lambdas/perf-runner-lambda/src/types.ts b/lambdas/perf-runner-lambda/src/types.ts
index 4415ef63..29ccfb0c 100644
--- a/lambdas/perf-runner-lambda/src/types.ts
+++ b/lambdas/perf-runner-lambda/src/types.ts
@@ -10,6 +10,8 @@ export type MessageStatusMixEntry = {
factory: "messageStatus";
clientId: string;
messageStatus: MessageStatus;
+ forcedStatusCode?: number;
+ forcedStatusCodeUntilMs?: number;
};
export type ChannelStatusMixEntry = {
@@ -17,6 +19,8 @@ export type ChannelStatusMixEntry = {
factory: "channelStatus";
clientId: string;
channelStatus: ChannelStatus;
+ forcedStatusCode?: number;
+ forcedStatusCodeUntilMs?: number;
};
export type EventMixEntry = MessageStatusMixEntry | ChannelStatusMixEntry;
@@ -24,6 +28,7 @@ export type EventMixEntry = MessageStatusMixEntry | ChannelStatusMixEntry;
export type Phase = {
durationSecs: number;
targetEps: number;
+ eventMix?: EventMixEntry[];
};
export type Scenario = {
@@ -97,6 +102,15 @@ export type WebhookVerificationResult = {
verified: boolean;
};
+export type QueueDepthSample = {
+ timestampMs: number;
+ queues: {
+ queueUrl: string;
+ visible: number;
+ notVisible: number;
+ }[];
+};
+
export type PerformanceResult = {
testId: string;
scenario: Scenario;
@@ -115,6 +129,8 @@ export type PerformanceResult = {
export type PerfRunnerPayload = {
testId: string;
scenario?: Scenario;
+ cloudWatchSettlingMs?: number;
+ skipPurge?: boolean;
};
export type RunnerDeps = {
diff --git a/tests/performance/fixtures/subscriptions/perf-client-1.json b/tests/performance/fixtures/subscriptions/perf-client-1.json
index 1c730b8a..161e28b1 100644
--- a/tests/performance/fixtures/subscriptions/perf-client-1.json
+++ b/tests/performance/fixtures/subscriptions/perf-client-1.json
@@ -36,7 +36,7 @@
},
"invocationEndpoint": "https://REPLACED_BY_TERRAFORM",
"invocationMethod": "POST",
- "invocationRateLimit": 300,
+ "invocationRateLimit": 100,
"targetId": "target-39dbd795-5909-40ab-95b2-4e88b11a2813",
"type": "API"
}
diff --git a/tests/performance/fixtures/subscriptions/perf-client-2.json b/tests/performance/fixtures/subscriptions/perf-client-2.json
index d3c58a93..d519da1e 100644
--- a/tests/performance/fixtures/subscriptions/perf-client-2.json
+++ b/tests/performance/fixtures/subscriptions/perf-client-2.json
@@ -46,7 +46,7 @@
},
"invocationEndpoint": "https://REPLACED_BY_TERRAFORM",
"invocationMethod": "POST",
- "invocationRateLimit": 300,
+ "invocationRateLimit": 5000,
"targetId": "target-e3ccc2c2-7b19-4475-80d5-51a1182d239a",
"type": "API"
}