Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge updates to refactor branch #2

Open
wants to merge 1 commit into
base: refactor
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
updates to refactor branch
  • Loading branch information
bhetherman-bbn committed Mar 5, 2022
commit 540f3b7c4aa2386ba034a2c8f1618aa9be369632
Empty file modified build.sh
100644 → 100755
Empty file.
34 changes: 24 additions & 10 deletions cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import grpc
import runtime_pb2
import runtime_pb2_grpc
import os

# import examples.vgg as vgg # TODO: this is used for debugging. Remove this later.

Expand Down Expand Up @@ -79,7 +80,7 @@ def initCommNCCL(self, message, msgType, groupId, members):
response = self.stub.InitCommNCCL(runtime_pb2.InitCommNCCLMsg(
message=message, msg_type=msgType, group_id=groupId, members=members))
print("received: " + response.message)
return response.group_id;
return response.group_id

def initCommGRPC(self, rankToIpMap):
rankToIpMapInJson = json.dumps(rankToIpMap)
Expand All @@ -103,10 +104,10 @@ def __init__(self, address: str, port: int, device: int, userId: str, sshKeyPath
self.serverId = None
self.proxy = None
self.isCpp = isCpp
self.is_local = address == "127.0.0.1"
self.is_local = (address == "127.0.0.1" or address == "localhost")
self.process = None

def getProxy(self, maxRetry = 180):
def getProxy(self, maxRetry = 360):
if self.proxy != None:
# print("getProxy() returned from cached proxy value.")
return self.proxy
Expand Down Expand Up @@ -200,11 +201,13 @@ def upSync(self, localPath, remotePath):
class ClusterCoordinator(xmlrpc.server.SimpleXMLRPCServer):
""" GPU cluster coordinator. It accepts training jobs from clients and schedule them to runtimes. """

def __init__(self, addrToBind: str, portToBind: int, locations: List[Location], workDir: str, be_batch_size: int):
def __init__(self, addrToBind: str, portToBind: int, locations: List[Location], workDir: str, be_batch_size: int, localhost:bool):
super(ClusterCoordinator, self).__init__((addrToBind, portToBind))
self.myAddr = addrToBind
self.myPort = portToBind
self.locations = locations
self.worldSize = len(locations)
self.localhost = localhost
self.workDir = workDir
self.processes = [] # from subprocess calls used for launching runtime.
self.nextTagStartOffset = 1
Expand Down Expand Up @@ -233,7 +236,7 @@ def export_poke(self):
return 'Returned from poke at %s' % self.myAddr

def export_scheduleTraining(self, jobName: str, trainingJobInJSON: str, runbe):
job = TrainingJob("test", None, None, 0, 0, "")
job = TrainingJob("test", None, None, 0, 0, 0, "")
job.loadJSON(trainingJobInJSON)
print("received job")

Expand All @@ -257,11 +260,15 @@ def export_scheduleTraining(self, jobName: str, trainingJobInJSON: str, runbe):
if len(self.locations) < gpusUsed:
return "Not enough servers available. %d gpus available while %d needed" % (len(self.locations), gpusUsed)

lfn = "NLL"
if "gpt2" in jobName:
lfn = "CrossEntropyLoss"
jobParams = {
"run_with_be": runbe,
"nr_gpus": gpusUsed,
"cifar_training": "cifar" in jobName,
"lossfn": "CrossEntropyLoss" if "gpt2" in jobName else "NLL",
"lossfn": lfn,
"epochsToTrain": 100
}

jobParamsInJson = json.dumps(jobParams)
Expand All @@ -277,7 +284,7 @@ def requestScheduleTraining(proxy, jobInJson):
thread.start()
waitthreads(threadList)

self.ongoingJobs[jobName] = {"iterTime": 0, "gpuMsec": 0, "gpusUsed": gpusUsed, "gpusFinished": 0, "globalBatchSize": job.globalBatchSize}
self.ongoingJobs[jobName] = {"iterTime": 0, "gpuMsec": 0, "gpusUsed": gpusUsed, "gpusFinished": 0, "globalBatchSize": job.globalBatchSize, "lossfn": job.lossfn}
self.ongoingJobs[jobName].update({"beImagesPerIter": 0.0, "idleMsPerIter": 0.0})

# for rank in range(gpusUsed):
Expand Down Expand Up @@ -518,7 +525,11 @@ def parse_args():
help="To launch CPP version runtimes.")
parser.add_argument('--manualLaunch', default=False, action='store_true',
help="Do not runtimes automatically. Primarily for using gdb on runtime processes.")
parser.add_argument("--logdir", type=str, default="", help="Full path of log directory")
parser.add_argument("--localhost", type=str, default=True,
help="Run cluster on local host only")
parser.add_argument("--logdir", type=str, default=os.getcwd(),
help="Run cluster on local host only")
# parser.add_argument("--logdir", type=str, default="", help="Full path of log directory")
# For installing nsys.. (with other cuda toolkit..)
# wget https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64/cuda-ubuntu1804.pin
# sudo mv cuda-ubuntu1804.pin /etc/apt/preferences.d/cuda-repository-pin-600
Expand All @@ -542,12 +553,15 @@ def main():
for deviceConfig in serverConfig["deviceList"]:
rankToIpMap[str(len(locations))] = serverConfig["addr"] + ":" + str(deviceConfig["port"])
commGrpRanksWorld.append(len(locations))
locations.append(Location(serverConfig["addr"], deviceConfig["port"], deviceConfig["device"], serverConfig["userId"], serverConfig["sshKeyPath"], args.cpp))
if args.localhost:
locations.append(Location(serverConfig["addr"], deviceConfig["port"], deviceConfig["device"], serverConfig["userId"], None, args.cpp))
else:
locations.append(Location(serverConfig["addr"], deviceConfig["port"], deviceConfig["device"], serverConfig["userId"], None, args.cpp))
addrToBindCombo = re.split('[-:]', args.addrToBind)
addrToBind = addrToBindCombo[0]
portToBind = int(addrToBindCombo[1])

