diff --git a/src/core/lib/channel/promise_based_filter.cc b/src/core/lib/channel/promise_based_filter.cc index 536a1287451ca..93fe16a909ba3 100644 --- a/src/core/lib/channel/promise_based_filter.cc +++ b/src/core/lib/channel/promise_based_filter.cc @@ -92,7 +92,8 @@ void ClientCallData::StartBatch(grpc_transport_stream_op_batch* batch) { // of this filter. if (batch->send_initial_metadata) { // If we're already cancelled, just terminate the batch. - if (send_initial_state_ == SendInitialState::kCancelled) { + if (send_initial_state_ == SendInitialState::kCancelled || + recv_trailing_state_ == RecvTrailingState::kCancelled) { grpc_transport_stream_op_batch_finish_with_failure( batch, GRPC_ERROR_REF(cancelled_error_), call_combiner()); return; @@ -116,6 +117,11 @@ void ClientCallData::StartBatch(grpc_transport_stream_op_batch* batch) { // recv_trailing_metadata *without* send_initial_metadata: hook it so we can // respond to it, and push it down. if (batch->recv_trailing_metadata) { + if (recv_trailing_state_ == RecvTrailingState::kCancelled) { + grpc_transport_stream_op_batch_finish_with_failure( + batch, GRPC_ERROR_REF(cancelled_error_), call_combiner()); + return; + } GPR_ASSERT(recv_trailing_state_ == RecvTrailingState::kInitial); recv_trailing_state_ = RecvTrailingState::kForwarded; HookRecvTrailingMetadata(batch); @@ -312,6 +318,7 @@ void ClientCallData::WakeInsideCombiner() { call_closure = absl::exchange(original_recv_trailing_metadata_ready_, nullptr); break; + case RecvTrailingState::kInitial: case RecvTrailingState::kQueued: case RecvTrailingState::kForwarded: { GPR_ASSERT(*md->get_pointer(GrpcStatusMetadata()) != @@ -325,6 +332,8 @@ void ClientCallData::WakeInsideCombiner() { error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, message->as_string_view()); } + GRPC_ERROR_UNREF(cancelled_error_); + cancelled_error_ = GRPC_ERROR_REF(error); if (recv_trailing_state_ == RecvTrailingState::kQueued) { GPR_ASSERT(send_initial_state_ == SendInitialState::kQueued); send_initial_state_ = SendInitialState::kCancelled; @@ -338,8 +347,6 @@ void ClientCallData::WakeInsideCombiner() { } recv_trailing_state_ = RecvTrailingState::kCancelled; } break; - case RecvTrailingState::kInitial: - abort(); // unimplemented case RecvTrailingState::kResponded: case RecvTrailingState::kCancelled: abort(); // unreachable