Skip to content

Commit

Permalink
feat(pubsub): add lease management for unary pull
Browse files Browse the repository at this point in the history
  • Loading branch information
alevenberg committed Jan 12, 2024
1 parent 08c1689 commit efa131d
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 7 deletions.
17 changes: 16 additions & 1 deletion google/cloud/pubsub/internal/default_pull_ack_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,22 @@ DefaultPullAckHandler::DefaultPullAckHandler(CompletionQueue cq,
ack_id_(std::move(ack_id)),
delivery_attempt_(delivery_attempt),
lease_manager_(
MakePullLeaseManager(cq_, stub_, subscription_, ack_id_, options)) {}
MakePullLeaseManager(cq_, stub_, subscription_, ack_id_, options)) {
initialize();
}

DefaultPullAckHandler::DefaultPullAckHandler(
CompletionQueue cq, std::weak_ptr<SubscriberStub> w, Options const& options,
pubsub::Subscription subscription, std::string ack_id,
std::int32_t delivery_attempt, std::shared_ptr<PullLeaseManager> manager)
: cq_(std::move(cq)),
stub_(std::move(w)),
subscription_(std::move(subscription)),
ack_id_(std::move(ack_id)),
delivery_attempt_(delivery_attempt),
lease_manager_(std::move(manager)) {
initialize();
}

DefaultPullAckHandler::~DefaultPullAckHandler() = default;

Expand Down
8 changes: 8 additions & 0 deletions google/cloud/pubsub/internal/default_pull_ack_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ class DefaultPullAckHandler : public pubsub::PullAckHandler::Impl {
Options const& options,
pubsub::Subscription subscription, std::string ack_id,
std::int32_t delivery_attempt);
// For testing.
DefaultPullAckHandler(CompletionQueue cq, std::weak_ptr<SubscriberStub> w,
Options const& options,
pubsub::Subscription subscription, std::string ack_id,
std::int32_t delivery_attempt,
std::shared_ptr<PullLeaseManager> manager);
~DefaultPullAckHandler() override;

future<Status> ack() override;
Expand All @@ -55,6 +61,8 @@ class DefaultPullAckHandler : public pubsub::PullAckHandler::Impl {
pubsub::Subscription subscription() const override;

private:
void initialize() { lease_manager_->StartLeaseLoop(); }

CompletionQueue cq_;
std::weak_ptr<SubscriberStub> stub_;
pubsub::Subscription subscription_;
Expand Down
31 changes: 25 additions & 6 deletions google/cloud/pubsub/internal/default_pull_ack_handler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "google/cloud/pubsub/internal/default_pull_lease_manager.h"
#include "google/cloud/pubsub/options.h"
#include "google/cloud/pubsub/retry_policy.h"
#include "google/cloud/pubsub/testing/mock_pull_lease_manager.h"
#include "google/cloud/pubsub/testing/mock_subscriber_stub.h"
#include "google/cloud/pubsub/testing/test_retry_policies.h"
#include "google/cloud/testing_util/async_sequencer.h"
Expand All @@ -29,6 +30,7 @@ namespace pubsub_internal {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
namespace {

using ::google::cloud::pubsub_testing::MockPullLeaseManager;
using ::google::cloud::pubsub_testing::MockSubscriberStub;
using ::google::cloud::testing_util::AsyncSequencer;
using ::google::cloud::testing_util::StatusIs;
Expand Down Expand Up @@ -96,7 +98,8 @@ TEST(PullAckHandlerTest, AckSimple) {
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto handler = std::make_unique<DefaultPullAckHandler>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42);
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42,
std::make_shared<MockPullLeaseManager>());
EXPECT_EQ(handler->delivery_attempt(), 42);
auto status = handler->ack();
auto timer = aseq.PopFrontWithName();
Expand All @@ -120,7 +123,8 @@ TEST(PullAckHandlerTest, AckPermanentError) {
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto handler = std::make_unique<DefaultPullAckHandler>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42);
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42,
std::make_shared<MockPullLeaseManager>());
EXPECT_EQ(handler->delivery_attempt(), 42);
auto status = handler->ack();
EXPECT_THAT(status.get(),
Expand All @@ -144,7 +148,8 @@ TEST(PullAckHandlerTest, NackSimple) {
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto handler = std::make_unique<DefaultPullAckHandler>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42);
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42,
std::make_shared<MockPullLeaseManager>());
EXPECT_EQ(handler->delivery_attempt(), 42);
auto status = handler->nack();
auto timer = aseq.PopFrontWithName();
Expand All @@ -170,20 +175,34 @@ TEST(PullAckHandlerTest, NackPermanentError) {
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto handler = std::make_unique<DefaultPullAckHandler>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42);
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42,
std::make_shared<MockPullLeaseManager>());
EXPECT_EQ(handler->delivery_attempt(), 42);
auto status = handler->nack();
EXPECT_THAT(status.get(),
StatusIs(StatusCode::kPermissionDenied, HasSubstr("uh-oh")));
}

TEST(PullAckHandlerTest, StartsLeaseManager) {
auto subscription = pubsub::Subscription("test-project", "test-subscription");

auto mock = std::make_shared<MockSubscriberStub>();
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto lm = std::make_shared<MockPullLeaseManager>();
EXPECT_CALL(*lm, StartLeaseLoop()).Times(1);
auto handler = std::make_unique<DefaultPullAckHandler>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42, lm);
}

TEST(AckHandlerTest, Subscription) {
auto subscription = pubsub::Subscription("test-project", "test-subscription");
auto mock = std::make_shared<MockSubscriberStub>();
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto handler = std::make_unique<DefaultPullAckHandler>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42);
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42,
std::make_shared<MockPullLeaseManager>());

EXPECT_EQ(handler->subscription(), subscription);
}
Expand All @@ -195,7 +214,7 @@ TEST(AckHandlerTest, AckId) {
auto handler = std::make_unique<DefaultPullAckHandler>(
cq, mock, MakeTestOptions(),
pubsub::Subscription("test-project", "test-subscription"), "test-ack-id",
42);
42, std::make_shared<MockPullLeaseManager>());

EXPECT_EQ(handler->ack_id(), "test-ack-id");
}
Expand Down

0 comments on commit efa131d

Please sign in to comment.