Skip to content

Commit

Permalink
Fix VST / HTTP2 idle connection keep alive behaviour (#12549) (#12584)
Browse files Browse the repository at this point in the history
* Fix VST / HTTP2 idle connection keep alive behaviour (#12549)

(cherry picked from commit 8dfe68f)

* remove wrong merge

Co-authored-by: KVS85 <vadim@arangodb.com>
  • Loading branch information
graetzer and KVS85 authored Sep 7, 2020
1 parent 8731685 commit d7888a3
Show file tree
Hide file tree
Showing 11 changed files with 522 additions and 460 deletions.
4 changes: 2 additions & 2 deletions 3rdParty/fuerte/src/H2Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ int on_invalid_frame_recv(nghttp2_session *session, const nghttp2_frame *frame,

constexpr uint32_t window_size = 512 * 1024 * 1024;
void populateSettings(std::array<nghttp2_settings_entry, 4>& iv) {
// 64 streams matches the queue capacity
iv[0] = {NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 64};
// 32 streams matches the queue capacity
iv[0] = {NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 32};
// typically client is just a *sink* and just process data as
// much as possible. Use large window size by default.
iv[1] = {NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, window_size};
Expand Down
2 changes: 1 addition & 1 deletion 3rdParty/fuerte/src/loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ EventLoopService::EventLoopService(unsigned int threadCount, char const* name)
_ioContexts.emplace_back(std::make_shared<asio_ns::io_context>(1));
_guards.emplace_back(asio_ns::make_work_guard(*_ioContexts.back()));
asio_ns::io_context* ctx = _ioContexts.back().get();
_threads.emplace_back([ctx, name]() {
_threads.emplace_back([=]() {
#ifdef __linux__
// set name of threadpool thread, so threads can be distinguished from each other
if (name != nullptr && *name != '\0') {
Expand Down
7 changes: 0 additions & 7 deletions arangod/GeneralServer/CommTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,6 @@ class CommTask : public std::enable_shared_from_this<CommTask> {
virtual void start() = 0;
virtual void stop() = 0;

// returns the number of scheduled requests
std::size_t getRequestCount() const { return _requestCount; }

void setKeepAliveTimeoutReached() { _keepAliveTimeoutReached = true; }

protected:

virtual std::unique_ptr<GeneralResponse> createResponse(rest::ResponseCode,
Expand Down Expand Up @@ -163,8 +158,6 @@ class CommTask : public std::enable_shared_from_this<CommTask> {

ConnectionStatistics::Item _connectionStatistics;
std::chrono::milliseconds _keepAliveTimeout;
std::size_t _requestCount = 0;
bool _keepAliveTimeoutReached = false;
AuthenticationFeature* _auth;

mutable std::mutex _statisticsMutex;
Expand Down
44 changes: 9 additions & 35 deletions arangod/GeneralServer/GeneralCommTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@
using namespace arangodb;
using namespace arangodb::rest;

// -----------------------------------------------------------------------------
// --SECTION-- constructors and destructors
// -----------------------------------------------------------------------------

template <SocketType T>
GeneralCommTask<T>::GeneralCommTask(GeneralServer& server,
ConnectionInfo info,
std::unique_ptr<AsioSocket<T>> socket)
: CommTask(server, std::move(info)), _protocol(std::move(socket)), _stopped(false) {
: CommTask(server, std::move(info)),
_protocol(std::move(socket)),
_reading(false),
_writing(false),
_stopped(false) {
if (AsioSocket<T>::supportsMixedIO()) {
_protocol->setNonBlocking(true);
}
Expand Down Expand Up @@ -77,27 +77,6 @@ void GeneralCommTask<T>::close(asio_ns::error_code const& ec) {
}
}

/// set / reset connection timeout
template <SocketType T>
void GeneralCommTask<T>::setTimeout(std::chrono::milliseconds millis) {
_protocol->timer.expires_after(millis);
_protocol->timer.async_wait([self = CommTask::weak_from_this(), oldRequestCount = getRequestCount()](asio_ns::error_code ec) {
std::shared_ptr<CommTask> s;
if (ec || !(s = self.lock())) { // was canceled / deallocated
return;
}

if (s->getRequestCount() != oldRequestCount) {
return;
}

s->setKeepAliveTimeoutReached();
LOG_TOPIC("5c1e0", INFO, Logger::REQUESTS)
<< "keep alive timeout, closing stream!";
static_cast<GeneralCommTask<T>&>(*s).close(ec);
});
}

template <SocketType T>
void GeneralCommTask<T>::asyncReadSome() try {
asio_ns::error_code ec;
Expand Down Expand Up @@ -128,20 +107,15 @@ void GeneralCommTask<T>::asyncReadSome() try {
return;
}

// VST and H2 get a simple timeout here
if (enableReadTimeout()) {
setTimeout(DefaultTimeout);
}

auto mutableBuff = _protocol->buffer.prepare(ReadBlockSize);

_reading = true;
setIOTimeout();
_protocol->socket.async_read_some(
mutableBuff, [self = shared_from_this()](asio_ns::error_code const& ec, size_t nread) {
auto& me = static_cast<GeneralCommTask<T>&>(*self);
me._reading = false;
me._protocol->buffer.commit(nread);

if (me.enableReadTimeout()) {
me._protocol->timer.cancel();
}

try {
if (me.readCallback(ec)) {
Expand Down
25 changes: 14 additions & 11 deletions arangod/GeneralServer/GeneralCommTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,31 +44,34 @@ class GeneralCommTask : public CommTask {
void stop() override;

void close(asio_ns::error_code const& err = asio_ns::error_code());

protected:

/// set / reset connection timeout
void setTimeout(std::chrono::milliseconds millis);
protected:

/// read from socket
void asyncReadSome();

bool stopped() const { return _stopped.load(std::memory_order_acquire); }

protected:

// set a read timeout in asyncReadSome
virtual bool enableReadTimeout() const = 0;


/// called to process data in _readBuffer, return false to stop
virtual bool readCallback(asio_ns::error_code ec) = 0;


/// set / reset connection timeout
virtual void setIOTimeout() = 0;

protected:

/// default max chunksize is 30kb in arangodb (each read fits)
static constexpr size_t ReadBlockSize = 1024 * 32;
static constexpr double WriteTimeout = 300.0;

std::unique_ptr<AsioSocket<T>> _protocol;

static constexpr std::chrono::seconds DefaultTimeout{120};
bool _reading;
bool _writing;

std::unique_ptr<AsioSocket<T>> _protocol;
private:

std::atomic<bool> _stopped;
};
Expand Down
Loading

0 comments on commit d7888a3

Please sign in to comment.