Skip to content

Commit

Permalink
Merge github.com:grpc/grpc into cpparena
Browse files Browse the repository at this point in the history
  • Loading branch information
ctiller committed Apr 19, 2017
2 parents 0dd38b5 + feaee85 commit 06a30ee
Show file tree
Hide file tree
Showing 76 changed files with 732 additions and 766 deletions.
1 change: 0 additions & 1 deletion BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -1362,7 +1362,6 @@ grpc_cc_library(
"include/grpc++/impl/codegen/slice.h",
"include/grpc++/impl/codegen/status.h",
"include/grpc++/impl/codegen/status_code_enum.h",
"include/grpc++/impl/codegen/status_helper.h",
"include/grpc++/impl/codegen/string_ref.h",
"include/grpc++/impl/codegen/stub_options.h",
"include/grpc++/impl/codegen/sync_stream.h",
Expand Down
4 changes: 0 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2456,7 +2456,6 @@ foreach(_hdr
include/grpc++/impl/codegen/slice.h
include/grpc++/impl/codegen/status.h
include/grpc++/impl/codegen/status_code_enum.h
include/grpc++/impl/codegen/status_helper.h
include/grpc++/impl/codegen/string_ref.h
include/grpc++/impl/codegen/stub_options.h
include/grpc++/impl/codegen/sync_stream.h
Expand Down Expand Up @@ -2849,7 +2848,6 @@ foreach(_hdr
include/grpc++/impl/codegen/slice.h
include/grpc++/impl/codegen/status.h
include/grpc++/impl/codegen/status_code_enum.h
include/grpc++/impl/codegen/status_helper.h
include/grpc++/impl/codegen/string_ref.h
include/grpc++/impl/codegen/stub_options.h
include/grpc++/impl/codegen/sync_stream.h
Expand Down Expand Up @@ -3239,7 +3237,6 @@ foreach(_hdr
include/grpc++/impl/codegen/slice.h
include/grpc++/impl/codegen/status.h
include/grpc++/impl/codegen/status_code_enum.h
include/grpc++/impl/codegen/status_helper.h
include/grpc++/impl/codegen/string_ref.h
include/grpc++/impl/codegen/stub_options.h
include/grpc++/impl/codegen/sync_stream.h
Expand Down Expand Up @@ -3547,7 +3544,6 @@ foreach(_hdr
include/grpc++/impl/codegen/slice.h
include/grpc++/impl/codegen/status.h
include/grpc++/impl/codegen/status_code_enum.h
include/grpc++/impl/codegen/status_helper.h
include/grpc++/impl/codegen/string_ref.h
include/grpc++/impl/codegen/stub_options.h
include/grpc++/impl/codegen/sync_stream.h
Expand Down
4 changes: 0 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4310,7 +4310,6 @@ PUBLIC_HEADERS_CXX += \
include/grpc++/impl/codegen/slice.h \
include/grpc++/impl/codegen/status.h \
include/grpc++/impl/codegen/status_code_enum.h \
include/grpc++/impl/codegen/status_helper.h \
include/grpc++/impl/codegen/string_ref.h \
include/grpc++/impl/codegen/stub_options.h \
include/grpc++/impl/codegen/sync_stream.h \
Expand Down Expand Up @@ -4711,7 +4710,6 @@ PUBLIC_HEADERS_CXX += \
include/grpc++/impl/codegen/slice.h \
include/grpc++/impl/codegen/status.h \
include/grpc++/impl/codegen/status_code_enum.h \
include/grpc++/impl/codegen/status_helper.h \
include/grpc++/impl/codegen/string_ref.h \
include/grpc++/impl/codegen/stub_options.h \
include/grpc++/impl/codegen/sync_stream.h \
Expand Down Expand Up @@ -5094,7 +5092,6 @@ PUBLIC_HEADERS_CXX += \
include/grpc++/impl/codegen/slice.h \
include/grpc++/impl/codegen/status.h \
include/grpc++/impl/codegen/status_code_enum.h \
include/grpc++/impl/codegen/status_helper.h \
include/grpc++/impl/codegen/string_ref.h \
include/grpc++/impl/codegen/stub_options.h \
include/grpc++/impl/codegen/sync_stream.h \
Expand Down Expand Up @@ -5407,7 +5404,6 @@ PUBLIC_HEADERS_CXX += \
include/grpc++/impl/codegen/slice.h \
include/grpc++/impl/codegen/status.h \
include/grpc++/impl/codegen/status_code_enum.h \
include/grpc++/impl/codegen/status_helper.h \
include/grpc++/impl/codegen/string_ref.h \
include/grpc++/impl/codegen/stub_options.h \
include/grpc++/impl/codegen/sync_stream.h \
Expand Down
1 change: 0 additions & 1 deletion build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,6 @@ filegroups:
- include/grpc++/impl/codegen/slice.h
- include/grpc++/impl/codegen/status.h
- include/grpc++/impl/codegen/status_code_enum.h
- include/grpc++/impl/codegen/status_helper.h
- include/grpc++/impl/codegen/string_ref.h
- include/grpc++/impl/codegen/stub_options.h
- include/grpc++/impl/codegen/sync_stream.h
Expand Down
3 changes: 1 addition & 2 deletions include/grpc++/impl/codegen/call.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
#include <grpc++/impl/codegen/serialization_traits.h>
#include <grpc++/impl/codegen/slice.h>
#include <grpc++/impl/codegen/status.h>
#include <grpc++/impl/codegen/status_helper.h>
#include <grpc++/impl/codegen/string_ref.h>

#include <grpc/impl/codegen/atm.h>
Expand Down Expand Up @@ -470,7 +469,7 @@ class CallOpServerSendStatus {
trailing_metadata_ = FillMetadataArray(
trailing_metadata, &trailing_metadata_count_, send_error_details_);
send_status_available_ = true;
send_status_code_ = static_cast<grpc_status_code>(GetCanonicalCode(status));
send_status_code_ = static_cast<grpc_status_code>(status.error_code());
send_error_message_ = status.error_message();
}

Expand Down
47 changes: 0 additions & 47 deletions include/grpc++/impl/codegen/status_helper.h

This file was deleted.

5 changes: 1 addition & 4 deletions include/grpc++/server_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,7 @@ class ServerBuilder {

struct SyncServerSettings {
SyncServerSettings()
: num_cqs(1),
min_pollers(1),
max_pollers(INT_MAX),
cq_timeout_msec(1000) {}
: num_cqs(1), min_pollers(1), max_pollers(2), cq_timeout_msec(10000) {}

// Number of server completion queues to create to listen to incoming RPCs.
int num_cqs;
Expand Down
2 changes: 2 additions & 0 deletions src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,7 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx,
glb_policy->base.interested_parties,
GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD,
&host, glb_policy->deadline, NULL);
grpc_slice_unref_internal(exec_ctx, host);

grpc_metadata_array_init(&glb_policy->lb_initial_metadata_recv);
grpc_metadata_array_init(&glb_policy->lb_trailing_metadata_recv);
Expand Down Expand Up @@ -1293,6 +1294,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg,
"Received empty server list. Picks will stay pending until a "
"response with > 0 servers is received");
}
grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
}
} else { /* serverlist == NULL */
gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.",
Expand Down
1 change: 1 addition & 0 deletions src/core/tsi/ssl_transport_security.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include <ws2tcpip.h>
#else
#include <arpa/inet.h>
#include <sys/socket.h>
#endif

#include <grpc/support/alloc.h>
Expand Down
11 changes: 7 additions & 4 deletions src/cpp/server/server_cc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -328,14 +328,18 @@ class Server::SyncRequestThreadManager : public ThreadManager {
}
}

void ShutdownAndDrainCompletionQueue() {
void Shutdown() override {
server_cq_->Shutdown();
ThreadManager::Shutdown();
}

void Wait() override {
ThreadManager::Wait();
// Drain any pending items from the queue
void* tag;
bool ok;
while (server_cq_->Next(&tag, &ok)) {
// Nothing to be done here
// Do nothing
}
}

Expand Down Expand Up @@ -415,7 +419,7 @@ Server::~Server() {
} else if (!started_) {
// Shutdown the completion queues
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->ShutdownAndDrainCompletionQueue();
(*it)->Shutdown();
}
}
}
Expand Down Expand Up @@ -579,7 +583,6 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
// Wait for threads in all ThreadManagers to terminate
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Wait();
(*it)->ShutdownAndDrainCompletionQueue();
}