coordinator = ClusterCoordinator(addrToBind, portToBind, locations, clusterConfig["workDir"], args.be_batch_size)
coordinator = ClusterCoordinator(addrToBind, portToBind, locations, clusterConfig["workDir"], args.be_batch_size, args.localhost)
if args.install:
coordinator.installPackages()

Expand Down
2 changes: 1 addition & 1 deletion csrc/CUDAGraph.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include <ATen/cuda/CUDAGeneratorImpl.h>
#include <ATen/CUDAGeneratorImpl.h>
#include <ATen/cuda/Exceptions.h>
#include <ATen/Functions.h>
#include <c10/cuda/CUDACachingAllocator.h>
Expand Down
70 changes: 54 additions & 16 deletions csrc/JobContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ JobContext::JobContext(std::unique_ptr<RunnableModule> modelIn,
if (job_params.contains("epochs_to_train"))
epochsToTrain = job_params["epochs_to_train"].get<size_t>();

train_dataset_.reset(
Dataset::fromName(dset, rtctx->rank, model->globalBatchSize,
model->initialBatchSizes, model->sampleIndices, 2000));
eval_dataset_.reset(
Dataset::fromName(dset + "_eval", rtctx->rank, model->globalBatchSize,
model->initialBatchSizes, model->sampleIndices, 10));
dataset_pipeline_.reset(new DatasetPipelineWrapper(train_dataset_));
train_dataset_ = Dataset::fromName(dset, rtctx->rank, model->globalBatchSize,
model->initialBatchSizes, model->sampleIndices, 2000, rtctx->worldSize);

eval_dataset_ = Dataset::fromName(dset + "_eval", rtctx->rank, model->globalBatchSize,
model->initialBatchSizes, model->sampleIndices, 10, rtctx->worldSize);

dataset_pipeline_ = std::make_shared<DatasetPipelineWrapper>(train_dataset_);

if (!rtctx->use_fg_graph)
iters_before_graph_capture = itersToTrain * epochsToTrain;
Expand Down Expand Up @@ -158,11 +158,12 @@ void JobContext::StepOne(bool *iter_done) {
job_done_ = true;
return;
}
++totiters;
if (!graphCapture)
++totiters;
}
}

