Skip to content

Commit

Permalink
Fix for promise based helper (grpc#29009)
Browse files Browse the repository at this point in the history
  • Loading branch information
ctiller authored Mar 3, 2022
1 parent cc44f7c commit 564f7d6
Showing 1 changed file with 10 additions and 3 deletions.
13 changes: 10 additions & 3 deletions src/core/lib/channel/promise_based_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ void ClientCallData::StartBatch(grpc_transport_stream_op_batch* batch) {
// of this filter.
if (batch->send_initial_metadata) {
// If we're already cancelled, just terminate the batch.
if (send_initial_state_ == SendInitialState::kCancelled) {
if (send_initial_state_ == SendInitialState::kCancelled ||
recv_trailing_state_ == RecvTrailingState::kCancelled) {
grpc_transport_stream_op_batch_finish_with_failure(
batch, GRPC_ERROR_REF(cancelled_error_), call_combiner());
return;
Expand All @@ -116,6 +117,11 @@ void ClientCallData::StartBatch(grpc_transport_stream_op_batch* batch) {
// recv_trailing_metadata *without* send_initial_metadata: hook it so we can
// respond to it, and push it down.
if (batch->recv_trailing_metadata) {
if (recv_trailing_state_ == RecvTrailingState::kCancelled) {
grpc_transport_stream_op_batch_finish_with_failure(
batch, GRPC_ERROR_REF(cancelled_error_), call_combiner());
return;
}
GPR_ASSERT(recv_trailing_state_ == RecvTrailingState::kInitial);
recv_trailing_state_ = RecvTrailingState::kForwarded;
HookRecvTrailingMetadata(batch);
Expand Down Expand Up @@ -312,6 +318,7 @@ void ClientCallData::WakeInsideCombiner() {
call_closure =
absl::exchange(original_recv_trailing_metadata_ready_, nullptr);
break;
case RecvTrailingState::kInitial:
case RecvTrailingState::kQueued:
case RecvTrailingState::kForwarded: {
GPR_ASSERT(*md->get_pointer(GrpcStatusMetadata()) !=
Expand All @@ -325,6 +332,8 @@ void ClientCallData::WakeInsideCombiner() {
error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE,
message->as_string_view());
}
GRPC_ERROR_UNREF(cancelled_error_);
cancelled_error_ = GRPC_ERROR_REF(error);
if (recv_trailing_state_ == RecvTrailingState::kQueued) {
GPR_ASSERT(send_initial_state_ == SendInitialState::kQueued);
send_initial_state_ = SendInitialState::kCancelled;
Expand All @@ -338,8 +347,6 @@ void ClientCallData::WakeInsideCombiner() {
}
recv_trailing_state_ = RecvTrailingState::kCancelled;
} break;
case RecvTrailingState::kInitial:
abort(); // unimplemented
case RecvTrailingState::kResponded:
case RecvTrailingState::kCancelled:
abort(); // unreachable
Expand Down

0 comments on commit 564f7d6

Please sign in to comment.