Skip to content

Commit

Permalink
Call finalization for promises (grpc#29008)
Browse files Browse the repository at this point in the history
* Call finalization for promises

* comment

* split out and test

* dont use promise_detail:: directly

* fix

* Automated change: Fix sanity tests

* fix

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
  • Loading branch information
ctiller and ctiller authored Mar 3, 2022
1 parent a047390 commit 047642c
Show file tree
Hide file tree
Showing 19 changed files with 300 additions and 10 deletions.
1 change: 1 addition & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2000,6 +2000,7 @@ grpc_cc_library(
"src/core/lib/transport/http2_errors.h",
"src/core/lib/address_utils/parse_address.h",
"src/core/lib/backoff/backoff.h",
"src/core/lib/channel/call_finalization.h",
"src/core/lib/channel/call_tracer.h",
"src/core/lib/channel/channel_stack.h",
"src/core/lib/channel/promise_based_filter.h",
Expand Down
36 changes: 36 additions & 0 deletions CMakeLists.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 15 additions & 1 deletion build_autogenerated.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions gRPC-C++.podspec

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions gRPC-Core.podspec

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions grpc.gemspec

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

86 changes: 86 additions & 0 deletions src/core/lib/channel/call_finalization.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef GRPC_CORE_LIB_CHANNEL_CALL_FINALIZATION_H
#define GRPC_CORE_LIB_CHANNEL_CALL_FINALIZATION_H

#include <grpc/support/port_platform.h>

#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/resource_quota/arena.h"

namespace grpc_core {

// Call finalization context.
// Sometimes a filter needs to perform some operation after the last byte of
// data is flushed to the wire. This context is used to perform that
// finalization.
// Filters can register a finalizer by calling Add().
// The finalizer will be called before the call is destroyed but after
// the top level promise is completed.
class CallFinalization {
public:
// Add a step to the finalization context.
// Takes a callable with a signature compatible with:
// (const grpc_call_final_info&) -> void.
// Finalizers are run in the reverse order they are added.
template <typename F>
void Add(F&& t) {
first_ =
GetContext<Arena>()->New<FuncFinalizer<F>>(std::forward<F>(t), first_);
}

void Run(const grpc_call_final_info* final_info) {
if (Finalizer* f = absl::exchange(first_, nullptr)) f->Run(final_info);
}

private:
// Base class for finalizer implementations.
class Finalizer {
public:
// Run the finalizer and call the destructor of this Finalizer.
virtual void Run(const grpc_call_final_info* final_info) = 0;

protected:
~Finalizer() {}
};
// Specialization for callable objects.
template <typename F>
class FuncFinalizer final : public Finalizer {
public:
FuncFinalizer(F&& f, Finalizer* next)
: next_(next), f_(std::forward<F>(f)) {}

void Run(const grpc_call_final_info* final_info) override {
f_(final_info);
Finalizer* next = next_;
this->~FuncFinalizer();
if (next != nullptr) next->Run(final_info);
}

private:
Finalizer* next_;
F f_;
};
// The first finalizer in the chain.
Finalizer* first_ = nullptr;
};

template <>
struct ContextType<CallFinalization> {};

} // namespace grpc_core

#endif // GRPC_CORE_LIB_CHANNEL_CALL_FINALIZATION_H
21 changes: 17 additions & 4 deletions src/core/lib/channel/promise_based_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
#include <grpc/status.h>
#include <grpc/support/log.h>

#include "src/core/lib/channel/call_finalization.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/transport/error_utils.h"

Expand Down Expand Up @@ -95,17 +97,24 @@ class BaseCallData : public Activity, private Wakeable {
Waker MakeNonOwningWaker() final;
Waker MakeOwningWaker() final;

void Finalize(const grpc_call_final_info* final_info) {
finalization_.Run(final_info);
}

protected:
class ScopedContext
: public promise_detail::Context<Arena>,
public promise_detail::Context<grpc_call_context_element>,
public promise_detail::Context<grpc_polling_entity> {
public promise_detail::Context<grpc_polling_entity>,
public promise_detail::Context<CallFinalization> {
public:
explicit ScopedContext(BaseCallData* call_data)
: promise_detail::Context<Arena>(call_data->arena_),
promise_detail::Context<grpc_call_context_element>(
call_data->context_),
promise_detail::Context<grpc_polling_entity>(call_data->pollent_) {}
promise_detail::Context<grpc_polling_entity>(call_data->pollent_),
promise_detail::Context<CallFinalization>(&call_data->finalization_) {
}
};

static MetadataHandle<grpc_metadata_batch> WrapMetadata(
Expand Down Expand Up @@ -135,6 +144,7 @@ class BaseCallData : public Activity, private Wakeable {
Arena* const arena_;
CallCombiner* const call_combiner_;
const Timestamp deadline_;
CallFinalization finalization_;
grpc_call_context_element* const context_;
grpc_polling_entity* pollent_ = nullptr;
};
Expand Down Expand Up @@ -380,8 +390,11 @@ MakePromiseBasedFilter(const char* name) {
static_cast<CallData*>(elem->call_data)->set_pollent(pollent);
},
// destroy_call_elem
[](grpc_call_element* elem, const grpc_call_final_info*, grpc_closure*) {
static_cast<CallData*>(elem->call_data)->~CallData();
[](grpc_call_element* elem, const grpc_call_final_info* final_info,
grpc_closure*) {
auto* cd = static_cast<CallData*>(elem->call_data);
cd->Finalize(final_info);
cd->~CallData();
},
// sizeof_channel_data
sizeof(F),
Expand Down
15 changes: 15 additions & 0 deletions test/core/channel/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,18 @@ grpc_cc_test(
"//test/core/util:grpc_test_util",
],
)

grpc_cc_test(
name = "call_finalization_test",
srcs = ["call_finalization_test.cc"],
external_deps = [
"gtest",
],
language = "C++",
deps = [
"//:gpr",
"//:grpc",
"//test/core/promise:test_context",
"//test/core/util:grpc_test_util",
],
)
50 changes: 50 additions & 0 deletions test/core/channel/call_finalization_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2021 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "src/core/lib/channel/call_finalization.h"

#include <gtest/gtest.h>

#include "src/core/lib/resource_quota/resource_quota.h"
#include "test/core/promise/test_context.h"

namespace grpc_core {

static auto* g_memory_allocator = new MemoryAllocator(
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator("test"));

TEST(CallFinalizationTest, Works) {
auto arena = MakeScopedArena(1024, g_memory_allocator);
std::string evidence;
TestContext<Arena> context(arena.get());
CallFinalization finalization;
auto p = std::make_shared<int>(42);
finalization.Add([&evidence, p](const grpc_call_final_info* final_info) {
evidence += absl::StrCat("FIRST", final_info->error_string, *p, "\n");
});
finalization.Add([&evidence, p](const grpc_call_final_info* final_info) {
evidence += absl::StrCat("SECOND", final_info->error_string, *p, "\n");
});
grpc_call_final_info final_info{};
final_info.error_string = "123";
finalization.Run(&final_info);
EXPECT_EQ(evidence, "SECOND12342\nFIRST12342\n");
}

} // namespace grpc_core

int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
1 change: 1 addition & 0 deletions test/core/filters/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ grpc_cc_test(
deps = [
"//:grpc",
"//:grpc_client_authority_filter",
"//test/core/promise:test_context",
"//test/core/util:grpc_suppressions",
],
)
Loading

0 comments on commit 047642c

Please sign in to comment.