void JobContext::Test() {
void JobContext::Test(int64_t curEpoch) {
double total = 0.0;
torch::Tensor correct = torch::zeros({1}).to(at::kLong).to(rtctx->c10dev);

Expand All @@ -176,12 +177,12 @@ void JobContext::Test() {
total += model->GetGlobalBatchSize();

auto batch = eval_dataset_->getNextThisRank();
torch::Tensor input = batch.data;
torch::Tensor input = batch["data"];
if (input.defined()) input = input.to(rtctx->c10dev);
auto output = Infer(input);
if (output.defined() && output.nbytes() > 0) {
auto pred = output.argmax(1);
correct += pred.eq(batch.target.to(rtctx->c10dev)).sum();
correct += pred.eq(batch["target"].to(rtctx->c10dev)).sum();
}
DP_LOG(DEBUG, "Evaluate iteration %lu/%lu\n", ++i,
eval_dataset_->GetItersPerEpoch());
Expand All @@ -204,29 +205,62 @@ void JobContext::Test() {
}

torch::Tensor JobContext::Infer(torch::Tensor input) {
bool will_do_graph_capture = false;
if(totiters == iters_before_graph_capture && !model->has_graph)
will_do_graph_capture = true;

torch::NoGradGuard guard;
model->SetEval();
model->SetInputsTargets(input, {});
model->SetInputsTargets(input);
FinishIteration();

if(will_do_graph_capture){
//model doesnt actuall run when the graph is captured
model->SetInputsTargets(input);
FinishIteration();
}
return model->getOutput();
}

void JobContext::Train(torch::Tensor input, torch::Tensor target) {
void JobContext::Train(torch::Tensor input, torch::Tensor target, torch::Tensor weights) {
bool will_do_graph_capture = false;
if(totiters == iters_before_graph_capture && !model->has_graph)
will_do_graph_capture = true;

model->SetTrain();
model->SetInputsTargets(input, target);
model->SetInputsTargets(input, target, weights);
FinishIteration();

if(will_do_graph_capture){
//model doesnt actuall run when the graph is captured
model->SetInputsTargets(input, target, weights);
FinishIteration();
}
}

void JobContext::TrainOneEpoch() {
void JobContext::TrainOneEpoch(int64_t curEpochh) {
dataset_pipeline_->Reset();
model->ResetAvgLoss();
size_t i = 0;
if (iters_before_graph_capture < totiters && rtctx->use_fg_graph)
iters_before_graph_capture = totiters + 5;
while (!dataset_pipeline_->IsDone() && !job_done_) {
auto batch = dataset_pipeline_->getNextThisRank();
Train(batch.data, batch.target);
if(batch.find("data") == batch.end())
continue;

Train(batch["data"], batch["target"], batch["weight"]);
DP_LOG(DEBUG, "Training iteration %lu/%lu\n", ++i,
dataset_pipeline_->GetItersPerEpoch());

if (nr_gpus_ > 1) {
rtctx->torch_stream.synchronize(); // sync before calling into NCCL
commHandler->comm_start();
auto tmp_sync = torch::zeros({1}).to(rtctx->c10dev);;
commHandler->all_reduce(tmp_sync, c10d::ReduceOp::SUM);
commHandler->comm_end();
commHandler->sync();
}
}
double loss = model->GetAvgLoss();
DP_LOG(DEBUG, "Epoch done. Loss %.2f", loss);
Expand All @@ -251,3 +285,7 @@ void JobContext::FinishIteration() {
StepOne(&iter_done);
} while (!iter_done && !job_done_);
}

size_t JobContext::getTrainItersPerEpoch(){
return train_dataset_->GetItersPerEpoch();
};
13 changes: 9 additions & 4 deletions csrc/JobContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,12 @@ class JobContext {
torch::Tensor Infer(torch::Tensor input);

/* Run one training iteration with this input/target */
void Train(torch::Tensor input, torch::Tensor target);
void Train(torch::Tensor input,
torch::Tensor target,
torch::Tensor weights = {});

/* Test the model on the test dataset */
void Test();
void Test(int64_t curEpoch = -1);

/* Advance one step through the the model */
void StepOne(bool *iter_done);
Expand All @@ -61,16 +63,19 @@ class JobContext {
void FinishIteration();

/* Train one full epoch */
void TrainOneEpoch();
void TrainOneEpoch(int64_t curEpoch = -1);

void printJobStatistics();

std::unique_ptr<RunnableModule> model;
std::shared_ptr<RunnableModule> model;
std::string name;
std::shared_ptr<CommunicationHandler> commHandler;
std::chrono::time_point<std::chrono::steady_clock> start, end;
uint64_t be_img_start, be_img_end;

size_t getTrainItersPerEpoch();
size_t getWarmupIters(){return warmupIters;};

private:
// Params
bool run_with_be_{false};
Expand Down
4 changes: 3 additions & 1 deletion csrc/communication.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include "rpcService.h"
#include "utils.h"

#define SIZE_16_MiB 16*1024*1024

using json = nlohmann::json;

class CommunicationHandler {
Expand Down Expand Up @@ -130,4 +132,4 @@ class CommunicationHandlerGRPC : public CommunicationHandler {
std::unordered_map<int, std::unique_ptr<RuntimeClient> > clientPool;
};

#endif
#endif
Loading