// Drain the shutdown queue (if the previous call to AsyncNext() timed out
Expand Down
114 changes: 56 additions & 58 deletions src/cpp/thread_manager/thread_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,80 +98,78 @@ void ThreadManager::MarkAsCompleted(WorkerThread* thd) {
}

void ThreadManager::CleanupCompletedThreads() {
std::unique_lock<std::mutex> lock(list_mu_);
for (auto thd = completed_threads_.begin(); thd != completed_threads_.end();
thd = completed_threads_.erase(thd)) {
delete *thd;
std::list<WorkerThread*> completed_threads;
{
// swap out the completed threads list: allows other threads to clean up
// more quickly
std::unique_lock<std::mutex> lock(list_mu_);
completed_threads.swap(completed_threads_);
}
for (auto thd : completed_threads) delete thd;
}

void ThreadManager::Initialize() {
for (int i = 0; i < min_pollers_; i++) {
MaybeCreatePoller();
}
}

// If the number of pollers (i.e threads currently blocked in PollForWork()) is
// less than max threshold (i.e max_pollers_) and the total number of threads is
// below the maximum threshold, we can let the current thread continue as poller
bool ThreadManager::MaybeContinueAsPoller() {
std::unique_lock<std::mutex> lock(mu_);
if (shutdown_ || num_pollers_ > max_pollers_) {
return false;
{
std::unique_lock<std::mutex> lock(mu_);
num_pollers_ = min_pollers_;
num_threads_ = min_pollers_;
}

num_pollers_++;
return true;
}

// Create a new poller if the current number of pollers i.e num_pollers_ (i.e
// threads currently blocked in PollForWork()) is below the threshold (i.e
// min_pollers_) and the total number of threads is below the maximum threshold
void ThreadManager::MaybeCreatePoller() {
std::unique_lock<std::mutex> lock(mu_);
if (!shutdown_ && num_pollers_ < min_pollers_) {
num_pollers_++;
num_threads_++;

for (int i = 0; i < min_pollers_; i++) {
// Create a new thread (which ends up calling the MainWorkLoop() function
new WorkerThread(this);
}
}

void ThreadManager::MainWorkLoop() {
void* tag;
bool ok;

/*
1. Poll for work (i.e PollForWork())
2. After returning from PollForWork, reduce the number of pollers by 1. If
PollForWork() returned a TIMEOUT, then it may indicate that we have more
polling threads than needed. Check if the number of pollers is greater
than min_pollers and if so, terminate the thread.
3. Since we are short of one poller now, see if a new poller has to be
created (i.e see MaybeCreatePoller() for more details)
4. Do the actual work (DoWork())
5. After doing the work, see it this thread can resume polling work (i.e
see MaybeContinueAsPoller() for more details) */
do {
while (true) {
void* tag;
bool ok;
WorkStatus work_status = PollForWork(&tag, &ok);

{
std::unique_lock<std::mutex> lock(mu_);
num_pollers_--;

if (work_status == TIMEOUT && num_pollers_ > min_pollers_) {
std::unique_lock<std::mutex> lock(mu_);
// Reduce the number of pollers by 1 and check what happened with the poll
num_pollers_--;
bool done = false;
switch (work_status) {
case TIMEOUT:
// If we timed out and we have more pollers than we need (or we are
// shutdown), finish this thread
if (shutdown_ || num_pollers_ > max_pollers_) done = true;
break;
case SHUTDOWN:
// If the thread manager is shutdown, finish this thread
done = true;
break;
case WORK_FOUND:
// If we got work and there are now insufficient pollers, start a new
// one
if (!shutdown_ && num_pollers_ < min_pollers_) {
num_pollers_++;
num_threads_++;
// Drop lock before spawning thread to avoid contention
lock.unlock();
new WorkerThread(this);
} else {
// Drop lock for consistency with above branch
lock.unlock();
}
// Lock is always released at this point - do the application work
DoWork(tag, ok);
// Take the lock again to check post conditions
lock.lock();
// If we're shutdown, we should finish at this point.
if (shutdown_) done = true;
break;
}
}

// Note that MaybeCreatePoller does check for shutdown and creates a new
// thread only if ThreadManager is not shutdown
if (work_status == WORK_FOUND) {
MaybeCreatePoller();
DoWork(tag, ok);
}
} while (MaybeContinueAsPoller());
// If we decided to finish the thread, break out of the while loop
if (done) break;
// ... otherwise increase poller count and continue
// There's a chance that we'll exceed the max poller count: that is
// explicitly ok - we'll decrease after one poll timeout, and prevent
// some thrashing starting up and shutting down threads
num_pollers_++;
};

CleanupCompletedThreads();

Expand Down
12 changes: 2 additions & 10 deletions src/cpp/thread_manager/thread_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,14 @@ class ThreadManager {
// Mark the ThreadManager as shutdown and begin draining the work. This is a
// non-blocking call and the caller should call Wait(), a blocking call which
// returns only once the shutdown is complete
void Shutdown();
virtual void Shutdown();

// Has Shutdown() been called
bool IsShutdown();

// A blocking call that returns only after the ThreadManager has shutdown and
// all the threads have drained all the outstanding work
void Wait();
virtual void Wait();

private:
// Helper wrapper class around std::thread. This takes a ThreadManager object
Expand All @@ -122,14 +122,6 @@ class ThreadManager {
// The main funtion in ThreadManager
void MainWorkLoop();

// Create a new poller if the number of current pollers is less than the
// minimum number of pollers needed (i.e min_pollers).
void MaybeCreatePoller();

// Returns true if the current thread can resume as a poller. i.e if the
// current number of pollers is less than the max_pollers.
bool MaybeContinueAsPoller();

void MarkAsCompleted(WorkerThread* thd);
void CleanupCompletedThreads();

Expand Down
Loading

0 comments on commit 06a30ee

Please sign in to comment.