Skip to content

Commit

Permalink
fix(pubsub): correctly trace modacks in the unary pull (#14049)
Browse files Browse the repository at this point in the history
* refactor(pubsub): create pull manager impl to wrap the modack rpc

* test(pubsub): use the mock pull lease manager impl in the tracing unittest

* refactor(pubsub): remove tracing lease manager class

* test(pubsub): add a test for the default pull lease manager impl

* fix clang-tidy-pubsub by removing unused
  • Loading branch information
alevenberg authored Apr 24, 2024
1 parent bbfacb5 commit 4dad770
Show file tree
Hide file tree
Showing 9 changed files with 221 additions and 118 deletions.
9 changes: 5 additions & 4 deletions google/cloud/pubsub/internal/default_pull_lease_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,14 @@ std::chrono::seconds DefaultLeaseExtension(Options const& options) {
DefaultPullLeaseManager::DefaultPullLeaseManager(
CompletionQueue cq, std::weak_ptr<SubscriberStub> w, Options options,
pubsub::Subscription subscription, std::string ack_id,
std::shared_ptr<Clock> clock)
std::shared_ptr<PullLeaseManagerImpl> impl, std::shared_ptr<Clock> clock)
: cq_(std::move(cq)),
stub_(std::move(w)),
options_(
google::cloud::internal::MakeImmutableOptions(std::move(options))),
subscription_(std::move(subscription)),
ack_id_(std::move(ack_id)),
impl_(std::move(impl)),
clock_(std::move(clock)),
lease_deadline_(DefaultLeaseDeadline(clock_->Now(), *options_)),
lease_extension_(DefaultLeaseExtension(*options_)),
Expand Down Expand Up @@ -94,14 +95,14 @@ future<Status> DefaultPullLeaseManager::ExtendLease(
options_->get<pubsub::RetryPolicyOption>()->clone(),
options_->get<pubsub::BackoffPolicyOption>()->clone(),
google::cloud::Idempotency::kIdempotent, cq_,
[stub = std::move(stub), deadline = now + extension, clock = clock_](
auto cq, auto context, auto options, auto const& request) {
[stub = std::move(stub), deadline = now + extension, clock = clock_,
impl = impl_](auto cq, auto context, auto options, auto const& request) {
if (deadline < clock->Now()) {
return make_ready_future(
Status(StatusCode::kDeadlineExceeded, "lease already expired"));
}
context->set_deadline((std::min)(deadline, context->deadline()));
return stub->AsyncModifyAckDeadline(cq, std::move(context),
return impl->AsyncModifyAckDeadline(stub, cq, std::move(context),
std::move(options), request);
},
options_, request, __func__);
Expand Down
16 changes: 16 additions & 0 deletions google/cloud/pubsub/internal/default_pull_lease_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ namespace cloud {
namespace pubsub_internal {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN

class DefaultPullLeaseManagerImpl : public PullLeaseManagerImpl {
public:
DefaultPullLeaseManagerImpl() = default;

future<Status> AsyncModifyAckDeadline(
std::shared_ptr<SubscriberStub> stub, google::cloud::CompletionQueue& cq,
std::shared_ptr<grpc::ClientContext> context,
google::cloud::internal::ImmutableOptions options,
google::pubsub::v1::ModifyAckDeadlineRequest const& request) override {
return stub->AsyncModifyAckDeadline(cq, std::move(context),
std::move(options), request);
}
};

/**
* Maintains the lease for a single message.
*/
Expand All @@ -45,6 +59,7 @@ class DefaultPullLeaseManager
DefaultPullLeaseManager(
CompletionQueue cq, std::weak_ptr<SubscriberStub> w, Options options,
pubsub::Subscription subscription, std::string ack_id,
std::shared_ptr<PullLeaseManagerImpl> impl,
std::shared_ptr<Clock> clock = std::make_shared<Clock>());
~DefaultPullLeaseManager() override;

Expand Down Expand Up @@ -76,6 +91,7 @@ class DefaultPullLeaseManager
google::cloud::internal::ImmutableOptions options_;
pubsub::Subscription subscription_;
std::string ack_id_;
std::shared_ptr<PullLeaseManagerImpl> impl_;
std::shared_ptr<Clock> clock_;
// The absolute deadline to complete processing the message.
// The application can configure this value using
Expand Down
66 changes: 53 additions & 13 deletions google/cloud/pubsub/internal/default_pull_lease_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ TEST(DefaultPullLeaseManager, SimpleLeaseLoop) {
});
});
auto manager = std::make_shared<DefaultPullLeaseManager>(
cq, mock, options, subscription, "test-ack-id", clock);
cq, mock, options, subscription, "test-ack-id",
std::make_shared<DefaultPullLeaseManagerImpl>(), clock);
manager->StartLeaseLoop();
auto pending = aseq.PopFrontWithName();
EXPECT_EQ(pending.second, "AsyncModifyAckDeadline");
Expand Down Expand Up @@ -174,7 +175,7 @@ TEST(DefaultPullLeaseManager, StartLeaseLoopAlreadyReleased) {
EXPECT_CALL(*mock, AsyncModifyAckDeadline).Times(0);
auto manager = std::make_shared<DefaultPullLeaseManager>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id",
std::move(clock));
std::make_shared<DefaultPullLeaseManagerImpl>(), std::move(clock));
// This can happen if the subscriber is shutdown, but the application manages
// to hold to a `AckHandler` reference. In this case, we expect the loop to
// stop (or have no effect).
Expand All @@ -193,7 +194,8 @@ TEST(DefaultPullLeaseManager, StartLeaseLoopAlreadyPastMaxExtension) {
auto mock = std::make_shared<MockSubscriberStub>();
EXPECT_CALL(*mock, AsyncModifyAckDeadline).Times(0);
auto manager = std::make_shared<DefaultPullLeaseManager>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id", clock);
cq, mock, MakeTestOptions(), subscription, "test-ack-id",
std::make_shared<DefaultPullLeaseManagerImpl>(), clock);
EXPECT_THAT(manager->lease_deadline(),
Eq(current_time + std::chrono::seconds(300)));
// See the MakeTestOptions() for the magic number.
Expand All @@ -213,7 +215,8 @@ TEST(DefaultPullLeaseManager, StartLeaseLoopTooCloseMaxExtension) {
auto mock = std::make_shared<MockSubscriberStub>();
EXPECT_CALL(*mock, AsyncModifyAckDeadline).Times(0);
auto manager = std::make_shared<DefaultPullLeaseManager>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id", clock);
cq, mock, MakeTestOptions(), subscription, "test-ack-id",
std::make_shared<DefaultPullLeaseManagerImpl>(), clock);
EXPECT_THAT(manager->lease_deadline(),
Eq(current_time + std::chrono::seconds(300)));
// See the MakeTestOptions() for the magic number.
Expand All @@ -234,7 +237,8 @@ TEST(DefaultPullLeaseManager, StartLeaseLoopAlreadyPastCurrentExtension) {
auto mock = std::make_shared<MockSubscriberStub>();
EXPECT_CALL(*mock, AsyncModifyAckDeadline).Times(0);
auto manager = std::make_shared<DefaultPullLeaseManager>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id", clock);
cq, mock, MakeTestOptions(), subscription, "test-ack-id",
std::make_shared<DefaultPullLeaseManagerImpl>(), clock);
EXPECT_GT(manager->current_lease(), current_time);
clock->SetTime(manager->current_lease());
manager->StartLeaseLoop();
Expand All @@ -258,7 +262,8 @@ TEST(DefaultPullLeaseManager, InitializeDeadlines) {
.set<pubsub::MaxDeadlineTimeOption>(std::chrono::seconds(300))
.set<pubsub::MinDeadlineExtensionOption>(
std::chrono::seconds(10))),
subscription, "test-ack-id", clock);
subscription, "test-ack-id",
std::make_shared<DefaultPullLeaseManagerImpl>(), clock);
EXPECT_EQ(manager->lease_deadline(),
current_time + std::chrono::seconds(300));
EXPECT_EQ(manager->LeaseRefreshPeriod(), std::chrono::seconds(9));
Expand All @@ -270,7 +275,8 @@ TEST(DefaultPullLeaseManager, InitializeDeadlines) {
.set<pubsub::MaxDeadlineTimeOption>(std::chrono::seconds(300))
.set<pubsub::MaxDeadlineExtensionOption>(
std::chrono::seconds(30))),
subscription, "test-ack-id", clock);
subscription, "test-ack-id",
std::make_shared<DefaultPullLeaseManagerImpl>(), clock);
EXPECT_EQ(manager->lease_deadline(),
current_time + std::chrono::seconds(300));
EXPECT_EQ(manager->LeaseRefreshPeriod(), std::chrono::seconds(29));
Expand All @@ -283,7 +289,8 @@ TEST(DefaultPullLeaseManager, InitializeDeadlines) {
.set<pubsub::MinDeadlineExtensionOption>(std::chrono::seconds(10))
.set<pubsub::MaxDeadlineExtensionOption>(
std::chrono::seconds(30))),
subscription, "test-ack-id", clock);
subscription, "test-ack-id",
std::make_shared<DefaultPullLeaseManagerImpl>(), clock);
EXPECT_EQ(manager->lease_deadline(),
current_time + std::chrono::seconds(300));
EXPECT_EQ(manager->LeaseRefreshPeriod(), std::chrono::seconds(9));
Expand All @@ -310,7 +317,8 @@ TEST(DefaultPullLeaseManager, ExtendLeaseDeadlineSimple) {
auto clock = std::make_shared<FakeSystemClock>();
clock->SetTime(current_time);
auto manager = std::make_shared<DefaultPullLeaseManager>(
cq, mock, options, subscription, "test-ack-id", std::move(clock));
cq, mock, options, subscription, "test-ack-id",
std::make_shared<DefaultPullLeaseManagerImpl>(), std::move(clock));

