diff --git a/docs/utilities/large_messages.md b/docs/utilities/large_messages.md index 9d14c8228..510ef99e9 100644 --- a/docs/utilities/large_messages.md +++ b/docs/utilities/large_messages.md @@ -174,6 +174,40 @@ on the S3 bucket used for the large messages offloading: - `s3:GetObject` - `s3:DeleteObject` +### Security: Restrict which buckets the utility accesses + +???+ warning "The message sender controls the `s3BucketName` in the payload pointer" + The large message body contains a pointer of the form + `["software.amazon.payloadoffloading.PayloadS3Pointer",{"s3BucketName":"...","s3Key":"..."}]`. + The utility reads `s3BucketName` from this pointer and fetches the object using your Lambda function's + own IAM credentials. Any sender that can write to your queue, or publish to a topic that fans out to it, + can name any bucket your execution role can reach. Your function then reads from that bucket. Delete-after-read + is enabled by default (`deleteS3Object=true`), so your function also deletes the object it read. + +Apply both of the following controls: + +1. **Configure an allowlist** with `LargeMessageConfig.init().withAllowedBuckets(...)`. The utility rejects any + message whose pointer names a bucket outside the allowlist before it reads or deletes from S3. See + [Restricting allowed buckets](#restricting-allowed-buckets). +2. **Scope your IAM policy** to the offload bucket. Grant `s3:GetObject` and `s3:DeleteObject` on the bucket + ARN instead of `*`: + + ```json + { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "s3:GetObject", + "s3:DeleteObject" + ], + "Resource": "arn:aws:s3:::/*" + } + ] + } + ``` + ## Usage You can use the Large Messages utility with either the `@LargeMessage` annotation or the functional API. @@ -481,6 +515,66 @@ If you need to customize this `S3Client`, you can leverage the `LargeMessageConf } ``` +## Restricting allowed buckets + +The message sender controls the bucket named in the message pointer (see +[the security note in Permissions](#security-restrict-which-buckets-the-utility-accesses)). Use the +`LargeMessageConfig` singleton to restrict which S3 buckets the utility reads from and deletes. This works with +both the annotation and the functional API. When you configure a non-empty allowlist, the utility rejects any +message whose pointer names a bucket outside the allowlist. It throws a `LargeMessageProcessingException` before +it reads or deletes from S3. An empty allowlist, the default, applies no restriction. + +=== "@LargeMessage annotation" + ```java hl_lines="6" + import software.amazon.lambda.powertools.largemessages.LargeMessage; + + public class SqsMessageHandler implements RequestHandler { + + public SqsMessageHandler() { + LargeMessageConfig.init().withAllowedBuckets(Collections.singleton("my-offload-bucket")); + } + + @Override + public SQSBatchResponse handleRequest(SQSEvent event, Context context) { + for (SQSMessage message: event.getRecords()) { + // throws LargeMessageProcessingException if the pointer's bucket is not in the allowlist + processRawMessage(message, context); + } + return SQSBatchResponse.builder().build(); + } + + @LargeMessage + private void processRawMessage(SQSEvent.SQSMessage sqsMessage, Context context) { + // sqsMessage.getBody() will contain the content of the S3 object + } + } + ``` + +=== "Functional API" + ```java hl_lines="6" + import software.amazon.lambda.powertools.largemessages.LargeMessages; + + public class SqsMessageHandler implements RequestHandler { + + public SqsMessageHandler() { + LargeMessageConfig.init().withAllowedBuckets(Collections.singleton("my-offload-bucket")); + } + + @Override + public SQSBatchResponse handleRequest(SQSEvent event, Context context) { + for (SQSMessage message: event.getRecords()) { + // throws LargeMessageProcessingException if the pointer's bucket is not in the allowlist + LargeMessages.processLargeMessage(message, this::processRawMessage); + } + return SQSBatchResponse.builder().build(); + } + + private void processRawMessage(SQSEvent.SQSMessage sqsMessage) { + // sqsMessage.getBody() will contain the content of the S3 object + } + } + ``` + ## Migration from the SQS Large Message utility - Replace the dependency in maven / gradle: `powertools-sqs` ==> `powertools-large-messages` diff --git a/examples/powertools-examples-kafka/src/main/java/org/demo/kafka/protobuf/ProtobufProduct.java b/examples/powertools-examples-kafka/src/main/java/org/demo/kafka/protobuf/ProtobufProduct.java index 99c490869..8168f7801 100644 --- a/examples/powertools-examples-kafka/src/main/java/org/demo/kafka/protobuf/ProtobufProduct.java +++ b/examples/powertools-examples-kafka/src/main/java/org/demo/kafka/protobuf/ProtobufProduct.java @@ -1,7 +1,7 @@ // Generated by the protocol buffer compiler. DO NOT EDIT! // NO CHECKED-IN PROTOBUF GENCODE // source: ProtobufProduct.proto -// Protobuf Java Version: 4.33.4 +// Protobuf Java Version: 4.35.0 package org.demo.kafka.protobuf; @@ -18,8 +18,8 @@ public final class ProtobufProduct extends com.google.protobuf.RuntimeVersion.validateProtobufGencodeVersion( com.google.protobuf.RuntimeVersion.RuntimeDomain.PUBLIC, /* major= */ 4, - /* minor= */ 33, - /* patch= */ 4, + /* minor= */ 35, + /* patch= */ 0, /* suffix= */ "", "ProtobufProduct"); } @@ -36,6 +36,11 @@ private ProtobufProduct() { return org.demo.kafka.protobuf.ProtobufProductOuterClass.internal_static_org_demo_kafka_protobuf_ProtobufProduct_descriptor; } + @java.lang.Override + public com.google.protobuf.Descriptors.Descriptor getDescriptorForType() { + return org.demo.kafka.protobuf.ProtobufProductOuterClass.internal_static_org_demo_kafka_protobuf_ProtobufProduct_descriptor; + } + @java.lang.Override protected com.google.protobuf.GeneratedMessage.FieldAccessorTable internalGetFieldAccessorTable() { @@ -130,13 +135,8 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) } getUnknownFields().writeTo(output); } - - @java.lang.Override - public int getSerializedSize() { - int size = memoizedSize; - if (size != -1) return size; - - size = 0; + private int computeSerializedSize_0() { + int size = 0; if (id_ != 0) { size += com.google.protobuf.CodedOutputStream .computeInt32Size(1, id_); @@ -148,6 +148,15 @@ public int getSerializedSize() { size += com.google.protobuf.CodedOutputStream .computeDoubleSize(3, price_); } + return size; + } + @java.lang.Override + public int getSerializedSize() { + int size = memoizedSize; + if (size != -1) return size; + + size = 0; + size += computeSerializedSize_0(); size += getUnknownFields().getSerializedSize(); memoizedSize = size; return size; diff --git a/examples/powertools-examples-kafka/src/main/java/org/demo/kafka/protobuf/ProtobufProductOrBuilder.java b/examples/powertools-examples-kafka/src/main/java/org/demo/kafka/protobuf/ProtobufProductOrBuilder.java index 462d3a66d..c84b8810e 100644 --- a/examples/powertools-examples-kafka/src/main/java/org/demo/kafka/protobuf/ProtobufProductOrBuilder.java +++ b/examples/powertools-examples-kafka/src/main/java/org/demo/kafka/protobuf/ProtobufProductOrBuilder.java @@ -1,7 +1,7 @@ // Generated by the protocol buffer compiler. DO NOT EDIT! // NO CHECKED-IN PROTOBUF GENCODE // source: ProtobufProduct.proto -// Protobuf Java Version: 4.33.4 +// Protobuf Java Version: 4.35.0 package org.demo.kafka.protobuf; diff --git a/examples/powertools-examples-kafka/src/main/java/org/demo/kafka/protobuf/ProtobufProductOuterClass.java b/examples/powertools-examples-kafka/src/main/java/org/demo/kafka/protobuf/ProtobufProductOuterClass.java index 3edc97e12..9c40cefed 100644 --- a/examples/powertools-examples-kafka/src/main/java/org/demo/kafka/protobuf/ProtobufProductOuterClass.java +++ b/examples/powertools-examples-kafka/src/main/java/org/demo/kafka/protobuf/ProtobufProductOuterClass.java @@ -1,7 +1,7 @@ // Generated by the protocol buffer compiler. DO NOT EDIT! // NO CHECKED-IN PROTOBUF GENCODE // source: ProtobufProduct.proto -// Protobuf Java Version: 4.33.4 +// Protobuf Java Version: 4.35.0 package org.demo.kafka.protobuf; @@ -12,8 +12,8 @@ private ProtobufProductOuterClass() {} com.google.protobuf.RuntimeVersion.validateProtobufGencodeVersion( com.google.protobuf.RuntimeVersion.RuntimeDomain.PUBLIC, /* major= */ 4, - /* minor= */ 33, - /* patch= */ 4, + /* minor= */ 35, + /* patch= */ 0, /* suffix= */ "", "ProtobufProductOuterClass"); } @@ -36,7 +36,7 @@ public static void registerAllExtensions( getDescriptor() { return descriptor; } - private static com.google.protobuf.Descriptors.FileDescriptor + private static final com.google.protobuf.Descriptors.FileDescriptor descriptor; static { java.lang.String[] descriptorData = { diff --git a/powertools-e2e-tests/handlers/largemessage-restricted/pom.xml b/powertools-e2e-tests/handlers/largemessage-restricted/pom.xml new file mode 100644 index 000000000..b3c338928 --- /dev/null +++ b/powertools-e2e-tests/handlers/largemessage-restricted/pom.xml @@ -0,0 +1,72 @@ + + 4.0.0 + + + software.amazon.lambda + e2e-test-handlers-parent + 2.10.0 + + + e2e-test-handler-largemessage-restricted + jar + E2E test handler – Large message (allowedBuckets restricted) + + + + software.amazon.awssdk + dynamodb + + + software.amazon.lambda + powertools-large-messages + + + software.amazon.lambda + powertools-logging-log4j + + + com.amazonaws + aws-lambda-java-events + + + org.aspectj + aspectjrt + + + + + + + dev.aspectj + aspectj-maven-plugin + + ${maven.compiler.source} + ${maven.compiler.target} + ${maven.compiler.target} + + + software.amazon.lambda + powertools-large-messages + + + software.amazon.lambda + powertools-logging + + + + + + + compile + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + + diff --git a/powertools-e2e-tests/handlers/largemessage-restricted/src/main/java/software/amazon/lambda/powertools/e2e/Function.java b/powertools-e2e-tests/handlers/largemessage-restricted/src/main/java/software/amazon/lambda/powertools/e2e/Function.java new file mode 100644 index 000000000..36108a647 --- /dev/null +++ b/powertools-e2e-tests/handlers/largemessage-restricted/src/main/java/software/amazon/lambda/powertools/e2e/Function.java @@ -0,0 +1,88 @@ +/* + * Copyright 2023 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package software.amazon.lambda.powertools.e2e; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; +import software.amazon.awssdk.utils.BinaryUtils; +import software.amazon.awssdk.utils.Md5Utils; +import software.amazon.lambda.powertools.largemessages.LargeMessage; +import software.amazon.lambda.powertools.largemessages.LargeMessageConfig; +import software.amazon.lambda.powertools.logging.Logging; + +public class Function implements RequestHandler { + + // The real offload bucket is created per test run with a random name (largemessagebucket). + // By pinning the allowlist to a fixed, non-matching bucket name, every offloaded message must be + // rejected before any S3 read or delete, exercising the bucket-allowlist security check. + private static final String DISALLOWED_BUCKET = "powertools-e2e-disallowed-bucket"; + + private static final String TABLE_FOR_ASYNC_TESTS = System.getenv("TABLE_FOR_ASYNC_TESTS"); + private DynamoDbClient client; + + public Function() { + LargeMessageConfig.init().withAllowedBuckets(Collections.singleton(DISALLOWED_BUCKET)); + if (client == null) { + client = DynamoDbClient.builder() + .httpClient(UrlConnectionHttpClient.builder().build()) + .region(Region.of(System.getenv("AWS_REGION"))) + .build(); + } + } + + @Logging(logEvent = true) + public SQSBatchResponse handleRequest(SQSEvent event, Context context) { + for (SQSMessage message : event.getRecords()) { + processRawMessage(message, context); + } + return SQSBatchResponse.builder().build(); + } + + @LargeMessage + private void processRawMessage(SQSMessage sqsMessage, Context context) { + String bodyMD5 = md5(sqsMessage.getBody()); + if (!sqsMessage.getMd5OfBody().equals(bodyMD5)) { + throw new SecurityException( + String.format("message digest does not match, expected %s, got %s", sqsMessage.getMd5OfBody(), + bodyMD5)); + } + + Map item = new HashMap<>(); + item.put("functionName", AttributeValue.builder().s(context.getFunctionName()).build()); + item.put("id", AttributeValue.builder().s(sqsMessage.getMessageId()).build()); + item.put("bodyMD5", AttributeValue.builder().s(bodyMD5).build()); + item.put("bodySize", + AttributeValue.builder().n(String.valueOf(sqsMessage.getBody().getBytes(StandardCharsets.UTF_8).length)) + .build()); + + client.putItem(PutItemRequest.builder().tableName(TABLE_FOR_ASYNC_TESTS).item(item).build()); + } + + private String md5(String message) { + return BinaryUtils.toHex(Md5Utils.computeMD5Hash(message.getBytes(StandardCharsets.UTF_8))); + } +} diff --git a/powertools-e2e-tests/handlers/largemessage-restricted/src/main/resources/log4j2.xml b/powertools-e2e-tests/handlers/largemessage-restricted/src/main/resources/log4j2.xml new file mode 100644 index 000000000..c717a7153 --- /dev/null +++ b/powertools-e2e-tests/handlers/largemessage-restricted/src/main/resources/log4j2.xml @@ -0,0 +1,16 @@ + + + + + + + + + + + + + + + + diff --git a/powertools-e2e-tests/handlers/pom.xml b/powertools-e2e-tests/handlers/pom.xml index 0c3d0128c..c22687e77 100644 --- a/powertools-e2e-tests/handlers/pom.xml +++ b/powertools-e2e-tests/handlers/pom.xml @@ -28,6 +28,7 @@ batch largemessage largemessage-functional + largemessage-restricted largemessage_idempotent logging-log4j logging-log4j-fluent-api diff --git a/powertools-e2e-tests/src/test/java/software/amazon/lambda/powertools/LargeMessageE2ET.java b/powertools-e2e-tests/src/test/java/software/amazon/lambda/powertools/LargeMessageE2ET.java index 0de2dca60..ea748d8c4 100644 --- a/powertools-e2e-tests/src/test/java/software/amazon/lambda/powertools/LargeMessageE2ET.java +++ b/powertools-e2e-tests/src/test/java/software/amazon/lambda/powertools/LargeMessageE2ET.java @@ -16,6 +16,7 @@ import org.apache.commons.io.IOUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; @@ -35,6 +36,7 @@ import software.amazon.awssdk.services.dynamodb.model.QueryRequest; import software.amazon.awssdk.services.dynamodb.model.QueryResponse; import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; import software.amazon.lambda.powertools.testutils.DataNotReadyException; @@ -166,6 +168,60 @@ void bigSQSMessageOffloadedToS3_shouldLoadFromS3(String pathToFunction) throws I assertThat(items.get(0).get("bodyMD5").s()).isEqualTo("22bde5e7b05fa80bc7be45bdd4bc6c75"); } + @Test + @Timeout(value = 5, unit = TimeUnit.MINUTES) + // Sonar java:S2925 (Thread.sleep in tests): suppressed intentionally. This is an end-to-end test of the real + // asynchronous SQS -> Lambda delivery path, asserting a side effect that does NOT happen (the S3 object is not + // deleted). There is no positive completion signal to await on, so we must give the asynchronous flow time to run + // before asserting the absence. Replacing the wait with a synchronous invoke would degrade this into a near + // unit-test that no longer exercises the event source nor the delete-after-read side effect. + @SuppressWarnings("java:S2925") + void bigSQSMessage_withDisallowedBucket_shouldNotProcessNorDelete() throws IOException, InterruptedException { + // The largemessage-restricted handler pins allowedBuckets to a fixed, non-matching bucket name, so the + // utility must reject the offloaded message before reading from or deleting the actual offload bucket. + setupInfrastructure("largemessage-restricted"); + + // GIVEN a large message offloaded to the real (random) offload bucket + final ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration() + .withPayloadSupportEnabled(s3Client, bucketName); + try (AmazonSQSExtendedClient client = new AmazonSQSExtendedClient( + SqsClient.builder().region(region).httpClient(httpClient).build(), extendedClientConfig); + InputStream inputStream = this.getClass().getResourceAsStream("/large_sqs_message.txt");) { + String bigMessage = IOUtils.toString(inputStream, StandardCharsets.UTF_8); + + // WHEN the message is sent and the function is invoked + client.sendMessage(SendMessageRequest + .builder() + .queueUrl(queueUrl) + .messageBody(bigMessage) + .build()); + } + + // Give the function time to be invoked, fail the bucket-allowlist check, and the message to be dead-lettered + // (the queue uses maxReceiveCount=1, so there is a single attempt and no retries). + TimeUnit.SECONDS.sleep(90); + + // THEN the handler never processed the message (no item written to DynamoDB) + QueryRequest request = QueryRequest + .builder() + .tableName(tableName) + .keyConditionExpression("functionName = :func") + .expressionAttributeValues( + Collections.singletonMap(":func", AttributeValue.builder().s(functionName).build())) + .build(); + QueryResponse response = dynamoDbClient.query(request); + assertThat(response.items()).isEmpty(); + + // AND the S3 object was never deleted (delete-after-read is on by default, but the disallowed bucket is + // rejected before any S3 interaction) + long objectCount = s3Client.listObjectsV2( + ListObjectsV2Request.builder() + .bucket(bucketName) + .build()) + .keyCount(); + assertThat(objectCount).isGreaterThanOrEqualTo(1); + } + @ParameterizedTest @ValueSource(strings = { "largemessage", "largemessage-functional" }) @Timeout(value = 5, unit = TimeUnit.MINUTES) diff --git a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfig.java b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfig.java index 6ad529496..de64184ba 100644 --- a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfig.java +++ b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfig.java @@ -16,6 +16,10 @@ import static software.amazon.lambda.powertools.common.internal.LambdaConstants.AWS_REGION_ENV; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; @@ -37,6 +41,7 @@ public class LargeMessageConfig { private static final LargeMessageConfig INSTANCE = new LargeMessageConfig(); private S3Client s3Client; + private Set allowedBuckets = Collections.emptySet(); private LargeMessageConfig() { } @@ -65,9 +70,44 @@ public void withS3Client(S3Client s3Client) { } } + /** + * Restrict the S3 buckets the utility is allowed to read from and delete. + *

