Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use InstanceProfileCredentialsProvider for S3 clients #71

Open
wants to merge 2 commits into
base: s3-encryption
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
removed unnecessary code
  • Loading branch information
smiklosovic committed May 20, 2023
commit 5b96a283e8c1f34688eda3c57c4174dab5a18f7d
72 changes: 15 additions & 57 deletions src/main/java/com/instaclustr/esop/s3/v2/BaseS3Backuper.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,10 @@
import com.instaclustr.esop.s3.v2.S3ClientsFactory.S3Clients;
import com.instaclustr.threading.Executors;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.core.waiters.WaiterOverrideConfiguration;
import software.amazon.awssdk.core.waiters.WaiterResponse;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest;
import software.amazon.awssdk.services.s3.model.GetObjectAttributesResponse;
import software.amazon.awssdk.services.s3.model.GetObjectTaggingRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.MetadataDirective;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.ObjectAttributes;
Expand All @@ -36,62 +32,52 @@
import software.amazon.awssdk.services.s3.model.Tagging;

import static java.nio.charset.StandardCharsets.UTF_8;
import static software.amazon.awssdk.core.retry.RetryMode.STANDARD;
import static software.amazon.awssdk.core.retry.backoff.BackoffStrategy.defaultStrategy;

public class BaseS3Backuper extends Backuper
{
public class BaseS3Backuper extends Backuper {
private static final Logger logger = LoggerFactory.getLogger(BaseS3Backuper.class);
private ExecutorService executorService;

public final S3Clients s3Clients;
public final BucketService s3BucketService;

public BaseS3Backuper(final S3Clients s3Clients,
final BackupOperationRequest request)
{
final BackupOperationRequest request) {
super(request);
this.s3Clients = s3Clients;
this.executorService = new Executors.FixedTasksExecutorSupplier().get(100);
this.s3BucketService = new BaseS3BucketService(s3Clients);
}

public BaseS3Backuper(final S3Clients s3Clients,
final BackupCommitLogsOperationRequest request)
{
final BackupCommitLogsOperationRequest request) {
super(request);
this.s3Clients = s3Clients;
this.executorService = new Executors.FixedTasksExecutorSupplier().get(100);
this.s3BucketService = new BaseS3BucketService(s3Clients);
}

@Override
public RemoteObjectReference objectKeyToRemoteReference(final Path objectKey)
{
public RemoteObjectReference objectKeyToRemoteReference(final Path objectKey) {
return new S3RemoteObjectReference(objectKey, objectKey.toString());
}

@Override
public RemoteObjectReference objectKeyToNodeAwareRemoteReference(final Path objectKey)
{
public RemoteObjectReference objectKeyToNodeAwareRemoteReference(final Path objectKey) {
return new S3RemoteObjectReference(objectKey, resolveNodeAwareRemotePath(objectKey));
}


@Override
protected void cleanup() throws Exception
{
protected void cleanup() throws Exception {
s3Clients.close();
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
}

@Override
public FreshenResult freshenRemoteObject(ManifestEntry manifestEntry, RemoteObjectReference object) throws Exception
{
public FreshenResult freshenRemoteObject(ManifestEntry manifestEntry, RemoteObjectReference object) {
List<Tag> tags;
try
{
try {
tags = s3Clients.getNonEncryptingClient()
.getObjectTagging(GetObjectTaggingRequest.builder()
.bucket(request.storageLocation.bucket)
Expand Down Expand Up @@ -133,8 +119,7 @@ public FreshenResult freshenRemoteObject(ManifestEntry manifestEntry, RemoteObje
// we want to preserve whatever tags it had
Tagging tagging = taggingBuilder.tagSet(tags).build();

if (!request.skipRefreshing)
{
if (!request.skipRefreshing) {
CopyObjectRequest copyObjectRequest = CopyObjectRequest.builder()
.sourceBucket(request.storageLocation.bucket)
.destinationBucket(request.storageLocation.bucket)
Expand All @@ -161,19 +146,16 @@ public FreshenResult freshenRemoteObject(ManifestEntry manifestEntry, RemoteObje
}

@Override
public void uploadFile(ManifestEntry manifestEntry, InputStream localFileStream, RemoteObjectReference objectReference) throws Exception
{
public void uploadFile(ManifestEntry manifestEntry, InputStream localFileStream, RemoteObjectReference objectReference) {
logger.info("Uploading {}", objectReference.canonicalPath);
s3Clients.getNonEncryptingClient()
.putObject(getPutObjectRequest(objectReference, manifestEntry.size),
RequestBody.fromInputStream(localFileStream, manifestEntry.size));
}

@Override
public void uploadEncryptedFile(ManifestEntry manifestEntry, InputStream localFileStream, RemoteObjectReference objectReference) throws Exception
{
if (!s3Clients.getEncryptingClient().isPresent())
{
public void uploadEncryptedFile(ManifestEntry manifestEntry, InputStream localFileStream, RemoteObjectReference objectReference) {
if (!s3Clients.getEncryptingClient().isPresent()) {
uploadFile(manifestEntry, localFileStream, objectReference);
return;
}
Expand Down Expand Up @@ -204,8 +186,7 @@ public void uploadEncryptedFile(ManifestEntry manifestEntry, InputStream localFi
}

@Override
public void uploadText(String text, RemoteObjectReference objectReference) throws Exception
{
public void uploadText(String text, RemoteObjectReference objectReference) throws Exception {
logger.info("Uploading {}", objectReference.canonicalPath);
byte[] bytes = text.getBytes(UTF_8);

Expand All @@ -215,8 +196,7 @@ public void uploadText(String text, RemoteObjectReference objectReference) throw
}

@Override
public void uploadEncryptedText(String plainText, RemoteObjectReference objectReference) throws Exception
{
public void uploadEncryptedText(String plainText, RemoteObjectReference objectReference) throws Exception {
if (!s3Clients.getEncryptingClient().isPresent()) {
uploadText(plainText, objectReference);
return;
Expand All @@ -230,31 +210,9 @@ public void uploadEncryptedText(String plainText, RemoteObjectReference objectRe
RequestBody.fromBytes(bytes));
}

private void waitForCompletion(RemoteObjectReference objectReference) throws Exception
{
WaiterResponse<HeadObjectResponse> response = s3Clients.getClient()
.waiter()
.waitUntilObjectExists(HeadObjectRequest.builder()
.bucket(request.storageLocation.bucket)
.key(objectReference.canonicalPath)
.build(),
WaiterOverrideConfiguration.builder()
.backoffStrategy(defaultStrategy(STANDARD))
.build());

if (response.matched().exception().isPresent())
{
logger.debug("Failed to upload {}.", objectReference.canonicalPath);
throw new RuntimeException(response.matched().exception().get());
}

logger.info("Finished uploading {}.", objectReference.canonicalPath);
}

private PutObjectRequest getPutObjectRequest(RemoteObjectReference s3RemoteObjectReference,
long unencryptedSize,
Tag... tags)
{
Tag... tags) {
return PutObjectRequest.builder()
.bucket(request.storageLocation.bucket)
.key(s3RemoteObjectReference.canonicalPath)
Expand Down