auto status = manager->ExtendLease(mock, current_time, kLeaseExtension);
EXPECT_STATUS_OK(status.get());
Expand All @@ -331,7 +339,8 @@ TEST(DefaultPullLeaseManager, ExtendLeaseDeadlineExceeded) {
// extension.
clock->SetTime(current_time + std::chrono::seconds(11));
auto manager = std::make_shared<DefaultPullLeaseManager>(
cq, mock, options, subscription, "test-ack-id", std::move(clock));
cq, mock, options, subscription, "test-ack-id",
std::make_shared<DefaultPullLeaseManagerImpl>(), std::move(clock));

auto status = manager->ExtendLease(mock, current_time, kLeaseExtension);
EXPECT_THAT(status.get(),
Expand Down Expand Up @@ -359,7 +368,8 @@ TEST(DefaultPullLeaseManager, ExtendLeasePermanentError) {
auto clock = std::make_shared<FakeSystemClock>();
clock->SetTime(current_time);
auto manager = std::make_shared<DefaultPullLeaseManager>(
cq, mock, options, subscription, "test-ack-id", std::move(clock));
cq, mock, options, subscription, "test-ack-id",
std::make_shared<DefaultPullLeaseManagerImpl>(), std::move(clock));

auto status = manager->ExtendLease(mock, current_time, kLeaseExtension);
EXPECT_THAT(status.get(),
Expand All @@ -374,7 +384,8 @@ TEST(DefaultPullLeaseManager, Subscription) {
auto clock = std::make_shared<FakeSystemClock>();
clock->SetTime(std::chrono::system_clock::now());
auto manager = std::make_shared<DefaultPullLeaseManager>(
cq, mock, Options{}, subscription, "test-ack-id", std::move(clock));
cq, mock, Options{}, subscription, "test-ack-id",
std::make_shared<DefaultPullLeaseManagerImpl>(), std::move(clock));

EXPECT_EQ(manager->subscription(), subscription);
}
Expand All @@ -388,11 +399,40 @@ TEST(DefaultPullLeaseManager, AckId) {
auto manager = std::make_shared<DefaultPullLeaseManager>(
cq, mock, Options{},
pubsub::Subscription("test-project", "test-subscription"), "test-ack-id",
std::move(clock));
std::make_shared<DefaultPullLeaseManagerImpl>(), std::move(clock));

EXPECT_EQ(manager->ack_id(), "test-ack-id");
}

