Skip to content

Commit

Permalink
Merge branch 'histo' of github.com:vjpai/grpc into delayed-write
Browse files Browse the repository at this point in the history
  • Loading branch information
ctiller committed Jul 15, 2016
2 parents 13d3e3b + 5fde20d commit b23239b
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 155 deletions.
3 changes: 3 additions & 0 deletions src/proto/grpc/testing/control.proto
Original file line number Diff line number Diff line change
Expand Up @@ -229,4 +229,7 @@ message ScenarioResult {
repeated int32 server_cores = 5;
// An after-the-fact computed summary
ScenarioResultSummary summary = 6;
// Information on success or failure of each worker
repeated bool client_success = 7;
repeated bool server_success = 8;
}
68 changes: 49 additions & 19 deletions test/cpp/qps/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,21 @@ class ClientRequestCreator<ByteBuffer> {
}
};

class HistogramEntry GRPC_FINAL {
public:
HistogramEntry() : used_(false) {}
bool used() const { return used_; }
double value() const { return value_; }
void set_value(double v) {
used_ = true;
value_ = v;
}

private:
bool used_;
double value_;
};

class Client {
public:
Client() : timer_(new UsageTimer), interarrival_timer_() {}
Expand Down Expand Up @@ -151,18 +166,30 @@ class Client {
return stats;
}

// Must call AwaitThreadsCompletion before destructor to avoid a race
// between destructor and invocation of virtual ThreadFunc
void AwaitThreadsCompletion() {
DestroyMultithreading();
std::unique_lock<std::mutex> g(thread_completion_mu_);
while (threads_remaining_ != 0) {
threads_complete_.wait(g);
}
}

protected:
bool closed_loop_;

void StartThreads(size_t num_threads) {
threads_remaining_ = num_threads;
for (size_t i = 0; i < num_threads; i++) {
threads_.emplace_back(new Thread(this, i));
}
}

void EndThreads() { threads_.clear(); }

virtual bool ThreadFunc(Histogram* histogram, size_t thread_idx) = 0;
virtual void DestroyMultithreading() = 0;
virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0;

void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
// Set up the load distribution based on the number of threads
Expand Down Expand Up @@ -215,7 +242,6 @@ class Client {
public:
Thread(Client* client, size_t idx)
: done_(false),
new_stats_(nullptr),
client_(client),
idx_(idx),
impl_(&Thread::ThreadFunc, this) {}
Expand All @@ -230,15 +256,10 @@ class Client {

void BeginSwap(Histogram* n) {
std::lock_guard<std::mutex> g(mu_);
new_stats_ = n;
n->Swap(&histogram_);
}

void EndSwap() {
std::unique_lock<std::mutex> g(mu_);
while (new_stats_ != nullptr) {
cv_.wait(g);
};
}
void EndSwap() {}

void MergeStatsInto(Histogram* hist) {
std::unique_lock<std::mutex> g(mu_);
Expand All @@ -252,29 +273,26 @@ class Client {
void ThreadFunc() {
for (;;) {
// run the loop body
const bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_);
// lock, see if we're done
HistogramEntry entry;
const bool thread_still_ok = client_->ThreadFunc(&entry, idx_);
// lock, update histogram if needed and see if we're done
std::lock_guard<std::mutex> g(mu_);
if (entry.used()) {
histogram_.Add(entry.value());
}
if (!thread_still_ok) {
gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
done_ = true;
}
if (done_) {
client_->CompleteThread();
return;
}
// check if we're resetting stats, swap out the histogram if so
if (new_stats_) {
new_stats_->Swap(&histogram_);
new_stats_ = nullptr;
cv_.notify_one();
}
}
}

std::mutex mu_;
std::condition_variable cv_;
bool done_;
Histogram* new_stats_;
Histogram histogram_;
Client* client_;
const size_t idx_;
Expand All @@ -286,6 +304,18 @@ class Client {

InterarrivalTimer interarrival_timer_;
std::vector<gpr_timespec> next_time_;

std::mutex thread_completion_mu_;
size_t threads_remaining_;
std::condition_variable threads_complete_;

void CompleteThread() {
std::lock_guard<std::mutex> g(thread_completion_mu_);
threads_remaining_--;
if (threads_remaining_ == 0) {
threads_complete_.notify_all();
}
}
};

template <class StubType, class RequestType>
Expand Down
91 changes: 57 additions & 34 deletions test/cpp/qps/client_async.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
*
*/

#include <cassert>
#include <forward_list>
#include <functional>
#include <list>
Expand All @@ -48,7 +47,6 @@
#include <grpc++/generic/generic_stub.h>
#include <grpc/grpc.h>
#include <grpc/support/cpu.h>
#include <grpc/support/histogram.h>
#include <grpc/support/log.h>

#include "src/proto/grpc/testing/services.grpc.pb.h"
Expand All @@ -64,7 +62,7 @@ class ClientRpcContext {
ClientRpcContext() {}
virtual ~ClientRpcContext() {}
// next state, return false if done. Collect stats when appropriate
virtual bool RunNextState(bool, Histogram* hist) = 0;
virtual bool RunNextState(bool, HistogramEntry* entry) = 0;
virtual ClientRpcContext* StartNewClone() = 0;
static void* tag(ClientRpcContext* c) { return reinterpret_cast<void*>(c); }
static ClientRpcContext* detag(void* t) {
Expand Down Expand Up @@ -104,7 +102,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
}
}
bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE {
switch (next_state_) {
case State::READY:
start_ = UsageTimer::Now();
Expand All @@ -114,7 +112,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
next_state_ = State::RESP_DONE;
return true;
case State::RESP_DONE:
hist->Add((UsageTimer::Now() - start_) * 1e9);
entry->set_value((UsageTimer::Now() - start_) * 1e9);
callback_(status_, &response_);
next_state_ = State::INVALID;
return false;
Expand Down Expand Up @@ -176,6 +174,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
for (int i = 0; i < num_async_threads_; i++) {
cli_cqs_.emplace_back(new CompletionQueue);
next_issuers_.emplace_back(NextIssuer(i));
shutdown_state_.emplace_back(new PerThreadShutdownState());
}

using namespace std::placeholders;
Expand All @@ -192,7 +191,6 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
virtual ~AsyncClient() {
for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
(*cq)->Shutdown();
void* got_tag;
bool ok;
while ((*cq)->Next(&got_tag, &ok)) {
Expand All @@ -201,20 +199,52 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
}

bool ThreadFunc(Histogram* histogram,
protected:
const int num_async_threads_;

private:
struct PerThreadShutdownState {
mutable std::mutex mutex;
bool shutdown;
PerThreadShutdownState() : shutdown(false) {}
};

int NumThreads(const ClientConfig& config) {
int num_threads = config.async_client_threads();
if (num_threads <= 0) { // Use dynamic sizing
num_threads = cores_;
gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads);
}
return num_threads;
}
void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL {
for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
std::lock_guard<std::mutex> lock((*ss)->mutex);
(*ss)->shutdown = true;
}
for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
(*cq)->Shutdown();
}
this->EndThreads(); // this needed for resolution
}

bool ThreadFunc(HistogramEntry* entry,
size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL {
void* got_tag;
bool ok;

switch (cli_cqs_[thread_idx]->AsyncNext(
&got_tag, &ok,
std::chrono::system_clock::now() + std::chrono::milliseconds(10))) {
case CompletionQueue::SHUTDOWN:
return false;
case CompletionQueue::GOT_EVENT: {
// Got a regular event, so process it
ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
if (!ctx->RunNextState(ok, histogram)) {
// Proceed while holding a lock to make sure that
// this thread isn't supposed to shut down
std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
if (shutdown_state_[thread_idx]->shutdown) {
return true;
} else if (!ctx->RunNextState(ok, entry)) {
// The RPC and callback are done, so clone the ctx
// and kickstart the new one
auto clone = ctx->StartNewClone();
Expand All @@ -224,29 +254,22 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
return true;
}
case CompletionQueue::TIMEOUT:
// TODO(ctiller): do something here to track how frequently we pass
// through this codepath.
case CompletionQueue::TIMEOUT: {
std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
if (shutdown_state_[thread_idx]->shutdown) {
return true;
}
return true;
}
case CompletionQueue::SHUTDOWN: // queue is shutting down, so we must be
// done
return true;
}
GPR_UNREACHABLE_CODE(return false);
}

protected:
const int num_async_threads_;

private:
int NumThreads(const ClientConfig& config) {
int num_threads = config.async_client_threads();
if (num_threads <= 0) { // Use dynamic sizing
num_threads = cores_;
gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads);
}
return num_threads;
}

std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
std::vector<std::function<gpr_timespec()>> next_issuers_;
std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
};

static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
Expand All @@ -262,7 +285,7 @@ class AsyncUnaryClient GRPC_FINAL
config, SetupCtx, BenchmarkStubCreator) {
StartThreads(num_async_threads_);
}
~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); }
~AsyncUnaryClient() GRPC_OVERRIDE {}

private:
static void CheckDone(grpc::Status s, SimpleResponse* response) {}
Expand Down Expand Up @@ -307,7 +330,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this));
next_state_ = State::STREAM_IDLE;
}
bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE {
while (true) {
switch (next_state_) {
case State::STREAM_IDLE:
Expand Down Expand Up @@ -339,7 +362,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
return true;
break;
case State::READ_DONE:
hist->Add((UsageTimer::Now() - start_) * 1e9);
entry->set_value((UsageTimer::Now() - start_) * 1e9);
callback_(status_, &response_);
next_state_ = State::STREAM_IDLE;
break; // loop around
Expand Down Expand Up @@ -391,7 +414,7 @@ class AsyncStreamingClient GRPC_FINAL
StartThreads(num_async_threads_);
}

~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
~AsyncStreamingClient() GRPC_OVERRIDE {}

private:
static void CheckDone(grpc::Status s, SimpleResponse* response) {}
Expand Down Expand Up @@ -439,7 +462,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
ClientRpcContext::tag(this));
next_state_ = State::STREAM_IDLE;
}
bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE {
while (true) {
switch (next_state_) {
case State::STREAM_IDLE:
Expand Down Expand Up @@ -471,7 +494,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
return true;
break;
case State::READ_DONE:
hist->Add((UsageTimer::Now() - start_) * 1e9);
entry->set_value((UsageTimer::Now() - start_) * 1e9);
callback_(status_, &response_);
next_state_ = State::STREAM_IDLE;
break; // loop around
Expand Down Expand Up @@ -527,7 +550,7 @@ class GenericAsyncStreamingClient GRPC_FINAL
StartThreads(num_async_threads_);
}

~GenericAsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
~GenericAsyncStreamingClient() GRPC_OVERRIDE {}

private:
static void CheckDone(grpc::Status s, ByteBuffer* response) {}
Expand Down
Loading

0 comments on commit b23239b

Please sign in to comment.