Skip to content

Commit

Permalink
Add logcabinctl commands to control snapshotting
Browse files Browse the repository at this point in the history
The new commands are:
  snapshot inhibit get         Print the remaining time for which the server
                               was prevented from taking snapshots.
  snapshot inhibit set [<time>]  Abort the server's current snapshot if one is
                                 in progress, and disallow the server from
                                 starting automated snapshots for the given
                                 duration [default: 1week].
  snapshot inhibit clear       Allow the server to take snapshots normally.
  snapshot start               Begin taking a snapshot if none is in progress.
  snapshot stop                Abort the current snapshot if one is in
                               progress.
  snapshot restart             Abort the current snapshot if one is in
                               progress, then begin taking a new snapshot.

Close logcabin#167: logcabinctl to disable snapshotting
  • Loading branch information
ongardie committed Jul 6, 2015
1 parent b08d420 commit a0fa053
Show file tree
Hide file tree
Showing 9 changed files with 676 additions and 34 deletions.
102 changes: 90 additions & 12 deletions Client/ServerControl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ namespace LogCabin {
namespace Client {
namespace {

using Client::Util::parseNonNegativeDuration;

/**
* Parses argv for the main function.
*/
Expand All @@ -42,7 +44,7 @@ class OptionParser {
, lastIndex(0)
, logPolicy("")
, server("localhost:5254")
, timeout(Client::Util::parseNonNegativeDuration("0s"))
, timeout(parseNonNegativeDuration("0s"))
{
while (true) {
static struct option longOptions[] = {
Expand All @@ -67,7 +69,7 @@ class OptionParser {
server = optarg;
break;
case 't':
timeout = Client::Util::parseNonNegativeDuration(optarg);
timeout = parseNonNegativeDuration(optarg);
break;
case 'v':
logPolicy = "VERBOSE";
Expand Down Expand Up @@ -97,6 +99,23 @@ class OptionParser {
return args.at(index);
}

/**
* Return all arguments at index or following it.
*/
std::string remaining(uint64_t index) {
lastIndex = args.size();
std::string r;
while (index < args.size()) {
r += args.at(index);
if (index < args.size() - 1)
r += " ";
++index;
}
if (index < args.size())
r += args.at(index);
return r;
}

/**
* Panic if are any unused arguments remain.
*/
Expand Down Expand Up @@ -163,23 +182,26 @@ class OptionParser {
<< "Rotate the server's debug log file."
<< std::endl

// TODO(ongaro): implement snapshot inhibit
#if 0
<< ospace("snapshot inhibit get")
<< "Print the remaining time for which the server"
<< std::endl << space
<< "was asked to not snapshot."
<< "was prevented from taking snapshots."
<< std::endl

<< ospace("snapshot inhibit set <time>")
<< "Stop the snapshot from snapshotting for the"
<< ospace("snapshot inhibit set [<time>]")
<< " Abort the server's current snapshot if one is"
<< std::endl << space
<< " in progress, and disallow the server from"
<< std::endl << space
<< " starting automated snapshots for the given"
<< std::endl << space
<< "given time duration."
<< " duration [default: 1week]."
<< std::endl

<< ospace("snapshot inhibit clear")
<< "Allow the server to take snapshots normally."
<< std::endl
#endif

// TODO(ongaro): implement snapshot commands
#if 0
<< ospace("snapshot start")
<< "Begin taking a snapshot if none is in progress."
<< std::endl
Expand All @@ -195,7 +217,6 @@ class OptionParser {
<< std::endl << space
<< "progress, then begin taking a new snapshot."
<< std::endl
#endif

<< ospace("stats get")
<< "Print detailed server metrics."
Expand Down Expand Up @@ -304,9 +325,21 @@ class ServerControl {
DEFINE_RPC(ServerInfoGet, SERVER_INFO_GET)
DEFINE_RPC(ServerStatsDump, SERVER_STATS_DUMP)
DEFINE_RPC(ServerStatsGet, SERVER_STATS_GET)
DEFINE_RPC(SnapshotControl, SNAPSHOT_CONTROL)
DEFINE_RPC(SnapshotInhibitGet, SNAPSHOT_INHIBIT_GET)
DEFINE_RPC(SnapshotInhibitSet, SNAPSHOT_INHIBIT_SET)

#undef DEFINE_RPC

void snapshotControl(Proto::SnapshotCommand command) {
Proto::SnapshotControl::Request request;
Proto::SnapshotControl::Response response;
request.set_command(command);
SnapshotControl(request, response);
if (response.has_error())
error(response.error());
}

ClientImpl clientImpl;
std::string server;
ClientImpl::TimePoint timeout;
Expand Down Expand Up @@ -386,6 +419,51 @@ main(int argc, char** argv)
error(response.error());
return 0;
}
} else if (options.at(0) == "snapshot") {
using Proto::SnapshotCommand;
if (options.at(1) == "start") {
options.done();
server.snapshotControl(SnapshotCommand::START_SNAPSHOT);
return 0;
} else if (options.at(1) == "stop") {
options.done();
server.snapshotControl(SnapshotCommand::STOP_SNAPSHOT);
return 0;
} else if (options.at(1) == "restart") {
options.done();
server.snapshotControl(SnapshotCommand::RESTART_SNAPSHOT);
return 0;
} else if (options.at(1) == "inhibit") {
if (options.at(2) == "get") {
options.done();
Proto::SnapshotInhibitGet::Request request;
Proto::SnapshotInhibitGet::Response response;
server.SnapshotInhibitGet(request, response);
std::chrono::nanoseconds ns(response.nanoseconds());
std::cout << ns << std::endl;
return 0;
} else if (options.at(2) == "set") {
Proto::SnapshotInhibitSet::Request request;
std::string time = options.remaining(3);
if (time.empty())
time = "1week";
request.set_nanoseconds(parseNonNegativeDuration(time));
Proto::SnapshotInhibitSet::Response response;
server.SnapshotInhibitSet(request, response);
if (response.has_error())
error(response.error());
return 0;
} else if (options.at(2) == "clear") {
options.done();
Proto::SnapshotInhibitSet::Request request;
request.set_nanoseconds(0);
Proto::SnapshotInhibitSet::Response response;
server.SnapshotInhibitSet(request, response);
if (response.has_error())
error(response.error());
return 0;
}
}
} else if (options.at(0) == "stats") {
if (options.at(1) == "get") {
options.done();
Expand Down
89 changes: 89 additions & 0 deletions Protocol/ServerControl.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ enum OpCode {
SERVER_STATS_DUMP = 7;
SERVER_STATS_GET = 8;
SNAPSHOT_CONTROL = 9;
SNAPSHOT_INHIBIT_GET = 10;
SNAPSHOT_INHIBIT_SET = 11;
};

/**
Expand Down Expand Up @@ -158,3 +160,90 @@ message ServerStatsGet {
optional LogCabin.Protocol.ServerStats server_stats = 1;
}
}

/**
* Operation specified in SnapshotControl.
*/
enum SnapshotCommand {
/**
* This must be the first value in the enum and is never assigned
* explicitly. If new values are added, old code will see them as set
* and equal to this value (though they will still serialize to the
* correct value).
*/
UNKNOWN_SNAPSHOT_COMMAND = 0;
/**
* Begin taking a snapshot if none is in progress.
*/
START_SNAPSHOT = 1;
/**
* Abort the current snapshot if one is in progress.
*/
STOP_SNAPSHOT = 2;
/**
* Abort the current snapshot if one is in progress,
* then begin taking a new snapshot.
*/
RESTART_SNAPSHOT = 3;
};

/**
* SnapshotControl RPC: Tell the server's state machine to start or abort
* taking a snapshot.
*/
message SnapshotControl {
message Request {
/**
* What to do.
*/
required SnapshotCommand command = 1;
}
message Response {
/**
* This field will be present if any error occurred and not present
* otherwise.
*/
optional string error = 1;
}
}

/**
* SnapshotInhibit RPC: Query the server's state machine to see if it has
* inhibited snapshotting.
*/
message SnapshotInhibitGet {
message Request {
}
message Response {
/**
* The number of nanoseconds for which the server will not be taking an
* automated snapshot (due to previous call to SnapshotInhibitSet).
*/
optional uint64 nanoseconds = 1;
}
}

/**
* SnapshotInhibit RPC: Tell the server's state machine to abort the current
* snapshot and not take any more (for the given amount of time).
*/
message SnapshotInhibitSet {
message Request {
/**
* If set to nonzero, abort the current snapshot, and do not take
* any more automated snapshots for the given time period.
* If set to zero, do not abort the current snapshot, and take
* automated snapshots occasionally as normally configured.
* If unset, abort the current snapshot, and do not take any more
* automated snapshots ever.
*/
optional uint64 nanoseconds = 1;
}
message Response {
/**
* This field will be present if any error occurred and not present
* otherwise.
*/
optional string error = 1;
}
}
1 change: 1 addition & 0 deletions Protocol/ServerStats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ message ServerStats {
optional uint32 running_version = 12;
optional Tree tree = 13;
optional uint64 num_unknown_requests = 14;
optional uint64 may_snapshot_at = 15;
};

/**
Expand Down
67 changes: 67 additions & 0 deletions Server/ControlService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "Server/ControlService.h"
#include "Server/Globals.h"
#include "Server/RaftConsensus.h"
#include "Server/StateMachine.h"

namespace LogCabin {
namespace Server {
Expand Down Expand Up @@ -65,6 +66,15 @@ ControlService::handleRPC(RPC::ServerRPC rpc)
case OpCode::SERVER_STATS_GET:
serverStatsGet(std::move(rpc));
break;
case OpCode::SNAPSHOT_CONTROL:
snapshotControl(std::move(rpc));
break;
case OpCode::SNAPSHOT_INHIBIT_GET:
snapshotInhibitGet(std::move(rpc));
break;
case OpCode::SNAPSHOT_INHIBIT_SET:
snapshotInhibitSet(std::move(rpc));
break;
default:
WARNING("Client sent request with bad op code (%u) to "
"ControlService", rpc.getOpCode());
Expand Down Expand Up @@ -185,6 +195,63 @@ ControlService::serverStatsGet(RPC::ServerRPC rpc)
rpc.reply(response);
}

void
ControlService::snapshotControl(RPC::ServerRPC rpc)
{
PRELUDE(SnapshotControl);
using Protocol::ServerControl::SnapshotCommand;
switch (request.command()) {
case SnapshotCommand::START_SNAPSHOT:
globals.stateMachine->startTakingSnapshot();
break;
case SnapshotCommand::STOP_SNAPSHOT:
globals.stateMachine->stopTakingSnapshot();
break;
case SnapshotCommand::RESTART_SNAPSHOT:
globals.stateMachine->stopTakingSnapshot();
globals.stateMachine->startTakingSnapshot();
break;
case SnapshotCommand::UNKNOWN_SNAPSHOT_COMMAND: // fallthrough
default:
response.set_error("Unknown SnapshotControl command");
}
rpc.reply(response);
}

void
ControlService::snapshotInhibitGet(RPC::ServerRPC rpc)
{
PRELUDE(SnapshotInhibitGet);
std::chrono::nanoseconds duration = globals.stateMachine->getInhibit();
assert(duration >= std::chrono::nanoseconds::zero());
response.set_nanoseconds(duration.count());
rpc.reply(response);
}

void
ControlService::snapshotInhibitSet(RPC::ServerRPC rpc)
{
PRELUDE(SnapshotInhibitSet);
bool abort = false;
std::chrono::nanoseconds duration;
if (request.has_nanoseconds()) {
duration = std::chrono::nanoseconds(request.nanoseconds());
if (request.nanoseconds() > 0 &&
duration < std::chrono::nanoseconds::zero()) { // overflow
duration = std::chrono::nanoseconds::max();
}
if (request.nanoseconds() == 0)
abort = false;
} else {
duration = std::chrono::nanoseconds::max();
}
globals.stateMachine->setInhibit(duration);
if (abort) {
globals.stateMachine->stopTakingSnapshot();
}
rpc.reply(response);
}


} // namespace LogCabin::Server
} // namespace LogCabin
3 changes: 3 additions & 0 deletions Server/ControlService.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class ControlService : public RPC::Service {
void serverInfoGet(RPC::ServerRPC rpc);
void serverStatsDump(RPC::ServerRPC rpc);
void serverStatsGet(RPC::ServerRPC rpc);
void snapshotControl(RPC::ServerRPC rpc);
void snapshotInhibitGet(RPC::ServerRPC rpc);
void snapshotInhibitSet(RPC::ServerRPC rpc);

/**
* The LogCabin daemon's top-level objects.
Expand Down
Loading

0 comments on commit a0fa053

Please sign in to comment.