TEST(DefaultPullLeaseManagerImpl, AsyncModifyAckDeadline) {
auto impl = std::make_shared<DefaultPullLeaseManagerImpl>();
google::pubsub::v1::ModifyAckDeadlineRequest request;
auto constexpr kLeaseExtension = std::chrono::seconds(10);
auto subscription = pubsub::Subscription("test-project", "test-subscription");
request.set_subscription(subscription.FullName());
request.set_ack_deadline_seconds(
static_cast<std::int32_t>(kLeaseExtension.count()));
request.add_ack_ids("test-ack-id");
auto stub = std::make_shared<MockSubscriberStub>();
auto request_matcher = AllOf(
Property(&ModifyAckDeadlineRequest::ack_ids, ElementsAre("test-ack-id")),
Property(&ModifyAckDeadlineRequest::ack_deadline_seconds,
kLeaseExtension.count()),
Property(&ModifyAckDeadlineRequest::subscription,
subscription.FullName()));
EXPECT_CALL(*stub, AsyncModifyAckDeadline(_, _, _, request_matcher))
.WillOnce(Return(ByMove(make_ready_future(Status{}))));
auto mock_cq = std::make_shared<testing_util::MockCompletionQueueImpl>();
CompletionQueue cq = CompletionQueue(std::move(mock_cq));
std::shared_ptr<grpc::ClientContext> context;
auto options = google::cloud::internal::MakeImmutableOptions(
google::cloud::pubsub_testing::MakeTestOptions());

auto status =
impl->AsyncModifyAckDeadline(stub, cq, context, options, request);
EXPECT_STATUS_OK(status.get());
}

} // namespace
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace pubsub_internal
Expand Down
13 changes: 13 additions & 0 deletions google/cloud/pubsub/internal/pull_lease_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,19 @@ class PullLeaseManager {
}
};

