From 09bfb382ed1daee151a224b5164c08164315c4a8 Mon Sep 17 00:00:00 2001 From: Philipp Page Date: Tue, 30 Jun 2026 15:53:42 +0200 Subject: [PATCH 1/2] feat(large-messages): add optional allowedBuckets allowlist Add an opt-in bucket allowlist to the v2 large messages utility. The S3 bucket in the message pointer is controlled by the sender, so this gives consumers fine-grained, in-application control over which buckets the utility may read from and delete, complementing IAM permissions. Configure via LargeMessageConfig.withAllowedBuckets(Set). When a non-empty allowlist is set, a message whose pointer names a bucket outside the allowlist is rejected with LargeMessageProcessingException before any S3 interaction. An empty allowlist (default) preserves the existing behavior, so the change is backward compatible and applies to both the @LargeMessage annotation and the functional API. Includes unit tests, an e2e handler and negative e2e test, and docs with a security note, resource-scoped IAM example, and usage examples. --- docs/utilities/large_messages.md | 94 +++++++++++++++++++ .../demo/kafka/protobuf/ProtobufProduct.java | 29 ++++-- .../protobuf/ProtobufProductOrBuilder.java | 2 +- .../protobuf/ProtobufProductOuterClass.java | 8 +- .../handlers/largemessage-restricted/pom.xml | 72 ++++++++++++++ .../lambda/powertools/e2e/Function.java | 88 +++++++++++++++++ .../src/main/resources/log4j2.xml | 16 ++++ powertools-e2e-tests/handlers/pom.xml | 1 + .../lambda/powertools/LargeMessageE2ET.java | 56 +++++++++++ .../largemessages/LargeMessageConfig.java | 40 ++++++++ .../internal/LargeMessageProcessor.java | 21 +++++ .../largemessages/LargeMessageConfigTest.java | 47 +++++++++- .../largemessages/LargeMessagesTest.java | 35 +++++++ .../internal/LargeMessageAspectTest.java | 31 ++++++ .../LargeMessageProcessorFactoryTest.java | 8 +- 15 files changed, 524 insertions(+), 24 deletions(-) create mode 100644 powertools-e2e-tests/handlers/largemessage-restricted/pom.xml create mode 100644 powertools-e2e-tests/handlers/largemessage-restricted/src/main/java/software/amazon/lambda/powertools/e2e/Function.java create mode 100644 powertools-e2e-tests/handlers/largemessage-restricted/src/main/resources/log4j2.xml diff --git a/docs/utilities/large_messages.md b/docs/utilities/large_messages.md index 9d14c8228..510ef99e9 100644 --- a/docs/utilities/large_messages.md +++ b/docs/utilities/large_messages.md @@ -174,6 +174,40 @@ on the S3 bucket used for the large messages offloading: - `s3:GetObject` - `s3:DeleteObject` +### Security: Restrict which buckets the utility accesses + +???+ warning "The message sender controls the `s3BucketName` in the payload pointer" + The large message body contains a pointer of the form + `["software.amazon.payloadoffloading.PayloadS3Pointer",{"s3BucketName":"...","s3Key":"..."}]`. + The utility reads `s3BucketName` from this pointer and fetches the object using your Lambda function's + own IAM credentials. Any sender that can write to your queue, or publish to a topic that fans out to it, + can name any bucket your execution role can reach. Your function then reads from that bucket. Delete-after-read + is enabled by default (`deleteS3Object=true`), so your function also deletes the object it read. + +Apply both of the following controls: + +1. **Configure an allowlist** with `LargeMessageConfig.init().withAllowedBuckets(...)`. The utility rejects any + message whose pointer names a bucket outside the allowlist before it reads or deletes from S3. See + [Restricting allowed buckets](#restricting-allowed-buckets). +2. **Scope your IAM policy** to the offload bucket. Grant `s3:GetObject` and `s3:DeleteObject` on the bucket + ARN instead of `*`: + + ```json + { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "s3:GetObject", + "s3:DeleteObject" + ], + "Resource": "arn:aws:s3:::/*" + } + ] + } + ``` + ## Usage You can use the Large Messages utility with either the `@LargeMessage` annotation or the functional API. @@ -481,6 +515,66 @@ If you need to customize this `S3Client`, you can leverage the `LargeMessageConf } ``` +## Restricting allowed buckets + +The message sender controls the bucket named in the message pointer (see +[the security note in Permissions](#security-restrict-which-buckets-the-utility-accesses)). Use the +`LargeMessageConfig` singleton to restrict which S3 buckets the utility reads from and deletes. This works with +both the annotation and the functional API. When you configure a non-empty allowlist, the utility rejects any +message whose pointer names a bucket outside the allowlist. It throws a `LargeMessageProcessingException` before +it reads or deletes from S3. An empty allowlist, the default, applies no restriction. + +=== "@LargeMessage annotation" + ```java hl_lines="6" + import software.amazon.lambda.powertools.largemessages.LargeMessage; + + public class SqsMessageHandler implements RequestHandler { + + public SqsMessageHandler() { + LargeMessageConfig.init().withAllowedBuckets(Collections.singleton("my-offload-bucket")); + } + + @Override + public SQSBatchResponse handleRequest(SQSEvent event, Context context) { + for (SQSMessage message: event.getRecords()) { + // throws LargeMessageProcessingException if the pointer's bucket is not in the allowlist + processRawMessage(message, context); + } + return SQSBatchResponse.builder().build(); + } + + @LargeMessage + private void processRawMessage(SQSEvent.SQSMessage sqsMessage, Context context) { + // sqsMessage.getBody() will contain the content of the S3 object + } + } + ``` + +=== "Functional API" + ```java hl_lines="6" + import software.amazon.lambda.powertools.largemessages.LargeMessages; + + public class SqsMessageHandler implements RequestHandler { + + public SqsMessageHandler() { + LargeMessageConfig.init().withAllowedBuckets(Collections.singleton("my-offload-bucket")); + } + + @Override + public SQSBatchResponse handleRequest(SQSEvent event, Context context) { + for (SQSMessage message: event.getRecords()) { + // throws LargeMessageProcessingException if the pointer's bucket is not in the allowlist + LargeMessages.processLargeMessage(message, this::processRawMessage); + } + return SQSBatchResponse.builder().build(); + } + + private void processRawMessage(SQSEvent.SQSMessage sqsMessage) { + // sqsMessage.getBody() will contain the content of the S3 object + } + } + ``` + ## Migration from the SQS Large Message utility - Replace the dependency in maven / gradle: `powertools-sqs` ==> `powertools-large-messages` diff --git a/examples/powertools-examples-kafka/src/main/java/org/demo/kafka/protobuf/ProtobufProduct.java b/examples/powertools-examples-kafka/src/main/java/org/demo/kafka/protobuf/ProtobufProduct.java index 99c490869..8168f7801 100644 --- a/examples/powertools-examples-kafka/src/main/java/org/demo/kafka/protobuf/ProtobufProduct.java +++ b/examples/powertools-examples-kafka/src/main/java/org/demo/kafka/protobuf/ProtobufProduct.java @@ -1,7 +1,7 @@ // Generated by the protocol buffer compiler. DO NOT EDIT! // NO CHECKED-IN PROTOBUF GENCODE // source: ProtobufProduct.proto -// Protobuf Java Version: 4.33.4 +// Protobuf Java Version: 4.35.0 package org.demo.kafka.protobuf; @@ -18,8 +18,8 @@ public final class ProtobufProduct extends com.google.protobuf.RuntimeVersion.validateProtobufGencodeVersion( com.google.protobuf.RuntimeVersion.RuntimeDomain.PUBLIC, /* major= */ 4, - /* minor= */ 33, - /* patch= */ 4, + /* minor= */ 35, + /* patch= */ 0, /* suffix= */ "", "ProtobufProduct"); } @@ -36,6 +36,11 @@ private ProtobufProduct() { return org.demo.kafka.protobuf.ProtobufProductOuterClass.internal_static_org_demo_kafka_protobuf_ProtobufProduct_descriptor; } + @java.lang.Override + public com.google.protobuf.Descriptors.Descriptor getDescriptorForType() { + return org.demo.kafka.protobuf.ProtobufProductOuterClass.internal_static_org_demo_kafka_protobuf_ProtobufProduct_descriptor; + } + @java.lang.Override protected com.google.protobuf.GeneratedMessage.FieldAccessorTable internalGetFieldAccessorTable() { @@ -130,13 +135,8 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) } getUnknownFields().writeTo(output); } - - @java.lang.Override - public int getSerializedSize() { - int size = memoizedSize; - if (size != -1) return size; - - size = 0; + private int computeSerializedSize_0() { + int size = 0; if (id_ != 0) { size += com.google.protobuf.CodedOutputStream .computeInt32Size(1, id_); @@ -148,6 +148,15 @@ public int getSerializedSize() { size += com.google.protobuf.CodedOutputStream .computeDoubleSize(3, price_); } + return size; + } + @java.lang.Override + public int getSerializedSize() { + int size = memoizedSize; + if (size != -1) return size; + + size = 0; + size += computeSerializedSize_0(); size += getUnknownFields().getSerializedSize(); memoizedSize = size; return size; diff --git a/examples/powertools-examples-kafka/src/main/java/org/demo/kafka/protobuf/ProtobufProductOrBuilder.java b/examples/powertools-examples-kafka/src/main/java/org/demo/kafka/protobuf/ProtobufProductOrBuilder.java index 462d3a66d..c84b8810e 100644 --- a/examples/powertools-examples-kafka/src/main/java/org/demo/kafka/protobuf/ProtobufProductOrBuilder.java +++ b/examples/powertools-examples-kafka/src/main/java/org/demo/kafka/protobuf/ProtobufProductOrBuilder.java @@ -1,7 +1,7 @@ // Generated by the protocol buffer compiler. DO NOT EDIT! // NO CHECKED-IN PROTOBUF GENCODE // source: ProtobufProduct.proto -// Protobuf Java Version: 4.33.4 +// Protobuf Java Version: 4.35.0 package org.demo.kafka.protobuf; diff --git a/examples/powertools-examples-kafka/src/main/java/org/demo/kafka/protobuf/ProtobufProductOuterClass.java b/examples/powertools-examples-kafka/src/main/java/org/demo/kafka/protobuf/ProtobufProductOuterClass.java index 3edc97e12..9c40cefed 100644 --- a/examples/powertools-examples-kafka/src/main/java/org/demo/kafka/protobuf/ProtobufProductOuterClass.java +++ b/examples/powertools-examples-kafka/src/main/java/org/demo/kafka/protobuf/ProtobufProductOuterClass.java @@ -1,7 +1,7 @@ // Generated by the protocol buffer compiler. DO NOT EDIT! // NO CHECKED-IN PROTOBUF GENCODE // source: ProtobufProduct.proto -// Protobuf Java Version: 4.33.4 +// Protobuf Java Version: 4.35.0 package org.demo.kafka.protobuf; @@ -12,8 +12,8 @@ private ProtobufProductOuterClass() {} com.google.protobuf.RuntimeVersion.validateProtobufGencodeVersion( com.google.protobuf.RuntimeVersion.RuntimeDomain.PUBLIC, /* major= */ 4, - /* minor= */ 33, - /* patch= */ 4, + /* minor= */ 35, + /* patch= */ 0, /* suffix= */ "", "ProtobufProductOuterClass"); } @@ -36,7 +36,7 @@ public static void registerAllExtensions( getDescriptor() { return descriptor; } - private static com.google.protobuf.Descriptors.FileDescriptor + private static final com.google.protobuf.Descriptors.FileDescriptor descriptor; static { java.lang.String[] descriptorData = { diff --git a/powertools-e2e-tests/handlers/largemessage-restricted/pom.xml b/powertools-e2e-tests/handlers/largemessage-restricted/pom.xml new file mode 100644 index 000000000..b3c338928 --- /dev/null +++ b/powertools-e2e-tests/handlers/largemessage-restricted/pom.xml @@ -0,0 +1,72 @@ + + 4.0.0 + + + software.amazon.lambda + e2e-test-handlers-parent + 2.10.0 + + + e2e-test-handler-largemessage-restricted + jar + E2E test handler – Large message (allowedBuckets restricted) + + + + software.amazon.awssdk + dynamodb + + + software.amazon.lambda + powertools-large-messages + + + software.amazon.lambda + powertools-logging-log4j + + + com.amazonaws + aws-lambda-java-events + + + org.aspectj + aspectjrt + + + + + + + dev.aspectj + aspectj-maven-plugin + + ${maven.compiler.source} + ${maven.compiler.target} + ${maven.compiler.target} + + + software.amazon.lambda + powertools-large-messages + + + software.amazon.lambda + powertools-logging + + + + + + + compile + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + + diff --git a/powertools-e2e-tests/handlers/largemessage-restricted/src/main/java/software/amazon/lambda/powertools/e2e/Function.java b/powertools-e2e-tests/handlers/largemessage-restricted/src/main/java/software/amazon/lambda/powertools/e2e/Function.java new file mode 100644 index 000000000..36108a647 --- /dev/null +++ b/powertools-e2e-tests/handlers/largemessage-restricted/src/main/java/software/amazon/lambda/powertools/e2e/Function.java @@ -0,0 +1,88 @@ +/* + * Copyright 2023 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package software.amazon.lambda.powertools.e2e; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; +import software.amazon.awssdk.utils.BinaryUtils; +import software.amazon.awssdk.utils.Md5Utils; +import software.amazon.lambda.powertools.largemessages.LargeMessage; +import software.amazon.lambda.powertools.largemessages.LargeMessageConfig; +import software.amazon.lambda.powertools.logging.Logging; + +public class Function implements RequestHandler { + + // The real offload bucket is created per test run with a random name (largemessagebucket). + // By pinning the allowlist to a fixed, non-matching bucket name, every offloaded message must be + // rejected before any S3 read or delete, exercising the bucket-allowlist security check. + private static final String DISALLOWED_BUCKET = "powertools-e2e-disallowed-bucket"; + + private static final String TABLE_FOR_ASYNC_TESTS = System.getenv("TABLE_FOR_ASYNC_TESTS"); + private DynamoDbClient client; + + public Function() { + LargeMessageConfig.init().withAllowedBuckets(Collections.singleton(DISALLOWED_BUCKET)); + if (client == null) { + client = DynamoDbClient.builder() + .httpClient(UrlConnectionHttpClient.builder().build()) + .region(Region.of(System.getenv("AWS_REGION"))) + .build(); + } + } + + @Logging(logEvent = true) + public SQSBatchResponse handleRequest(SQSEvent event, Context context) { + for (SQSMessage message : event.getRecords()) { + processRawMessage(message, context); + } + return SQSBatchResponse.builder().build(); + } + + @LargeMessage + private void processRawMessage(SQSMessage sqsMessage, Context context) { + String bodyMD5 = md5(sqsMessage.getBody()); + if (!sqsMessage.getMd5OfBody().equals(bodyMD5)) { + throw new SecurityException( + String.format("message digest does not match, expected %s, got %s", sqsMessage.getMd5OfBody(), + bodyMD5)); + } + + Map item = new HashMap<>(); + item.put("functionName", AttributeValue.builder().s(context.getFunctionName()).build()); + item.put("id", AttributeValue.builder().s(sqsMessage.getMessageId()).build()); + item.put("bodyMD5", AttributeValue.builder().s(bodyMD5).build()); + item.put("bodySize", + AttributeValue.builder().n(String.valueOf(sqsMessage.getBody().getBytes(StandardCharsets.UTF_8).length)) + .build()); + + client.putItem(PutItemRequest.builder().tableName(TABLE_FOR_ASYNC_TESTS).item(item).build()); + } + + private String md5(String message) { + return BinaryUtils.toHex(Md5Utils.computeMD5Hash(message.getBytes(StandardCharsets.UTF_8))); + } +} diff --git a/powertools-e2e-tests/handlers/largemessage-restricted/src/main/resources/log4j2.xml b/powertools-e2e-tests/handlers/largemessage-restricted/src/main/resources/log4j2.xml new file mode 100644 index 000000000..c717a7153 --- /dev/null +++ b/powertools-e2e-tests/handlers/largemessage-restricted/src/main/resources/log4j2.xml @@ -0,0 +1,16 @@ + + + + + + + + + + + + + + + + diff --git a/powertools-e2e-tests/handlers/pom.xml b/powertools-e2e-tests/handlers/pom.xml index 0c3d0128c..c22687e77 100644 --- a/powertools-e2e-tests/handlers/pom.xml +++ b/powertools-e2e-tests/handlers/pom.xml @@ -28,6 +28,7 @@ batch largemessage largemessage-functional + largemessage-restricted largemessage_idempotent logging-log4j logging-log4j-fluent-api diff --git a/powertools-e2e-tests/src/test/java/software/amazon/lambda/powertools/LargeMessageE2ET.java b/powertools-e2e-tests/src/test/java/software/amazon/lambda/powertools/LargeMessageE2ET.java index 0de2dca60..ea748d8c4 100644 --- a/powertools-e2e-tests/src/test/java/software/amazon/lambda/powertools/LargeMessageE2ET.java +++ b/powertools-e2e-tests/src/test/java/software/amazon/lambda/powertools/LargeMessageE2ET.java @@ -16,6 +16,7 @@ import org.apache.commons.io.IOUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; @@ -35,6 +36,7 @@ import software.amazon.awssdk.services.dynamodb.model.QueryRequest; import software.amazon.awssdk.services.dynamodb.model.QueryResponse; import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; import software.amazon.lambda.powertools.testutils.DataNotReadyException; @@ -166,6 +168,60 @@ void bigSQSMessageOffloadedToS3_shouldLoadFromS3(String pathToFunction) throws I assertThat(items.get(0).get("bodyMD5").s()).isEqualTo("22bde5e7b05fa80bc7be45bdd4bc6c75"); } + @Test + @Timeout(value = 5, unit = TimeUnit.MINUTES) + // Sonar java:S2925 (Thread.sleep in tests): suppressed intentionally. This is an end-to-end test of the real + // asynchronous SQS -> Lambda delivery path, asserting a side effect that does NOT happen (the S3 object is not + // deleted). There is no positive completion signal to await on, so we must give the asynchronous flow time to run + // before asserting the absence. Replacing the wait with a synchronous invoke would degrade this into a near + // unit-test that no longer exercises the event source nor the delete-after-read side effect. + @SuppressWarnings("java:S2925") + void bigSQSMessage_withDisallowedBucket_shouldNotProcessNorDelete() throws IOException, InterruptedException { + // The largemessage-restricted handler pins allowedBuckets to a fixed, non-matching bucket name, so the + // utility must reject the offloaded message before reading from or deleting the actual offload bucket. + setupInfrastructure("largemessage-restricted"); + + // GIVEN a large message offloaded to the real (random) offload bucket + final ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration() + .withPayloadSupportEnabled(s3Client, bucketName); + try (AmazonSQSExtendedClient client = new AmazonSQSExtendedClient( + SqsClient.builder().region(region).httpClient(httpClient).build(), extendedClientConfig); + InputStream inputStream = this.getClass().getResourceAsStream("/large_sqs_message.txt");) { + String bigMessage = IOUtils.toString(inputStream, StandardCharsets.UTF_8); + + // WHEN the message is sent and the function is invoked + client.sendMessage(SendMessageRequest + .builder() + .queueUrl(queueUrl) + .messageBody(bigMessage) + .build()); + } + + // Give the function time to be invoked, fail the bucket-allowlist check, and the message to be dead-lettered + // (the queue uses maxReceiveCount=1, so there is a single attempt and no retries). + TimeUnit.SECONDS.sleep(90); + + // THEN the handler never processed the message (no item written to DynamoDB) + QueryRequest request = QueryRequest + .builder() + .tableName(tableName) + .keyConditionExpression("functionName = :func") + .expressionAttributeValues( + Collections.singletonMap(":func", AttributeValue.builder().s(functionName).build())) + .build(); + QueryResponse response = dynamoDbClient.query(request); + assertThat(response.items()).isEmpty(); + + // AND the S3 object was never deleted (delete-after-read is on by default, but the disallowed bucket is + // rejected before any S3 interaction) + long objectCount = s3Client.listObjectsV2( + ListObjectsV2Request.builder() + .bucket(bucketName) + .build()) + .keyCount(); + assertThat(objectCount).isGreaterThanOrEqualTo(1); + } + @ParameterizedTest @ValueSource(strings = { "largemessage", "largemessage-functional" }) @Timeout(value = 5, unit = TimeUnit.MINUTES) diff --git a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfig.java b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfig.java index 6ad529496..97d867c4b 100644 --- a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfig.java +++ b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfig.java @@ -16,6 +16,10 @@ import static software.amazon.lambda.powertools.common.internal.LambdaConstants.AWS_REGION_ENV; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; @@ -37,6 +41,7 @@ public class LargeMessageConfig { private static final LargeMessageConfig INSTANCE = new LargeMessageConfig(); private S3Client s3Client; + private Set allowedBuckets = Collections.emptySet(); private LargeMessageConfig() { } @@ -65,9 +70,44 @@ public void withS3Client(S3Client s3Client) { } } + /** + * Restrict the S3 buckets the utility is allowed to read from and delete. + *

