Skip to content

Commit

Permalink
all: add max message size to client calls
Browse files Browse the repository at this point in the history
  • Loading branch information
carl-mastrangelo authored Jan 6, 2017
1 parent 6ed3cbb commit 8d49df2
Show file tree
Hide file tree
Showing 18 changed files with 334 additions and 34 deletions.
74 changes: 63 additions & 11 deletions core/src/main/java/io/grpc/CallOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

package io.grpc;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;

Expand Down Expand Up @@ -77,6 +79,12 @@ public final class CallOptions {
*/
private boolean waitForReady;

@Nullable
private Integer maxInboundMessageSize;
@Nullable
private Integer maxOutboundMessageSize;


/**
* Override the HTTP/2 authority the channel claims to be connecting to. <em>This is not
* generally safe.</em> Overriding allows advanced users to re-use a single Channel for multiple
Expand Down Expand Up @@ -362,6 +370,47 @@ public boolean isWaitForReady() {
return waitForReady;
}

/**
* Sets the maximum allowed message size acceptable from the remote peer. If unset, this will
* default to the value set on the {@link ManagedChannelBuilder#maxInboundMessageSize(int)}.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/2563")
public CallOptions withMaxInboundMessageSize(int maxSize) {
checkArgument(maxSize >= 0, "invalid maxsize %s", maxSize);
CallOptions newOptions = new CallOptions(this);
newOptions.maxInboundMessageSize = maxSize;
return newOptions;
}

/**
* Sets the maximum allowed message size acceptable sent to the remote peer.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/2563")
public CallOptions withMaxOutboundMessageSize(int maxSize) {
checkArgument(maxSize >= 0, "invalid maxsize %s", maxSize);
CallOptions newOptions = new CallOptions(this);
newOptions.maxOutboundMessageSize = maxSize;
return newOptions;
}

/**
* Gets the maximum allowed message size acceptable from the remote peer.
*/
@Nullable
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/2563")
public Integer getMaxInboundMessageSize() {
return maxInboundMessageSize;
}

/**
* Gets the maximum allowed message size acceptable to send the remote peer.
*/
@Nullable
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/2563")
public Integer getMaxOutboundMessageSize() {
return maxOutboundMessageSize;
}

/**
* Copy constructor.
*/
Expand All @@ -374,20 +423,23 @@ private CallOptions(CallOptions other) {
compressorName = other.compressorName;
customOptions = other.customOptions;
waitForReady = other.waitForReady;
maxInboundMessageSize = other.maxInboundMessageSize;
maxOutboundMessageSize = other.maxOutboundMessageSize;
}

@Override
public String toString() {
MoreObjects.ToStringHelper toStringHelper = MoreObjects.toStringHelper(this);
toStringHelper.add("deadline", deadline);
toStringHelper.add("authority", authority);
toStringHelper.add("callCredentials", credentials);
toStringHelper.add("affinity", affinity);
toStringHelper.add("executor", executor != null ? executor.getClass() : null);
toStringHelper.add("compressorName", compressorName);
toStringHelper.add("customOptions", Arrays.deepToString(customOptions));
toStringHelper.add("waitForReady", isWaitForReady());

return toStringHelper.toString();
return MoreObjects.toStringHelper(this)
.add("deadline", deadline)
.add("authority", authority)
.add("callCredentials", credentials)
.add("affinity", affinity)
.add("executor", executor != null ? executor.getClass() : null)
.add("compressorName", compressorName)
.add("customOptions", Arrays.deepToString(customOptions))
.add("waitForReady", isWaitForReady())
.add("maxInboundMessageSize", maxInboundMessageSize)
.add("maxOutboundMessageSize", maxOutboundMessageSize)
.toString();
}
}
6 changes: 6 additions & 0 deletions core/src/main/java/io/grpc/inprocess/InProcessTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,12 @@ public void setCompressor(Compressor compressor) {}

@Override
public void setDecompressor(Decompressor decompressor) {}

@Override
public void setMaxInboundMessageSize(int maxSize) {}

@Override
public void setMaxOutboundMessageSize(int maxSize) {}
}
}

Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/io/grpc/internal/AbstractClientStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ protected AbstractClientStream(WritableBufferAllocator bufferAllocator, int maxM
super(bufferAllocator, maxMessageSize, statsTraceCtx);
}

@Override
public void setMaxInboundMessageSize(int maxSize) {
setMaxInboundMessageSizeProtected(maxSize);
}

@Override
public void setMaxOutboundMessageSize(int maxSize) {
setMaxOutboundMessageSizeProtected(maxSize);
}

