Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion infrastructure/terraform/components/callbacks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
|------|-------------|------|---------|:--------:|
| <a name="input_applications_map_parameter_name"></a> [applications\_map\_parameter\_name](#input\_applications\_map\_parameter\_name) | SSM Parameter Store path for the clientId-to-applicationData map, where applicationData is currently only the applicationId | `string` | `null` | no |
| <a name="input_aws_account_id"></a> [aws\_account\_id](#input\_aws\_account\_id) | The AWS Account ID (numeric) | `string` | n/a | yes |
| <a name="input_cb_cooldown_period_ms"></a> [cb\_cooldown\_period\_ms](#input\_cb\_cooldown\_period\_ms) | Full block duration after circuit opens, before half-open probes begin (ms) | `number` | `120000` | no |
| <a name="input_cb_recovery_period_ms"></a> [cb\_recovery\_period\_ms](#input\_cb\_recovery\_period\_ms) | Linear ramp-up duration after circuit closes (ms) | `number` | `600000` | no |
| <a name="input_component"></a> [component](#input\_component) | The variable encapsulating the name of this component | `string` | `"callbacks"` | no |
| <a name="input_default_tags"></a> [default\_tags](#input\_default\_tags) | A map of default tags to apply to all taggable resources within the component | `map(string)` | `{}` | no |
| <a name="input_deploy_mock_clients"></a> [deploy\_mock\_clients](#input\_deploy\_mock\_clients) | Flag to deploy mock webhook lambda for integration testing (test/dev environments only) | `bool` | `false` | no |
Expand All @@ -40,7 +42,7 @@
| <a name="input_parent_acct_environment"></a> [parent\_acct\_environment](#input\_parent\_acct\_environment) | Name of the environment responsible for the acct resources used, affects things like DNS zone. Useful for named dev environments | `string` | `"main"` | no |
| <a name="input_pipe_event_patterns"></a> [pipe\_event\_patterns](#input\_pipe\_event\_patterns) | value | `list(string)` | `[]` | no |
| <a name="input_pipe_log_level"></a> [pipe\_log\_level](#input\_pipe\_log\_level) | Log level for the EventBridge Pipe. | `string` | `"ERROR"` | no |
| <a name="input_pipe_sqs_input_batch_size"></a> [pipe\_sqs\_input\_batch\_size](#input\_pipe\_sqs\_input\_batch\_size) | n/a | `number` | `1` | no |
| <a name="input_pipe_sqs_input_batch_size"></a> [pipe\_sqs\_input\_batch\_size](#input\_pipe\_sqs\_input\_batch\_size) | n/a | `number` | `10` | no |
| <a name="input_pipe_sqs_max_batch_window"></a> [pipe\_sqs\_max\_batch\_window](#input\_pipe\_sqs\_max\_batch\_window) | n/a | `number` | `2` | no |
| <a name="input_project"></a> [project](#input\_project) | The name of the tfscaffold project | `string` | n/a | yes |
| <a name="input_region"></a> [region](#input\_region) | The AWS Region | `string` | n/a | yes |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ module "client_delivery" {
mtls_ca_s3_key = local.mtls_ca_s3_key # gitleaks:allow

token_bucket_burst_capacity = var.token_bucket_burst_capacity
cb_cooldown_period_ms = var.cb_cooldown_period_ms
cb_recovery_period_ms = var.cb_recovery_period_ms

vpc_subnet_ids = try(local.acct.private_subnets[local.bc_name], [])
lambda_security_group_id = aws_security_group.https_client_lambda.id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,23 @@ data "aws_iam_policy_document" "perf_runner_lambda" {

resources = [
module.sqs_inbound_event.sqs_queue_arn,
"${module.sqs_inbound_event.sqs_queue_arn}-dlq",
module.sqs_inbound_event.sqs_dlq_arn,
"arn:aws:sqs:${var.region}:${var.aws_account_id}:${local.csi}-*-delivery-queue",
"arn:aws:sqs:${var.region}:${var.aws_account_id}:${local.csi}-*-delivery-dlq-queue",
]
}

statement {
sid = "SQSGetQueueAttributes"
effect = "Allow"

actions = [
"sqs:GetQueueAttributes",
]

resources = [
module.sqs_inbound_event.sqs_queue_arn,
module.sqs_inbound_event.sqs_dlq_arn,
"arn:aws:sqs:${var.region}:${var.aws_account_id}:${local.csi}-*-delivery-queue",
"arn:aws:sqs:${var.region}:${var.aws_account_id}:${local.csi}-*-delivery-dlq-queue",
]
Expand Down
6 changes: 3 additions & 3 deletions infrastructure/terraform/components/callbacks/pre.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ deploy_perf_runner="false"
for _tfvar_file in \
"${base_path}/etc/group_${group}.tfvars" \
"${base_path}/etc/env_${region}_${environment}.tfvars"; do
if [[ -f "${_tfvar_file}" ]]; then
_val=$(grep -E '^\s*deploy_mock_clients\s*=' "${_tfvar_file}" | tail -1 | sed 's/.*=\s*//;s/\s*$//')
if [ -f "${_tfvar_file}" ]; then
_val=$(grep -E '^\s*deploy_mock_clients\s*=' "${_tfvar_file}" | tail -1 | sed 's/.*=\s*//;s/\s*$//;s/^"//;s/"$//')
[ -n "${_val}" ] && deploy_mock_clients="${_val}"
_val=$(grep -E '^\s*deploy_perf_runner\s*=' "${_tfvar_file}" | tail -1 | sed 's/.*=\s*//;s/\s*$//')
_val=$(grep -E '^\s*deploy_perf_runner\s*=' "${_tfvar_file}" | tail -1 | sed 's/.*=\s*//;s/\s*$//;s/^"//;s/"$//')
[ -n "${_val}" ] && deploy_perf_runner="${_val}"
fi
done
Expand Down
14 changes: 13 additions & 1 deletion infrastructure/terraform/components/callbacks/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ variable "pipe_log_level" {

variable "pipe_sqs_input_batch_size" {
type = number
default = 1
default = 10
}

variable "pipe_sqs_max_batch_window" {
Expand Down Expand Up @@ -213,3 +213,15 @@ variable "token_bucket_burst_capacity" {
description = "Token bucket burst capacity used by the rate limiter"
default = 2250
}

variable "cb_cooldown_period_ms" {
type = number
description = "Full block duration after circuit opens, before half-open probes begin (ms)"
default = 120000
}

variable "cb_recovery_period_ms" {
type = number
description = "Linear ramp-up duration after circuit closes (ms)"
default = 600000
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ No requirements.
|------|-------------|------|---------|:--------:|
| <a name="input_applications_map_parameter_name"></a> [applications\_map\_parameter\_name](#input\_applications\_map\_parameter\_name) | SSM Parameter Store path for the clientId-to-applicationData map | `string` | n/a | yes |
| <a name="input_aws_account_id"></a> [aws\_account\_id](#input\_aws\_account\_id) | Account ID | `string` | n/a | yes |
| <a name="input_cb_cooldown_period_ms"></a> [cb\_cooldown\_period\_ms](#input\_cb\_cooldown\_period\_ms) | Full block duration after circuit opens, before half-open probes begin (ms) | `number` | `120000` | no |
| <a name="input_cb_recovery_period_ms"></a> [cb\_recovery\_period\_ms](#input\_cb\_recovery\_period\_ms) | Linear ramp-up duration after circuit closes (ms) | `number` | `600000` | no |
| <a name="input_client_bus_name"></a> [client\_bus\_name](#input\_client\_bus\_name) | EventBridge bus name for subscription rules | `string` | n/a | yes |
| <a name="input_client_config_bucket"></a> [client\_config\_bucket](#input\_client\_config\_bucket) | S3 bucket name containing client subscription configuration | `string` | n/a | yes |
| <a name="input_client_config_bucket_arn"></a> [client\_config\_bucket\_arn](#input\_client\_config\_bucket\_arn) | S3 bucket ARN containing client subscription configuration | `string` | n/a | yes |
Expand All @@ -24,7 +26,8 @@ No requirements.
| <a name="input_force_lambda_code_deploy"></a> [force\_lambda\_code\_deploy](#input\_force\_lambda\_code\_deploy) | Force Lambda code redeployment even when commit tag matches | `bool` | `false` | no |
| <a name="input_group"></a> [group](#input\_group) | The name of the tfscaffold group | `string` | `null` | no |
| <a name="input_kms_key_arn"></a> [kms\_key\_arn](#input\_kms\_key\_arn) | KMS Key ARN for encryption at rest | `string` | n/a | yes |
| <a name="input_lambda_batch_size"></a> [lambda\_batch\_size](#input\_lambda\_batch\_size) | Number of SQS messages per Lambda invocation | `number` | `10` | no |
| <a name="input_lambda_batch_size"></a> [lambda\_batch\_size](#input\_lambda\_batch\_size) | Number of SQS messages per Lambda invocation | `number` | `100` | no |
| <a name="input_lambda_batching_window_in_seconds"></a> [lambda\_batching\_window\_in\_seconds](#input\_lambda\_batching\_window\_in\_seconds) | Maximum time in seconds to wait for a full batch before invoking Lambda. Allows the delivery queue to fill to batch\_size, improving Lambda concurrency utilisation. | `number` | `1` | no |
| <a name="input_lambda_code_base_path"></a> [lambda\_code\_base\_path](#input\_lambda\_code\_base\_path) | Base path to Lambda source code directories | `string` | n/a | yes |
| <a name="input_lambda_memory"></a> [lambda\_memory](#input\_lambda\_memory) | Lambda memory allocation in MB | `number` | `256` | no |
| <a name="input_lambda_s3_bucket"></a> [lambda\_s3\_bucket](#input\_lambda\_s3\_bucket) | S3 bucket for Lambda function artefacts | `string` | n/a | yes |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ module "https_client_lambda" {
MTLS_CERT_S3_KEY = var.mtls_cert_s3_key # gitleaks:allow
QUEUE_URL = module.sqs_delivery.sqs_queue_url
TOKEN_BUCKET_BURST_CAPACITY = tostring(var.token_bucket_burst_capacity)
CB_COOLDOWN_PERIOD_MS = tostring(var.cb_cooldown_period_ms)
CB_RECOVERY_PERIOD_MS = tostring(var.cb_recovery_period_ms)
}

vpc_config = var.lambda_security_group_id != "" ? {
Expand All @@ -62,10 +64,11 @@ module "https_client_lambda" {
}

resource "aws_lambda_event_source_mapping" "sqs_delivery" {
event_source_arn = module.sqs_delivery.sqs_queue_arn
function_name = module.https_client_lambda.function_arn
batch_size = var.lambda_batch_size
enabled = true
event_source_arn = module.sqs_delivery.sqs_queue_arn
function_name = module.https_client_lambda.function_arn
batch_size = var.lambda_batch_size
maximum_batching_window_in_seconds = var.lambda_batching_window_in_seconds
enabled = true

function_response_types = ["ReportBatchItemFailures"]
}
20 changes: 19 additions & 1 deletion infrastructure/terraform/modules/client-delivery/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,13 @@ variable "log_subscription_role_arn" {
variable "lambda_batch_size" {
type = number
description = "Number of SQS messages per Lambda invocation"
default = 10
default = 100
}

variable "lambda_batching_window_in_seconds" {
type = number
description = "Maximum time in seconds to wait for a full batch before invoking Lambda. Allows the delivery queue to fill to batch_size, improving Lambda concurrency utilisation."
default = 1
Comment thread
mjewildnhs marked this conversation as resolved.
}

variable "lambda_memory" {
Expand Down Expand Up @@ -210,3 +216,15 @@ variable "lambda_security_group_id" {
description = "Security group ID for the Lambda function"
default = ""
}

variable "cb_cooldown_period_ms" {
type = number
description = "Full block duration after circuit opens, before half-open probes begin (ms)"
default = 120000
}

variable "cb_recovery_period_ms" {
type = number
description = "Linear ramp-up duration after circuit closes (ms)"
default = 600000
}
28 changes: 24 additions & 4 deletions lambdas/https-client-lambda/src/__tests__/handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,28 @@ describe("processRecords", () => {
expect(mockChangeVisibility).toHaveBeenCalledTimes(1);
});

it("caps visibility delay at SQS maximum (12 hours) for admission-denied batch", async () => {
mockAdmit.mockResolvedValue({
allowed: false,
reason: "rate_limited",
retryAfterMs: 60_000,
effectiveRate: 10,
});

const record = makeRecord({
attributes: {
ApproximateReceiveCount: "1000",
SentTimestamp: "0",
SenderId: "sender",
ApproximateFirstReceiveTimestamp: "0",
},
});

await processRecords([record]);

expect(mockChangeVisibility).toHaveBeenCalledWith("receipt-1", 43_200);
});

it("changes visibility once for transient failure", async () => {
mockDeliverPayload.mockResolvedValue({
outcome: "transient_failure",
Expand Down Expand Up @@ -380,8 +402,7 @@ describe("processRecords", () => {

expect(failures).toEqual([{ itemIdentifier: "msg-1" }]);
const visibilityDelay = mockChangeVisibility.mock.calls[0]![1] as number;
expect(visibilityDelay).toBeGreaterThanOrEqual(2);
expect(visibilityDelay).toBeLessThanOrEqual(6);
expect(visibilityDelay).toBe(2);
expect(mockSendToDlq).not.toHaveBeenCalled();
expect(mockDeliverPayload).not.toHaveBeenCalled();
});
Expand All @@ -398,8 +419,7 @@ describe("processRecords", () => {

expect(failures).toEqual([{ itemIdentifier: "msg-1" }]);
const visibilityDelay = mockChangeVisibility.mock.calls[0]![1] as number;
expect(visibilityDelay).toBeGreaterThanOrEqual(30);
expect(visibilityDelay).toBeLessThanOrEqual(34);
expect(visibilityDelay).toBe(30);
expect(mockSendToDlq).not.toHaveBeenCalled();
expect(mockDeliverPayload).not.toHaveBeenCalled();
});
Expand Down
50 changes: 35 additions & 15 deletions lambdas/https-client-lambda/src/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ import { flushMetrics, resetMetrics } from "services/delivery-metrics";
type RedisClientType = Awaited<ReturnType<typeof getRedisClient>>;

const DEFAULT_MAX_RETRY_DURATION_MS = 7_200_000; // 2 hours
const DEFAULT_CONCURRENCY_LIMIT = 5;
const DEFAULT_CONCURRENCY_LIMIT = 10;
const BURST_MULTIPLIER = 5;
const MAX_BURST_CAPACITY = Number(
process.env.TOKEN_BUCKET_BURST_CAPACITY ?? "2250",
);
const SQS_MAX_VISIBILITY_TIMEOUT_SEC = 43_200; // 12 hours

const gateConfig: EndpointGateConfig = {
// Max tokens the bucket can hold — absorbs short traffic bursts without throttling
Expand Down Expand Up @@ -118,6 +119,15 @@ async function deliverRecord(
clientId: string,
): Promise<{ success: boolean; dlq: boolean }> {
const correlationId = extractCorrelationId(message);
const receiveCount = Number(record.attributes.ApproximateReceiveCount);

logger.info("Processing delivery record", {
correlationId,
receiveCount,
firstReceivedAt: new Date(
Number(record.attributes.ApproximateFirstReceiveTimestamp),
).toISOString(),
});

const maxRetryDurationMs =
target.delivery?.maxRetryDurationSeconds === undefined
Expand Down Expand Up @@ -147,7 +157,7 @@ async function deliverRecord(
message.targetId,
correlationId,
record.messageId,
Number(record.attributes.ApproximateReceiveCount),
receiveCount,
);
const deliveryStart = Date.now();
const result = await deliverPayload(target, payloadJson, signature, agent);
Expand All @@ -171,7 +181,6 @@ async function deliverRecord(
}

if (result.outcome === OUTCOME_RATE_LIMITED) {
const receiveCount = Number(record.attributes.ApproximateReceiveCount);
recordDeliveryRateLimited(clientId, message.targetId, correlationId);
await handleRateLimitedRecord(
record,
Expand All @@ -183,7 +192,6 @@ async function deliverRecord(
return { success: true, dlq: false };
}

const receiveCount = Number(record.attributes.ApproximateReceiveCount);
const backoffSec = jitteredBackoffSeconds(receiveCount);
recordDeliveryFailure(
clientId,
Expand All @@ -209,14 +217,17 @@ async function handleBatchDenied(
reason: string,
retryAfterMs: number,
): Promise<TargetBatchResult> {
const delaySec = Math.ceil(retryAfterMs / 1000);
const baseDelaySec = Math.max(1, Math.ceil(retryAfterMs / 1000));
const correlationIds = batch.messages.map((m) => extractCorrelationId(m));
recordAdmissionDenied(clientId, batch.targetId, reason, correlationIds);
const failures: SQSBatchItemFailure[] = [];
for (const record of batch.records) {
// eslint-disable-next-line sonarjs/pseudo-random -- jitter for backoff, not security-sensitive
const jitterSec = Math.floor(Math.random() * 5);
await changeVisibility(record.receiptHandle, delaySec + jitterSec);
const receiveCount = Number(record.attributes.ApproximateReceiveCount);
const delaySec = Math.min(
receiveCount * baseDelaySec,
SQS_MAX_VISIBILITY_TIMEOUT_SEC,
);
await changeVisibility(record.receiptHandle, delaySec);
Comment thread
mjewildnhs marked this conversation as resolved.
failures.push({ itemIdentifier: record.messageId });
}
return { failures, deliveredCount: 0, dlqCount: 0 };
Expand Down Expand Up @@ -264,23 +275,30 @@ async function processTargetBatch(
const failures: SQSBatchItemFailure[] = [];
let processingFailures = 0;

const admittedPairs = admitted.map(
(record, i): { record: SQSRecord; message: CallbackDeliveryMessage } => ({
record,
message: admittedMessages[i], // eslint-disable-line security/detect-object-injection -- i is the numeric index from .map(), not user input
}),
);

const deliveryResults = await pMap(
admitted,
async (
admittedPairs,
async ({
message,
record,
index,
): Promise<{ record: SQSRecord; success: boolean; dlq: boolean }> => {
}): Promise<{ record: SQSRecord; success: boolean; dlq: boolean }> => {
try {
const outcome = await deliverRecord(
record,
admittedMessages[index],
message,
target,
applicationId,
clientId,
);
return { record, success: outcome.success, dlq: outcome.dlq };
} catch (error) {
const correlationId = extractCorrelationId(admittedMessages[index]);
const correlationId = extractCorrelationId(message);
logger.error("Failed to process record", {
messageId: record.messageId,
correlationId,
Expand Down Expand Up @@ -348,7 +366,9 @@ async function processTargetBatch(
rejectedCorrelationIds,
);
for (const record of rejected) {
await changeVisibility(record.receiptHandle, 1);
const receiveCount = Number(record.attributes.ApproximateReceiveCount);
const delaySec = receiveCount * 1;
await changeVisibility(record.receiptHandle, delaySec);
Comment thread
mjewildnhs marked this conversation as resolved.
failures.push({ itemIdentifier: record.messageId });
}
}
Expand Down
48 changes: 48 additions & 0 deletions lambdas/mock-webhook-lambda/src/__tests__/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,54 @@ describe("Mock Webhook Lambda", () => {
const body = JSON.parse(result.body);
expect(body.message).toBe("Forced status 500");
});

it("should return forced status code when messageId uses timed format and deadline is in the future", async () => {
const futureMs = Date.now() + 60_000;
const callback = {
data: [
{
type: "MessageStatus",
attributes: {
messageId: `force-500-until-${futureMs}-some-uuid`,
messageStatus: "delivered",
},
links: { message: "some-message-link" },
meta: { idempotencyKey: "some-idempotency-key" },
},
],
};

const event = createMockEvent(JSON.stringify(callback));
const result = await handler(event);

expect(result.statusCode).toBe(500);
const body = JSON.parse(result.body);
expect(body.message).toBe("Forced status 500");
});

it("should return 200 when messageId uses timed format and deadline has passed", async () => {
const pastMs = Date.now() - 60_000;
const callback = {
data: [
{
type: "MessageStatus",
attributes: {
messageId: `force-500-until-${pastMs}-some-uuid`,
messageStatus: "delivered",
},
links: { message: "some-message-link" },
meta: { idempotencyKey: "some-idempotency-key" },
},
],
};

const event = createMockEvent(JSON.stringify(callback));
const result = await handler(event);

expect(result.statusCode).toBe(200);
const body = JSON.parse(result.body);
expect(body.message).toBe("Callback received");
});
});

describe("Logging", () => {
Expand Down
Loading
Loading