From 9f90e6d906589ad4cc2c34e1ebb4abe35f0e40cf Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 16 Feb 2022 13:32:21 -0800 Subject: [PATCH] Revert "Revert "Implement transparent retries (#28548)" (#28888)" (#28890) * Revert "Revert "Implement transparent retries (#28548)" (#28888)" This reverts commit 35708ff6b4bbc2f8a5042fbe8761374fee5c9395. * disable retries for ALTS handshaker channel --- CMakeLists.txt | 6 + build_autogenerated.yaml | 6 + gRPC-Core.podspec | 3 + grpc.gyp | 6 + .../filters/client_channel/retry_filter.cc | 178 +++++--- .../alts/handshaker/alts_shared_resource.cc | 8 +- .../alts/handshaker/alts_tsi_handshaker.cc | 7 +- test/core/end2end/end2end_nosec_tests.cc | 24 ++ test/core/end2end/end2end_tests.cc | 24 ++ test/core/end2end/fuzzers/api_fuzzer.cc | 34 +- test/core/end2end/generate_tests.bzl | 11 + ...retry_cancel_with_multiple_send_batches.cc | 2 +- .../retry_recv_trailing_metadata_error.cc | 2 +- .../core/end2end/tests/retry_send_op_fails.cc | 28 +- .../end2end/tests/retry_transparent_goaway.cc | 379 ++++++++++++++++++ ...etry_transparent_max_concurrent_streams.cc | 368 +++++++++++++++++ .../retry_transparent_not_sent_on_wire.cc | 378 +++++++++++++++++ .../transport/chttp2/streams_not_seen_test.cc | 4 +- 18 files changed, 1383 insertions(+), 85 deletions(-) create mode 100644 test/core/end2end/tests/retry_transparent_goaway.cc create mode 100644 test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc create mode 100644 test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index a2b3c36eb40da..ac98c0594c2e2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1191,6 +1191,9 @@ add_library(end2end_nosec_tests test/core/end2end/tests/retry_streaming_succeeds_before_replay_finished.cc test/core/end2end/tests/retry_throttled.cc test/core/end2end/tests/retry_too_many_attempts.cc + test/core/end2end/tests/retry_transparent_goaway.cc + test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc + test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc test/core/end2end/tests/server_finishes_request.cc test/core/end2end/tests/server_streaming.cc test/core/end2end/tests/shutdown_finishes_calls.cc @@ -1334,6 +1337,9 @@ add_library(end2end_tests test/core/end2end/tests/retry_streaming_succeeds_before_replay_finished.cc test/core/end2end/tests/retry_throttled.cc test/core/end2end/tests/retry_too_many_attempts.cc + test/core/end2end/tests/retry_transparent_goaway.cc + test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc + test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc test/core/end2end/tests/server_finishes_request.cc test/core/end2end/tests/server_streaming.cc test/core/end2end/tests/shutdown_finishes_calls.cc diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index c786fd3288ed3..b579701d06dda 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -112,6 +112,9 @@ libs: - test/core/end2end/tests/retry_streaming_succeeds_before_replay_finished.cc - test/core/end2end/tests/retry_throttled.cc - test/core/end2end/tests/retry_too_many_attempts.cc + - test/core/end2end/tests/retry_transparent_goaway.cc + - test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc + - test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc - test/core/end2end/tests/server_finishes_request.cc - test/core/end2end/tests/server_streaming.cc - test/core/end2end/tests/shutdown_finishes_calls.cc @@ -246,6 +249,9 @@ libs: - test/core/end2end/tests/retry_streaming_succeeds_before_replay_finished.cc - test/core/end2end/tests/retry_throttled.cc - test/core/end2end/tests/retry_too_many_attempts.cc + - test/core/end2end/tests/retry_transparent_goaway.cc + - test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc + - test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc - test/core/end2end/tests/server_finishes_request.cc - test/core/end2end/tests/server_streaming.cc - test/core/end2end/tests/shutdown_finishes_calls.cc diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 6c0f0e7b6ca1c..c73f7b5e57842 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -2480,6 +2480,9 @@ Pod::Spec.new do |s| 'test/core/end2end/tests/retry_streaming_succeeds_before_replay_finished.cc', 'test/core/end2end/tests/retry_throttled.cc', 'test/core/end2end/tests/retry_too_many_attempts.cc', + 'test/core/end2end/tests/retry_transparent_goaway.cc', + 'test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc', + 'test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc', 'test/core/end2end/tests/server_finishes_request.cc', 'test/core/end2end/tests/server_streaming.cc', 'test/core/end2end/tests/shutdown_finishes_calls.cc', diff --git a/grpc.gyp b/grpc.gyp index cac65544f7758..d350502e7f390 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -265,6 +265,9 @@ 'test/core/end2end/tests/retry_streaming_succeeds_before_replay_finished.cc', 'test/core/end2end/tests/retry_throttled.cc', 'test/core/end2end/tests/retry_too_many_attempts.cc', + 'test/core/end2end/tests/retry_transparent_goaway.cc', + 'test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc', + 'test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc', 'test/core/end2end/tests/server_finishes_request.cc', 'test/core/end2end/tests/server_streaming.cc', 'test/core/end2end/tests/shutdown_finishes_calls.cc', @@ -376,6 +379,9 @@ 'test/core/end2end/tests/retry_streaming_succeeds_before_replay_finished.cc', 'test/core/end2end/tests/retry_throttled.cc', 'test/core/end2end/tests/retry_too_many_attempts.cc', + 'test/core/end2end/tests/retry_transparent_goaway.cc', + 'test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc', + 'test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc', 'test/core/end2end/tests/server_finishes_request.cc', 'test/core/end2end/tests/server_streaming.cc', 'test/core/end2end/tests/shutdown_finishes_calls.cc', diff --git a/src/core/ext/filters/client_channel/retry_filter.cc b/src/core/ext/filters/client_channel/retry_filter.cc index d34390ed1e12d..2b20a0890aaca 100644 --- a/src/core/ext/filters/client_channel/retry_filter.cc +++ b/src/core/ext/filters/client_channel/retry_filter.cc @@ -85,7 +85,6 @@ // which batches need to be sent on the LB call for a given attempt. // TODO(roth): In subsequent PRs: -// - add support for transparent retries (including initial metadata) // - implement hedging // By default, we buffer 256 KiB per RPC for retries. @@ -209,7 +208,7 @@ class RetryFilter::CallData { // State associated with each call attempt. class CallAttempt : public RefCounted { public: - explicit CallAttempt(CallData* calld); + CallAttempt(CallData* calld, bool is_transparent_retry); ~CallAttempt() override; bool lb_call_committed() const { return lb_call_committed_; } @@ -395,7 +394,7 @@ class RetryFilter::CallData { void MaybeSwitchToFastPath(); // Returns true if the call should be retried. - bool ShouldRetry(absl::optional status, bool is_lb_drop, + bool ShouldRetry(absl::optional status, absl::optional server_pushback_ms); // Abandons the call attempt. Unrefs any deferred batches. @@ -512,10 +511,15 @@ class RetryFilter::CallData { static void OnRetryTimer(void* arg, grpc_error_handle error); static void OnRetryTimerLocked(void* arg, grpc_error_handle error); + // Adds a closure to closures to start a transparent retry. + void AddClosureToStartTransparentRetry(CallCombinerClosureList* closures); + static void StartTransparentRetry(void* arg, grpc_error_handle error); + OrphanablePtr CreateLoadBalancedCall( - ConfigSelector::CallDispatchController* call_dispatch_controller); + ConfigSelector::CallDispatchController* call_dispatch_controller, + bool is_transparent_retry); - void CreateCallAttempt(); + void CreateCallAttempt(bool is_transparent_retry); RetryFilter* chand_; grpc_polling_entity* pollent_; @@ -558,6 +562,8 @@ class RetryFilter::CallData { // Retry state. bool retry_committed_ : 1; bool retry_timer_pending_ : 1; + bool retry_codepath_started_ : 1; + bool sent_transparent_retry_not_seen_by_server_ : 1; int num_attempts_completed_ = 0; grpc_timer retry_timer_; grpc_closure retry_closure_; @@ -650,7 +656,8 @@ class RetryFilter::CallData::CallStackDestructionBarrier // RetryFilter::CallData::CallAttempt // -RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld) +RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld, + bool is_transparent_retry) : RefCounted(GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace) ? "CallAttempt" : nullptr), calld_(calld), @@ -667,7 +674,8 @@ RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld) sent_cancel_stream_(false), seen_recv_trailing_metadata_from_surface_(false), abandoned_(false) { - lb_call_ = calld->CreateLoadBalancedCall(&attempt_dispatch_controller_); + lb_call_ = calld->CreateLoadBalancedCall(&attempt_dispatch_controller_, + is_transparent_retry); if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { gpr_log(GPR_INFO, "chand=%p calld=%p attempt=%p: create lb_call=%p", calld->chand_, calld, this, lb_call_.get()); @@ -1070,11 +1078,8 @@ void RetryFilter::CallData::CallAttempt::CancelFromSurface( } bool RetryFilter::CallData::CallAttempt::ShouldRetry( - absl::optional status, bool is_lb_drop, + absl::optional status, absl::optional server_pushback_ms) { - // LB drops always inhibit retries. - if (is_lb_drop) return false; - // TODO(roth): Handle transparent retries here. // If no retry policy, don't retry. if (calld_->retry_policy_ == nullptr) return false; // Check status. @@ -1238,9 +1243,8 @@ void RetryFilter::CallData::CallAttempt::OnPerAttemptRecvTimerLocked( GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED), &closures); // Check whether we should retry. - if (call_attempt->ShouldRetry( - /*status=*/absl::nullopt, /*is_lb_drop=*/false, - /*server_pushback_ms=*/absl::nullopt)) { + if (call_attempt->ShouldRetry(/*status=*/absl::nullopt, + /*server_pushback_ms=*/absl::nullopt)) { // Mark current attempt as abandoned. call_attempt->Abandon(); // We are retrying. Start backoff timer. @@ -1542,10 +1546,11 @@ namespace { // Sets *status, *server_pushback_ms, and *is_lb_drop based on md_batch // and error. -void GetCallStatus(grpc_millis deadline, grpc_metadata_batch* md_batch, - grpc_error_handle error, grpc_status_code* status, - absl::optional* server_pushback_ms, - bool* is_lb_drop) { +void GetCallStatus( + grpc_millis deadline, grpc_metadata_batch* md_batch, + grpc_error_handle error, grpc_status_code* status, + absl::optional* server_pushback_ms, bool* is_lb_drop, + absl::optional* stream_network_state) { if (error != GRPC_ERROR_NONE) { grpc_error_get_status(error, deadline, status, nullptr, nullptr, nullptr); intptr_t value = 0; @@ -1555,8 +1560,9 @@ void GetCallStatus(grpc_millis deadline, grpc_metadata_batch* md_batch, } } else { *status = *md_batch->get(GrpcStatusMetadata()); - *server_pushback_ms = md_batch->get(GrpcRetryPushbackMsMetadata()); } + *server_pushback_ms = md_batch->get(GrpcRetryPushbackMsMetadata()); + *stream_network_state = md_batch->get(GrpcStreamNetworkState()); GRPC_ERROR_UNREF(error); } @@ -1688,36 +1694,72 @@ void RetryFilter::CallData::CallAttempt::BatchData::RecvTrailingMetadataReady( // Get the call's status and check for server pushback metadata. grpc_status_code status = GRPC_STATUS_OK; absl::optional server_pushback_ms; + bool is_lb_drop = false; + absl::optional stream_network_state; grpc_metadata_batch* md_batch = batch_data->batch_.payload->recv_trailing_metadata.recv_trailing_metadata; - bool is_lb_drop = false; GetCallStatus(calld->deadline_, md_batch, GRPC_ERROR_REF(error), &status, - &server_pushback_ms, &is_lb_drop); + &server_pushback_ms, &is_lb_drop, &stream_network_state); if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { - gpr_log( - GPR_INFO, - "chand=%p calld=%p attempt=%p: call finished, status=%s is_lb_drop=%d", - calld->chand_, calld, call_attempt, grpc_status_code_to_string(status), - is_lb_drop); + gpr_log(GPR_INFO, + "chand=%p calld=%p attempt=%p: call finished, status=%s " + "server_pushback_ms=%s is_lb_drop=%d stream_network_state=%s", + calld->chand_, calld, call_attempt, + grpc_status_code_to_string(status), + server_pushback_ms.has_value() + ? absl::StrCat(*server_pushback_ms).c_str() + : "N/A", + is_lb_drop, + stream_network_state.has_value() + ? absl::StrCat(*stream_network_state).c_str() + : "N/A"); } // Check if we should retry. - if (call_attempt->ShouldRetry(status, is_lb_drop, server_pushback_ms)) { - // Start retry timer. - calld->StartRetryTimer(server_pushback_ms); - // Cancel call attempt. - CallCombinerClosureList closures; - call_attempt->MaybeAddBatchForCancelOp( - error == GRPC_ERROR_NONE - ? grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("call attempt failed"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED) - : GRPC_ERROR_REF(error), - &closures); - // Record that this attempt has been abandoned. - call_attempt->Abandon(); - // Yields call combiner. - closures.RunClosures(calld->call_combiner_); - return; + if (!is_lb_drop) { // Never retry on LB drops. + enum { kNoRetry, kTransparentRetry, kConfigurableRetry } retry = kNoRetry; + // Handle transparent retries. + if (stream_network_state.has_value() && !calld->retry_committed_) { + // If not sent on wire, then always retry. + // If sent on wire but not seen by server, retry exactly once. + if (*stream_network_state == GrpcStreamNetworkState::kNotSentOnWire) { + retry = kTransparentRetry; + } else if (*stream_network_state == + GrpcStreamNetworkState::kNotSeenByServer && + !calld->sent_transparent_retry_not_seen_by_server_) { + calld->sent_transparent_retry_not_seen_by_server_ = true; + retry = kTransparentRetry; + } + } + // If not transparently retrying, check for configurable retry. + if (retry == kNoRetry && + call_attempt->ShouldRetry(status, server_pushback_ms)) { + retry = kConfigurableRetry; + } + // If we're retrying, do so. + if (retry != kNoRetry) { + CallCombinerClosureList closures; + // Cancel call attempt. + call_attempt->MaybeAddBatchForCancelOp( + error == GRPC_ERROR_NONE + ? grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("call attempt failed"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED) + : GRPC_ERROR_REF(error), + &closures); + // For transparent retries, add a closure to immediately start a new + // call attempt. + // For configurable retries, start retry timer. + if (retry == kTransparentRetry) { + calld->AddClosureToStartTransparentRetry(&closures); + } else { + calld->StartRetryTimer(server_pushback_ms); + } + // Record that this attempt has been abandoned. + call_attempt->Abandon(); + // Yields call combiner. + closures.RunClosures(calld->call_combiner_); + return; + } } // Not retrying, so commit the call. calld->RetryCommit(call_attempt); @@ -2087,7 +2129,9 @@ RetryFilter::CallData::CallData(RetryFilter* chand, pending_send_message_(false), pending_send_trailing_metadata_(false), retry_committed_(false), - retry_timer_pending_(false) {} + retry_timer_pending_(false), + retry_codepath_started_(false), + sent_transparent_retry_not_seen_by_server_(false) {} RetryFilter::CallData::~CallData() { grpc_slice_unref_internal(path_); @@ -2100,6 +2144,10 @@ RetryFilter::CallData::~CallData() { void RetryFilter::CallData::StartTransportStreamOpBatch( grpc_transport_stream_op_batch* batch) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, "chand=%p calld=%p: batch started from surface: %s", + chand_, this, grpc_transport_stream_op_batch_string(batch).c_str()); + } // If we have an LB call, delegate to the LB call. if (committed_call_ != nullptr) { // Note: This will release the call combiner. @@ -2168,11 +2216,6 @@ void RetryFilter::CallData::StartTransportStreamOpBatch( batch, GRPC_ERROR_REF(cancelled_from_surface_), call_combiner_); return; } - // If there is no retry policy, then commit retries immediately. - // This ensures that the code below will always jump to the fast path. - // TODO(roth): Remove this special case when we implement - // transparent retries. - if (retry_policy_ == nullptr) retry_committed_ = true; // If this is the first batch and retries are already committed // (e.g., if this batch put the call above the buffer size limit), then // immediately create an LB call and delegate the batch to it. This @@ -2188,7 +2231,7 @@ void RetryFilter::CallData::StartTransportStreamOpBatch( // We also skip this optimization if perAttemptRecvTimeout is set in the // retry policy, because we need the code in CallAttempt to handle // the associated timer. - if (num_attempts_completed_ == 0 && retry_committed_ && + if (!retry_codepath_started_ && retry_committed_ && (retry_policy_ == nullptr || !retry_policy_->per_attempt_recv_timeout().has_value())) { if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { @@ -2202,7 +2245,8 @@ void RetryFilter::CallData::StartTransportStreamOpBatch( static_cast( call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value); committed_call_ = CreateLoadBalancedCall( - service_config_call_data->call_dispatch_controller()); + service_config_call_data->call_dispatch_controller(), + /*is_transparent_retry=*/false); committed_call_->StartTransportStreamOpBatch(batch); return; } @@ -2213,7 +2257,8 @@ void RetryFilter::CallData::StartTransportStreamOpBatch( gpr_log(GPR_INFO, "chand=%p calld=%p: creating call attempt", chand_, this); } - CreateCallAttempt(); + retry_codepath_started_ = true; + CreateCallAttempt(/*is_transparent_retry=*/false); return; } // Send batches to call attempt. @@ -2226,7 +2271,8 @@ void RetryFilter::CallData::StartTransportStreamOpBatch( OrphanablePtr RetryFilter::CallData::CreateLoadBalancedCall( - ConfigSelector::CallDispatchController* call_dispatch_controller) { + ConfigSelector::CallDispatchController* call_dispatch_controller, + bool is_transparent_retry) { grpc_call_element_args args = {owning_call_, nullptr, call_context_, path_, /*start_time=*/0, deadline_, arena_, call_combiner_}; @@ -2235,13 +2281,11 @@ RetryFilter::CallData::CreateLoadBalancedCall( // This callback holds a ref to the CallStackDestructionBarrier // object until the LB call is destroyed. call_stack_destruction_barrier_->MakeLbCallDestructionClosure(this), - call_dispatch_controller, - // TODO(roth): Change this when we support transparent retries. - /*is_transparent_retry=*/false); + call_dispatch_controller, is_transparent_retry); } -void RetryFilter::CallData::CreateCallAttempt() { - call_attempt_ = MakeRefCounted(this); +void RetryFilter::CallData::CreateCallAttempt(bool is_transparent_retry) { + call_attempt_ = MakeRefCounted(this, is_transparent_retry); call_attempt_->StartRetriableBatches(); } @@ -2533,13 +2577,29 @@ void RetryFilter::CallData::OnRetryTimerLocked(void* arg, auto* calld = static_cast(arg); if (error == GRPC_ERROR_NONE && calld->retry_timer_pending_) { calld->retry_timer_pending_ = false; - calld->CreateCallAttempt(); + calld->CreateCallAttempt(/*is_transparent_retry=*/false); } else { GRPC_CALL_COMBINER_STOP(calld->call_combiner_, "retry timer cancelled"); } GRPC_CALL_STACK_UNREF(calld->owning_call_, "OnRetryTimer"); } +void RetryFilter::CallData::AddClosureToStartTransparentRetry( + CallCombinerClosureList* closures) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) { + gpr_log(GPR_INFO, "chand=%p calld=%p: scheduling transparent retry", chand_, + this); + } + GRPC_CLOSURE_INIT(&retry_closure_, StartTransparentRetry, this, nullptr); + closures->Add(&retry_closure_, GRPC_ERROR_NONE, "start transparent retry"); +} + +void RetryFilter::CallData::StartTransparentRetry(void* arg, + grpc_error_handle /*error*/) { + auto* calld = static_cast(arg); + calld->CreateCallAttempt(/*is_transparent_retry=*/true); +} + } // namespace const grpc_channel_filter kRetryFilterVtable = { diff --git a/src/core/tsi/alts/handshaker/alts_shared_resource.cc b/src/core/tsi/alts/handshaker/alts_shared_resource.cc index 33f6fc0073d6c..a4a1fb202338a 100644 --- a/src/core/tsi/alts/handshaker/alts_shared_resource.cc +++ b/src/core/tsi/alts/handshaker/alts_shared_resource.cc @@ -22,6 +22,7 @@ #include +#include "src/core/lib/channel/channel_args.h" #include "src/core/tsi/alts/handshaker/alts_handshaker_client.h" static alts_shared_resource_dedicated g_alts_resource_dedicated; @@ -56,8 +57,13 @@ void grpc_alts_shared_resource_dedicated_start( gpr_mu_lock(&g_alts_resource_dedicated.mu); if (g_alts_resource_dedicated.cq == nullptr) { grpc_channel_credentials* creds = grpc_insecure_credentials_create(); + // Disable retries so that we quickly get a signal when the + // handshake server is not reachable. + grpc_arg disable_retries_arg = grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_ENABLE_RETRIES), 0); + grpc_channel_args args = {1, &disable_retries_arg}; g_alts_resource_dedicated.channel = - grpc_channel_create(handshaker_service_url, creds, nullptr); + grpc_channel_create(handshaker_service_url, creds, &args); grpc_channel_credentials_release(creds); g_alts_resource_dedicated.cq = grpc_completion_queue_create_for_next(nullptr); diff --git a/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc b/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc index f6e2451606ac5..97b3bdb3f5cc0 100644 --- a/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc +++ b/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc @@ -499,8 +499,13 @@ static void alts_tsi_handshaker_create_channel( alts_tsi_handshaker* handshaker = next_args->handshaker; GPR_ASSERT(handshaker->channel == nullptr); grpc_channel_credentials* creds = grpc_insecure_credentials_create(); + // Disable retries so that we quickly get a signal when the + // handshake server is not reachable. + grpc_arg disable_retries_arg = grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_ENABLE_RETRIES), 0); + grpc_channel_args args = {1, &disable_retries_arg}; handshaker->channel = grpc_channel_create( - next_args->handshaker->handshaker_service_url, creds, nullptr); + next_args->handshaker->handshaker_service_url, creds, &args); grpc_channel_credentials_release(creds); tsi_result continue_next_result = alts_tsi_handshaker_continue_handshaker_next( diff --git a/test/core/end2end/end2end_nosec_tests.cc b/test/core/end2end/end2end_nosec_tests.cc index 1f0ab7d8366f9..841cc343606ad 100644 --- a/test/core/end2end/end2end_nosec_tests.cc +++ b/test/core/end2end/end2end_nosec_tests.cc @@ -177,6 +177,12 @@ extern void retry_throttled(grpc_end2end_test_config config); extern void retry_throttled_pre_init(void); extern void retry_too_many_attempts(grpc_end2end_test_config config); extern void retry_too_many_attempts_pre_init(void); +extern void retry_transparent_goaway(grpc_end2end_test_config config); +extern void retry_transparent_goaway_pre_init(void); +extern void retry_transparent_max_concurrent_streams(grpc_end2end_test_config config); +extern void retry_transparent_max_concurrent_streams_pre_init(void); +extern void retry_transparent_not_sent_on_wire(grpc_end2end_test_config config); +extern void retry_transparent_not_sent_on_wire_pre_init(void); extern void server_finishes_request(grpc_end2end_test_config config); extern void server_finishes_request_pre_init(void); extern void server_streaming(grpc_end2end_test_config config); @@ -279,6 +285,9 @@ void grpc_end2end_tests_pre_init(void) { retry_streaming_succeeds_before_replay_finished_pre_init(); retry_throttled_pre_init(); retry_too_many_attempts_pre_init(); + retry_transparent_goaway_pre_init(); + retry_transparent_max_concurrent_streams_pre_init(); + retry_transparent_not_sent_on_wire_pre_init(); server_finishes_request_pre_init(); server_streaming_pre_init(); shutdown_finishes_calls_pre_init(); @@ -375,6 +384,9 @@ void grpc_end2end_tests(int argc, char **argv, retry_streaming_succeeds_before_replay_finished(config); retry_throttled(config); retry_too_many_attempts(config); + retry_transparent_goaway(config); + retry_transparent_max_concurrent_streams(config); + retry_transparent_not_sent_on_wire(config); server_finishes_request(config); server_streaming(config); shutdown_finishes_calls(config); @@ -687,6 +699,18 @@ void grpc_end2end_tests(int argc, char **argv, retry_too_many_attempts(config); continue; } + if (0 == strcmp("retry_transparent_goaway", argv[i])) { + retry_transparent_goaway(config); + continue; + } + if (0 == strcmp("retry_transparent_max_concurrent_streams", argv[i])) { + retry_transparent_max_concurrent_streams(config); + continue; + } + if (0 == strcmp("retry_transparent_not_sent_on_wire", argv[i])) { + retry_transparent_not_sent_on_wire(config); + continue; + } if (0 == strcmp("server_finishes_request", argv[i])) { server_finishes_request(config); continue; diff --git a/test/core/end2end/end2end_tests.cc b/test/core/end2end/end2end_tests.cc index adb637255c4a8..6ae489877de37 100644 --- a/test/core/end2end/end2end_tests.cc +++ b/test/core/end2end/end2end_tests.cc @@ -181,6 +181,12 @@ extern void retry_throttled(grpc_end2end_test_config config); extern void retry_throttled_pre_init(void); extern void retry_too_many_attempts(grpc_end2end_test_config config); extern void retry_too_many_attempts_pre_init(void); +extern void retry_transparent_goaway(grpc_end2end_test_config config); +extern void retry_transparent_goaway_pre_init(void); +extern void retry_transparent_max_concurrent_streams(grpc_end2end_test_config config); +extern void retry_transparent_max_concurrent_streams_pre_init(void); +extern void retry_transparent_not_sent_on_wire(grpc_end2end_test_config config); +extern void retry_transparent_not_sent_on_wire_pre_init(void); extern void server_finishes_request(grpc_end2end_test_config config); extern void server_finishes_request_pre_init(void); extern void server_streaming(grpc_end2end_test_config config); @@ -285,6 +291,9 @@ void grpc_end2end_tests_pre_init(void) { retry_streaming_succeeds_before_replay_finished_pre_init(); retry_throttled_pre_init(); retry_too_many_attempts_pre_init(); + retry_transparent_goaway_pre_init(); + retry_transparent_max_concurrent_streams_pre_init(); + retry_transparent_not_sent_on_wire_pre_init(); server_finishes_request_pre_init(); server_streaming_pre_init(); shutdown_finishes_calls_pre_init(); @@ -383,6 +392,9 @@ void grpc_end2end_tests(int argc, char **argv, retry_streaming_succeeds_before_replay_finished(config); retry_throttled(config); retry_too_many_attempts(config); + retry_transparent_goaway(config); + retry_transparent_max_concurrent_streams(config); + retry_transparent_not_sent_on_wire(config); server_finishes_request(config); server_streaming(config); shutdown_finishes_calls(config); @@ -703,6 +715,18 @@ void grpc_end2end_tests(int argc, char **argv, retry_too_many_attempts(config); continue; } + if (0 == strcmp("retry_transparent_goaway", argv[i])) { + retry_transparent_goaway(config); + continue; + } + if (0 == strcmp("retry_transparent_max_concurrent_streams", argv[i])) { + retry_transparent_max_concurrent_streams(config); + continue; + } + if (0 == strcmp("retry_transparent_not_sent_on_wire", argv[i])) { + retry_transparent_not_sent_on_wire(config); + continue; + } if (0 == strcmp("server_finishes_request", argv[i])) { server_finishes_request(config); continue; diff --git a/test/core/end2end/fuzzers/api_fuzzer.cc b/test/core/end2end/fuzzers/api_fuzzer.cc index c3ddd23f4c402..6797d962ac1e3 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.cc +++ b/test/core/end2end/fuzzers/api_fuzzer.cc @@ -340,6 +340,16 @@ class Call : public std::enable_shared_from_this { void Shutdown() { if (call_ != nullptr) { grpc_call_cancel(call_, nullptr); + if (type_ == CallType::CLIENT && !started_recv_status_on_client_) { + uint8_t has_ops = 0; + grpc_op op = MakeRecvStatusOnClientOp(&has_ops); + auto* v = FinishedBatchValidator(has_ops); + grpc_call_error error = + grpc_call_start_batch(call_, &op, 1, v, nullptr); + if (error != GRPC_CALL_OK) { + v->Run(false); + } + } type_ = CallType::TOMBSTONED; } } @@ -481,12 +491,9 @@ class Call : public std::enable_shared_from_this { } break; case api_fuzzer::BatchOp::kReceiveStatusOnClient: - op.op = GRPC_OP_RECV_STATUS_ON_CLIENT; - op.data.recv_status_on_client.status = &status_; - op.data.recv_status_on_client.trailing_metadata = - &recv_trailing_metadata_; - op.data.recv_status_on_client.status_details = &recv_status_details_; - *batch_ops |= 1 << GRPC_OP_RECV_STATUS_ON_CLIENT; + op = MakeRecvStatusOnClientOp(batch_ops); + unwinders->push_back( + [this]() { started_recv_status_on_client_ = false; }); break; case api_fuzzer::BatchOp::kReceiveCloseOnServer: op.op = GRPC_OP_RECV_CLOSE_ON_SERVER; @@ -538,6 +545,18 @@ class Call : public std::enable_shared_from_this { } private: + grpc_op MakeRecvStatusOnClientOp(uint8_t* batch_ops) { + grpc_op op; + memset(&op, 0, sizeof(op)); + op.op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op.data.recv_status_on_client.status = &status_; + op.data.recv_status_on_client.trailing_metadata = &recv_trailing_metadata_; + op.data.recv_status_on_client.status_details = &recv_status_details_; + *batch_ops |= 1 << GRPC_OP_RECV_STATUS_ON_CLIENT; + started_recv_status_on_client_ = true; + return op; + } + CallType type_; grpc_call* call_ = nullptr; grpc_byte_buffer* recv_message_ = nullptr; @@ -553,8 +572,9 @@ class Call : public std::enable_shared_from_this { bool enqueued_recv_initial_metadata_ = false; grpc_call_details call_details_{}; grpc_byte_buffer* send_message_ = nullptr; - bool call_closed_ = false; bool pending_recv_message_op_ = false; + bool started_recv_status_on_client_ = false; + bool call_closed_ = false; std::vector free_pointers_; std::vector unref_slices_; diff --git a/test/core/end2end/generate_tests.bzl b/test/core/end2end/generate_tests.bzl index abc3faac57d2d..a85158c02e9c8 100755 --- a/test/core/end2end/generate_tests.bzl +++ b/test/core/end2end/generate_tests.bzl @@ -356,6 +356,17 @@ END2END_TESTS = { ), "retry_throttled": _test_options(needs_client_channel = True), "retry_too_many_attempts": _test_options(needs_client_channel = True), + "retry_transparent_goaway": _test_options(needs_client_channel = True), + "retry_transparent_not_sent_on_wire": _test_options( + needs_client_channel = True, + ), + "retry_transparent_max_concurrent_streams": _test_options( + needs_client_channel = True, + proxyable = False, + # TODO(jtattermusch): too long bazel test name makes the test flaky on Windows RBE + # See b/151617965 + short_name = "retry_transparent_mcs", + ), "server_finishes_request": _test_options(), "server_streaming": _test_options(needs_http2 = True), "shutdown_finishes_calls": _test_options(), diff --git a/test/core/end2end/tests/retry_cancel_with_multiple_send_batches.cc b/test/core/end2end/tests/retry_cancel_with_multiple_send_batches.cc index bf0d02d6234b3..3b929afc4607d 100644 --- a/test/core/end2end/tests/retry_cancel_with_multiple_send_batches.cc +++ b/test/core/end2end/tests/retry_cancel_with_multiple_send_batches.cc @@ -240,7 +240,7 @@ class FailSendOpsFilter { public: static grpc_channel_filter kFilterVtable; - public: + private: class CallData { public: static grpc_error_handle Init(grpc_call_element* elem, diff --git a/test/core/end2end/tests/retry_recv_trailing_metadata_error.cc b/test/core/end2end/tests/retry_recv_trailing_metadata_error.cc index 01bd449fbb6ab..afa7bcecf8da3 100644 --- a/test/core/end2end/tests/retry_recv_trailing_metadata_error.cc +++ b/test/core/end2end/tests/retry_recv_trailing_metadata_error.cc @@ -267,7 +267,7 @@ class InjectStatusFilter { public: static grpc_channel_filter kFilterVtable; - public: + private: class CallData { public: static grpc_error_handle Init(grpc_call_element* elem, diff --git a/test/core/end2end/tests/retry_send_op_fails.cc b/test/core/end2end/tests/retry_send_op_fails.cc index 92dc8859ff031..57d7b53c098dc 100644 --- a/test/core/end2end/tests/retry_send_op_fails.cc +++ b/test/core/end2end/tests/retry_send_op_fails.cc @@ -277,14 +277,14 @@ static void test_retry_send_op_fails(grpc_end2end_test_config config) { namespace { -// A filter that, for the first call it sees, will fail the batch -// containing send_initial_metadata and then fail the call with status -// ABORTED. All subsequent calls are allowed through without failures. -class FailFirstSendOpFilter { +// A filter that, for the first call it sees, will fail all batches except +// for cancellations, so that the call fails with status ABORTED. +// All subsequent calls are allowed through without failures. +class FailFirstCallFilter { public: static grpc_channel_filter kFilterVtable; - public: + private: class CallData { public: static grpc_error_handle Init(grpc_call_element* elem, @@ -302,7 +302,7 @@ class FailFirstSendOpFilter { static void StartTransportStreamOpBatch( grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { - auto* chand = static_cast(elem->channel_data); + auto* chand = static_cast(elem->channel_data); auto* calld = static_cast(elem->call_data); if (!chand->seen_first_) { chand->seen_first_ = true; @@ -312,7 +312,7 @@ class FailFirstSendOpFilter { grpc_transport_stream_op_batch_finish_with_failure( batch, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "FailFirstSendOpFilter failing batch"), + "FailFirstCallFilter failing batch"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_ABORTED), calld->call_combiner_); return; @@ -330,19 +330,19 @@ class FailFirstSendOpFilter { static grpc_error_handle Init(grpc_channel_element* elem, grpc_channel_element_args* /*args*/) { - new (elem->channel_data) FailFirstSendOpFilter(); + new (elem->channel_data) FailFirstCallFilter(); return GRPC_ERROR_NONE; } static void Destroy(grpc_channel_element* elem) { - auto* chand = static_cast(elem->channel_data); - chand->~FailFirstSendOpFilter(); + auto* chand = static_cast(elem->channel_data); + chand->~FailFirstCallFilter(); } bool seen_first_ = false; }; -grpc_channel_filter FailFirstSendOpFilter::kFilterVtable = { +grpc_channel_filter FailFirstCallFilter::kFilterVtable = { CallData::StartTransportStreamOpBatch, nullptr, grpc_channel_next_op, @@ -350,11 +350,11 @@ grpc_channel_filter FailFirstSendOpFilter::kFilterVtable = { CallData::Init, grpc_call_stack_ignore_set_pollset_or_pollset_set, CallData::Destroy, - sizeof(FailFirstSendOpFilter), + sizeof(FailFirstCallFilter), Init, Destroy, grpc_channel_next_get_info, - "FailFirstSendOpFilter", + "FailFirstCallFilter", }; } // namespace @@ -374,7 +374,7 @@ void retry_send_op_fails(grpc_end2end_test_config config) { return true; } // Install filter. - builder->PrependFilter(&FailFirstSendOpFilter::kFilterVtable, + builder->PrependFilter(&FailFirstCallFilter::kFilterVtable, nullptr); return true; }); diff --git a/test/core/end2end/tests/retry_transparent_goaway.cc b/test/core/end2end/tests/retry_transparent_goaway.cc new file mode 100644 index 0000000000000..ab5b4b4c2aced --- /dev/null +++ b/test/core/end2end/tests/retry_transparent_goaway.cc @@ -0,0 +1,379 @@ +// +// Copyright 2017 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/channel/channel_stack_builder.h" +#include "src/core/lib/config/core_configuration.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/gpr/useful.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/surface/channel_init.h" +#include "src/core/lib/transport/error_utils.h" +#include "test/core/end2end/cq_verifier.h" +#include "test/core/end2end/end2end_tests.h" +#include "test/core/end2end/tests/cancel_test_helpers.h" + +static void* tag(intptr_t t) { return reinterpret_cast(t); } + +static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, + const char* test_name, + grpc_channel_args* client_args, + grpc_channel_args* server_args) { + grpc_end2end_test_fixture f; + gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name); + f = config.create_fixture(client_args, server_args); + config.init_server(&f, server_args); + config.init_client(&f, client_args); + return f; +} + +static gpr_timespec n_seconds_from_now(int n) { + return grpc_timeout_seconds_to_deadline(n); +} + +static gpr_timespec five_seconds_from_now(void) { + return n_seconds_from_now(5); +} + +static void drain_cq(grpc_completion_queue* cq) { + grpc_event ev; + do { + ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr); + } while (ev.type != GRPC_QUEUE_SHUTDOWN); +} + +static void shutdown_server(grpc_end2end_test_fixture* f) { + if (!f->server) return; + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + nullptr) + .type == GRPC_OP_COMPLETE); + grpc_server_destroy(f->server); + f->server = nullptr; +} + +static void shutdown_client(grpc_end2end_test_fixture* f) { + if (!f->client) return; + grpc_channel_destroy(f->client); + f->client = nullptr; +} + +static void end_test(grpc_end2end_test_fixture* f) { + shutdown_server(f); + shutdown_client(f); + + grpc_completion_queue_shutdown(f->cq); + drain_cq(f->cq); + grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); +} + +// Tests transparent retries when the call was never sent out on the wire. +static void test_retry_transparent_goaway(grpc_end2end_test_config config) { + grpc_call* c; + grpc_call* s; + grpc_op ops[6]; + grpc_op* op; + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array trailing_metadata_recv; + grpc_metadata_array request_metadata_recv; + grpc_call_details call_details; + grpc_slice request_payload_slice = grpc_slice_from_static_string("foo"); + grpc_slice response_payload_slice = grpc_slice_from_static_string("bar"); + grpc_byte_buffer* request_payload = + grpc_raw_byte_buffer_create(&request_payload_slice, 1); + grpc_byte_buffer* response_payload = + grpc_raw_byte_buffer_create(&response_payload_slice, 1); + grpc_byte_buffer* request_payload_recv = nullptr; + grpc_byte_buffer* response_payload_recv = nullptr; + grpc_status_code status; + grpc_call_error error; + grpc_slice details; + int was_cancelled = 2; + char* peer; + + grpc_end2end_test_fixture f = + begin_test(config, "retry_transparent_goaway", nullptr, nullptr); + + cq_verifier* cqv = cq_verifier_create(f.cq); + + gpr_timespec deadline = five_seconds_from_now(); + c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq, + grpc_slice_from_static_string("/service/method"), + nullptr, deadline, nullptr); + GPR_ASSERT(c); + + peer = grpc_call_get_peer(c); + GPR_ASSERT(peer != nullptr); + gpr_log(GPR_DEBUG, "client_peer_before_call=%s", peer); + gpr_free(peer); + + grpc_metadata_array_init(&initial_metadata_recv); + grpc_metadata_array_init(&trailing_metadata_recv); + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_init(&call_details); + grpc_slice status_details = grpc_slice_from_static_string("xyz"); + + // Start a batch containing send ops. + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op++; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = request_payload; + op++; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op++; + error = grpc_call_start_batch(c, ops, static_cast(op - ops), tag(1), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + // Start a batch containing recv ops. + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &response_payload_recv; + op++; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv; + op++; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op++; + error = grpc_call_start_batch(c, ops, static_cast(op - ops), tag(2), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + // Client send ops should now complete. + CQ_EXPECT_COMPLETION(cqv, tag(1), true); + cq_verify(cqv); + + // Server should get a call. + error = + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.cq, f.cq, tag(101)); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(101), true); + cq_verify(cqv); + + // Server receives the request. + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &request_payload_recv; + op++; + error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(102), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(102), true); + cq_verify(cqv); + + // Server sends a response with status OK. + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled; + op++; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op++; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = response_payload; + op++; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.trailing_metadata_count = 0; + op->data.send_status_from_server.status = GRPC_STATUS_OK; + op->data.send_status_from_server.status_details = &status_details; + op++; + error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(103), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + // In principle, the server batch should complete before the client + // recv ops batch, but in the proxy fixtures, there are multiple threads + // involved, so the completion order tends to be a little racy. + CQ_EXPECT_COMPLETION(cqv, tag(103), true); + CQ_EXPECT_COMPLETION(cqv, tag(2), true); + cq_verify(cqv); + + GPR_ASSERT(status == GRPC_STATUS_OK); + GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz")); + GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method")); + GPR_ASSERT(0 == call_details.flags); + GPR_ASSERT(was_cancelled == 0); + GPR_ASSERT(byte_buffer_eq_slice(request_payload_recv, request_payload_slice)); + GPR_ASSERT( + byte_buffer_eq_slice(response_payload_recv, response_payload_slice)); + + // Make sure the "grpc-previous-rpc-attempts" header was NOT sent, since + // we don't do that for transparent retries. + for (size_t i = 0; i < request_metadata_recv.count; ++i) { + GPR_ASSERT(!grpc_slice_eq( + request_metadata_recv.metadata[i].key, + grpc_slice_from_static_string("grpc-previous-rpc-attempts"))); + } + + grpc_slice_unref(details); + grpc_metadata_array_destroy(&initial_metadata_recv); + grpc_metadata_array_destroy(&trailing_metadata_recv); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); + grpc_byte_buffer_destroy(request_payload); + grpc_byte_buffer_destroy(response_payload); + grpc_byte_buffer_destroy(request_payload_recv); + grpc_byte_buffer_destroy(response_payload_recv); + + grpc_call_unref(c); + grpc_call_unref(s); + + cq_verifier_destroy(cqv); + + end_test(&f); + config.tear_down_data(&f); +} + +namespace { + +// A filter that, for the first call it sees, will fail all batches except +// for cancellations, so that the call fails with an error whose +// StreamNetworkState is kNotSeenByServer. +// All subsequent calls are allowed through without failures. +class FailFirstCallFilter { + public: + static grpc_channel_filter kFilterVtable; + + private: + class CallData { + public: + static grpc_error_handle Init(grpc_call_element* elem, + const grpc_call_element_args* args) { + new (elem->call_data) CallData(args); + return GRPC_ERROR_NONE; + } + + static void Destroy(grpc_call_element* elem, + const grpc_call_final_info* /*final_info*/, + grpc_closure* /*ignored*/) { + auto* calld = static_cast(elem->call_data); + calld->~CallData(); + } + + static void StartTransportStreamOpBatch( + grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { + auto* chand = static_cast(elem->channel_data); + auto* calld = static_cast(elem->call_data); + if (!chand->seen_call_) { + calld->fail_ = true; + chand->seen_call_ = true; + } + if (calld->fail_) { + if (batch->recv_trailing_metadata) { + batch->payload->recv_trailing_metadata.recv_trailing_metadata->Set( + grpc_core::GrpcStreamNetworkState(), + grpc_core::GrpcStreamNetworkState::kNotSeenByServer); + } + if (!batch->cancel_stream) { + grpc_transport_stream_op_batch_finish_with_failure( + batch, + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "FailFirstCallFilter failing batch"), + GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAVAILABLE), + calld->call_combiner_); + return; + } + } + grpc_call_next_op(elem, batch); + } + + private: + explicit CallData(const grpc_call_element_args* args) + : call_combiner_(args->call_combiner) {} + + grpc_core::CallCombiner* call_combiner_; + bool fail_ = false; + }; + + static grpc_error_handle Init(grpc_channel_element* elem, + grpc_channel_element_args* /*args*/) { + new (elem->channel_data) FailFirstCallFilter(); + return GRPC_ERROR_NONE; + } + + static void Destroy(grpc_channel_element* elem) { + auto* chand = static_cast(elem->channel_data); + chand->~FailFirstCallFilter(); + } + + bool seen_call_ = false; +}; + +grpc_channel_filter FailFirstCallFilter::kFilterVtable = { + CallData::StartTransportStreamOpBatch, + nullptr, + grpc_channel_next_op, + sizeof(CallData), + CallData::Init, + grpc_call_stack_ignore_set_pollset_or_pollset_set, + CallData::Destroy, + sizeof(FailFirstCallFilter), + Init, + Destroy, + grpc_channel_next_get_info, + "FailFirstCallFilter", +}; + +} // namespace + +void retry_transparent_goaway(grpc_end2end_test_config config) { + GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL); + grpc_core::CoreConfiguration::RunWithSpecialConfiguration( + [](grpc_core::CoreConfiguration::Builder* builder) { + grpc_core::BuildCoreConfiguration(builder); + builder->channel_init()->RegisterStage( + GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY + 1, + [](grpc_core::ChannelStackBuilder* builder) { + // Skip on proxy (which explicitly disables retries). + const grpc_channel_args* args = builder->channel_args(); + if (!grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_RETRIES, + true)) { + return true; + } + // Install filter. + builder->PrependFilter(&FailFirstCallFilter::kFilterVtable, + nullptr); + return true; + }); + }, + [config] { test_retry_transparent_goaway(config); }); +} + +void retry_transparent_goaway_pre_init(void) {} diff --git a/test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc b/test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc new file mode 100644 index 0000000000000..921e416dff092 --- /dev/null +++ b/test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc @@ -0,0 +1,368 @@ +// +// Copyright 2017 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/channel/channel_stack_builder.h" +#include "src/core/lib/config/core_configuration.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/gpr/useful.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/surface/channel_init.h" +#include "src/core/lib/transport/error_utils.h" +#include "test/core/end2end/cq_verifier.h" +#include "test/core/end2end/end2end_tests.h" +#include "test/core/end2end/tests/cancel_test_helpers.h" + +static void* tag(intptr_t t) { return reinterpret_cast(t); } + +static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, + const char* test_name, + grpc_channel_args* client_args, + grpc_channel_args* server_args) { + grpc_end2end_test_fixture f; + gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name); + f = config.create_fixture(client_args, server_args); + config.init_server(&f, server_args); + config.init_client(&f, client_args); + return f; +} + +static gpr_timespec n_seconds_from_now(int n) { + return grpc_timeout_seconds_to_deadline(n); +} + +static gpr_timespec five_seconds_from_now(void) { + return n_seconds_from_now(5); +} + +static void drain_cq(grpc_completion_queue* cq) { + grpc_event ev; + do { + ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr); + } while (ev.type != GRPC_QUEUE_SHUTDOWN); +} + +static void shutdown_server(grpc_end2end_test_fixture* f) { + if (!f->server) return; + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + nullptr) + .type == GRPC_OP_COMPLETE); + grpc_server_destroy(f->server); + f->server = nullptr; +} + +static void shutdown_client(grpc_end2end_test_fixture* f) { + if (!f->client) return; + grpc_channel_destroy(f->client); + f->client = nullptr; +} + +static void end_test(grpc_end2end_test_fixture* f) { + shutdown_server(f); + shutdown_client(f); + + grpc_completion_queue_shutdown(f->cq); + drain_cq(f->cq); + grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); +} + +// Tests transparent retries when the call was never sent out on the wire. +// This is similar to retry_transparent_not_sent_on_wire, except that +// instead of simulating the response with a filter, we actually have +// the transport behave the right way. We create a server with +// MAX_CONCURRENT_STREAMS set to 1. We start a call on the server, and +// then start a second call, which will get queued in the transport. +// Then, before the first call finishes, the server is shut down and +// restarted. The second call will fail in that transport instance and +// will be transparently retried after the server starts up again. +static void test_retry_transparent_max_concurrent_streams( + grpc_end2end_test_config config) { + grpc_op ops[6]; + grpc_op* op; + grpc_slice request_payload_slice = grpc_slice_from_static_string("foo"); + grpc_slice response_payload_slice = grpc_slice_from_static_string("bar"); + grpc_slice status_details = grpc_slice_from_static_string("xyz"); + grpc_call_error error; + + grpc_arg arg = grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_MAX_CONCURRENT_STREAMS), 1); + grpc_channel_args server_args = {1, &arg}; + grpc_end2end_test_fixture f = + begin_test(config, "retry_transparent_max_concurrent_streams", nullptr, + &server_args); + + cq_verifier* cqv = cq_verifier_create(f.cq); + + gpr_timespec deadline = five_seconds_from_now(); + + // Client starts a call. + grpc_call* c = + grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq, + grpc_slice_from_static_string("/service/method"), + nullptr, deadline, nullptr); + GPR_ASSERT(c); + grpc_byte_buffer* request_payload = + grpc_raw_byte_buffer_create(&request_payload_slice, 1); + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array_init(&initial_metadata_recv); + grpc_byte_buffer* response_payload_recv = nullptr; + grpc_metadata_array trailing_metadata_recv; + grpc_metadata_array_init(&trailing_metadata_recv); + grpc_status_code status; + grpc_slice details; + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op++; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = request_payload; + op++; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op++; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv; + op++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &response_payload_recv; + op++; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op++; + error = grpc_call_start_batch(c, ops, static_cast(op - ops), tag(1), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + // Server should get a call. + grpc_call* s; + grpc_metadata_array request_metadata_recv; + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details call_details; + grpc_call_details_init(&call_details); + error = + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.cq, f.cq, tag(101)); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(101), true); + cq_verify(cqv); + GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method")); + GPR_ASSERT(0 == call_details.flags); + grpc_call_details_destroy(&call_details); + grpc_metadata_array_destroy(&request_metadata_recv); + + // Client starts a second call. + // We set wait_for_ready for this call, so that if it retries before + // the server comes back up, it stays pending. + grpc_call* c2 = + grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq, + grpc_slice_from_static_string("/service/method"), + nullptr, deadline, nullptr); + GPR_ASSERT(c2); + grpc_byte_buffer* request_payload2 = + grpc_raw_byte_buffer_create(&request_payload_slice, 1); + grpc_metadata_array initial_metadata_recv2; + grpc_metadata_array_init(&initial_metadata_recv2); + grpc_byte_buffer* response_payload_recv2 = nullptr; + grpc_metadata_array trailing_metadata_recv2; + grpc_metadata_array_init(&trailing_metadata_recv2); + grpc_status_code status2; + grpc_slice details2; + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY; + op++; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = request_payload2; + op++; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op++; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata.recv_initial_metadata = + &initial_metadata_recv2; + op++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &response_payload_recv2; + op++; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv2; + op->data.recv_status_on_client.status = &status2; + op->data.recv_status_on_client.status_details = &details2; + op++; + error = grpc_call_start_batch(c2, ops, static_cast(op - ops), tag(2), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + // Start server shutdown. + grpc_server_shutdown_and_notify(f.server, f.cq, tag(102)); + + // Server handles the first call. + grpc_byte_buffer* request_payload_recv = nullptr; + int was_cancelled = 2; + grpc_byte_buffer* response_payload = + grpc_raw_byte_buffer_create(&response_payload_slice, 1); + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &request_payload_recv; + op++; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled; + op++; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op++; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = response_payload; + op++; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.trailing_metadata_count = 0; + op->data.send_status_from_server.status = GRPC_STATUS_OK; + op->data.send_status_from_server.status_details = &status_details; + op++; + error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(103), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + // Server completes first call and shutdown. + // Client completes first call. + CQ_EXPECT_COMPLETION(cqv, tag(103), true); + CQ_EXPECT_COMPLETION(cqv, tag(102), true); + CQ_EXPECT_COMPLETION(cqv, tag(1), true); + cq_verify(cqv); + + // Clean up from first call. + GPR_ASSERT(byte_buffer_eq_slice(request_payload_recv, request_payload_slice)); + grpc_byte_buffer_destroy(request_payload_recv); + GPR_ASSERT(was_cancelled == 0); + grpc_byte_buffer_destroy(response_payload); + grpc_call_unref(s); + grpc_byte_buffer_destroy(request_payload); + grpc_metadata_array_destroy(&initial_metadata_recv); + GPR_ASSERT( + byte_buffer_eq_slice(response_payload_recv, response_payload_slice)); + grpc_byte_buffer_destroy(response_payload_recv); + grpc_metadata_array_destroy(&trailing_metadata_recv); + GPR_ASSERT(status == GRPC_STATUS_OK); + GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz")); + grpc_slice_unref(details); + grpc_call_unref(c); + + // Destroy server and then restart it. + grpc_server_destroy(f.server); + f.server = nullptr; + config.init_server(&f, &server_args); + + // Server should get the second call. + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_init(&call_details); + error = + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.cq, f.cq, tag(201)); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(201), true); + cq_verify(cqv); + GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method")); + GPR_ASSERT(0 == call_details.flags); + grpc_call_details_destroy(&call_details); + // Make sure the "grpc-previous-rpc-attempts" header was NOT sent, since + // we don't do that for transparent retries. + for (size_t i = 0; i < request_metadata_recv.count; ++i) { + GPR_ASSERT(!grpc_slice_eq( + request_metadata_recv.metadata[i].key, + grpc_slice_from_static_string("grpc-previous-rpc-attempts"))); + } + grpc_metadata_array_destroy(&request_metadata_recv); + + // Server handles the second call. + request_payload_recv = nullptr; + was_cancelled = 2; + grpc_byte_buffer* response_payload2 = + grpc_raw_byte_buffer_create(&response_payload_slice, 1); + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &request_payload_recv; + op++; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled; + op++; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op++; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = response_payload2; + op++; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.trailing_metadata_count = 0; + op->data.send_status_from_server.status = GRPC_STATUS_OK; + op->data.send_status_from_server.status_details = &status_details; + op++; + error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(202), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + // Second call completes. + CQ_EXPECT_COMPLETION(cqv, tag(202), true); + CQ_EXPECT_COMPLETION(cqv, tag(2), true); + cq_verify(cqv); + + // Clean up from second call. + GPR_ASSERT(byte_buffer_eq_slice(request_payload_recv, request_payload_slice)); + grpc_byte_buffer_destroy(request_payload_recv); + GPR_ASSERT(was_cancelled == 0); + grpc_byte_buffer_destroy(response_payload2); + grpc_call_unref(s); + grpc_byte_buffer_destroy(request_payload2); + grpc_metadata_array_destroy(&initial_metadata_recv2); + GPR_ASSERT( + byte_buffer_eq_slice(response_payload_recv2, response_payload_slice)); + grpc_byte_buffer_destroy(response_payload_recv2); + grpc_metadata_array_destroy(&trailing_metadata_recv2); + GPR_ASSERT(status2 == GRPC_STATUS_OK); + GPR_ASSERT(0 == grpc_slice_str_cmp(details2, "xyz")); + grpc_slice_unref(details2); + grpc_call_unref(c2); + + cq_verifier_destroy(cqv); + + end_test(&f); + config.tear_down_data(&f); +} + +void retry_transparent_max_concurrent_streams(grpc_end2end_test_config config) { + GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL); + test_retry_transparent_max_concurrent_streams(config); +} + +void retry_transparent_max_concurrent_streams_pre_init(void) {} diff --git a/test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc b/test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc new file mode 100644 index 0000000000000..739601d58c677 --- /dev/null +++ b/test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc @@ -0,0 +1,378 @@ +// +// Copyright 2017 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/channel/channel_stack_builder.h" +#include "src/core/lib/config/core_configuration.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/gpr/useful.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/surface/channel_init.h" +#include "src/core/lib/transport/error_utils.h" +#include "test/core/end2end/cq_verifier.h" +#include "test/core/end2end/end2end_tests.h" +#include "test/core/end2end/tests/cancel_test_helpers.h" + +static void* tag(intptr_t t) { return reinterpret_cast(t); } + +static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, + const char* test_name, + grpc_channel_args* client_args, + grpc_channel_args* server_args) { + grpc_end2end_test_fixture f; + gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name); + f = config.create_fixture(client_args, server_args); + config.init_server(&f, server_args); + config.init_client(&f, client_args); + return f; +} + +static gpr_timespec n_seconds_from_now(int n) { + return grpc_timeout_seconds_to_deadline(n); +} + +static gpr_timespec five_seconds_from_now(void) { + return n_seconds_from_now(5); +} + +static void drain_cq(grpc_completion_queue* cq) { + grpc_event ev; + do { + ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr); + } while (ev.type != GRPC_QUEUE_SHUTDOWN); +} + +static void shutdown_server(grpc_end2end_test_fixture* f) { + if (!f->server) return; + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + nullptr) + .type == GRPC_OP_COMPLETE); + grpc_server_destroy(f->server); + f->server = nullptr; +} + +static void shutdown_client(grpc_end2end_test_fixture* f) { + if (!f->client) return; + grpc_channel_destroy(f->client); + f->client = nullptr; +} + +static void end_test(grpc_end2end_test_fixture* f) { + shutdown_server(f); + shutdown_client(f); + + grpc_completion_queue_shutdown(f->cq); + drain_cq(f->cq); + grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); +} + +// Tests transparent retries when the call was never sent out on the wire. +static void test_retry_transparent_not_sent_on_wire( + grpc_end2end_test_config config) { + grpc_call* c; + grpc_call* s; + grpc_op ops[6]; + grpc_op* op; + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array trailing_metadata_recv; + grpc_metadata_array request_metadata_recv; + grpc_call_details call_details; + grpc_slice request_payload_slice = grpc_slice_from_static_string("foo"); + grpc_slice response_payload_slice = grpc_slice_from_static_string("bar"); + grpc_byte_buffer* request_payload = + grpc_raw_byte_buffer_create(&request_payload_slice, 1); + grpc_byte_buffer* response_payload = + grpc_raw_byte_buffer_create(&response_payload_slice, 1); + grpc_byte_buffer* request_payload_recv = nullptr; + grpc_byte_buffer* response_payload_recv = nullptr; + grpc_status_code status; + grpc_call_error error; + grpc_slice details; + int was_cancelled = 2; + char* peer; + + grpc_end2end_test_fixture f = begin_test( + config, "retry_transparent_not_sent_on_wire", nullptr, nullptr); + + cq_verifier* cqv = cq_verifier_create(f.cq); + + gpr_timespec deadline = five_seconds_from_now(); + c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq, + grpc_slice_from_static_string("/service/method"), + nullptr, deadline, nullptr); + GPR_ASSERT(c); + + peer = grpc_call_get_peer(c); + GPR_ASSERT(peer != nullptr); + gpr_log(GPR_DEBUG, "client_peer_before_call=%s", peer); + gpr_free(peer); + + grpc_metadata_array_init(&initial_metadata_recv); + grpc_metadata_array_init(&trailing_metadata_recv); + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_init(&call_details); + grpc_slice status_details = grpc_slice_from_static_string("xyz"); + + // Start a batch containing send ops. + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op++; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = request_payload; + op++; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op++; + error = grpc_call_start_batch(c, ops, static_cast(op - ops), tag(1), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + // Start a batch containing recv ops. + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &response_payload_recv; + op++; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv; + op++; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op++; + error = grpc_call_start_batch(c, ops, static_cast(op - ops), tag(2), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + // Client send ops should now complete. + CQ_EXPECT_COMPLETION(cqv, tag(1), true); + cq_verify(cqv); + + // Server should get a call. + error = + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.cq, f.cq, tag(101)); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(101), true); + cq_verify(cqv); + + // Server receives the request. + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &request_payload_recv; + op++; + error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(102), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(102), true); + cq_verify(cqv); + + // Server sends a response with status OK. + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled; + op++; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op++; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = response_payload; + op++; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.trailing_metadata_count = 0; + op->data.send_status_from_server.status = GRPC_STATUS_OK; + op->data.send_status_from_server.status_details = &status_details; + op++; + error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(103), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + // In principle, the server batch should complete before the client + // recv ops batch, but in the proxy fixtures, there are multiple threads + // involved, so the completion order tends to be a little racy. + CQ_EXPECT_COMPLETION(cqv, tag(103), true); + CQ_EXPECT_COMPLETION(cqv, tag(2), true); + cq_verify(cqv); + + GPR_ASSERT(status == GRPC_STATUS_OK); + GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz")); + GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method")); + GPR_ASSERT(0 == call_details.flags); + GPR_ASSERT(was_cancelled == 0); + GPR_ASSERT(byte_buffer_eq_slice(request_payload_recv, request_payload_slice)); + GPR_ASSERT( + byte_buffer_eq_slice(response_payload_recv, response_payload_slice)); + + // Make sure the "grpc-previous-rpc-attempts" header was NOT sent, since + // we don't do that for transparent retries. + for (size_t i = 0; i < request_metadata_recv.count; ++i) { + GPR_ASSERT(!grpc_slice_eq( + request_metadata_recv.metadata[i].key, + grpc_slice_from_static_string("grpc-previous-rpc-attempts"))); + } + + grpc_slice_unref(details); + grpc_metadata_array_destroy(&initial_metadata_recv); + grpc_metadata_array_destroy(&trailing_metadata_recv); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); + grpc_byte_buffer_destroy(request_payload); + grpc_byte_buffer_destroy(response_payload); + grpc_byte_buffer_destroy(request_payload_recv); + grpc_byte_buffer_destroy(response_payload_recv); + + grpc_call_unref(c); + grpc_call_unref(s); + + cq_verifier_destroy(cqv); + + end_test(&f); + config.tear_down_data(&f); +} + +namespace { + +// A filter that, for the first 10 calls it sees, will fail all batches except +// for cancellations, so that the call fails with an error whose +// StreamNetworkState is kNotSentOnWire. +// All subsequent calls are allowed through without failures. +class FailFirstTenCallsFilter { + public: + static grpc_channel_filter kFilterVtable; + + private: + class CallData { + public: + static grpc_error_handle Init(grpc_call_element* elem, + const grpc_call_element_args* args) { + new (elem->call_data) CallData(args); + return GRPC_ERROR_NONE; + } + + static void Destroy(grpc_call_element* elem, + const grpc_call_final_info* /*final_info*/, + grpc_closure* /*ignored*/) { + auto* calld = static_cast(elem->call_data); + calld->~CallData(); + } + + static void StartTransportStreamOpBatch( + grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { + auto* chand = static_cast(elem->channel_data); + auto* calld = static_cast(elem->call_data); + if (chand->num_calls_ < 10) calld->fail_ = true; + if (batch->send_initial_metadata) ++chand->num_calls_; + if (calld->fail_) { + if (batch->recv_trailing_metadata) { + batch->payload->recv_trailing_metadata.recv_trailing_metadata->Set( + grpc_core::GrpcStreamNetworkState(), + grpc_core::GrpcStreamNetworkState::kNotSentOnWire); + } + if (!batch->cancel_stream) { + grpc_transport_stream_op_batch_finish_with_failure( + batch, + grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "FailFirstTenCallsFilter failing batch"), + GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAVAILABLE), + calld->call_combiner_); + return; + } + } + grpc_call_next_op(elem, batch); + } + + private: + explicit CallData(const grpc_call_element_args* args) + : call_combiner_(args->call_combiner) {} + + grpc_core::CallCombiner* call_combiner_; + bool fail_ = false; + }; + + static grpc_error_handle Init(grpc_channel_element* elem, + grpc_channel_element_args* /*args*/) { + new (elem->channel_data) FailFirstTenCallsFilter(); + return GRPC_ERROR_NONE; + } + + static void Destroy(grpc_channel_element* elem) { + auto* chand = static_cast(elem->channel_data); + chand->~FailFirstTenCallsFilter(); + } + + size_t num_calls_ = 0; +}; + +grpc_channel_filter FailFirstTenCallsFilter::kFilterVtable = { + CallData::StartTransportStreamOpBatch, + nullptr, + grpc_channel_next_op, + sizeof(CallData), + CallData::Init, + grpc_call_stack_ignore_set_pollset_or_pollset_set, + CallData::Destroy, + sizeof(FailFirstTenCallsFilter), + Init, + Destroy, + grpc_channel_next_get_info, + "FailFirstTenCallsFilter", +}; + +} // namespace + +void retry_transparent_not_sent_on_wire(grpc_end2end_test_config config) { + GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL); + grpc_core::CoreConfiguration::RunWithSpecialConfiguration( + [](grpc_core::CoreConfiguration::Builder* builder) { + grpc_core::BuildCoreConfiguration(builder); + builder->channel_init()->RegisterStage( + GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY + 1, + [](grpc_core::ChannelStackBuilder* builder) { + // Skip on proxy (which explicitly disables retries). + const grpc_channel_args* args = builder->channel_args(); + if (!grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_RETRIES, + true)) { + return true; + } + // Install filter. + builder->PrependFilter(&FailFirstTenCallsFilter::kFilterVtable, + nullptr); + return true; + }); + }, + [config] { test_retry_transparent_not_sent_on_wire(config); }); +} + +void retry_transparent_not_sent_on_wire_pre_init(void) {} diff --git a/test/core/transport/chttp2/streams_not_seen_test.cc b/test/core/transport/chttp2/streams_not_seen_test.cc index e6ec6cd35c6ac..3898de635ada7 100644 --- a/test/core/transport/chttp2/streams_not_seen_test.cc +++ b/test/core/transport/chttp2/streams_not_seen_test.cc @@ -207,7 +207,9 @@ class StreamsNotSeenTest : public ::testing::Test { grpc_channel_arg_integer_create( const_cast(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA), 0), grpc_channel_arg_integer_create( - const_cast(GRPC_ARG_HTTP2_BDP_PROBE), 0)}; + const_cast(GRPC_ARG_HTTP2_BDP_PROBE), 0), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_ENABLE_RETRIES), 0)}; grpc_channel_args client_channel_args = {GPR_ARRAY_SIZE(client_args), client_args}; grpc_channel_credentials* creds = grpc_insecure_credentials_create();