@Override
protected final ClientStreamListener listener() {
return listener;
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/io/grpc/internal/AbstractClientStream2.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,16 @@ protected AbstractClientStream2(WritableBufferAllocator bufferAllocator,
framer = new MessageFramer(this, bufferAllocator, statsTraceCtx);
}

@Override
public void setMaxOutboundMessageSize(int maxSize) {
framer.setMaxOutboundMessageSize(maxSize);
}

@Override
public void setMaxInboundMessageSize(int maxSize) {
transportState().setMaxInboundMessageSize(maxSize);
}

/** {@inheritDoc} */
@Override
protected abstract TransportState transportState();
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/java/io/grpc/internal/AbstractStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,14 @@ public void endOfStream() {
statsTraceCtx);
}

protected final void setMaxInboundMessageSizeProtected(int maxSize) {
deframer.setMaxInboundMessageSize(maxSize);
}

protected final void setMaxOutboundMessageSizeProtected(int maxSize) {
framer.setMaxOutboundMessageSize(maxSize);
}

@VisibleForTesting
AbstractStream(MessageFramer framer, MessageDeframer deframer) {
this.framer = framer;
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/io/grpc/internal/AbstractStream2.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ protected TransportState(int maxMessageSize, StatsTraceContext statsTraceCtx) {
this.deframer = deframer;
}

final void setMaxInboundMessageSize(int maxSize) {
deframer.setMaxInboundMessageSize(maxSize);
}

/**
* Override this method to provide a stream listener.
*/
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/io/grpc/internal/ClientCallImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,12 @@ public void runInContext() {
if (callOptions.getAuthority() != null) {
stream.setAuthority(callOptions.getAuthority());
}
if (callOptions.getMaxInboundMessageSize() != null) {
stream.setMaxInboundMessageSize(callOptions.getMaxInboundMessageSize());
}
if (callOptions.getMaxOutboundMessageSize() != null) {
stream.setMaxOutboundMessageSize(callOptions.getMaxOutboundMessageSize());
}
stream.setCompressor(compressor);
stream.start(new ClientStreamListenerImpl(observer));

Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/io/grpc/internal/ClientStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,14 @@ public interface ClientStream extends Stream {
* @param listener non-{@code null} listener of stream events
*/
void start(ClientStreamListener listener);

/**
* Sets the max size accepted from the remote endpoint.
*/
void setMaxInboundMessageSize(int maxSize);

/**
* Sets the max size sent to the remote endpoint.
*/
void setMaxOutboundMessageSize(int maxSize);
}
28 changes: 28 additions & 0 deletions core/src/main/java/io/grpc/internal/DelayedStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,34 @@ class DelayedStream implements ClientStream {
@GuardedBy("this")
private DelayedStreamListener delayedListener;

@Override
public void setMaxInboundMessageSize(final int maxSize) {
if (passThrough) {
realStream.setMaxInboundMessageSize(maxSize);
} else {
delayOrExecute(new Runnable() {
@Override
public void run() {
realStream.setMaxInboundMessageSize(maxSize);
}
});
}
}

@Override
public void setMaxOutboundMessageSize(final int maxSize) {
if (passThrough) {
realStream.setMaxOutboundMessageSize(maxSize);
} else {
delayOrExecute(new Runnable() {
@Override
public void run() {
realStream.setMaxOutboundMessageSize(maxSize);
}
});
}
}

/**
* Transfers all pending and future requests and mutations to the given stream.
*
Expand Down
20 changes: 11 additions & 9 deletions core/src/main/java/io/grpc/internal/MessageDeframer.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private enum State {
}

private final Listener listener;
private final int maxMessageSize;
private int maxInboundMessageSize;
private final StatsTraceContext statsTraceCtx;
private Decompressor decompressor;
private State state = State.HEADER;
Expand All @@ -122,10 +122,14 @@ public MessageDeframer(Listener listener, Decompressor decompressor, int maxMess
StatsTraceContext statsTraceCtx) {
this.listener = Preconditions.checkNotNull(listener, "sink");
this.decompressor = Preconditions.checkNotNull(decompressor, "decompressor");
this.maxMessageSize = maxMessageSize;
this.maxInboundMessageSize = maxMessageSize;
this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
}

void setMaxInboundMessageSize(int messageSize) {
maxInboundMessageSize = messageSize;
}

/**
* Sets the decompressor available to use. The message encoding for the stream comes later in
* time, and thus will not be available at the time of construction. This should only be set
Expand Down Expand Up @@ -338,10 +342,9 @@ private void processHeader() {

// Update the required length to include the length of the frame.
requiredLength = nextFrame.readInt();
if (requiredLength < 0 || requiredLength > maxMessageSize) {
throw Status.INTERNAL.withDescription(String.format("Frame size %d exceeds maximum: %d. "
+ "If this is normal, increase the maxMessageSize in the channel/server builder",
requiredLength, maxMessageSize)).asRuntimeException();
if (requiredLength < 0 || requiredLength > maxInboundMessageSize) {
throw Status.INTERNAL.withDescription(String.format("Frame size %d exceeds maximum: %d. ",
requiredLength, maxInboundMessageSize)).asRuntimeException();
}

// Continue reading the frame body.
Expand Down Expand Up @@ -377,7 +380,7 @@ private InputStream getCompressedBody() {
// Enforce the maxMessageSize limit on the returned stream.
InputStream unlimitedStream =
decompressor.decompress(ReadableBuffers.openStream(nextFrame, true));
return new SizeEnforcingInputStream(unlimitedStream, maxMessageSize, statsTraceCtx);
return new SizeEnforcingInputStream(unlimitedStream, maxInboundMessageSize, statsTraceCtx);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -461,8 +464,7 @@ private void reportCount() {
private void verifySize() {
if (count > maxMessageSize) {
throw Status.INTERNAL.withDescription(String.format(
"Compressed frame exceeds maximum frame size: %d. Bytes read: %d. "
+ "If this is normal, increase the maxMessageSize in the channel/server builder",
"Compressed frame exceeds maximum frame size: %d. Bytes read: %d. ",
maxMessageSize, count)).asRuntimeException();
}
}
Expand Down
29 changes: 29 additions & 0 deletions core/src/main/java/io/grpc/internal/MessageFramer.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.lang.Math.min;

import com.google.common.io.ByteStreams;
Expand All @@ -58,6 +59,9 @@
* MessageFramer.Sink}.
*/
public class MessageFramer {

private static final int NO_MAX_OUTBOUND_MESSAGE_SIZE = -1;

/**
* Sink implemented by the transport layer to receive frames and forward them to their
* destination.
Expand All @@ -79,6 +83,8 @@ public interface Sink {
private static final byte COMPRESSED = 1;

private final Sink sink;
// effectively final. Can only be set once.
private int maxOutboundMessageSize = NO_MAX_OUTBOUND_MESSAGE_SIZE;
private WritableBuffer buffer;
private Compressor compressor = Codec.Identity.NONE;
private boolean messageCompression = true;
Expand Down Expand Up @@ -111,6 +117,11 @@ MessageFramer setMessageCompression(boolean enable) {
return this;
}

void setMaxOutboundMessageSize(int maxSize) {
checkState(maxOutboundMessageSize == NO_MAX_OUTBOUND_MESSAGE_SIZE, "max size already set");
maxOutboundMessageSize = maxSize;
}

/**
* Writes out a payload message.
*
Expand Down Expand Up @@ -155,6 +166,12 @@ private int writeUncompressed(InputStream message, int messageLength) throws IOE
}
BufferChainOutputStream bufferChain = new BufferChainOutputStream();
int written = writeToOutputStream(message, bufferChain);
if (maxOutboundMessageSize >= 0 && written > maxOutboundMessageSize) {
throw Status.INTERNAL
.withDescription(
String.format("message too large %d > %d", written , maxOutboundMessageSize))
.asRuntimeException();
}
writeBufferChain(bufferChain, false);
return written;
}
Expand All @@ -169,6 +186,12 @@ private int writeCompressed(InputStream message, int messageLength) throws IOExc
} finally {
compressingStream.close();
}
if (maxOutboundMessageSize >= 0 && written > maxOutboundMessageSize) {
throw Status.CANCELLED
.withDescription(
String.format("message too large %d > %d", written , maxOutboundMessageSize))
.asRuntimeException();
}

writeBufferChain(bufferChain, true);
return written;
Expand All @@ -186,6 +209,12 @@ private int getKnownLength(InputStream inputStream) throws IOException {
*/
private int writeKnownLengthUncompressed(InputStream message, int messageLength)
throws IOException {
if (maxOutboundMessageSize >= 0 && messageLength > maxOutboundMessageSize) {
throw Status.CANCELLED
.withDescription(
String.format("message too large %d > %d", messageLength , maxOutboundMessageSize))
.asRuntimeException();
}
ByteBuffer header = ByteBuffer.wrap(headerScratch);
header.put(UNCOMPRESSED);
header.putInt(messageLength);
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/io/grpc/internal/NoopClientStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,10 @@ public void setCompressor(Compressor compressor) {}

@Override
public void setDecompressor(Decompressor decompressor) {}

@Override
public void setMaxInboundMessageSize(int maxSize) {}

@Override
public void setMaxOutboundMessageSize(int maxSize) {}
}
Loading

0 comments on commit 8d49df2

Please sign in to comment.