+ * The {@code s3BucketName} in the payload pointer is controlled by whoever sent the message. Without an + * allowlist, any sender can redirect the Lambda function to fetch (and, when {@code deleteS3Object} is enabled, + * delete) objects from an arbitrary bucket using the function's own credentials. When a non-empty allowlist is + * configured, the utility rejects any message whose pointer references a bucket that is not in the allowlist, + * before any S3 interaction. + *

+ * An empty (or null) allowlist means no restriction is applied (default), preserving backward compatibility. + * Configuring an allowlist is strongly recommended for security. + *

+     * public MyLambdaHandler() {
+     *     LargeMessageConfig.init().withAllowedBuckets(Collections.singleton("my-offload-bucket"));
+     * }
+     * 
+ * + * @param allowedBuckets the set of bucket names the utility is allowed to access (null is treated as empty) + */ + public void withAllowedBuckets(Set allowedBuckets) { + this.allowedBuckets = allowedBuckets == null + ? Collections.emptySet() + : Collections.unmodifiableSet(new HashSet<>(allowedBuckets)); + } + + /** + * Retrieve the configured set of allowed buckets. An empty set means no restriction is applied. + * + * @return the (unmodifiable) set of allowed bucket names, never null + */ + public Set getAllowedBuckets() { + return Collections.unmodifiableSet(allowedBuckets); + } + // For tests purpose void resetS3Client() { this.s3Client = null; + this.allowedBuckets = Collections.emptySet(); } // Getter needs to initialize if not done with setter diff --git a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageProcessor.java b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageProcessor.java index c41af0cea..15b436ac7 100644 --- a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageProcessor.java +++ b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageProcessor.java @@ -17,6 +17,7 @@ import static java.lang.String.format; import java.nio.charset.StandardCharsets; +import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,6 +26,7 @@ import software.amazon.awssdk.services.s3.S3Client; import software.amazon.lambda.powertools.largemessages.LargeMessageConfig; import software.amazon.lambda.powertools.largemessages.LargeMessageProcessingException; +import software.amazon.payloadoffloading.PayloadS3Pointer; import software.amazon.payloadoffloading.S3BackedPayloadStore; import software.amazon.payloadoffloading.S3Dao; @@ -71,6 +73,11 @@ public R process(T message, LargeMessageFunction function, boolean del payloadPointer = payloadPointer.replace("com.amazon.sqs.javamessaging.MessageS3Pointer", "software.amazon.payloadoffloading.PayloadS3Pointer"); + // The bucket name in the pointer is controlled by the message sender. Validate it against the configured + // allowlist (if any) before any S3 interaction, to prevent reading from or deleting objects in arbitrary + // buckets using the function's own credentials. + validateBucket(payloadPointer); + if (LOG.isInfoEnabled()) { LOG.info("Large message [{}]: retrieving content from S3", getMessageId(message)); } @@ -136,6 +143,20 @@ public R process(T message, LargeMessageFunction function, boolean del */ protected abstract void removeLargeMessageAttributes(T message); + private void validateBucket(String payloadPointer) { + Set allowedBuckets = LargeMessageConfig.get().getAllowedBuckets(); + if (allowedBuckets == null || allowedBuckets.isEmpty()) { + // No allowlist configured: keep the existing (unrestricted) behavior for backward compatibility. + return; + } + + String bucketName = PayloadS3Pointer.fromJson(payloadPointer).getS3BucketName(); + if (!allowedBuckets.contains(bucketName)) { + throw new LargeMessageProcessingException( + format("S3 bucket [%s] is not in the configured allowedBuckets", bucketName)); + } + } + private String getS3ObjectContent(String payloadPointer) { try { return payloadStore.getOriginalPayload(payloadPointer); diff --git a/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfigTest.java b/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfigTest.java index b6bcaf6b5..43736baf2 100644 --- a/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfigTest.java +++ b/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfigTest.java @@ -16,26 +16,30 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; -public class LargeMessageConfigTest { +class LargeMessageConfigTest { @BeforeEach - public void setup() { + void setup() { LargeMessageConfig.get().resetS3Client(); } @AfterEach - public void tearDown() { + void tearDown() { LargeMessageConfig.get().resetS3Client(); } @Test - public void singleton_shouldNotChangeWhenCalledMultipleTimes() { + void singleton_shouldNotChangeWhenCalledMultipleTimes() { LargeMessageConfig.init().withS3Client(S3Client.builder().region(Region.US_EAST_1).build()); LargeMessageConfig config = LargeMessageConfig.get(); @@ -46,7 +50,7 @@ public void singleton_shouldNotChangeWhenCalledMultipleTimes() { } @Test - public void singletonWithDefaultClient_shouldNotChangeWhenCalledMultipleTimes() { + void singletonWithDefaultClient_shouldNotChangeWhenCalledMultipleTimes() { S3Client s3Client = LargeMessageConfig.get().getS3Client(); LargeMessageConfig.init().withS3Client(S3Client.create()); @@ -54,4 +58,37 @@ public void singletonWithDefaultClient_shouldNotChangeWhenCalledMultipleTimes() assertThat(s3Client2).isEqualTo(s3Client); } + + @Test + void allowedBuckets_shouldDefaultToEmpty() { + assertThat(LargeMessageConfig.get().getAllowedBuckets()).isEmpty(); + } + + @Test + void withAllowedBuckets_shouldStoreAndReturnTheSet() { + Set buckets = new HashSet<>(); + buckets.add("bucket-a"); + buckets.add("bucket-b"); + + LargeMessageConfig.init().withAllowedBuckets(buckets); + + assertThat(LargeMessageConfig.get().getAllowedBuckets()).containsExactlyInAnyOrder("bucket-a", "bucket-b"); + } + + @Test + void withAllowedBuckets_null_shouldResultInEmptySet() { + LargeMessageConfig.init().withAllowedBuckets(Collections.singleton("bucket-a")); + LargeMessageConfig.init().withAllowedBuckets(null); + + assertThat(LargeMessageConfig.get().getAllowedBuckets()).isEmpty(); + } + + @Test + void reset_shouldClearAllowedBuckets() { + LargeMessageConfig.init().withAllowedBuckets(Collections.singleton("bucket-a")); + + LargeMessageConfig.get().resetS3Client(); + + assertThat(LargeMessageConfig.get().getAllowedBuckets()).isEmpty(); + } } diff --git a/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/LargeMessagesTest.java b/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/LargeMessagesTest.java index 7bd3e80e2..e0c409e7e 100644 --- a/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/LargeMessagesTest.java +++ b/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/LargeMessagesTest.java @@ -69,6 +69,8 @@ void init() throws NoSuchFieldException, IllegalAccessException { Field client = LargeMessageConfig.class.getDeclaredField("s3Client"); client.setAccessible(true); client.set(LargeMessageConfig.get(), null); + // clear any allowlist configured by a previous test (singleton) + LargeMessageConfig.init().withAllowedBuckets(null); LargeMessageConfig.init().withS3Client(s3Client); } @@ -246,6 +248,39 @@ void testProcessLargeMessage_shouldModifyMessageInPlace() { assertThat(sqsMessage.getBody()).isNotEqualTo(originalBody); } + @Test + void testProcessLargeMessage_withAllowedBucketMatching_shouldRetrieveFromS3AndDelete() { + // given + when(s3Client.getObject(any(GetObjectRequest.class))).thenReturn(s3ObjectWithLargeMessage()); + LargeMessageConfig.init().withAllowedBuckets(Collections.singleton(BUCKET_NAME)); + SQSMessage sqsMessage = sqsMessageWithBody(BIG_MESSAGE_BODY, true); + + // when + String result = LargeMessages.processLargeMessage(sqsMessage, SQSMessage::getBody); + + // then + assertThat(result).isEqualTo(BIG_MSG); + ArgumentCaptor delete = ArgumentCaptor.forClass(DeleteObjectRequest.class); + verify(s3Client).deleteObject(delete.capture()); + assertThat(delete.getValue().bucket()).isEqualTo(BUCKET_NAME); + assertThat(delete.getValue().key()).isEqualTo(BUCKET_KEY); + } + + @Test + void testProcessLargeMessage_withAllowedBucketNotMatching_shouldThrowAndNotTouchS3() { + // given + LargeMessageConfig.init().withAllowedBuckets(Collections.singleton("some-other-bucket")); + SQSMessage sqsMessage = sqsMessageWithBody(BIG_MESSAGE_BODY, true); + + // when / then + assertThatThrownBy(() -> LargeMessages.processLargeMessage(sqsMessage, SQSMessage::getBody)) + .isInstanceOf(LargeMessageProcessingException.class) + .hasMessageContaining("is not in the configured allowedBuckets") + .hasMessageContaining(BUCKET_NAME); + // the disallowed bucket must never be read from nor deleted + verifyNoInteractions(s3Client); + } + private String processOrderSimple(SQSMessage message, String orderId) { assertThat(message.getBody()).isEqualTo(BIG_MSG); return orderId + "-processed"; diff --git a/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageAspectTest.java b/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageAspectTest.java index b84709ddc..b02dc8782 100644 --- a/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageAspectTest.java +++ b/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageAspectTest.java @@ -85,6 +85,8 @@ void init() throws NoSuchFieldException, IllegalAccessException { Field client = LargeMessageConfig.class.getDeclaredField("s3Client"); client.setAccessible(true); client.set(LargeMessageConfig.get(), null); + // clear any allowlist configured by a previous test (singleton) + LargeMessageConfig.init().withAllowedBuckets(null); LargeMessageConfig.init().withS3Client(s3Client); } @@ -132,6 +134,35 @@ private void verifyMessageObjectIsModified(SQSMessage sqsMessage) { assertThat(sqsMessage.getMd5OfBody()).isEqualTo(BIG_MSG_MD5); } + @Test + void testLargeSQSMessage_withAllowedBucketMatching_shouldRetrieveFromS3() { + // given + when(s3Client.getObject(any(GetObjectRequest.class))).thenReturn(s3ObjectWithLargeMessage()); + LargeMessageConfig.init().withAllowedBuckets(Collections.singleton(BUCKET_NAME)); + SQSMessage sqsMessage = sqsMessageWithBody(BIG_MESSAGE_BODY, true); + + // when + String message = processSQSMessage(sqsMessage, context); + + // then + assertThat(message).isEqualTo(BIG_MSG); + verify(s3Client).deleteObject(any(DeleteObjectRequest.class)); + } + + @Test + void testLargeSQSMessage_withAllowedBucketNotMatching_shouldThrowAndNotTouchS3() { + // given + LargeMessageConfig.init().withAllowedBuckets(Collections.singleton("some-other-bucket")); + SQSMessage sqsMessage = sqsMessageWithBody(BIG_MESSAGE_BODY, true); + + // when / then + assertThatThrownBy(() -> processSQSMessage(sqsMessage, context)) + .isInstanceOf(LargeMessageProcessingException.class) + .hasMessageContaining("is not in the configured allowedBuckets") + .hasMessageContaining(BUCKET_NAME); + verifyNoInteractions(s3Client); + } + @Test void testLargeSQSMessageWithDefaultDeletion() { // given diff --git a/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageProcessorFactoryTest.java b/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageProcessorFactoryTest.java index 3011c8189..276bc0fcf 100644 --- a/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageProcessorFactoryTest.java +++ b/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageProcessorFactoryTest.java @@ -21,10 +21,10 @@ import com.amazonaws.services.lambda.runtime.events.SQSEvent; import org.junit.jupiter.api.Test; -public class LargeMessageProcessorFactoryTest { +class LargeMessageProcessorFactoryTest { @Test - public void createLargeSQSMessageProcessor() { + void createLargeSQSMessageProcessor() { assertThat(LargeMessageProcessorFactory.get(new SQSEvent.SQSMessage())) .isPresent() .get() @@ -32,7 +32,7 @@ public void createLargeSQSMessageProcessor() { } @Test - public void createLargeSNSMessageProcessor() { + void createLargeSNSMessageProcessor() { assertThat(LargeMessageProcessorFactory.get(new SNSEvent.SNSRecord())) .isPresent() .get() @@ -40,7 +40,7 @@ public void createLargeSNSMessageProcessor() { } @Test - public void createUnknownMessageProcessor() { + void createUnknownMessageProcessor() { assertThat(LargeMessageProcessorFactory.get(new KinesisEvent.KinesisEventRecord())).isNotPresent(); } }