Skip to content

Commit

Permalink
[BTS-1407] Cancel Requests in retry (arangodb#19107)
Browse files Browse the repository at this point in the history
  • Loading branch information
Lars Maier authored May 26, 2023
1 parent 6c07655 commit 8d8e3c8
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 13 deletions.
25 changes: 13 additions & 12 deletions arangod/Network/Methods.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ FutureRes sendRequest(ConnectionPool* pool, DestinationId dest, RestVerb type,

/// Stateful handler class with enough information to keep retrying
/// a request until an overall timeout is hit (or the request succeeds)
class RequestsState final : public std::enable_shared_from_this<RequestsState> {
class RequestsState final : public std::enable_shared_from_this<RequestsState>,
public RetryableRequest {
public:
RequestsState(ConnectionPool* pool, DestinationId&& destination,
RestVerb type, std::string&& path,
Expand Down Expand Up @@ -585,17 +586,17 @@ class RequestsState final : public std::enable_shared_from_this<RequestsState> {
return;
}

_workItem = sch->queueDelayed(
"request-retry", _options.continuationLane, tryAgainAfter,
[self = shared_from_this()](bool canceled) {
if (canceled) {
self->_promise.setValue(Response{std::move(self->_destination),
Error::ConnectionCanceled, nullptr,
nullptr});
} else {
self->startRequest();
}
});
auto& server = _pool->config().clusterInfo->server();
NetworkFeature& nf = server.getFeature<NetworkFeature>();
nf.retryRequest(shared_from_this(), _options.continuationLane,
tryAgainAfter);
}

public:
void retry() override { startRequest(); }
void cancel() override {
_promise.setValue(Response{std::move(_destination),
Error::ConnectionCanceled, nullptr, nullptr});
}

private:
Expand Down
33 changes: 33 additions & 0 deletions arangod/Network/NetworkFeature.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,10 @@ void NetworkFeature::beginShutdown() {
{
std::lock_guard<std::mutex> guard(_workItemMutex);
_workItem.reset();
for (auto const& [req, item] : _retryRequests) {
item->cancel();
}
_retryRequests.clear();
}
_poolPtr.store(nullptr, std::memory_order_relaxed);
if (_pool) { // first cancel all connections
Expand All @@ -311,6 +315,10 @@ void NetworkFeature::stop() {
// we might have posted another workItem during shutdown.
std::lock_guard<std::mutex> guard(_workItemMutex);
_workItem.reset();
for (auto const& [req, item] : _retryRequests) {
item->cancel();
}
_retryRequests.clear();
}
if (_pool) {
_pool->shutdownConnections();
Expand Down Expand Up @@ -481,4 +489,29 @@ void NetworkFeature::finishRequest(network::ConnectionPool const& pool,
}
}

void NetworkFeature::retryRequest(
std::shared_ptr<network::RetryableRequest> req, RequestLane lane,
std::chrono::steady_clock::duration duration) {
if (server().isStopping()) {
req->cancel();
}
std::unique_lock guard(_workItemMutex);
auto item = SchedulerFeature::SCHEDULER->queueDelayed(
"retry-requests", lane, duration,
[this, weak = std::weak_ptr(req)](bool cancelled) {
if (auto self = weak.lock(); self) {
{
std::unique_lock guard(_workItemMutex);
_retryRequests.erase(self);
}
if (cancelled) {
self->cancel();
} else {
self->retry();
}
}
});
_retryRequests.emplace(std::move(req), std::move(item));
}

} // namespace arangodb
15 changes: 14 additions & 1 deletion arangod/Network/NetworkFeature.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@
namespace arangodb {
namespace network {
struct RequestOptions;
}

struct RetryableRequest {
virtual ~RetryableRequest() = default;
virtual void retry() = 0;
virtual void cancel() = 0;
};
} // namespace network

class NetworkFeature final : public ArangodFeature {
public:
Expand Down Expand Up @@ -79,6 +85,9 @@ class NetworkFeature final : public ArangodFeature {
std::unique_ptr<fuerte::Request>&& req,
RequestCallback&& cb);

void retryRequest(std::shared_ptr<network::RetryableRequest>, RequestLane,
std::chrono::steady_clock::duration);

protected:
void prepareRequest(network::ConnectionPool const& pool,
std::unique_ptr<fuerte::Request>& req);
Expand All @@ -102,6 +111,10 @@ class NetworkFeature final : public ArangodFeature {
std::unique_ptr<network::ConnectionPool> _pool;
std::atomic<network::ConnectionPool*> _poolPtr;

std::unordered_map<std::shared_ptr<network::RetryableRequest>,
Scheduler::WorkHandle>
_retryRequests;

/// @brief number of cluster-internal forwarded requests
/// (from one coordinator to another, in case load-balancing
/// is used)
Expand Down

0 comments on commit 8d8e3c8

Please sign in to comment.