Skip to content

Commit

Permalink
Add support for dynamic bucket and default bucket in S3 sink (opensea…
Browse files Browse the repository at this point in the history
…rch-project#4402)

Signed-off-by: Taylor Gray <tylgry@amazon.com>
  • Loading branch information
graytaylor0 authored Apr 11, 2024
1 parent 036bd50 commit 02aef54
Show file tree
Hide file tree
Showing 34 changed files with 574 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,19 @@ public interface Event extends Serializable {
* @throws RuntimeException if the input string is not properly formatted
* @since 2.1
*/
String formatString(String format, ExpressionEvaluator expressionEvaluator);
String formatString(final String format, final ExpressionEvaluator expressionEvaluator);

/**
* Returns formatted parts of the input string replaced by their values in the event or the values from the result
* of a Data Prepper expression
* @param format input format
* @param expressionEvaluator - The expression evaluator that will support formatting from Data Prepper expressions
* @param defaultValue - The String to use as a replacement for when keys in Events can't be found
* @return returns a string with no formatted parts, returns null if no value is found
* @throws RuntimeException if the input string is not properly formatted
* @since 2.1
*/
String formatString(final String format, final ExpressionEvaluator expressionEvaluator, final String defaultValue);

/**
* Returns event handle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ public String getAsJsonString(final String key) {
*/
@Override
public String formatString(final String format) {
return formatStringInternal(format, null);
return formatStringInternal(format, null, null);
}

/**
Expand All @@ -334,10 +334,16 @@ public String formatString(final String format) {
*/
@Override
public String formatString(final String format, final ExpressionEvaluator expressionEvaluator) {
return formatStringInternal(format, expressionEvaluator);
return formatStringInternal(format, expressionEvaluator, null);
}

private String formatStringInternal(final String format, final ExpressionEvaluator expressionEvaluator) {
@Override
public String formatString(final String format, final ExpressionEvaluator expressionEvaluator, final String defaultValue) {
return formatStringInternal(format, expressionEvaluator, defaultValue);
}


private String formatStringInternal(final String format, final ExpressionEvaluator expressionEvaluator, final String defaultValue) {
int fromIndex = 0;
String result = "";
int position = 0;
Expand All @@ -361,7 +367,11 @@ private String formatStringInternal(final String format, final ExpressionEvaluat
if (expressionEvaluator != null && expressionEvaluator.isValidExpressionStatement(name)) {
val = expressionEvaluator.evaluate(name, this);
} else {
throw new EventKeyNotFoundException(String.format("The key %s could not be found in the Event when formatting", name));
if (defaultValue == null) {
throw new EventKeyNotFoundException(String.format("The key %s could not be found in the Event when formatting", name));
}

val = defaultValue;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,21 @@ public void testBuild_withFormatStringWithValueNotFound() {
assertThrows(EventKeyNotFoundException.class, () -> event.formatString("test-${boo}-string"));
}

@Test
public void testBuild_withFormatStringWithValueNotFound_and_defaultValue_for_missing_keys() {

final String defaultValueForMissingKey = UUID.randomUUID().toString();
final String jsonString = "{\"foo\": \"bar\", \"info\": {\"ids\": {\"id\":\"idx\"}}}";
final ExpressionEvaluator expressionEvaluator = mock(ExpressionEvaluator.class);
event = JacksonEvent.builder()
.withEventType(eventType)
.withData(jsonString)
.getThis()
.build();
final String result = event.formatString("test-${boo}-string", expressionEvaluator, defaultValueForMissingKey);
assertThat(result, equalTo("test-" + defaultValueForMissingKey + "-string"));
}

@Test
public void testBuild_withFormatStringWithInvalidFormat() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public class S3SinkIT {

@Mock
private PluginSetting pluginSetting;
@Mock
@Mock(stubOnly = true)
private S3SinkConfig s3SinkConfig;
@Mock
private PluginFactory pluginFactory;
Expand Down Expand Up @@ -166,6 +166,8 @@ void setUp() {
.build();

when(expressionEvaluator.isValidFormatExpression(anyString())).thenReturn(true);

when(s3SinkConfig.getDefaultBucket()).thenReturn(null);
}

private S3Sink createObjectUnderTest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;

Expand All @@ -27,6 +29,8 @@
public class S3OutputStream extends PositionOutputStream {
private static final Logger LOG = LoggerFactory.getLogger(S3OutputStream.class);

static final String ACCESS_DENIED = "Access Denied";

/**
* Default chunk size is 10MB
*/
Expand All @@ -35,7 +39,7 @@ public class S3OutputStream extends PositionOutputStream {
/**
* The bucket-name on Amazon S3
*/
private final String bucket;
private String bucket;

/**
* The key (path) name within the bucket
Expand Down Expand Up @@ -65,21 +69,30 @@ public class S3OutputStream extends PositionOutputStream {
*/
private boolean open;

/**
* The default bucket to send to when upload fails with dynamic bucket
*/
private final String defaultBucket;

/**
* Creates a new S3 OutputStream
*
* @param s3Client the AmazonS3 client
* @param bucketSupplier name of the bucket
* @param keySupplier path within the bucket
*/
public S3OutputStream(final S3Client s3Client, Supplier<String> bucketSupplier, Supplier<String> keySupplier) {
public S3OutputStream(final S3Client s3Client,
final Supplier<String> bucketSupplier,
final Supplier<String> keySupplier,
final String defaultBucket) {
this.s3Client = s3Client;
this.bucket = bucketSupplier.get();
this.key = keySupplier.get();
buf = new byte[BUFFER_SIZE];
position = 0;
etags = new ArrayList<>();
open = true;
this.defaultBucket = defaultBucket;
}

@Override
Expand Down Expand Up @@ -184,12 +197,18 @@ private void flushBufferAndRewind() {

private void possiblyStartMultipartUpload() {
if (uploadId == null) {
CreateMultipartUploadRequest uploadRequest = CreateMultipartUploadRequest.builder()
.bucket(bucket)
.key(key)
.build();
CreateMultipartUploadResponse multipartUpload = s3Client.createMultipartUpload(uploadRequest);
uploadId = multipartUpload.uploadId();

try {
createMultipartUpload();
} catch (final S3Exception e) {
if (defaultBucket != null && (e instanceof NoSuchBucketException || e.getMessage().contains(ACCESS_DENIED))) {
bucket = defaultBucket;
LOG.warn("Bucket {} could not be accessed to create multi-part upload, attempting to create multi-part upload to default_bucket {}", bucket, defaultBucket);
createMultipartUpload();
} else {
throw e;
}
}

LOG.debug("Created multipart upload {} bucket='{}',key='{}'.", uploadId, bucket, key);
}
Expand Down Expand Up @@ -217,5 +236,14 @@ private void uploadPart() {
public long getPos() throws IOException {
return position + (long) etags.size() * (long) BUFFER_SIZE;
}

private void createMultipartUpload() {
CreateMultipartUploadRequest uploadRequest = CreateMultipartUploadRequest.builder()
.bucket(bucket)
.key(key)
.build();
CreateMultipartUploadResponse multipartUpload = s3Client.createMultipartUpload(uploadRequest);
uploadId = multipartUpload.uploadId();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.ObjectKey;


public class KeyGenerator {
private final S3SinkConfig s3SinkConfig;
private final ExtensionProvider extensionProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ public S3Sink(final PluginSetting pluginSetting,
throw new InvalidPluginConfigurationException("name_pattern is not a valid format expression");
}

if (s3SinkConfig.getBucketName() != null &&
!expressionEvaluator.isValidFormatExpression(s3SinkConfig.getBucketName())) {
throw new InvalidPluginConfigurationException("bucket name is not a valid format expression");
}

S3OutputCodecContext s3OutputCodecContext = new S3OutputCodecContext(OutputCodecContext.fromSinkContext(sinkContext), compressionOption);

testCodec.validateAgainstCodecContext(s3OutputCodecContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ public class S3SinkConfig {
@Size(min = 3, max = 500, message = "bucket length should be at least 3 characters")
private String bucketName;

/**
* The default bucket to send to if using a dynamic bucket name and failures occur
* for any reason when sending to a dynamic bucket
*/
@JsonProperty("default_bucket")
@Size(min = 3, max = 500, message = "default_bucket length should be at least 3 characters")
private String defaultBucket;


@JsonProperty("object_key")
@Valid
private ObjectKeyOptions objectKeyOptions = new ObjectKeyOptions();
Expand Down Expand Up @@ -143,4 +152,6 @@ public int getMaxUploadRetries() {
public CompressionOption getCompression() {
return compression;
}

public String getDefaultBucket() { return defaultBucket; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@
import java.util.function.Supplier;

public interface BufferFactory {
Buffer getBuffer(S3Client s3Client, Supplier<String> bucketSupplier, Supplier<String> keySupplier);
Buffer getBuffer(S3Client s3Client, Supplier<String> bucketSupplier, Supplier<String> keySupplier, String defaultBucket);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.s3.accumulator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;

class BufferUtilities {

private static final Logger LOG = LoggerFactory.getLogger(BufferUtilities.class);

static final String ACCESS_DENIED = "Access Denied";

static void putObjectOrSendToDefaultBucket(final S3Client s3Client,
final RequestBody requestBody,
final String objectKey,
final String targetBucket,
final String defaultBucket) {
try {
s3Client.putObject(
PutObjectRequest.builder().bucket(targetBucket).key(objectKey).build(),
requestBody);
} catch (final S3Exception e) {
if (defaultBucket != null && (e instanceof NoSuchBucketException || e.getMessage().contains(ACCESS_DENIED))) {
LOG.warn("Bucket {} could not be accessed, attempting to send to default_bucket {}", targetBucket, defaultBucket);
s3Client.putObject(
PutObjectRequest.builder().bucket(defaultBucket).key(objectKey).build(),
requestBody);
} else {
throw e;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ public CodecBufferFactory(BufferFactory innerBufferFactory, BufferedCodec codec)
}

@Override
public Buffer getBuffer(S3Client s3Client, Supplier<String> bucketSupplier, Supplier<String> keySupplier) {
Buffer innerBuffer = innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier);
public Buffer getBuffer(final S3Client s3Client,
final Supplier<String> bucketSupplier,
final Supplier<String> keySupplier,
final String defaultBucket) {
Buffer innerBuffer = innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket);
return new CodecBuffer(innerBuffer, bufferedCodec);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ public CompressionBufferFactory(final BufferFactory innerBufferFactory, final Co
}

@Override
public Buffer getBuffer(S3Client s3Client, Supplier<String> bucketSupplier, Supplier<String> keySupplier) {
final Buffer internalBuffer = innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier);
public Buffer getBuffer(final S3Client s3Client,
final Supplier<String> bucketSupplier,
final Supplier<String> keySupplier,
final String defaultBucket) {
final Buffer internalBuffer = innerBufferFactory.getBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket);
if(compressionInternal)
return internalBuffer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import org.apache.commons.lang3.time.StopWatch;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.time.Duration;
Expand All @@ -31,7 +31,12 @@ public class InMemoryBuffer implements Buffer {
private String bucket;
private String key;

InMemoryBuffer(S3Client s3Client, Supplier<String> bucketSupplier, Supplier<String> keySupplier) {
private String defaultBucket;

InMemoryBuffer(final S3Client s3Client,
final Supplier<String> bucketSupplier,
final Supplier<String> keySupplier,
final String defaultBucket) {
this.s3Client = s3Client;
this.bucketSupplier = bucketSupplier;
this.keySupplier = keySupplier;
Expand All @@ -40,6 +45,7 @@ public class InMemoryBuffer implements Buffer {
watch = new StopWatch();
watch.start();
isCodecStarted = false;
this.defaultBucket = defaultBucket;
}

@Override
Expand All @@ -62,9 +68,7 @@ public Duration getDuration() {
@Override
public void flushToS3() {
final byte[] byteArray = byteArrayOutputStream.toByteArray();
s3Client.putObject(
PutObjectRequest.builder().bucket(getBucket()).key(getKey()).build(),
RequestBody.fromBytes(byteArray));
BufferUtilities.putObjectOrSendToDefaultBucket(s3Client, RequestBody.fromBytes(byteArray), getKey(), getBucket(), defaultBucket);
}

private String getBucket() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@

public class InMemoryBufferFactory implements BufferFactory {
@Override
public Buffer getBuffer(S3Client s3Client, Supplier<String> bucketSupplier, Supplier<String> keySupplier) {
return new InMemoryBuffer(s3Client, bucketSupplier, keySupplier);
public Buffer getBuffer(final S3Client s3Client,
final Supplier<String> bucketSupplier,
final Supplier<String> keySupplier,
final String defaultBucket) {
return new InMemoryBuffer(s3Client, bucketSupplier, keySupplier, defaultBucket);
}
}
Loading

0 comments on commit 02aef54

Please sign in to comment.