Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api,stub: make param types in StreamObserver and Listener explicit #8544

Merged
merged 1 commit into from
Sep 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
api,stub: clarify StreamObserver and Listener param type
  • Loading branch information
dapengzhang0 committed Sep 21, 2021
commit 03d4a7cef02a6670fc70a7728f01ee4d178af494
2 changes: 2 additions & 0 deletions api/src/main/java/io/grpc/ClientCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ public abstract class ClientCall<ReqT, RespT> {
* an instance from multiple threads, but only one call simultaneously. A single thread may
* interleave calls to multiple instances, so implementations using ThreadLocals must be careful
* to avoid leaking inappropriate state (e.g., clearing the ThreadLocal before returning).
*
* @param <T> type of message received.
*/
public abstract static class Listener<T> {

Expand Down
2 changes: 2 additions & 0 deletions stub/src/main/java/io/grpc/stub/CallStreamObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
*
* <p>DO NOT MOCK: The API is too complex to reliably mock. Use InProcessChannelBuilder to create
* "real" RPCs suitable for testing.
*
* @param <V> type of outbound message.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8499")
public abstract class CallStreamObserver<V> implements StreamObserver<V> {
Expand Down
6 changes: 2 additions & 4 deletions stub/src/main/java/io/grpc/stub/ClientCallStreamObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* <p>DO NOT MOCK: The API is too complex to reliably mock. Use InProcessChannelBuilder to create
* "real" RPCs suitable for testing and make a fake for the server-side.
*/
public abstract class ClientCallStreamObserver<V> extends CallStreamObserver<V> {
public abstract class ClientCallStreamObserver<ReqT> extends CallStreamObserver<ReqT> {
/**
* Prevent any further processing for this {@code ClientCallStreamObserver}. No further messages
* will be received. The server is informed of cancellations, but may not stop processing the
Expand Down Expand Up @@ -78,9 +78,7 @@ public void disableAutoRequestWithInitial(int request) {
* thread will always be used to execute the {@link Runnable}, it is guaranteed that executions
* are serialized with calls to the 'inbound' {@link StreamObserver}.
*
* <p>On client-side this method may only be called during {@link
* ClientResponseObserver#beforeStart}. On server-side it may only be called during the initial
* call to the application, before the service returns its {@code StreamObserver}.
* <p>May only be called during {@link ClientResponseObserver#beforeStart}.
*
* <p>Because there is a processing delay to deliver this notification, it is possible for
* concurrent writes to cause {@code isReady() == false} within this callback. Handle "spurious"
Expand Down
9 changes: 5 additions & 4 deletions stub/src/main/java/io/grpc/stub/ClientCalls.java
Original file line number Diff line number Diff line change
Expand Up @@ -337,9 +337,10 @@ private abstract static class StartableListener<T> extends ClientCall.Listener<T
abstract void onStart();
}

private static final class CallToStreamObserverAdapter<T> extends ClientCallStreamObserver<T> {
private static final class CallToStreamObserverAdapter<ReqT>
extends ClientCallStreamObserver<ReqT> {
private boolean frozen;
private final ClientCall<T, ?> call;
private final ClientCall<ReqT, ?> call;
private final boolean streamingResponse;
private Runnable onReadyHandler;
private int initialRequest = 1;
Expand All @@ -348,7 +349,7 @@ private static final class CallToStreamObserverAdapter<T> extends ClientCallStre
private boolean completed = false;

// Non private to avoid synthetic class
CallToStreamObserverAdapter(ClientCall<T, ?> call, boolean streamingResponse) {
CallToStreamObserverAdapter(ClientCall<ReqT, ?> call, boolean streamingResponse) {
this.call = call;
this.streamingResponse = streamingResponse;
}
Expand All @@ -358,7 +359,7 @@ private void freeze() {
}

@Override
public void onNext(T value) {
public void onNext(ReqT value) {
checkState(!aborted, "Stream was terminated by error, no further calls are allowed");
checkState(!completed, "Stream is already completed, no further calls are allowed");
call.sendMessage(value);
Expand Down
7 changes: 3 additions & 4 deletions stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* <p>DO NOT MOCK: The API is too complex to reliably mock. Use InProcessChannelBuilder to create
* "real" RPCs suitable for testing and interact with the server using a normal client stub.
*/
public abstract class ServerCallStreamObserver<V> extends CallStreamObserver<V> {
public abstract class ServerCallStreamObserver<RespT> extends CallStreamObserver<RespT> {

/**
* Returns {@code true} when the call is cancelled and the server is encouraged to abort
Expand Down Expand Up @@ -113,9 +113,8 @@ public void disableAutoRequest() {
* thread will always be used to execute the {@link Runnable}, it is guaranteed that executions
* are serialized with calls to the 'inbound' {@link StreamObserver}.
*
* <p>On client-side this method may only be called during {@link
* ClientResponseObserver#beforeStart}. On server-side it may only be called during the initial
* call to the application, before the service returns its {@code StreamObserver}.
* <p>May only be called during the initial call to the application, before the service returns
* its {@code StreamObserver}.
*
* <p>Because there is a processing delay to deliver this notification, it is possible for
* concurrent writes to cause {@code isReady() == false} within this callback. Handle "spurious"
Expand Down
2 changes: 1 addition & 1 deletion stub/src/main/java/io/grpc/stub/ServerCalls.java
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ public static void asyncUnimplementedUnaryCall(
* @param methodDescriptor of method for which error will be thrown.
* @param responseObserver on which error will be set.
*/
public static <T> StreamObserver<T> asyncUnimplementedStreamingCall(
public static <ReqT> StreamObserver<ReqT> asyncUnimplementedStreamingCall(
MethodDescriptor<?, ?> methodDescriptor, StreamObserver<?> responseObserver) {
// NB: For streaming call we want to do the same as for unary call. Fail-fast by setting error
// on responseObserver and then return no-op observer.
Expand Down