Skip to content

Commit

Permalink
fix: update gRPC ReadObject retry to avoid double retry (#2765)
Browse files Browse the repository at this point in the history
Prevent adding an unavailable error to the queue if the retrying loop is still active.

Also, cleanup unused retry config for gRPC ReadObject.
  • Loading branch information
BenWhitehead authored Oct 10, 2024
1 parent 6829eb4 commit 1fc57b9
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

final class GapicUnbufferedReadableByteChannel
implements UnbufferedReadableByteChannel, ScatteringByteChannel {
Expand Down Expand Up @@ -258,6 +259,11 @@ ApiFuture<Object> getResult() {

private void ensureStreamOpen() {
if (readObjectObserver == null) {
java.lang.Object peek = queue.peek();
if (peek instanceof Throwable || peek == EOF_MARKER) {
// If our queue has an error or EOF, do not send another request
return;
}
readObjectObserver =
Retrying.run(
retryingDeps,
Expand Down Expand Up @@ -326,13 +332,15 @@ protected void onResponseImpl(ReadObjectResponse response) {

@Override
protected void onErrorImpl(Throwable t) {
open.setException(t);
if (!alg.shouldRetry(t, null)) {
result.setException(StorageException.coalesce(t));
}
if (t instanceof CancellationException) {
cancellation.set(t);
}
if (!open.isDone()) {
open.setException(t);
if (!alg.shouldRetry(t, null)) {
result.setException(StorageException.coalesce(t));
}
}
try {
queue.offer(t);
} catch (InterruptedException e) {
Expand Down Expand Up @@ -369,6 +377,11 @@ public boolean nonEmpty() {
return !queue.isEmpty();
}

@Nullable
public T peek() {
return queue.peek();
}

@NonNull
public T poll() throws InterruptedException {
return queue.take();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -723,8 +723,7 @@ public GrpcBlobReadChannel reader(String bucket, String blob, BlobSourceOption..
public GrpcBlobReadChannel reader(BlobId blob, BlobSourceOption... options) {
Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob).prepend(defaultOpts);
ReadObjectRequest request = getReadObjectRequest(blob, opts);
Set<StatusCode.Code> codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(request));
GrpcCallContext grpcCallContext = Retrying.newCallContext().withRetryableCodes(codes);
GrpcCallContext grpcCallContext = Retrying.newCallContext();

return new GrpcBlobReadChannel(
storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext),
Expand Down Expand Up @@ -1708,10 +1707,7 @@ private UnbufferedReadableByteChannelSession<Object> unbufferedReadSession(

Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob).prepend(defaultOpts);
ReadObjectRequest readObjectRequest = getReadObjectRequest(blob, opts);
Set<StatusCode.Code> codes =
resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(readObjectRequest));
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(Retrying.newCallContext().withRetryableCodes(codes));
GrpcCallContext grpcCallContext = opts.grpcMetadataMapper().apply(Retrying.newCallContext());
return ResumableMedia.gapic()
.read()
.byteChannel(
Expand Down

0 comments on commit 1fc57b9

Please sign in to comment.