Skip to content

Commit

Permalink
a new class address_book will takes all nodes information
Browse files Browse the repository at this point in the history
mli committed Dec 24, 2013
1 parent 31de0d5 commit cd94994
Showing 9 changed files with 1,251 additions and 1,057 deletions.
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@ GTEST = -lgtest_main -lgtest
OBJECTS = \
./proto/nodemgt.pb.o \
./proto/header.pb.o \
./proto/command.pb.o \
./proto/express.pb.o \
./util/crc32c.o \
./util/key.o \
./util/mail.o \
6 changes: 3 additions & 3 deletions src/proto/command.proto → src/proto/express.proto
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// commands between nodes

message Command {
enum CommandId {
message Express {
enum Command {
ASSIGN_OBJ_ID = 1;
}

required CommandId command_id = 1;
required Command command = 1;
required int32 sender = 2;
required int32 recver = 3;
required int32 seq_id = 4;
118 changes: 118 additions & 0 deletions src/system/address_book.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#include "system/address_book.h"

namespace PS {

DEFINE_int32(my_rank, 0, "my rank id, continous integer from 0");
DEFINE_string(my_type, "server", "type of my node, client, or server");
DEFINE_int32(num_server, 1, "number of servers");
DEFINE_int32(num_client, 1, "number of clients");
DEFINE_string(server_address,
"tcp://localhost:7100,tcp://localhost:9102,tcp://localhost:7004,tcp://localhost:7006,tcp://localhost:7008,tcp://localhost:7010",
"address of servers");
DEFINE_string(client_address,
"tcp://localhost:6050,tcp://localhost:6012,tcp://localhost:6004,tcp://localhost:6006,tcp://localhost:6008,tcp://localhost:6010",
"address of clients");

string AddressBook::DebugString() {
std::stringstream ss;
ss << "#client: " << FLAGS_num_client
<< ", #server: " << FLAGS_num_server
<< ", my rank: " << FLAGS_my_rank
<< ", my type: " << FLAGS_my_type;
return ss.str();
}

void AddressBook::InitNodes() {
num_server_ = FLAGS_num_server;
num_client_ = FLAGS_num_client;
std::vector<string> s_addr = split(FLAGS_server_address, ',');
std::vector<string> c_addr = split(FLAGS_client_address, ',');
CHECK_GE(s_addr.size(), num_server_)
<< "#address in " << FLAGS_server_address << " is less than num_server";
CHECK_GE(c_addr.size(), num_client_)
<< "#address in " << FLAGS_client_address << " is less than num_client";
Node node;
for (size_t i = 0; i < num_server_; ++i) {
// if (IamBackupProcess()) {
// if (i == FLAGS_failed_node_id)
// continue;
// }
// network address
string mail_addr = s_addr[i];
std::vector<string> part = split(mail_addr, ':');
// use data_port + 1 to send cmd
int port = std::stoi(part.back());
port ++;
string cmd_addr;
for (size_t j = 0; j < part.size(); j++) {
if (j != part.size() - 1 )
cmd_addr += (part[j] + ':');
else
cmd_addr += std::to_string(port);
}
node.Init(Node::kTypeServer, i, mail_addr, cmd_addr);
// insert into node groups
uid_t uid = node.uid();
all_[uid] = node;
group_.servers()->push_back(uid);
group_.all()->push_back(uid);
if (i==0) group_.set_root(uid);
}
// client nodes
for (size_t i = 0; i < num_client_; ++i) {
string mail_addr = c_addr[i];
std::vector<string> part = split(mail_addr, ':');
// use data_port + 1 to send cmd
int port = std::stoi(part.back());
port ++;
string cmd_addr;
for (size_t j = 0; j < part.size(); j++) {
if (j != part.size() - 1)
cmd_addr += (part[j] + ':');
else
cmd_addr += std::to_string(port);
}
node.Init(Node::kTypeClient, i, mail_addr, cmd_addr);
// insert into node groups
uid_t uid = node.uid();
all_[uid] = node;
group_.clients()->push_back(uid);
group_.all()->push_back(uid);
}

my_uid_ = Node::GetUid(FLAGS_my_type, FLAGS_my_rank);
CHECK(all_.find(my_uid_) != all_.end())
<< "there is no my_node [" << my_uid_ << "] info. "
<< DebugString();
}

void AddressBook::InitVans() {
// data connections
package_van_ = new Van();
CHECK(package_van_->Init());
CHECK(package_van_->Bind(my_node(), 0));

if (my_node().is_client()) {
for (auto id : *group_.servers())
CHECK(package_van_->Connect(all_[id], 0));
} else {
// connect to all
for (auto id : *group_.all()) {
if (id != my_uid()) // TODO no if
CHECK(package_van_->Connect(all_[id], 0));
}
}
// control connections
express_van_ = new Van();
CHECK(express_van_->Init());
CHECK(express_van_->Bind(my_node(), 1));
if (IamRoot()) {
for (auto id : *group_.all()) {
CHECK(express_van_->Connect(all_[id], 1));
}
} else {
CHECK(express_van_->Connect(root(), 1));
}
}

} // namespace PS
48 changes: 48 additions & 0 deletions src/system/address_book.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#pragma once

#include "util/common.h"
#include "system/node_group.h"
#include "system/van.h"

DECLARE_int32(num_server);
DECLARE_int32(num_client);
DECLARE_int32(my_rank);
DECLARE_string(my_type);

namespace PS {

class AddressBook {
public:
AddressBook() { }
void Init() {
InitNodes();
InitVans();
}

Node& my_node() { return all_[my_uid_]; };
uid_t my_uid() { return my_uid_; }

bool IamRoot() { return my_uid_ == 0; }
Node& root() { return all_[0]; }

private:
string DebugString();
void InitNodes();
void InitVans();
size_t num_server_;
size_t num_client_;
// all availabe clients and servers
NodeGroup group_;
// this is the ground true of all nodes in this
// system. if a node dies, or a new node comming, modify the information
// stored here.
map<uid_t, Node> all_;

uid_t my_uid_;

Van* package_van_;
Van* express_van_;
};


} // namespace PS
Loading
Oops, something went wrong.

0 comments on commit cd94994

Please sign in to comment.