+ * The {@code s3BucketName} in the payload pointer is controlled by whoever sent the message. Without an + * allowlist, any sender can redirect the Lambda function to fetch (and, when {@code deleteS3Object} is enabled, + * delete) objects from an arbitrary bucket using the function's own credentials. When a non-empty allowlist is + * configured, the utility rejects any message whose pointer references a bucket that is not in the allowlist, + * before any S3 interaction. + *

+ * An empty (or null) allowlist means no restriction is applied (default), preserving backward compatibility. + * Configuring an allowlist is strongly recommended for security. + *

+     * public MyLambdaHandler() {
+     *     LargeMessageConfig.init().withAllowedBuckets(Collections.singleton("my-offload-bucket"));
+     * }
+     * 
+ * + * @param allowedBuckets the set of bucket names the utility is allowed to access (null is treated as empty) + */ + public void withAllowedBuckets(Set allowedBuckets) { + this.allowedBuckets = allowedBuckets == null + ? Collections.emptySet() + : Collections.unmodifiableSet(new HashSet<>(allowedBuckets)); + } + + /** + * Retrieve the configured set of allowed buckets. An empty set means no restriction is applied. + * + * @return the (unmodifiable) set of allowed bucket names, never null + */ + public Set getAllowedBuckets() { + return allowedBuckets; + } + // For tests purpose void resetS3Client() { this.s3Client = null; + this.allowedBuckets = Collections.emptySet(); } // Getter needs to initialize if not done with setter diff --git a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageProcessor.java b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageProcessor.java index c41af0cea..15b436ac7 100644 --- a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageProcessor.java +++ b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageProcessor.java @@ -17,6 +17,7 @@ import static java.lang.String.format; import java.nio.charset.StandardCharsets; +import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,6 +26,7 @@ import software.amazon.awssdk.services.s3.S3Client; import software.amazon.lambda.powertools.largemessages.LargeMessageConfig; import software.amazon.lambda.powertools.largemessages.LargeMessageProcessingException; +import software.amazon.payloadoffloading.PayloadS3Pointer; import software.amazon.payloadoffloading.S3BackedPayloadStore; import software.amazon.payloadoffloading.S3Dao; @@ -71,6 +73,11 @@ public R process(T message, LargeMessageFunction function, boolean del payloadPointer = payloadPointer.replace("com.amazon.sqs.javamessaging.MessageS3Pointer", "software.amazon.payloadoffloading.PayloadS3Pointer"); + // The bucket name in the pointer is controlled by the message sender. Validate it against the configured + // allowlist (if any) before any S3 interaction, to prevent reading from or deleting objects in arbitrary + // buckets using the function's own credentials. + validateBucket(payloadPointer); + if (LOG.isInfoEnabled()) { LOG.info("Large message [{}]: retrieving content from S3", getMessageId(message)); } @@ -136,6 +143,20 @@ public R process(T message, LargeMessageFunction function, boolean del */ protected abstract void removeLargeMessageAttributes(T message); + private void validateBucket(String payloadPointer) { + Set allowedBuckets = LargeMessageConfig.get().getAllowedBuckets(); + if (allowedBuckets == null || allowedBuckets.isEmpty()) { + // No allowlist configured: keep the existing (unrestricted) behavior for backward compatibility. + return; + } + + String bucketName = PayloadS3Pointer.fromJson(payloadPointer).getS3BucketName(); + if (!allowedBuckets.contains(bucketName)) { + throw new LargeMessageProcessingException( + format("S3 bucket [%s] is not in the configured allowedBuckets", bucketName)); + } + } + private String getS3ObjectContent(String payloadPointer) { try { return payloadStore.getOriginalPayload(payloadPointer); diff --git a/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfigTest.java b/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfigTest.java index b6bcaf6b5..43736baf2 100644 --- a/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfigTest.java +++ b/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfigTest.java @@ -16,26 +16,30 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; -public class LargeMessageConfigTest { +class LargeMessageConfigTest { @BeforeEach - public void setup() { + void setup() { LargeMessageConfig.get().resetS3Client(); } @AfterEach - public void tearDown() { + void tearDown() { LargeMessageConfig.get().resetS3Client(); } @Test - public void singleton_shouldNotChangeWhenCalledMultipleTimes() { + void singleton_shouldNotChangeWhenCalledMultipleTimes() { LargeMessageConfig.init().withS3Client(S3Client.builder().region(Region.US_EAST_1).build()); LargeMessageConfig config = LargeMessageConfig.get(); @@ -46,7 +50,7 @@ public void singleton_shouldNotChangeWhenCalledMultipleTimes() { } @Test - public void singletonWithDefaultClient_shouldNotChangeWhenCalledMultipleTimes() { + void singletonWithDefaultClient_shouldNotChangeWhenCalledMultipleTimes() { S3Client s3Client = LargeMessageConfig.get().getS3Client(); LargeMessageConfig.init().withS3Client(S3Client.create()); @@ -54,4 +58,37 @@ public void singletonWithDefaultClient_shouldNotChangeWhenCalledMultipleTimes() assertThat(s3Client2).isEqualTo(s3Client); } + + @Test + void allowedBuckets_shouldDefaultToEmpty() { + assertThat(LargeMessageConfig.get().getAllowedBuckets()).isEmpty(); + } + + @Test + void withAllowedBuckets_shouldStoreAndReturnTheSet() { + Set buckets = new HashSet<>(); + buckets.add("bucket-a"); + buckets.add("bucket-b"); + + LargeMessageConfig.init().withAllowedBuckets(buckets); + + assertThat(LargeMessageConfig.get().getAllowedBuckets()).containsExactlyInAnyOrder("bucket-a", "bucket-b"); + } + + @Test + void withAllowedBuckets_null_shouldResultInEmptySet() { + LargeMessageConfig.init().withAllowedBuckets(Collections.singleton("bucket-a")); + LargeMessageConfig.init().withAllowedBuckets(null); + + assertThat(LargeMessageConfig.get().getAllowedBuckets()).isEmpty(); + } + + @Test + void reset_shouldClearAllowedBuckets() { + LargeMessageConfig.init().withAllowedBuckets(Collections.singleton("bucket-a")); + + LargeMessageConfig.get().resetS3Client(); + + assertThat(LargeMessageConfig.get().getAllowedBuckets()).isEmpty(); + } } diff --git a/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/LargeMessagesTest.java b/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/LargeMessagesTest.java index 7bd3e80e2..e0c409e7e 100644 --- a/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/LargeMessagesTest.java +++ b/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/LargeMessagesTest.java @@ -69,6 +69,8 @@ void init() throws NoSuchFieldException, IllegalAccessException { Field client = LargeMessageConfig.class.getDeclaredField("s3Client"); client.setAccessible(true); client.set(LargeMessageConfig.get(), null); + // clear any allowlist configured by a previous test (singleton) + LargeMessageConfig.init().withAllowedBuckets(null); LargeMessageConfig.init().withS3Client(s3Client); } @@ -246,6 +248,39 @@ void testProcessLargeMessage_shouldModifyMessageInPlace() { assertThat(sqsMessage.getBody()).isNotEqualTo(originalBody); } + @Test + void testProcessLargeMessage_withAllowedBucketMatching_shouldRetrieveFromS3AndDelete() { + // given + when(s3Client.getObject(any(GetObjectRequest.class))).thenReturn(s3ObjectWithLargeMessage()); + LargeMessageConfig.init().withAllowedBuckets(Collections.singleton(BUCKET_NAME)); + SQSMessage sqsMessage = sqsMessageWithBody(BIG_MESSAGE_BODY, true); + + // when + String result = LargeMessages.processLargeMessage(sqsMessage, SQSMessage::getBody); + + // then + assertThat(result).isEqualTo(BIG_MSG); + ArgumentCaptor delete = ArgumentCaptor.forClass(DeleteObjectRequest.class); + verify(s3Client).deleteObject(delete.capture()); + assertThat(delete.getValue().bucket()).isEqualTo(BUCKET_NAME); + assertThat(delete.getValue().key()).isEqualTo(BUCKET_KEY); + } + + @Test + void testProcessLargeMessage_withAllowedBucketNotMatching_shouldThrowAndNotTouchS3() { + // given + LargeMessageConfig.init().withAllowedBuckets(Collections.singleton("some-other-bucket")); + SQSMessage sqsMessage = sqsMessageWithBody(BIG_MESSAGE_BODY, true); + + // when / then + assertThatThrownBy(() -> LargeMessages.processLargeMessage(sqsMessage, SQSMessage::getBody)) + .isInstanceOf(LargeMessageProcessingException.class) + .hasMessageContaining("is not in the configured allowedBuckets") + .hasMessageContaining(BUCKET_NAME); + // the disallowed bucket must never be read from nor deleted + verifyNoInteractions(s3Client); + } + private String processOrderSimple(SQSMessage message, String orderId) { assertThat(message.getBody()).isEqualTo(BIG_MSG); return orderId + "-processed"; diff --git a/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageAspectTest.java b/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageAspectTest.java index b84709ddc..b02dc8782 100644 --- a/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageAspectTest.java +++ b/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageAspectTest.java @@ -85,6 +85,8 @@ void init() throws NoSuchFieldException, IllegalAccessException { Field client = LargeMessageConfig.class.getDeclaredField("s3Client"); client.setAccessible(true); client.set(LargeMessageConfig.get(), null); + // clear any allowlist configured by a previous test (singleton) + LargeMessageConfig.init().withAllowedBuckets(null); LargeMessageConfig.init().withS3Client(s3Client); } @@ -132,6 +134,35 @@ private void verifyMessageObjectIsModified(SQSMessage sqsMessage) { assertThat(sqsMessage.getMd5OfBody()).isEqualTo(BIG_MSG_MD5); } + @Test + void testLargeSQSMessage_withAllowedBucketMatching_shouldRetrieveFromS3() { + // given + when(s3Client.getObject(any(GetObjectRequest.class))).thenReturn(s3ObjectWithLargeMessage()); + LargeMessageConfig.init().withAllowedBuckets(Collections.singleton(BUCKET_NAME)); + SQSMessage sqsMessage = sqsMessageWithBody(BIG_MESSAGE_BODY, true); + + // when + String message = processSQSMessage(sqsMessage, context); + + // then + assertThat(message).isEqualTo(BIG_MSG); + verify(s3Client).deleteObject(any(DeleteObjectRequest.class)); + } + + @Test + void testLargeSQSMessage_withAllowedBucketNotMatching_shouldThrowAndNotTouchS3() { + // given + LargeMessageConfig.init().withAllowedBuckets(Collections.singleton("some-other-bucket")); + SQSMessage sqsMessage = sqsMessageWithBody(BIG_MESSAGE_BODY, true); + + // when / then + assertThatThrownBy(() -> processSQSMessage(sqsMessage, context)) + .isInstanceOf(LargeMessageProcessingException.class) + .hasMessageContaining("is not in the configured allowedBuckets") + .hasMessageContaining(BUCKET_NAME); + verifyNoInteractions(s3Client); + } + @Test void testLargeSQSMessageWithDefaultDeletion() { // given diff --git a/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageProcessorFactoryTest.java b/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageProcessorFactoryTest.java index 3011c8189..276bc0fcf 100644 --- a/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageProcessorFactoryTest.java +++ b/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageProcessorFactoryTest.java @@ -21,10 +21,10 @@ import com.amazonaws.services.lambda.runtime.events.SQSEvent; import org.junit.jupiter.api.Test; -public class LargeMessageProcessorFactoryTest { +class LargeMessageProcessorFactoryTest { @Test - public void createLargeSQSMessageProcessor() { + void createLargeSQSMessageProcessor() { assertThat(LargeMessageProcessorFactory.get(new SQSEvent.SQSMessage())) .isPresent() .get() @@ -32,7 +32,7 @@ public void createLargeSQSMessageProcessor() { } @Test - public void createLargeSNSMessageProcessor() { + void createLargeSNSMessageProcessor() { assertThat(LargeMessageProcessorFactory.get(new SNSEvent.SNSRecord())) .isPresent() .get() @@ -40,7 +40,7 @@ public void createLargeSNSMessageProcessor() { } @Test - public void createUnknownMessageProcessor() { + void createUnknownMessageProcessor() { assertThat(LargeMessageProcessorFactory.get(new KinesisEvent.KinesisEventRecord())).isNotPresent(); } } From 53e26ff15aa10a6f6108de94058196dd110c0af5 Mon Sep 17 00:00:00 2001 From: Philipp Page Date: Tue, 30 Jun 2026 16:32:59 +0200 Subject: [PATCH 2/2] fix(large-messages): wrap getAllowedBuckets() return in unmodifiableSet Avoid exposing the internal set reference (SpotBugs EI_EXPOSE_REP). --- .../lambda/powertools/largemessages/LargeMessageConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfig.java b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfig.java index 97d867c4b..de64184ba 100644 --- a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfig.java +++ b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfig.java @@ -101,7 +101,7 @@ public void withAllowedBuckets(Set allowedBuckets) { * @return the (unmodifiable) set of allowed bucket names, never null */ public Set getAllowedBuckets() { - return allowedBuckets; + return Collections.unmodifiableSet(allowedBuckets); } // For tests purpose