diff --git a/infrastructure/terraform/components/callbacks/README.md b/infrastructure/terraform/components/callbacks/README.md index 9889ab22..c2060977 100644 --- a/infrastructure/terraform/components/callbacks/README.md +++ b/infrastructure/terraform/components/callbacks/README.md @@ -17,6 +17,8 @@ |------|-------------|------|---------|:--------:| | [applications\_map\_parameter\_name](#input\_applications\_map\_parameter\_name) | SSM Parameter Store path for the clientId-to-applicationData map, where applicationData is currently only the applicationId | `string` | `null` | no | | [aws\_account\_id](#input\_aws\_account\_id) | The AWS Account ID (numeric) | `string` | n/a | yes | +| [cb\_cooldown\_period\_ms](#input\_cb\_cooldown\_period\_ms) | Full block duration after circuit opens, before half-open probes begin (ms) | `number` | `120000` | no | +| [cb\_recovery\_period\_ms](#input\_cb\_recovery\_period\_ms) | Linear ramp-up duration after circuit closes (ms) | `number` | `600000` | no | | [component](#input\_component) | The variable encapsulating the name of this component | `string` | `"callbacks"` | no | | [default\_tags](#input\_default\_tags) | A map of default tags to apply to all taggable resources within the component | `map(string)` | `{}` | no | | [deploy\_mock\_clients](#input\_deploy\_mock\_clients) | Flag to deploy mock webhook lambda for integration testing (test/dev environments only) | `bool` | `false` | no | @@ -40,7 +42,7 @@ | [parent\_acct\_environment](#input\_parent\_acct\_environment) | Name of the environment responsible for the acct resources used, affects things like DNS zone. Useful for named dev environments | `string` | `"main"` | no | | [pipe\_event\_patterns](#input\_pipe\_event\_patterns) | value | `list(string)` | `[]` | no | | [pipe\_log\_level](#input\_pipe\_log\_level) | Log level for the EventBridge Pipe. | `string` | `"ERROR"` | no | -| [pipe\_sqs\_input\_batch\_size](#input\_pipe\_sqs\_input\_batch\_size) | n/a | `number` | `1` | no | +| [pipe\_sqs\_input\_batch\_size](#input\_pipe\_sqs\_input\_batch\_size) | n/a | `number` | `10` | no | | [pipe\_sqs\_max\_batch\_window](#input\_pipe\_sqs\_max\_batch\_window) | n/a | `number` | `2` | no | | [project](#input\_project) | The name of the tfscaffold project | `string` | n/a | yes | | [region](#input\_region) | The AWS Region | `string` | n/a | yes | diff --git a/infrastructure/terraform/components/callbacks/module_client_delivery.tf b/infrastructure/terraform/components/callbacks/module_client_delivery.tf index cce31bd5..e2d0f55a 100644 --- a/infrastructure/terraform/components/callbacks/module_client_delivery.tf +++ b/infrastructure/terraform/components/callbacks/module_client_delivery.tf @@ -41,6 +41,8 @@ module "client_delivery" { mtls_ca_s3_key = local.mtls_ca_s3_key # gitleaks:allow token_bucket_burst_capacity = var.token_bucket_burst_capacity + cb_cooldown_period_ms = var.cb_cooldown_period_ms + cb_recovery_period_ms = var.cb_recovery_period_ms vpc_subnet_ids = try(local.acct.private_subnets[local.bc_name], []) lambda_security_group_id = aws_security_group.https_client_lambda.id diff --git a/infrastructure/terraform/components/callbacks/module_perf_runner_lambda.tf b/infrastructure/terraform/components/callbacks/module_perf_runner_lambda.tf index 7a77c40c..a7bf92db 100644 --- a/infrastructure/terraform/components/callbacks/module_perf_runner_lambda.tf +++ b/infrastructure/terraform/components/callbacks/module_perf_runner_lambda.tf @@ -93,7 +93,23 @@ data "aws_iam_policy_document" "perf_runner_lambda" { resources = [ module.sqs_inbound_event.sqs_queue_arn, - "${module.sqs_inbound_event.sqs_queue_arn}-dlq", + module.sqs_inbound_event.sqs_dlq_arn, + "arn:aws:sqs:${var.region}:${var.aws_account_id}:${local.csi}-*-delivery-queue", + "arn:aws:sqs:${var.region}:${var.aws_account_id}:${local.csi}-*-delivery-dlq-queue", + ] + } + + statement { + sid = "SQSGetQueueAttributes" + effect = "Allow" + + actions = [ + "sqs:GetQueueAttributes", + ] + + resources = [ + module.sqs_inbound_event.sqs_queue_arn, + module.sqs_inbound_event.sqs_dlq_arn, "arn:aws:sqs:${var.region}:${var.aws_account_id}:${local.csi}-*-delivery-queue", "arn:aws:sqs:${var.region}:${var.aws_account_id}:${local.csi}-*-delivery-dlq-queue", ] diff --git a/infrastructure/terraform/components/callbacks/pre.sh b/infrastructure/terraform/components/callbacks/pre.sh index 39eb0817..8ae910e8 100755 --- a/infrastructure/terraform/components/callbacks/pre.sh +++ b/infrastructure/terraform/components/callbacks/pre.sh @@ -12,10 +12,10 @@ deploy_perf_runner="false" for _tfvar_file in \ "${base_path}/etc/group_${group}.tfvars" \ "${base_path}/etc/env_${region}_${environment}.tfvars"; do - if [[ -f "${_tfvar_file}" ]]; then - _val=$(grep -E '^\s*deploy_mock_clients\s*=' "${_tfvar_file}" | tail -1 | sed 's/.*=\s*//;s/\s*$//') + if [ -f "${_tfvar_file}" ]; then + _val=$(grep -E '^\s*deploy_mock_clients\s*=' "${_tfvar_file}" | tail -1 | sed 's/.*=\s*//;s/\s*$//;s/^"//;s/"$//') [ -n "${_val}" ] && deploy_mock_clients="${_val}" - _val=$(grep -E '^\s*deploy_perf_runner\s*=' "${_tfvar_file}" | tail -1 | sed 's/.*=\s*//;s/\s*$//') + _val=$(grep -E '^\s*deploy_perf_runner\s*=' "${_tfvar_file}" | tail -1 | sed 's/.*=\s*//;s/\s*$//;s/^"//;s/"$//') [ -n "${_val}" ] && deploy_perf_runner="${_val}" fi done diff --git a/infrastructure/terraform/components/callbacks/variables.tf b/infrastructure/terraform/components/callbacks/variables.tf index 68e4eafd..7d8bb9ed 100644 --- a/infrastructure/terraform/components/callbacks/variables.tf +++ b/infrastructure/terraform/components/callbacks/variables.tf @@ -102,7 +102,7 @@ variable "pipe_log_level" { variable "pipe_sqs_input_batch_size" { type = number - default = 1 + default = 10 } variable "pipe_sqs_max_batch_window" { @@ -213,3 +213,15 @@ variable "token_bucket_burst_capacity" { description = "Token bucket burst capacity used by the rate limiter" default = 2250 } + +variable "cb_cooldown_period_ms" { + type = number + description = "Full block duration after circuit opens, before half-open probes begin (ms)" + default = 120000 +} + +variable "cb_recovery_period_ms" { + type = number + description = "Linear ramp-up duration after circuit closes (ms)" + default = 600000 +} diff --git a/infrastructure/terraform/modules/client-delivery/README.md b/infrastructure/terraform/modules/client-delivery/README.md index 22b98e26..c2920f71 100644 --- a/infrastructure/terraform/modules/client-delivery/README.md +++ b/infrastructure/terraform/modules/client-delivery/README.md @@ -11,6 +11,8 @@ No requirements. |------|-------------|------|---------|:--------:| | [applications\_map\_parameter\_name](#input\_applications\_map\_parameter\_name) | SSM Parameter Store path for the clientId-to-applicationData map | `string` | n/a | yes | | [aws\_account\_id](#input\_aws\_account\_id) | Account ID | `string` | n/a | yes | +| [cb\_cooldown\_period\_ms](#input\_cb\_cooldown\_period\_ms) | Full block duration after circuit opens, before half-open probes begin (ms) | `number` | `120000` | no | +| [cb\_recovery\_period\_ms](#input\_cb\_recovery\_period\_ms) | Linear ramp-up duration after circuit closes (ms) | `number` | `600000` | no | | [client\_bus\_name](#input\_client\_bus\_name) | EventBridge bus name for subscription rules | `string` | n/a | yes | | [client\_config\_bucket](#input\_client\_config\_bucket) | S3 bucket name containing client subscription configuration | `string` | n/a | yes | | [client\_config\_bucket\_arn](#input\_client\_config\_bucket\_arn) | S3 bucket ARN containing client subscription configuration | `string` | n/a | yes | @@ -24,7 +26,8 @@ No requirements. | [force\_lambda\_code\_deploy](#input\_force\_lambda\_code\_deploy) | Force Lambda code redeployment even when commit tag matches | `bool` | `false` | no | | [group](#input\_group) | The name of the tfscaffold group | `string` | `null` | no | | [kms\_key\_arn](#input\_kms\_key\_arn) | KMS Key ARN for encryption at rest | `string` | n/a | yes | -| [lambda\_batch\_size](#input\_lambda\_batch\_size) | Number of SQS messages per Lambda invocation | `number` | `10` | no | +| [lambda\_batch\_size](#input\_lambda\_batch\_size) | Number of SQS messages per Lambda invocation | `number` | `100` | no | +| [lambda\_batching\_window\_in\_seconds](#input\_lambda\_batching\_window\_in\_seconds) | Maximum time in seconds to wait for a full batch before invoking Lambda. Allows the delivery queue to fill to batch\_size, improving Lambda concurrency utilisation. | `number` | `1` | no | | [lambda\_code\_base\_path](#input\_lambda\_code\_base\_path) | Base path to Lambda source code directories | `string` | n/a | yes | | [lambda\_memory](#input\_lambda\_memory) | Lambda memory allocation in MB | `number` | `256` | no | | [lambda\_s3\_bucket](#input\_lambda\_s3\_bucket) | S3 bucket for Lambda function artefacts | `string` | n/a | yes | diff --git a/infrastructure/terraform/modules/client-delivery/module_https_client_lambda.tf b/infrastructure/terraform/modules/client-delivery/module_https_client_lambda.tf index a1bb48f2..4e26c3a2 100644 --- a/infrastructure/terraform/modules/client-delivery/module_https_client_lambda.tf +++ b/infrastructure/terraform/modules/client-delivery/module_https_client_lambda.tf @@ -53,6 +53,8 @@ module "https_client_lambda" { MTLS_CERT_S3_KEY = var.mtls_cert_s3_key # gitleaks:allow QUEUE_URL = module.sqs_delivery.sqs_queue_url TOKEN_BUCKET_BURST_CAPACITY = tostring(var.token_bucket_burst_capacity) + CB_COOLDOWN_PERIOD_MS = tostring(var.cb_cooldown_period_ms) + CB_RECOVERY_PERIOD_MS = tostring(var.cb_recovery_period_ms) } vpc_config = var.lambda_security_group_id != "" ? { @@ -62,10 +64,11 @@ module "https_client_lambda" { } resource "aws_lambda_event_source_mapping" "sqs_delivery" { - event_source_arn = module.sqs_delivery.sqs_queue_arn - function_name = module.https_client_lambda.function_arn - batch_size = var.lambda_batch_size - enabled = true + event_source_arn = module.sqs_delivery.sqs_queue_arn + function_name = module.https_client_lambda.function_arn + batch_size = var.lambda_batch_size + maximum_batching_window_in_seconds = var.lambda_batching_window_in_seconds + enabled = true function_response_types = ["ReportBatchItemFailures"] } diff --git a/infrastructure/terraform/modules/client-delivery/variables.tf b/infrastructure/terraform/modules/client-delivery/variables.tf index 46f66f45..a8762b13 100644 --- a/infrastructure/terraform/modules/client-delivery/variables.tf +++ b/infrastructure/terraform/modules/client-delivery/variables.tf @@ -118,7 +118,13 @@ variable "log_subscription_role_arn" { variable "lambda_batch_size" { type = number description = "Number of SQS messages per Lambda invocation" - default = 10 + default = 100 +} + +variable "lambda_batching_window_in_seconds" { + type = number + description = "Maximum time in seconds to wait for a full batch before invoking Lambda. Allows the delivery queue to fill to batch_size, improving Lambda concurrency utilisation." + default = 1 } variable "lambda_memory" { @@ -210,3 +216,15 @@ variable "lambda_security_group_id" { description = "Security group ID for the Lambda function" default = "" } + +variable "cb_cooldown_period_ms" { + type = number + description = "Full block duration after circuit opens, before half-open probes begin (ms)" + default = 120000 +} + +variable "cb_recovery_period_ms" { + type = number + description = "Linear ramp-up duration after circuit closes (ms)" + default = 600000 +} diff --git a/lambdas/https-client-lambda/src/__tests__/handler.test.ts b/lambdas/https-client-lambda/src/__tests__/handler.test.ts index 14da4bd2..f337f7c7 100644 --- a/lambdas/https-client-lambda/src/__tests__/handler.test.ts +++ b/lambdas/https-client-lambda/src/__tests__/handler.test.ts @@ -288,6 +288,28 @@ describe("processRecords", () => { expect(mockChangeVisibility).toHaveBeenCalledTimes(1); }); + it("caps visibility delay at SQS maximum (12 hours) for admission-denied batch", async () => { + mockAdmit.mockResolvedValue({ + allowed: false, + reason: "rate_limited", + retryAfterMs: 60_000, + effectiveRate: 10, + }); + + const record = makeRecord({ + attributes: { + ApproximateReceiveCount: "1000", + SentTimestamp: "0", + SenderId: "sender", + ApproximateFirstReceiveTimestamp: "0", + }, + }); + + await processRecords([record]); + + expect(mockChangeVisibility).toHaveBeenCalledWith("receipt-1", 43_200); + }); + it("changes visibility once for transient failure", async () => { mockDeliverPayload.mockResolvedValue({ outcome: "transient_failure", @@ -380,8 +402,7 @@ describe("processRecords", () => { expect(failures).toEqual([{ itemIdentifier: "msg-1" }]); const visibilityDelay = mockChangeVisibility.mock.calls[0]![1] as number; - expect(visibilityDelay).toBeGreaterThanOrEqual(2); - expect(visibilityDelay).toBeLessThanOrEqual(6); + expect(visibilityDelay).toBe(2); expect(mockSendToDlq).not.toHaveBeenCalled(); expect(mockDeliverPayload).not.toHaveBeenCalled(); }); @@ -398,8 +419,7 @@ describe("processRecords", () => { expect(failures).toEqual([{ itemIdentifier: "msg-1" }]); const visibilityDelay = mockChangeVisibility.mock.calls[0]![1] as number; - expect(visibilityDelay).toBeGreaterThanOrEqual(30); - expect(visibilityDelay).toBeLessThanOrEqual(34); + expect(visibilityDelay).toBe(30); expect(mockSendToDlq).not.toHaveBeenCalled(); expect(mockDeliverPayload).not.toHaveBeenCalled(); }); diff --git a/lambdas/https-client-lambda/src/handler.ts b/lambdas/https-client-lambda/src/handler.ts index 782777f2..4c195ed5 100644 --- a/lambdas/https-client-lambda/src/handler.ts +++ b/lambdas/https-client-lambda/src/handler.ts @@ -43,11 +43,12 @@ import { flushMetrics, resetMetrics } from "services/delivery-metrics"; type RedisClientType = Awaited>; const DEFAULT_MAX_RETRY_DURATION_MS = 7_200_000; // 2 hours -const DEFAULT_CONCURRENCY_LIMIT = 5; +const DEFAULT_CONCURRENCY_LIMIT = 10; const BURST_MULTIPLIER = 5; const MAX_BURST_CAPACITY = Number( process.env.TOKEN_BUCKET_BURST_CAPACITY ?? "2250", ); +const SQS_MAX_VISIBILITY_TIMEOUT_SEC = 43_200; // 12 hours const gateConfig: EndpointGateConfig = { // Max tokens the bucket can hold — absorbs short traffic bursts without throttling @@ -118,6 +119,15 @@ async function deliverRecord( clientId: string, ): Promise<{ success: boolean; dlq: boolean }> { const correlationId = extractCorrelationId(message); + const receiveCount = Number(record.attributes.ApproximateReceiveCount); + + logger.info("Processing delivery record", { + correlationId, + receiveCount, + firstReceivedAt: new Date( + Number(record.attributes.ApproximateFirstReceiveTimestamp), + ).toISOString(), + }); const maxRetryDurationMs = target.delivery?.maxRetryDurationSeconds === undefined @@ -147,7 +157,7 @@ async function deliverRecord( message.targetId, correlationId, record.messageId, - Number(record.attributes.ApproximateReceiveCount), + receiveCount, ); const deliveryStart = Date.now(); const result = await deliverPayload(target, payloadJson, signature, agent); @@ -171,7 +181,6 @@ async function deliverRecord( } if (result.outcome === OUTCOME_RATE_LIMITED) { - const receiveCount = Number(record.attributes.ApproximateReceiveCount); recordDeliveryRateLimited(clientId, message.targetId, correlationId); await handleRateLimitedRecord( record, @@ -183,7 +192,6 @@ async function deliverRecord( return { success: true, dlq: false }; } - const receiveCount = Number(record.attributes.ApproximateReceiveCount); const backoffSec = jitteredBackoffSeconds(receiveCount); recordDeliveryFailure( clientId, @@ -209,14 +217,17 @@ async function handleBatchDenied( reason: string, retryAfterMs: number, ): Promise { - const delaySec = Math.ceil(retryAfterMs / 1000); + const baseDelaySec = Math.max(1, Math.ceil(retryAfterMs / 1000)); const correlationIds = batch.messages.map((m) => extractCorrelationId(m)); recordAdmissionDenied(clientId, batch.targetId, reason, correlationIds); const failures: SQSBatchItemFailure[] = []; for (const record of batch.records) { - // eslint-disable-next-line sonarjs/pseudo-random -- jitter for backoff, not security-sensitive - const jitterSec = Math.floor(Math.random() * 5); - await changeVisibility(record.receiptHandle, delaySec + jitterSec); + const receiveCount = Number(record.attributes.ApproximateReceiveCount); + const delaySec = Math.min( + receiveCount * baseDelaySec, + SQS_MAX_VISIBILITY_TIMEOUT_SEC, + ); + await changeVisibility(record.receiptHandle, delaySec); failures.push({ itemIdentifier: record.messageId }); } return { failures, deliveredCount: 0, dlqCount: 0 }; @@ -264,23 +275,30 @@ async function processTargetBatch( const failures: SQSBatchItemFailure[] = []; let processingFailures = 0; + const admittedPairs = admitted.map( + (record, i): { record: SQSRecord; message: CallbackDeliveryMessage } => ({ + record, + message: admittedMessages[i], // eslint-disable-line security/detect-object-injection -- i is the numeric index from .map(), not user input + }), + ); + const deliveryResults = await pMap( - admitted, - async ( + admittedPairs, + async ({ + message, record, - index, - ): Promise<{ record: SQSRecord; success: boolean; dlq: boolean }> => { + }): Promise<{ record: SQSRecord; success: boolean; dlq: boolean }> => { try { const outcome = await deliverRecord( record, - admittedMessages[index], + message, target, applicationId, clientId, ); return { record, success: outcome.success, dlq: outcome.dlq }; } catch (error) { - const correlationId = extractCorrelationId(admittedMessages[index]); + const correlationId = extractCorrelationId(message); logger.error("Failed to process record", { messageId: record.messageId, correlationId, @@ -348,7 +366,9 @@ async function processTargetBatch( rejectedCorrelationIds, ); for (const record of rejected) { - await changeVisibility(record.receiptHandle, 1); + const receiveCount = Number(record.attributes.ApproximateReceiveCount); + const delaySec = receiveCount * 1; + await changeVisibility(record.receiptHandle, delaySec); failures.push({ itemIdentifier: record.messageId }); } } diff --git a/lambdas/mock-webhook-lambda/src/__tests__/index.test.ts b/lambdas/mock-webhook-lambda/src/__tests__/index.test.ts index d7463722..fefa87ed 100644 --- a/lambdas/mock-webhook-lambda/src/__tests__/index.test.ts +++ b/lambdas/mock-webhook-lambda/src/__tests__/index.test.ts @@ -367,6 +367,54 @@ describe("Mock Webhook Lambda", () => { const body = JSON.parse(result.body); expect(body.message).toBe("Forced status 500"); }); + + it("should return forced status code when messageId uses timed format and deadline is in the future", async () => { + const futureMs = Date.now() + 60_000; + const callback = { + data: [ + { + type: "MessageStatus", + attributes: { + messageId: `force-500-until-${futureMs}-some-uuid`, + messageStatus: "delivered", + }, + links: { message: "some-message-link" }, + meta: { idempotencyKey: "some-idempotency-key" }, + }, + ], + }; + + const event = createMockEvent(JSON.stringify(callback)); + const result = await handler(event); + + expect(result.statusCode).toBe(500); + const body = JSON.parse(result.body); + expect(body.message).toBe("Forced status 500"); + }); + + it("should return 200 when messageId uses timed format and deadline has passed", async () => { + const pastMs = Date.now() - 60_000; + const callback = { + data: [ + { + type: "MessageStatus", + attributes: { + messageId: `force-500-until-${pastMs}-some-uuid`, + messageStatus: "delivered", + }, + links: { message: "some-message-link" }, + meta: { idempotencyKey: "some-idempotency-key" }, + }, + ], + }; + + const event = createMockEvent(JSON.stringify(callback)); + const result = await handler(event); + + expect(result.statusCode).toBe(200); + const body = JSON.parse(result.body); + expect(body.message).toBe("Callback received"); + }); }); describe("Logging", () => { diff --git a/lambdas/mock-webhook-lambda/src/index.ts b/lambdas/mock-webhook-lambda/src/index.ts index d0bf582d..ea30c0e6 100644 --- a/lambdas/mock-webhook-lambda/src/index.ts +++ b/lambdas/mock-webhook-lambda/src/index.ts @@ -57,153 +57,232 @@ function isClientCallbackPayload( }); } -async function buildResponse( - event: APIGatewayProxyEvent, -): Promise { - const eventWithContextFields = event as APIGatewayProxyEvent & { - rawPath?: string; - requestContext?: { - http?: { method?: string }; - elb?: { targetGroupArn: string }; - }; +type EventWithContextFields = APIGatewayProxyEvent & { + rawPath?: string; + requestContext?: { + http?: { method?: string }; + elb?: { targetGroupArn: string }; }; - const headers = Object.fromEntries( +}; + +function normalizeHeaders( + event: APIGatewayProxyEvent, +): Record { + return Object.fromEntries( Object.entries(event.headers).map(([k, v]) => [String(k).toLowerCase(), v]), ) as Record; +} - const path = event.path ?? eventWithContextFields.rawPath; +function resolveMtlsStatus( + headers: Record, + isAlbInvocation: boolean, +): boolean { + if (!isAlbInvocation) { + return false; + } - const isAlbInvocation = Boolean(eventWithContextFields.requestContext?.elb); const clientCertPresent = Boolean(headers["x-amzn-mtls-clientcert"]); - let isMtls = false; - if (isAlbInvocation) { - const certResult = verifyClientCertificate( - headers["x-amzn-mtls-clientcert"], - ); - isMtls = certResult.valid; - if (isMtls) { - logger.info("mTLS client certificate verified", { - fingerprint: headers["x-amzn-mtls-clientcert-fingerprint"] ?? "", - isMtls: true, - }); - } else { - logger.info("Mock webhook invoked without mTLS", { - isMtls: false, - clientCertPresent, - reason: certResult.reason, - }); - } + const certResult = verifyClientCertificate(headers["x-amzn-mtls-clientcert"]); + + if (certResult.valid) { + logger.info("mTLS client certificate verified", { + fingerprint: headers["x-amzn-mtls-clientcert-fingerprint"] ?? "", + isMtls: true, + }); + return true; } - logger.info("Mock webhook invoked", { - path, - method: event.httpMethod, - hasBody: Boolean(event.body), - isMtls, + logger.info("Mock webhook invoked without mTLS", { + isMtls: false, clientCertPresent, - "x-api-key": headers["x-api-key"], - "x-hmac-sha256-signature": headers["x-hmac-sha256-signature"], - payload: event.body, + reason: certResult.reason, }); + return false; +} +function authenticateApiKey(headers: Record): { + error: APIGatewayProxyResult | undefined; +} { const expectedApiKey = process.env.API_KEY; const providedApiKey = headers["x-api-key"]; if (!expectedApiKey || providedApiKey !== expectedApiKey) { logger.error("Unauthorized: invalid or missing x-api-key"); return { - statusCode: 401, - body: JSON.stringify({ message: "Unauthorized" }), + error: { + statusCode: 401, + body: JSON.stringify({ message: "Unauthorized" }), + }, }; } - if (!event.body) { - logger.error("No event body received"); + return { error: undefined }; +} - return { +type ParseResult = { + payload: ClientCallbackPayload | undefined; + error: APIGatewayProxyResult | undefined; +}; + +function parseError(response: APIGatewayProxyResult): ParseResult { + return { payload: undefined, error: response }; +} + +function parseAndValidateBody(body: string | null): ParseResult { + if (!body) { + logger.error("No event body received"); + return parseError({ statusCode: 400, body: JSON.stringify({ message: "No body" }), - }; + }); } try { - const parsed = JSON.parse(event.body) as unknown; - + const parsed = JSON.parse(body) as unknown; logger.info("Mock webhook parsed payload", { parsedPayload: parsed }); if (!isClientCallbackPayload(parsed)) { logger.error("Invalid message structure - missing or invalid data array"); - - return { + return parseError({ statusCode: 400, body: JSON.stringify({ message: "Invalid message structure" }), - }; + }); } if (parsed.data.length !== 1) { logger.error("Expected exactly 1 callback item in data array", { receivedCount: parsed.data.length, }); - - return { + return parseError({ statusCode: 400, body: JSON.stringify({ message: `Expected exactly 1 callback item, got ${parsed.data.length}`, }), - }; + }); + } + + return { payload: parsed, error: undefined }; + } catch (error) { + if (error instanceof SyntaxError) { + logger.error("Invalid JSON body", { error: error.message }); + return parseError({ + statusCode: 400, + body: JSON.stringify({ message: "Invalid JSON body" }), + }); } - const [item] = parsed.data; - const correlationId = item.meta.idempotencyKey; - const { messageId } = item.attributes; - const forcedStatusMatch = /^force-(\d{3})-/.exec(messageId); - if (forcedStatusMatch) { - const statusCode = Number(forcedStatusMatch[1]); - logger.info("Forced status code response", { + logger.error("Failed to process callback", { + error: error instanceof Error ? error.message : String(error), + }); + return parseError({ + statusCode: 500, + body: JSON.stringify({ message: "Internal server error" }), + }); + } +} + +function checkForcedStatusResponse( + messageId: string, + correlationId: string, +): { response: APIGatewayProxyResult | undefined } { + const timedMatch = /^force-(\d{3})-until-(\d+)-/.exec(messageId); + if (timedMatch) { + const statusCode = Number(timedMatch[1]); + const until = Number(timedMatch[2]); + if (Date.now() < until) { + logger.info("Timed forced status code response", { correlationId, messageId, statusCode, + until, }); return { - statusCode, - body: JSON.stringify({ message: `Forced status ${statusCode}` }), + response: { + statusCode, + body: JSON.stringify({ message: `Forced status ${statusCode}` }), + }, }; } + return { response: undefined }; + } - logger.info("Callback received", { + const permanentMatch = /^force-(\d{3})-/.exec(messageId); + if (permanentMatch) { + const statusCode = Number(permanentMatch[1]); + logger.info("Forced status code response", { correlationId, messageId, - callbackType: item.type, - path, - isMtls, - apiKey: providedApiKey, - signature: headers["x-hmac-sha256-signature"] ?? "", - payload: JSON.stringify(item), + statusCode, }); - return { - statusCode: 200, - body: JSON.stringify({ message: "Callback received" }), + response: { + statusCode, + body: JSON.stringify({ message: `Forced status ${statusCode}` }), + }, }; - } catch (error) { - if (error instanceof SyntaxError) { - logger.error("Invalid JSON body", { error: error.message }); + } - return { - statusCode: 400, - body: JSON.stringify({ message: "Invalid JSON body" }), - }; - } + return { response: undefined }; +} - logger.error("Failed to process callback", { - error: error instanceof Error ? error.message : String(error), - }); +async function buildResponse( + event: APIGatewayProxyEvent, +): Promise { + const eventWithContextFields = event as EventWithContextFields; + const headers = normalizeHeaders(event); + const path = event.path ?? eventWithContextFields.rawPath; + const isAlbInvocation = Boolean(eventWithContextFields.requestContext?.elb); + const clientCertPresent = Boolean(headers["x-amzn-mtls-clientcert"]); + const isMtls = resolveMtlsStatus(headers, isAlbInvocation); - return { - statusCode: 500, - body: JSON.stringify({ message: "Internal server error" }), - }; + logger.info("Mock webhook invoked", { + path, + method: event.httpMethod, + hasBody: Boolean(event.body), + isMtls, + clientCertPresent, + "x-api-key": headers["x-api-key"], + "x-hmac-sha256-signature": headers["x-hmac-sha256-signature"], + payload: event.body, + }); + + const authResult = authenticateApiKey(headers); + if (authResult.error) { + return authResult.error; + } + + const bodyResult = parseAndValidateBody(event.body); + if (bodyResult.error) { + return bodyResult.error; + } + + const [item] = bodyResult.payload!.data; + const correlationId = item.meta.idempotencyKey; + const { messageId } = item.attributes; + + const { response: forcedResponse } = checkForcedStatusResponse( + messageId, + correlationId, + ); + if (forcedResponse) { + return forcedResponse; } + + logger.info("Callback received", { + correlationId, + messageId, + callbackType: item.type, + path, + isMtls, + apiKey: headers["x-api-key"], + signature: headers["x-hmac-sha256-signature"] ?? "", + payload: JSON.stringify(item), + }); + + return { + statusCode: 200, + body: JSON.stringify({ message: "Callback received" }), + }; } export async function handler( diff --git a/lambdas/perf-runner-lambda/src/__tests__/event-factories.test.ts b/lambdas/perf-runner-lambda/src/__tests__/event-factories.test.ts index 1c877a17..dcecd707 100644 --- a/lambdas/perf-runner-lambda/src/__tests__/event-factories.test.ts +++ b/lambdas/perf-runner-lambda/src/__tests__/event-factories.test.ts @@ -25,6 +25,25 @@ describe("createMessageStatusEvent", () => { expect(a.id).not.toBe(b.id); expect(a.data.messageId).not.toBe(b.data.messageId); }); + + it("prefixes messageId with force-{code}- when forcedStatusCode is set", () => { + const event = createMessageStatusEvent("perf-client-1", "DELIVERED", 500); + + expect(event.data.messageId).toMatch(/^force-500-[0-9a-f-]+$/); + }); + + it("prefixes messageId with force-{code}-until-{timestamp}- when both forced fields are set", () => { + const until = Date.now() + 60_000; + const event = createMessageStatusEvent( + "perf-client-1", + "DELIVERED", + 500, + until, + ); + + const prefix = `force-500-until-${until}-`; + expect(event.data.messageId.startsWith(prefix)).toBe(true); + }); }); describe("createChannelStatusEvent", () => { @@ -39,6 +58,25 @@ describe("createChannelStatusEvent", () => { expect(event.data.messageId).toBeTruthy(); expect(event.id).toBeTruthy(); }); + + it("prefixes messageId with force-{code}- when forcedStatusCode is set", () => { + const event = createChannelStatusEvent("perf-client-2", "DELIVERED", 503); + + expect(event.data.messageId).toMatch(/^force-503-[0-9a-f-]+$/); + }); + + it("prefixes messageId with force-{code}-until-{timestamp}- when both forced fields are set", () => { + const until = Date.now() + 60_000; + const event = createChannelStatusEvent( + "perf-client-2", + "DELIVERED", + 503, + until, + ); + + const prefix = `force-503-until-${until}-`; + expect(event.data.messageId.startsWith(prefix)).toBe(true); + }); }); describe("createEvent", () => { @@ -65,4 +103,19 @@ describe("createEvent", () => { expect(event.type).toBe(EventTypes.CHANNEL_STATUS_PUBLISHED); expect(event.data.clientId).toBe("perf-client-2"); }); + + it("forwards forcedStatusCode and forcedStatusCodeUntilMs from the mix entry", () => { + const until = Date.now() + 60_000; + const event = createEvent({ + weight: 1, + factory: "messageStatus", + clientId: "perf-client-1", + messageStatus: "DELIVERED", + forcedStatusCode: 500, + forcedStatusCodeUntilMs: until, + }); + + const prefix = `force-500-until-${until}-`; + expect(event.data.messageId.startsWith(prefix)).toBe(true); + }); }); diff --git a/lambdas/perf-runner-lambda/src/__tests__/index.test.ts b/lambdas/perf-runner-lambda/src/__tests__/index.test.ts index 3c33bfd6..feebbe46 100644 --- a/lambdas/perf-runner-lambda/src/__tests__/index.test.ts +++ b/lambdas/perf-runner-lambda/src/__tests__/index.test.ts @@ -72,6 +72,8 @@ describe("handler", () => { iamUsername: "test-user", region: "eu-west-2", }), + undefined, + undefined, ); }); @@ -89,6 +91,8 @@ describe("handler", () => { "custom-test", undefined, expect.anything(), + undefined, + undefined, ); }); @@ -135,6 +139,8 @@ describe("handler", () => { "no-prefix-test", undefined, expect.anything(), + undefined, + undefined, ); }); @@ -151,6 +157,8 @@ describe("handler", () => { "no-cache-test", undefined, undefined, + undefined, + undefined, ); }); @@ -165,6 +173,36 @@ describe("handler", () => { "webhook-test", undefined, expect.anything(), + undefined, + undefined, + ); + }); + + it("passes cloudWatchSettlingMs when provided in the event", async () => { + await handler({ testId: "settling-test", cloudWatchSettlingMs: 5000 }); + + expect(mockRunPerformanceTest).toHaveBeenCalledWith( + expect.anything(), + expect.anything(), + "settling-test", + undefined, + expect.anything(), + 5000, + undefined, + ); + }); + + it("passes skipPurge when provided in the event", async () => { + await handler({ testId: "skip-purge-test", skipPurge: true }); + + expect(mockRunPerformanceTest).toHaveBeenCalledWith( + expect.anything(), + expect.anything(), + "skip-purge-test", + undefined, + expect.anything(), + undefined, + true, ); }); }); diff --git a/lambdas/perf-runner-lambda/src/__tests__/purge.test.ts b/lambdas/perf-runner-lambda/src/__tests__/purge.test.ts index 14bcf247..52832910 100644 --- a/lambdas/perf-runner-lambda/src/__tests__/purge.test.ts +++ b/lambdas/perf-runner-lambda/src/__tests__/purge.test.ts @@ -30,7 +30,7 @@ describe("deriveQueueUrls", () => { expect(urls).toEqual([ "https://sqs.eu-west-2.amazonaws.com/123456789/nhs-dev-callbacks-inbound-event-queue", - "https://sqs.eu-west-2.amazonaws.com/123456789/nhs-dev-callbacks-inbound-event-dlq-queue", + "https://sqs.eu-west-2.amazonaws.com/123456789/nhs-dev-callbacks-inbound-event-dlq", "https://sqs.eu-west-2.amazonaws.com/123456789/nhs-dev-callbacks-perf-client-1-delivery-queue", "https://sqs.eu-west-2.amazonaws.com/123456789/nhs-dev-callbacks-perf-client-1-delivery-dlq-queue", "https://sqs.eu-west-2.amazonaws.com/123456789/nhs-dev-callbacks-perf-client-2-delivery-queue", @@ -61,7 +61,7 @@ describe("deriveQueueUrls", () => { expect(urls).toEqual([ "https://sqs.eu-west-2.amazonaws.com/123456789/nhs-dev-callbacks-inbound-event-queue", - "https://sqs.eu-west-2.amazonaws.com/123456789/nhs-dev-callbacks-inbound-event-dlq-queue", + "https://sqs.eu-west-2.amazonaws.com/123456789/nhs-dev-callbacks-inbound-event-dlq", "https://sqs.eu-west-2.amazonaws.com/123456789/nhs-dev-callbacks-perf-client-1-delivery-queue", "https://sqs.eu-west-2.amazonaws.com/123456789/nhs-dev-callbacks-perf-client-1-delivery-dlq-queue", ]); @@ -88,20 +88,8 @@ describe("purgeQueues", () => { expect(mockSend).toHaveBeenCalledTimes(2); }); - it("ignores QueueDoesNotExist errors gracefully", async () => { - const nonExistentError = Object.assign(new Error("Queue does not exist"), { - name: "QueueDoesNotExist", - }); - mockSend.mockRejectedValueOnce(nonExistentError); - - await expect( - purgeQueues(mockSqsClient, ["https://sqs.example.invalid/missing"]), - ).resolves.toBeUndefined(); - }); - - it("rethrows non-QueueDoesNotExist errors", async () => { - const otherError = new Error("Access denied"); - mockSend.mockRejectedValueOnce(otherError); + it("throws when a purge fails", async () => { + mockSend.mockRejectedValueOnce(new Error("Access denied")); await expect( purgeQueues(mockSqsClient, ["https://sqs.example.invalid/queue"]), diff --git a/lambdas/perf-runner-lambda/src/__tests__/runner.test.ts b/lambdas/perf-runner-lambda/src/__tests__/runner.test.ts index 622e98a4..9d7acfe5 100644 --- a/lambdas/perf-runner-lambda/src/__tests__/runner.test.ts +++ b/lambdas/perf-runner-lambda/src/__tests__/runner.test.ts @@ -12,6 +12,7 @@ import { defaultSleep, runPerformanceTest } from "runner"; import { generatePhaseLoad } from "sqs"; import { deriveQueueUrls, purgeQueues } from "purge"; +import { getQueueDepths } from "sqs-stats"; import { dumpRateLimitState, flushElastiCache } from "elasticache"; import { verifyMockWebhook } from "webhook-verify"; import { @@ -26,6 +27,7 @@ jest.mock("cloudwatch"); jest.mock("purge"); jest.mock("elasticache"); jest.mock("webhook-verify"); +jest.mock("sqs-stats"); const mockGeneratePhaseLoad = jest.mocked(generatePhaseLoad); const mockQueryMetricsSnapshot = jest.mocked(queryMetricsSnapshot); @@ -41,6 +43,7 @@ const mockPurgeQueues = jest.mocked(purgeQueues); const mockFlushElastiCache = jest.mocked(flushElastiCache); const mockDumpRateLimitState = jest.mocked(dumpRateLimitState); const mockVerifyMockWebhook = jest.mocked(verifyMockWebhook); +const mockGetQueueDepths = jest.mocked(getQueueDepths); const immediateSleep = jest.fn().mockResolvedValue(undefined); @@ -118,6 +121,16 @@ beforeEach(() => { receivedCallbacks: 0, verified: false, }); + mockGetQueueDepths.mockResolvedValue({ + timestampMs: Date.now(), + queues: [ + { + queueUrl: "https://sqs.example.invalid/inbound-event-queue", + visible: 100, + notVisible: 10, + }, + ], + }); immediateSleep.mockResolvedValue(undefined); }); @@ -557,6 +570,22 @@ describe("runPerformanceTest", () => { expect(mockPurgeQueues).toHaveBeenCalledTimes(2); }); + it("skips both purges when skipPurge is true", async () => { + mockQueryMetricsSnapshot.mockResolvedValue(null); + + await runPerformanceTest( + deps, + scenario, + "test-skip-purge", + immediateSleep, + undefined, + undefined, + true, + ); + + expect(mockPurgeQueues).not.toHaveBeenCalled(); + }); + it("flushes ElastiCache before and after when deps are provided", async () => { mockQueryMetricsSnapshot.mockResolvedValue(null); const elastiCacheDeps = { @@ -574,7 +603,7 @@ describe("runPerformanceTest", () => { elastiCacheDeps, ); - expect(mockFlushElastiCache).toHaveBeenCalledTimes(2); + expect(mockFlushElastiCache).toHaveBeenCalledTimes(1); expect(mockFlushElastiCache).toHaveBeenCalledWith(elastiCacheDeps); }); @@ -682,6 +711,37 @@ describe("runPerformanceTest", () => { expect(mockVerifyMockWebhook).not.toHaveBeenCalled(); expect(result.webhookVerification).toBeUndefined(); }); + + it("samples queue depths during polling and at final snapshot", async () => { + mockQueryMetricsSnapshot.mockResolvedValue(null); + + await runPerformanceTest( + deps, + scenario, + "test-queue-depths", + immediateSleep, + ); + + expect(mockGetQueueDepths).toHaveBeenCalledTimes(2); // one mid-test, one final + expect(mockGetQueueDepths).toHaveBeenCalledWith(deps.sqsClient, [ + "https://sqs.example.invalid/inbound-event-queue", + ]); + }); + + it("uses the provided cloudWatchSettlingMs instead of the default", async () => { + mockQueryMetricsSnapshot.mockResolvedValue(null); + + await runPerformanceTest( + deps, + scenario, + "test-settling", + immediateSleep, + undefined, + 5000, + ); + + expect(immediateSleep).toHaveBeenCalledWith(5000); + }); }); describe("defaultSleep", () => { diff --git a/lambdas/perf-runner-lambda/src/__tests__/sqs-stats.test.ts b/lambdas/perf-runner-lambda/src/__tests__/sqs-stats.test.ts new file mode 100644 index 00000000..8d2900b8 --- /dev/null +++ b/lambdas/perf-runner-lambda/src/__tests__/sqs-stats.test.ts @@ -0,0 +1,75 @@ +import type { SQSClient } from "@aws-sdk/client-sqs"; +import { getQueueDepths } from "sqs-stats"; + +describe("getQueueDepths", () => { + const mockSend = jest.fn(); + const mockSqsClient = { send: mockSend } as unknown as SQSClient; + + beforeEach(() => { + jest.clearAllMocks(); + }); + + it("returns visible and notVisible counts for each queue URL", async () => { + mockSend + .mockResolvedValueOnce({ + Attributes: { + ApproximateNumberOfMessages: "42", + ApproximateNumberOfMessagesNotVisible: "8", + }, + }) + .mockResolvedValueOnce({ + Attributes: { + ApproximateNumberOfMessages: "10", + ApproximateNumberOfMessagesNotVisible: "2", + }, + }); + + const result = await getQueueDepths(mockSqsClient, [ + "https://sqs.example.invalid/queue-a", + "https://sqs.example.invalid/queue-b", + ]); + + expect(result.queues).toHaveLength(2); + expect(result.queues[0]).toEqual({ + queueUrl: "https://sqs.example.invalid/queue-a", + visible: 42, + notVisible: 8, + }); + expect(result.queues[1]).toEqual({ + queueUrl: "https://sqs.example.invalid/queue-b", + visible: 10, + notVisible: 2, + }); + expect(result.timestampMs).toBeGreaterThan(0); + }); + + it("defaults to 0 when attributes are missing", async () => { + mockSend.mockResolvedValueOnce({ Attributes: undefined }); + + const result = await getQueueDepths(mockSqsClient, [ + "https://sqs.example.invalid/queue-a", + ]); + + expect(result.queues[0].visible).toBe(0); + expect(result.queues[0].notVisible).toBe(0); + }); + + it("sends GetQueueAttributesCommand with correct attributes requested", async () => { + mockSend.mockResolvedValueOnce({ Attributes: {} }); + + await getQueueDepths(mockSqsClient, [ + "https://sqs.example.invalid/queue-a", + ]); + + const command = mockSend.mock.calls[0][0] as { + input: { QueueUrl: string; AttributeNames: string[] }; + }; + expect(command.input.QueueUrl).toBe("https://sqs.example.invalid/queue-a"); + expect(command.input.AttributeNames).toContain( + "ApproximateNumberOfMessages", + ); + expect(command.input.AttributeNames).toContain( + "ApproximateNumberOfMessagesNotVisible", + ); + }); +}); diff --git a/lambdas/perf-runner-lambda/src/event-factories.ts b/lambdas/perf-runner-lambda/src/event-factories.ts index 6f39add9..c32cb9f7 100644 --- a/lambdas/perf-runner-lambda/src/event-factories.ts +++ b/lambdas/perf-runner-lambda/src/event-factories.ts @@ -8,11 +8,32 @@ import type { import { EventTypes } from "@nhs-notify-client-callbacks/models"; import type { EventMixEntry } from "types"; +function buildMessageId( + uuid: string, + forcedStatusCode?: number, + forcedStatusCodeUntilMs?: number, +): string { + if (forcedStatusCode === undefined) { + return uuid; + } + if (forcedStatusCodeUntilMs === undefined) { + return `force-${forcedStatusCode}-${uuid}`; + } + return `force-${forcedStatusCode}-until-${forcedStatusCodeUntilMs}-${uuid}`; +} + export function createMessageStatusEvent( clientId: string, messageStatus: MessageStatus, + forcedStatusCode?: number, + forcedStatusCodeUntilMs?: number, ): StatusPublishEvent { - const messageId = crypto.randomUUID(); + const uuid = crypto.randomUUID(); + const messageId = buildMessageId( + uuid, + forcedStatusCode, + forcedStatusCodeUntilMs, + ); const data: MessageStatusData = { clientId, @@ -47,8 +68,15 @@ export function createMessageStatusEvent( export function createChannelStatusEvent( clientId: string, channelStatus: ChannelStatus, + forcedStatusCode?: number, + forcedStatusCodeUntilMs?: number, ): StatusPublishEvent { - const messageId = crypto.randomUUID(); + const uuid = crypto.randomUUID(); + const messageId = buildMessageId( + uuid, + forcedStatusCode, + forcedStatusCodeUntilMs, + ); const data: ChannelStatusData = { clientId, @@ -80,8 +108,18 @@ export function createChannelStatusEvent( export function createEvent(entry: EventMixEntry): StatusPublishEvent { if (entry.factory === "messageStatus") { - return createMessageStatusEvent(entry.clientId, entry.messageStatus); + return createMessageStatusEvent( + entry.clientId, + entry.messageStatus, + entry.forcedStatusCode, + entry.forcedStatusCodeUntilMs, + ); } - return createChannelStatusEvent(entry.clientId, entry.channelStatus); + return createChannelStatusEvent( + entry.clientId, + entry.channelStatus, + entry.forcedStatusCode, + entry.forcedStatusCodeUntilMs, + ); } diff --git a/lambdas/perf-runner-lambda/src/index.ts b/lambdas/perf-runner-lambda/src/index.ts index 5974627b..51054b3a 100644 --- a/lambdas/perf-runner-lambda/src/index.ts +++ b/lambdas/perf-runner-lambda/src/index.ts @@ -14,7 +14,12 @@ const logger = new Logger(); export async function handler( event: PerfRunnerPayload, ): Promise { - const { scenario = DEFAULT_SCENARIO, testId } = event; + const { + cloudWatchSettlingMs, + scenario = DEFAULT_SCENARIO, + skipPurge, + testId, + } = event; const region = process.env.AWS_REGION ?? "eu-west-2"; const queueUrl = process.env.INBOUND_QUEUE_URL; @@ -64,6 +69,8 @@ export async function handler( testId, undefined, elastiCacheDeps, + cloudWatchSettlingMs, + skipPurge, ); logger.info("Performance test completed", { testId }); diff --git a/lambdas/perf-runner-lambda/src/purge.ts b/lambdas/perf-runner-lambda/src/purge.ts index e363e706..3f7cb097 100644 --- a/lambdas/perf-runner-lambda/src/purge.ts +++ b/lambdas/perf-runner-lambda/src/purge.ts @@ -11,7 +11,7 @@ export function deriveQueueUrls( return [ inboundQueueUrl, - `${baseUrl}inbound-event-dlq-queue`, + `${baseUrl}inbound-event-dlq`, ...clientIds.flatMap((id) => [ `${baseUrl}${id}-delivery-queue`, `${baseUrl}${id}-delivery-dlq-queue`, @@ -23,18 +23,9 @@ export async function purgeQueues( client: SQSClient, queueUrls: string[], ): Promise { - const results = await Promise.allSettled( + await Promise.all( queueUrls.map((url) => client.send(new PurgeQueueCommand({ QueueUrl: url })), ), ); - - for (const result of results) { - if (result.status === "rejected") { - const error = result.reason as { name?: string }; - if (error.name !== "QueueDoesNotExist") { - throw result.reason as Error; - } - } - } } diff --git a/lambdas/perf-runner-lambda/src/runner.ts b/lambdas/perf-runner-lambda/src/runner.ts index 7a5b5ee6..86ccd435 100644 --- a/lambdas/perf-runner-lambda/src/runner.ts +++ b/lambdas/perf-runner-lambda/src/runner.ts @@ -11,8 +11,10 @@ import type { Scenario, WebhookVerificationResult, } from "types"; +import { Logger } from "@nhs-notify-client-callbacks/logger"; import { generatePhaseLoad } from "sqs"; import { deriveQueueUrls, purgeQueues } from "purge"; +import { getQueueDepths } from "sqs-stats"; import { dumpRateLimitState, flushElastiCache } from "elasticache"; import { verifyMockWebhook } from "webhook-verify"; import { @@ -22,6 +24,8 @@ import { queryPerClientRateTimeline, } from "cloudwatch"; +const logger = new Logger(); + const CLOUDWATCH_SETTLING_MS = 60_000; export const defaultSleep = (ms: number): Promise => @@ -82,12 +86,55 @@ async function collectSnapshots( return cbStartSec; } +async function collectPerClientRateTimelines( + deps: RunnerDeps, + scenario: Scenario, + startSec: number, + endSec: number, +): Promise { + if (!deps.deliveryLogGroupPrefix) { + return []; + } + + const clientIds = [...new Set(scenario.eventMix.map((e) => e.clientId))]; + const timelinePromises = clientIds.map(async (clientId) => { + const logGroupName = `${deps.deliveryLogGroupPrefix}${clientId}`; + const entries = await queryPerClientRateTimeline( + deps.cloudWatchClient, + logGroupName, + startSec, + endSec, + ); + return { clientId, entries }; + }); + const timelines = await Promise.all(timelinePromises); + return timelines.filter((t) => t.entries.length > 0); +} + +async function collectWebhookVerification( + deps: RunnerDeps, + startSec: number, + endSec: number, +): Promise { + if (!deps.mockWebhookLogGroup) { + return undefined; + } + return verifyMockWebhook( + deps.cloudWatchClient, + deps.mockWebhookLogGroup, + startSec, + endSec, + ); +} + export async function runPerformanceTest( deps: RunnerDeps, scenario: Scenario, testId: string, sleepFn: (ms: number) => Promise = defaultSleep, elastiCacheDeps?: ElastiCacheDeps, + cloudWatchSettlingMs: number = CLOUDWATCH_SETTLING_MS, + skipPurge = false, ): Promise { if (scenario.eventMix.length === 0) { throw new Error("scenario.eventMix must contain at least one entry"); @@ -109,8 +156,15 @@ export async function runPerformanceTest( const testStartMs = Date.now(); const queueUrls = deriveQueueUrls(deps.queueUrl, scenario); - await purgeQueues(deps.sqsClient, queueUrls); + + if (skipPurge) { + logger.info("Skipping queue purge", { queueUrls }); + } else { + logger.info("Purging queues", { queueUrls }); + await purgeQueues(deps.sqsClient, queueUrls); + } if (elastiCacheDeps) { + logger.info("Clearing rate limit and circuit breaker state"); await flushElastiCache(elastiCacheDeps); } @@ -148,6 +202,9 @@ export async function runPerformanceTest( lastCbSnapshotSec, out, ); + logger.info("Sampling queue depths", { queueUrls }); + const depthSample = await getQueueDepths(deps.sqsClient, queueUrls); + logger.info("Queue depth sample", { queues: depthSample.queues }); if (!stopPolling) { await sleepFn(scenario.metricsIntervalSecs * 1000); @@ -157,20 +214,35 @@ export async function runPerformanceTest( const pollPromise = pollLoop(); - for (const phase of scenario.phases) { + for (const [index, phase] of scenario.phases.entries()) { + logger.info("Starting phase", { + index, + targetEps: phase.targetEps, + durationSecs: phase.durationSecs, + }); const result = await generatePhaseLoad( deps.sqsClient, deps.queueUrl, phase, - scenario.eventMix, + phase.eventMix ?? scenario.eventMix, ); + logger.info("Phase complete", { + index, + targetEps: result.targetEps, + achievedEps: result.achievedEps, + sent: result.sent, + durationMs: result.durationMs, + }); phaseResults.push(result); } stopPolling = true; await pollPromise; - await sleepFn(CLOUDWATCH_SETTLING_MS); + logger.info("Waiting for CloudWatch logs to settle", { + settlingMs: cloudWatchSettlingMs, + }); + await sleepFn(cloudWatchSettlingMs); const finalStartSec = Math.floor(testStartMs / 1000); const finalEndSec = Math.floor(Date.now() / 1000); @@ -183,45 +255,33 @@ export async function runPerformanceTest( lastCbSnapshotSec, out, ); + logger.info("Sampling queue depths", { queueUrls }); + const finalDepthSample = await getQueueDepths(deps.sqsClient, queueUrls); + logger.info("Final queue depth sample", { queues: finalDepthSample.queues }); - const perClientRateTimelines: PerClientRateTimeline[] = []; - - if (deps.deliveryLogGroupPrefix) { - const clientIds = [...new Set(scenario.eventMix.map((e) => e.clientId))]; - const timelinePromises = clientIds.map(async (clientId) => { - const logGroupName = `${deps.deliveryLogGroupPrefix}${clientId}`; - const entries = await queryPerClientRateTimeline( - deps.cloudWatchClient, - logGroupName, - finalStartSec, - finalEndSec, - ); - return { clientId, entries }; - }); - const timelines = await Promise.all(timelinePromises); - perClientRateTimelines.push( - ...timelines.filter((t) => t.entries.length > 0), - ); - } + const perClientRateTimelines = await collectPerClientRateTimelines( + deps, + scenario, + finalStartSec, + finalEndSec, + ); - let webhookVerification: WebhookVerificationResult | undefined; - if (deps.mockWebhookLogGroup) { - webhookVerification = await verifyMockWebhook( - deps.cloudWatchClient, - deps.mockWebhookLogGroup, - finalStartSec, - finalEndSec, - ); - } + const webhookVerification = await collectWebhookVerification( + deps, + finalStartSec, + finalEndSec, + ); let rateLimitStateAfter: EndpointRateLimitState[] | undefined; if (elastiCacheDeps) { rateLimitStateAfter = await dumpRateLimitState(elastiCacheDeps); } - await purgeQueues(deps.sqsClient, queueUrls); - if (elastiCacheDeps) { - await flushElastiCache(elastiCacheDeps); + if (skipPurge) { + logger.info("Skipping final queue purge", { queueUrls }); + } else { + await purgeQueues(deps.sqsClient, queueUrls); + logger.info("Final queue purge complete", { queueUrls }); } return { diff --git a/lambdas/perf-runner-lambda/src/sqs-stats.ts b/lambdas/perf-runner-lambda/src/sqs-stats.ts new file mode 100644 index 00000000..5d573793 --- /dev/null +++ b/lambdas/perf-runner-lambda/src/sqs-stats.ts @@ -0,0 +1,29 @@ +import { GetQueueAttributesCommand, type SQSClient } from "@aws-sdk/client-sqs"; +import type { QueueDepthSample } from "types"; + +export async function getQueueDepths( + client: SQSClient, + queueUrls: string[], +): Promise { + const queues = await Promise.all( + queueUrls.map(async (url) => { + const response = await client.send( + new GetQueueAttributesCommand({ + QueueUrl: url, + AttributeNames: [ + "ApproximateNumberOfMessages", + "ApproximateNumberOfMessagesNotVisible", + ], + }), + ); + const attrs = response.Attributes ?? {}; + return { + queueUrl: url, + visible: Number(attrs.ApproximateNumberOfMessages ?? "0"), + notVisible: Number(attrs.ApproximateNumberOfMessagesNotVisible ?? "0"), + }; + }), + ); + + return { timestampMs: Date.now(), queues }; +} diff --git a/lambdas/perf-runner-lambda/src/types.ts b/lambdas/perf-runner-lambda/src/types.ts index 4415ef63..29ccfb0c 100644 --- a/lambdas/perf-runner-lambda/src/types.ts +++ b/lambdas/perf-runner-lambda/src/types.ts @@ -10,6 +10,8 @@ export type MessageStatusMixEntry = { factory: "messageStatus"; clientId: string; messageStatus: MessageStatus; + forcedStatusCode?: number; + forcedStatusCodeUntilMs?: number; }; export type ChannelStatusMixEntry = { @@ -17,6 +19,8 @@ export type ChannelStatusMixEntry = { factory: "channelStatus"; clientId: string; channelStatus: ChannelStatus; + forcedStatusCode?: number; + forcedStatusCodeUntilMs?: number; }; export type EventMixEntry = MessageStatusMixEntry | ChannelStatusMixEntry; @@ -24,6 +28,7 @@ export type EventMixEntry = MessageStatusMixEntry | ChannelStatusMixEntry; export type Phase = { durationSecs: number; targetEps: number; + eventMix?: EventMixEntry[]; }; export type Scenario = { @@ -97,6 +102,15 @@ export type WebhookVerificationResult = { verified: boolean; }; +export type QueueDepthSample = { + timestampMs: number; + queues: { + queueUrl: string; + visible: number; + notVisible: number; + }[]; +}; + export type PerformanceResult = { testId: string; scenario: Scenario; @@ -115,6 +129,8 @@ export type PerformanceResult = { export type PerfRunnerPayload = { testId: string; scenario?: Scenario; + cloudWatchSettlingMs?: number; + skipPurge?: boolean; }; export type RunnerDeps = { diff --git a/tests/performance/fixtures/subscriptions/perf-client-1.json b/tests/performance/fixtures/subscriptions/perf-client-1.json index 1c730b8a..161e28b1 100644 --- a/tests/performance/fixtures/subscriptions/perf-client-1.json +++ b/tests/performance/fixtures/subscriptions/perf-client-1.json @@ -36,7 +36,7 @@ }, "invocationEndpoint": "https://REPLACED_BY_TERRAFORM", "invocationMethod": "POST", - "invocationRateLimit": 300, + "invocationRateLimit": 100, "targetId": "target-39dbd795-5909-40ab-95b2-4e88b11a2813", "type": "API" } diff --git a/tests/performance/fixtures/subscriptions/perf-client-2.json b/tests/performance/fixtures/subscriptions/perf-client-2.json index d3c58a93..d519da1e 100644 --- a/tests/performance/fixtures/subscriptions/perf-client-2.json +++ b/tests/performance/fixtures/subscriptions/perf-client-2.json @@ -46,7 +46,7 @@ }, "invocationEndpoint": "https://REPLACED_BY_TERRAFORM", "invocationMethod": "POST", - "invocationRateLimit": 300, + "invocationRateLimit": 5000, "targetId": "target-e3ccc2c2-7b19-4475-80d5-51a1182d239a", "type": "API" }