/**
* Interface to make a modify ack deadline rpc request.
*/
class PullLeaseManagerImpl {
public:
virtual ~PullLeaseManagerImpl() = default;
virtual future<Status> AsyncModifyAckDeadline(
std::shared_ptr<SubscriberStub> stub, google::cloud::CompletionQueue& cq,
std::shared_ptr<grpc::ClientContext> context,
google::cloud::internal::ImmutableOptions options,
google::pubsub::v1::ModifyAckDeadlineRequest const& request) = 0;
};

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace pubsub_internal
} // namespace cloud
Expand Down
11 changes: 7 additions & 4 deletions google/cloud/pubsub/internal/pull_lease_manager_factory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@ std::shared_ptr<PullLeaseManager> MakePullLeaseManager(
CompletionQueue cq, std::weak_ptr<SubscriberStub> stub,
pubsub::Subscription subscription, std::string ack_id,
Options const& options, std::shared_ptr<Clock> clock) {
std::shared_ptr<PullLeaseManagerImpl> manager_impl =
std::make_shared<pubsub_internal::DefaultPullLeaseManagerImpl>();
if (internal::TracingEnabled(options)) {
manager_impl = MakeTracingPullLeaseManagerImpl(std::move(manager_impl),
ack_id, subscription);
}
std::shared_ptr<PullLeaseManager> manager =
std::make_shared<pubsub_internal::DefaultPullLeaseManager>(
std::move(cq), std::move(stub), options, std::move(subscription),
std::move(ack_id), std::move(clock));
if (internal::TracingEnabled(options)) {
manager = MakeTracingPullLeaseManager(std::move(manager));
}
std::move(ack_id), std::move(manager_impl), std::move(clock));
return manager;
}

Expand Down
67 changes: 32 additions & 35 deletions google/cloud/pubsub/internal/tracing_pull_lease_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,44 +33,41 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN

#ifdef GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY

class TracingPullLeaseManager : public PullLeaseManager {
class TracingPullLeaseManagerImpl : public PullLeaseManagerImpl {
public:
explicit TracingPullLeaseManager(std::shared_ptr<PullLeaseManager> child)
explicit TracingPullLeaseManagerImpl(
std::shared_ptr<PullLeaseManagerImpl> child, std::string ack_id,
pubsub::Subscription subscription)
: child_(std::move(child)),
ack_id_(std::move(ack_id)),
subscription_(std::move(subscription)),
consumer_span_context_(
opentelemetry::trace::GetSpan(
opentelemetry::context::RuntimeContext::GetCurrent())
->GetContext()) {}

void StartLeaseLoop() override { child_->StartLeaseLoop(); };

std::chrono::milliseconds LeaseRefreshPeriod() const override {
return child_->LeaseRefreshPeriod();
}

future<Status> ExtendLease(std::shared_ptr<SubscriberStub> stub,
std::chrono::system_clock::time_point time_now,
std::chrono::seconds extension) override {
future<Status> AsyncModifyAckDeadline(
std::shared_ptr<SubscriberStub> stub, google::cloud::CompletionQueue& cq,
std::shared_ptr<grpc::ClientContext> context,
google::cloud::internal::ImmutableOptions options,
google::pubsub::v1::ModifyAckDeadlineRequest const& request) override {
namespace sc = opentelemetry::trace::SemanticConventions;
opentelemetry::trace::StartSpanOptions options;
options.kind = opentelemetry::trace::SpanKind::kClient;
opentelemetry::trace::StartSpanOptions start_span_options;
start_span_options.kind = opentelemetry::trace::SpanKind::kClient;
auto span = internal::MakeSpan(
child_->subscription().subscription_id() + " modack",
subscription_.subscription_id() + " modack",
{{sc::kMessagingSystem, "gcp_pubsub"},
{sc::kMessagingOperation, "modack"},
{sc::kCodeFunction, "pubsub::PullLeaseManager::ExtendLease"},
{"messaging.gcp_pubsub.message.ack_id", child_->ack_id()},
{"messaging.gcp_pubsub.message.ack_id", ack_id_},
{"messaging.gcp_pubsub.message.ack_deadline_seconds",
static_cast<std::int32_t>(extension.count())},
{"gcp.project_id", child_->subscription().project_id()},
{sc::kMessagingDestinationName,
child_->subscription().subscription_id()}},
CreateLinks(consumer_span_context_), options);
static_cast<std::int32_t>(request.ack_deadline_seconds())},
{"gcp.project_id", subscription_.project_id()},
{sc::kMessagingDestinationName, subscription_.subscription_id()}},
CreateLinks(consumer_span_context_), start_span_options);
auto scope = internal::OTelScope(span);
MaybeAddLinkAttributes(*span, consumer_span_context_, "receive");
return child_
->ExtendLease(std::move(stub), std::move(time_now),
std::move(extension))
return child_->AsyncModifyAckDeadline(stub, cq, context, options, request)
.then([oc = opentelemetry::context::RuntimeContext::GetCurrent(),
span = std::move(span)](auto f) {
auto result = f.get();
Expand All @@ -79,25 +76,25 @@ class TracingPullLeaseManager : public PullLeaseManager {
});
}

std::string ack_id() const override { return child_->ack_id(); }

pubsub::Subscription subscription() const override {
return child_->subscription();
}

std::shared_ptr<PullLeaseManager> child_;
private:
std::shared_ptr<PullLeaseManagerImpl> child_;
std::string ack_id_;
pubsub::Subscription subscription_;
opentelemetry::trace::SpanContext consumer_span_context_;
};

std::shared_ptr<PullLeaseManager> MakeTracingPullLeaseManager(
std::shared_ptr<PullLeaseManager> manager) {
return std::make_shared<TracingPullLeaseManager>(std::move(manager));
std::shared_ptr<PullLeaseManagerImpl> MakeTracingPullLeaseManagerImpl(
std::shared_ptr<PullLeaseManagerImpl> manager, std::string ack_id,
pubsub::Subscription subscription) {
return std::make_shared<TracingPullLeaseManagerImpl>(
std::move(manager), std::move(ack_id), std::move(subscription));
}

#else // GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY

std::shared_ptr<PullLeaseManager> MakeTracingPullLeaseManager(
std::shared_ptr<PullLeaseManager> manager) {
std::shared_ptr<PullLeaseManagerImpl> MakeTracingPullLeaseManagerImpl(
std::shared_ptr<PullLeaseManagerImpl> manager, std::string,
pubsub::Subscription) {
return manager;
}

Expand Down
5 changes: 3 additions & 2 deletions google/cloud/pubsub/internal/tracing_pull_lease_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ namespace cloud {
namespace pubsub_internal {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN

std::shared_ptr<PullLeaseManager> MakeTracingPullLeaseManager(
std::shared_ptr<PullLeaseManager> manager);
std::shared_ptr<PullLeaseManagerImpl> MakeTracingPullLeaseManagerImpl(
std::shared_ptr<PullLeaseManagerImpl> manager, std::string ack_id,
pubsub::Subscription subscription);

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace pubsub_internal
Expand Down
Loading

0 comments on commit 4dad770

Please sign in to comment.