Skip to content

Commit

Permalink
abortion of orphaned multipart uploads and fixed restore
Browse files Browse the repository at this point in the history
  • Loading branch information
smiklosovic committed May 31, 2023
1 parent 66b4876 commit a4790d1
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public void update(final StorageLocation storageLocation,

private boolean isClosed = false;

public void init(List<ManifestEntry> manifestEntries) {}

public void close() throws IOException {
if (isClosed) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public void coordinate(final Operation<BackupOperationRequest> operation) {

try (final Backuper backuper = backuperFactoryMap.get(request.storageLocation.storageProvider).createBackuper(request)) {

backuper.init(manifest.getManifestEntries(true));
performUpload(manifest.getManifestEntries(false), backuper, operation, request);

// upload manifest as the last, possibly with updated file sizes as they were encrypted
Expand Down
197 changes: 156 additions & 41 deletions src/main/java/com/instaclustr/esop/s3/v2/BaseS3Backuper.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -18,9 +17,9 @@
import com.instaclustr.esop.impl.backup.BackupCommitLogsOperationRequest;
import com.instaclustr.esop.impl.backup.BackupOperationRequest;
import com.instaclustr.esop.impl.backup.Backuper;
import com.instaclustr.esop.impl.backup.BaseBackupOperationRequest;
import com.instaclustr.esop.s3.S3RemoteObjectReference;
import com.instaclustr.esop.s3.v2.S3ClientsFactory.S3Clients;
import com.instaclustr.threading.Executors;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
Expand All @@ -34,14 +33,22 @@
import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest;
import software.amazon.awssdk.services.s3.model.GetObjectAttributesResponse;
import software.amazon.awssdk.services.s3.model.GetObjectTaggingRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.ListMultipartUploadsRequest;
import software.amazon.awssdk.services.s3.model.ListMultipartUploadsResponse;
import software.amazon.awssdk.services.s3.model.MetadataDirective;
import software.amazon.awssdk.services.s3.model.MultipartUpload;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.NoSuchUploadException;
import software.amazon.awssdk.services.s3.model.ObjectAttributes;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.SdkPartType;
import software.amazon.awssdk.services.s3.model.StorageClass;
import software.amazon.awssdk.services.s3.model.Tag;
import software.amazon.awssdk.services.s3.model.Tagging;
import software.amazon.awssdk.services.s3.model.UploadPartCopyRequest;
import software.amazon.awssdk.services.s3.model.UploadPartCopyResponse;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.encryption.s3.S3EncryptionClient;
Expand All @@ -50,25 +57,25 @@

public class BaseS3Backuper extends Backuper {
private static final Logger logger = LoggerFactory.getLogger(BaseS3Backuper.class);
private ExecutorService executorService;

public final S3Clients s3Clients;
public final BucketService s3BucketService;
public final MultipartAbortionService multipartAbortionService;

public BaseS3Backuper(final S3Clients s3Clients,
final BackupOperationRequest request) {
super(request);
this.s3Clients = s3Clients;
this.executorService = new Executors.FixedTasksExecutorSupplier().get(100);
this.s3BucketService = new BaseS3BucketService(s3Clients);
this.multipartAbortionService = new MultipartAbortionService(s3Clients.getClient(), this);
}

public BaseS3Backuper(final S3Clients s3Clients,
final BackupCommitLogsOperationRequest request) {
super(request);
this.s3Clients = s3Clients;
this.executorService = new Executors.FixedTasksExecutorSupplier().get(100);
this.s3BucketService = new BaseS3BucketService(s3Clients);
this.multipartAbortionService = new MultipartAbortionService(s3Clients.getClient(), this);
}

@Override
Expand All @@ -85,8 +92,12 @@ public RemoteObjectReference objectKeyToNodeAwareRemoteReference(final Path obje
@Override
protected void cleanup() throws Exception {
s3Clients.close();
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
}

@Override
public void init(List<ManifestEntry> manifestEntries) {
multipartAbortionService.abortOrphanedMultiparts(manifestEntries, request);

}

@Override
Expand Down Expand Up @@ -135,26 +146,77 @@ public FreshenResult freshenRemoteObject(ManifestEntry manifestEntry, RemoteObje
Tagging tagging = taggingBuilder.tagSet(tags).build();

if (!request.skipRefreshing) {
CopyObjectRequest copyObjectRequest = CopyObjectRequest.builder()
.sourceBucket(request.storageLocation.bucket)
.destinationBucket(request.storageLocation.bucket)
.sourceKey(object.canonicalPath)
.destinationKey(object.canonicalPath)
.tagging(tagging)
// we need to translate this because request is still working with api v1
.metadataDirective(MetadataDirective.fromValue(request.metadataDirective.toString()))
.storageClass(StorageClass.STANDARD)
.build();
s3Clients.getClient().copyObject(copyObjectRequest);

GetObjectAttributesResponse objectAttributes = s3Clients.getNonEncryptingClient()
.getObjectAttributes(GetObjectAttributesRequest
.builder()
.bucket(request.storageLocation.bucket)
.key(object.canonicalPath)
.objectAttributes(ObjectAttributes.OBJECT_SIZE)
.build());
manifestEntry.size = objectAttributes.objectSize();
// Get the source object's metadata to determine the part size
HeadObjectRequest headRequest = HeadObjectRequest.builder()
.bucket(request.storageLocation.bucket)
.key(object.canonicalPath)
.build();
HeadObjectResponse headResponse = s3Clients.getClient().headObject(headRequest);
long objectSize = headResponse.contentLength();
long partSize = Long.parseLong(System.getProperty("upload.max.part.size", Double.toString(100 * 1024 * 1024)));

CreateMultipartUploadRequest createRequest = CreateMultipartUploadRequest.builder()
.bucket(request.storageLocation.bucket)
.key(object.canonicalPath)
.tagging(tagging)
.build();
CreateMultipartUploadResponse createResponse = s3Clients.getClient().createMultipartUpload(createRequest);
String uploadId = createResponse.uploadId();

try
{
int totalParts = (int) Math.ceil((double) objectSize / partSize);

List<CompletedPart> completedParts = new ArrayList<>();

for (int partNumber = 1; partNumber <= totalParts; partNumber++)
{
// Calculate the range for the current part
long startOffset = (partNumber - 1) * partSize;
long endOffset = Math.min(partNumber * partSize - 1, objectSize - 1);

UploadPartCopyResponse copyResponse = s3Clients.getClient()
.uploadPartCopy(UploadPartCopyRequest.builder()
.sourceBucket(request.storageLocation.bucket)
.sourceKey(object.canonicalPath)
.destinationBucket(request.storageLocation.bucket)
.destinationKey(object.canonicalPath)
.uploadId(uploadId)
.partNumber(partNumber)
.copySourceRange("bytes=" + startOffset + "-" + endOffset)
.build());
completedParts.add(CompletedPart.builder().partNumber(partNumber).eTag(copyResponse.copyPartResult().eTag()).build());
logger.info("Part {} of {} copied. ETag: {}", partNumber, object.canonicalPath, copyResponse.copyPartResult().eTag());
}

CompleteMultipartUploadRequest completeRequest = CompleteMultipartUploadRequest.builder()
.bucket(request.storageLocation.bucket)
.key(object.canonicalPath)
.uploadId(uploadId)
.multipartUpload(CompletedMultipartUpload.builder()
.parts(completedParts.toArray(new CompletedPart[0]))
.build()).build();
CompleteMultipartUploadResponse completeMultipartUploadResponse = s3Clients.getNonEncryptingClient().completeMultipartUpload(completeRequest);

if (!completeMultipartUploadResponse.sdkHttpResponse().isSuccessful()) {
throw new RuntimeException(String.format("Unsuccessful multipart copying of %s, upload id %s", object.canonicalPath, uploadId));
} else {
logger.info("Completed multipart copying of {}, upload id {}", object.canonicalPath, uploadId);
}

GetObjectAttributesResponse objectAttributes = s3Clients.getClient()
.getObjectAttributes(GetObjectAttributesRequest
.builder()
.bucket(request.storageLocation.bucket)
.key(object.canonicalPath)
.objectAttributes(ObjectAttributes.OBJECT_SIZE)
.build());
manifestEntry.size = objectAttributes.objectSize();
} catch (Throwable t) {
t.printStackTrace();
multipartAbortionService.abortMultipartUpload(uploadId, request, object);
throw new RuntimeException(t);
}
}

return FreshenResult.FRESHENED;
Expand Down Expand Up @@ -300,24 +362,77 @@ private void uploadFile(S3Client s3Client,
.build();

CompleteMultipartUploadResponse completeResponse = s3Client.completeMultipartUpload(completeRequest);
logger.info("Multipart upload of {} completed. ETag: {}", objectReference.canonicalPath, completeResponse.eTag());

if (!completeResponse.sdkHttpResponse().isSuccessful()) {
throw new RuntimeException(String.format("Unsuccessful multipart upload of %s, upload id %s", objectReference.canonicalPath, uploadId));
} else {
logger.info("Completed multipart upload of {}, upload id {}, etag {}", objectReference.canonicalPath, uploadId, completeResponse.eTag());
}
} catch (Throwable t) {
t.printStackTrace();
abortMultipartUpload(s3Client, uploadId, objectReference);
multipartAbortionService.abortMultipartUpload(uploadId, request, objectReference);
throw new RuntimeException(t);
}
}

private void abortMultipartUpload(S3Client s3Client, String uploadId, RemoteObjectReference objectReference) {
// Abort the multipart upload if an exception occurs or if it's not completed
if (uploadId != null) {
AbortMultipartUploadRequest abortRequest = AbortMultipartUploadRequest.builder()
.bucket(request.storageLocation.bucket)
.key(objectReference.canonicalPath)
.uploadId(uploadId)
.build();
s3Client.abortMultipartUpload(abortRequest);
logger.info("Aborted multipart upload of {}, uploadId: {}", objectReference.canonicalPath, uploadId);
public static class MultipartAbortionService {

private final S3Client s3Client;
private final BaseS3Backuper backuper;

public MultipartAbortionService(S3Client s3Client,
BaseS3Backuper backuper) {
this.s3Client = s3Client;
this.backuper = backuper;
}
}

public void abortOrphanedMultiparts(List<ManifestEntry> manifestEntries, BaseBackupOperationRequest request) {
ListMultipartUploadsRequest listRequest = ListMultipartUploadsRequest.builder()
.bucket(request.storageLocation.bucket)
.build();

ListMultipartUploadsResponse listResponse = s3Client.listMultipartUploads(listRequest);

List<String> entriesKeys = manifestEntries.stream()
.map(me -> backuper.objectKeyToNodeAwareRemoteReference(me.objectKey).canonicalPath)
.collect(Collectors.toList());

for (MultipartUpload upload : listResponse.uploads()
.stream()
.filter(upload -> entriesKeys.contains(upload.key()))
.collect(Collectors.toList())) {
AbortMultipartUploadRequest abortRequest = AbortMultipartUploadRequest.builder()
.bucket(request.storageLocation.bucket)
.key(upload.key())
.uploadId(upload.uploadId())
.build();

logger.info("Aborting orphaned multipart upload of id {} for key {} in bucket {}",
abortRequest.uploadId(),
abortRequest.key(),
abortRequest.bucket());

s3Client.abortMultipartUpload(abortRequest);
}
}

public void abortMultipartUpload(String uploadId,
BaseBackupOperationRequest request,
RemoteObjectReference objectReference) {
// Abort the multipart upload if an exception occurs or if it's not completed
if (uploadId != null) {
AbortMultipartUploadRequest abortRequest = AbortMultipartUploadRequest.builder()
.bucket(request.storageLocation.bucket)
.key(objectReference.canonicalPath)
.uploadId(uploadId)
.build();
try {
s3Client.abortMultipartUpload(abortRequest);
logger.info("Aborted multipart upload of {}, uploadId: {}", objectReference.canonicalPath, uploadId);
} catch (NoSuchUploadException ex) {
logger.info("There is no such multipart upload of {}, uploadId: {} to delete", objectReference.canonicalPath, uploadId);
}
}
}
}
}
Loading

0 comments on commit a4790d1

Please sign in to comment.