Skip to content

Commit

Permalink
Merge pull request sharathvemula#2 from sharathvemula/newWaffle
Browse files Browse the repository at this point in the history
Fully functional new Waffle
  • Loading branch information
sharathvemula authored Feb 8, 2023
2 parents 54fdfe6 + 07194d8 commit 4593967
Show file tree
Hide file tree
Showing 17 changed files with 736,963 additions and 31 deletions.
Binary file modified waffleClient/bin/proxy_benchmark
Binary file not shown.
Binary file modified waffleClient/bin/proxy_server
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
99,443 changes: 99,443 additions & 0 deletions waffleClient/data/TueFeb716:54/1

Large diffs are not rendered by default.

218,661 changes: 218,661 additions & 0 deletions waffleClient/data/TueFeb717:22/1

Large diffs are not rendered by default.

202,871 changes: 202,871 additions & 0 deletions waffleClient/data/TueFeb717:39/1

Large diffs are not rendered by default.

215,941 changes: 215,941 additions & 0 deletions waffleClient/data/TueFeb717:43/1

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions waffleClient/libclient/src/async_proxy_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ std::vector<std::string> async_proxy_client::get_batch(const std::vector<std::st
}
seq_id_.__set_client_seq_no(sequence_num++);
//std::cout << "Entering async_proxy_client.cpp line " << __LINE__ << std::endl;
std::cout << "client get_batch client ID is " << seq_id_.client_id << std::endl;
// std::cout << "client get_batch client ID is " << seq_id_.client_id << std::endl;
client_->async_get_batch(seq_id_, keys);
requests_->push(GET_BATCH);
while (requests_->size() > 63){
Expand All @@ -86,7 +86,7 @@ void async_proxy_client::put_batch(const std::vector<std::string> &keys, const s
m_cond_->wait(mlock);
}
seq_id_.__set_client_seq_no(sequence_num++);
std::cout << "client put_batch client ID is " << seq_id_.client_id << std::endl;
// std::cout << "client put_batch client ID is " << seq_id_.client_id << std::endl;
client_->async_put_batch(seq_id_, keys, values);
requests_->push(PUT_BATCH);
}
Expand Down
5 changes: 5 additions & 0 deletions waffleClient/proxy/src/Cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ std::string Cache::getValue(std::string key){
return val;
}

std::string Cache::getValueWithoutPositionChange(std::string key){
if (checkIfKeyExists(key) == false) return "";
return cacheMap[key]->second;
}

void Cache::insertIntoCache(std::string key, std::string value) {
if (cacheMap.count(key) == 0)
{
Expand Down
1 change: 1 addition & 0 deletions waffleClient/proxy/src/Cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class Cache {
int size();
bool checkIfKeyExists(std::string key);
std::string getValue(std::string key);
std::string getValueWithoutPositionChange(std::string key);
void insertIntoCache(std::string key, std::string value);
std::vector<std::string> evictLRElementFromCache();
};
Expand Down
60 changes: 35 additions & 25 deletions waffleClient/proxy/src/waffle_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ std::string gen_random(const int len) {

void waffle_proxy::init(const std::vector<std::string> &keys, void ** args){
std::unordered_set<std::string> allKeys;
std::unordered_set<std::string> tempFakeKeys;
realBst = FrequencySmoother();
fakeBst = FrequencySmoother();
std::vector<std::string> keysCache;
std::vector<std::string> keysCacheUnencrypted;

if (server_type_ == "redis") {
std::cout << "Storage interface is starting with Redis DB " << std::endl;
Expand Down Expand Up @@ -64,6 +66,7 @@ void waffle_proxy::init(const std::vector<std::string> &keys, void ** args){
threads_.push_back(std::thread(&waffle_proxy::responder_thread, this));

//Adding the data to Database
std::cout << "Keys size in init() is " << keys.size() << std::endl;
for(auto& it : keys) {
allKeys.insert(it);
realBst.insert(it);
Expand All @@ -73,39 +76,39 @@ void waffle_proxy::init(const std::vector<std::string> &keys, void ** args){

// Initialising Cache
size_t cacheCapacity = R+s;
// std::sample(keys.begin(), keys.end(), std::back_inserter(keysCache), cacheCapacity, std::mt19937{std::random_device{}()});
//std::mt19937_64 random_engine{std::random_device{}()};
//std::ranges::sample(keys, std::back_inserter(keysCache), cacheCapacity, random_engine);
std::unordered_set<std::string> temp;
while(keysCache.size() < cacheCapacity) {
while(keysCacheUnencrypted.size() < cacheCapacity) {
auto index = rand()%keys.size();
if(temp.find(keys[index]) == temp.end()) {
temp.insert(keys[index]);
keysCacheUnencrypted.push_back(keys[index]);
keysCache.push_back(encryption_engine_.encrypt(keys[index] + "#" + std::to_string(realBst.getFrequency(keys[index]))));
}
}
auto valuesCache = storage_interface_->get_batch(keysCache);
for(int i=0;i<valuesCache.size(); ++i) {
valuesCache[i] = encryption_engine_.decrypt(valuesCache[i]);
}
cache = Cache(keysCache, valuesCache, cacheCapacity);
cache = Cache(keysCache, valuesCache, cacheCapacity+R);



std::vector<std::string> fakeValues;
std::vector<std::string> fakeKeys;
for(int i = 0; i < m; ) {
for(int i=0; i < m; ) {
auto fakeKey = gen_random(rand()%15);
if(allKeys.find(fakeKey) == allKeys.end()) {
if(allKeys.find(fakeKey) == allKeys.end() && tempFakeKeys.find(fakeKey)==tempFakeKeys.end()) {
++i;
fakeBst.insert(fakeKey);
fakeKeys.push_back(encryption_engine_.encrypt(fakeKey + '#' + std::to_string(fakeBst.getFrequency(fakeKey))));
fakeValues.push_back(encryption_engine_.encrypt(gen_random(rand()%15)));
tempFakeKeys.insert(fakeKey);
auto tempFakeKey = encryption_engine_.encrypt(fakeKey + '#' + std::to_string(fakeBst.getFrequency(fakeKey)));
auto fakeKeyValue = encryption_engine_.encrypt(gen_random(rand()%15));
fakeKeys.push_back(tempFakeKey);
fakeValues.push_back(fakeKeyValue);
}
}

storage_interface_->put_batch(fakeKeys, fakeValues);
std::cout << "Successfully initialized waffle with keys size " << keys.size() << " and cache with " << cache.size() << std::endl;
std::cout << "Successfully initialized waffle with keys size " << keys.size() << " and cache with " << cache.size() << " Fake keys size is " << m << " D is " << D << " R is" << R << " s is " << s << " FakeBST size is " << fakeBst.size() << std::endl;
}

void waffle_proxy::create_security_batch(std::shared_ptr<queue <std::pair<operation, std::shared_ptr<std::promise<std::string>>>>> &op_queue,
Expand All @@ -121,7 +124,7 @@ void waffle_proxy::create_security_batch(std::shared_ptr<queue <std::pair<operat
if(operation_promise_pair.first.value == "") {
// It's a GET request
if(cache.checkIfKeyExists(currentKey) == true) {
operation_promise_pair.second->set_value(cache.getValue(currentKey));
operation_promise_pair.second->set_value(cache.getValueWithoutPositionChange(currentKey));
} else {
if(keyToPromiseMap.find(currentKey) == keyToPromiseMap.end()) {
storage_batch.push_back(operation_promise_pair.first);
Expand All @@ -136,13 +139,13 @@ void waffle_proxy::create_security_batch(std::shared_ptr<queue <std::pair<operat
}
}
cache.insertIntoCache(currentKey, operation_promise_pair.first.value);
operation_promise_pair.second->set_value(cache.getValue(currentKey));
operation_promise_pair.second->set_value(cache.getValueWithoutPositionChange(currentKey));
}
}
};

void waffle_proxy::execute_batch(const std::vector<operation> &operations, std::unordered_map<std::string, std::vector<std::shared_ptr<std::promise<std::string>>>> &keyToPromiseMap, std::shared_ptr<storage_interface> storage_interface, encryption_engine *enc_engine) {
std::cout << "Storage batch size is " << operations.size() << " and R is " << R << std::endl; // Both need not be the same
// std::cout << "Storage batch size is " << operations.size() << " and R is " << R << std::endl; // Both need not be the same
// Storage_keys is same as readBatch
std::vector<std::string> storage_keys;
std::vector<std::string> writeBatchKeys;
Expand All @@ -164,6 +167,8 @@ void waffle_proxy::execute_batch(const std::vector<operation> &operations, std::
++it;
}

// std::cout << "realKeysNotInCache size is " << realKeysNotInCache.size() << std::endl;

for(auto& iter: realKeysNotInCache) {
storage_keys.push_back(enc_engine->encrypt(iter + "#" + std::to_string(realBst.getFrequency(iter))));
realBst.incrementFrequency(iter);
Expand All @@ -176,35 +181,40 @@ void waffle_proxy::execute_batch(const std::vector<operation> &operations, std::
}

auto responses = storage_interface->get_batch(storage_keys);
std::cout << "Got key value pairs" << std::endl;
for(int i = 0, j = 0; i < storage_keys.size(); i++){

if(i < (operations.size() + realKeysNotInCache.size())) {
// std::cout << "Got key value pairs" << std::endl;
for(int i = 0 ; i < storage_keys.size(); i++){
// if(i < (operations.size() + realKeysNotInCache.size())) {
if(i < operations.size()) {
// This means ith request is for real key
auto kv_pair = cache.evictLRElementFromCache();
writeBatchKeys.push_back(enc_engine->encrypt(kv_pair[0] + "#" + std::to_string(realBst.getFrequency(kv_pair[0]))));
writeBatchValues.push_back(enc_engine->encrypt(kv_pair[1]));

auto keyAboutToGoToCache = extractKey(enc_engine->decrypt(storage_keys[i]));
std::cout << "Extracted key which is about to go to Cache is " << keyAboutToGoToCache << std::endl;
// std::cout << "Extracted key which is about to go to Cache is " << keyAboutToGoToCache << std::endl;
std::string valueAboutToGoToCache = enc_engine->decrypt(responses[i]);
if(keyToPromiseMap.find(keyAboutToGoToCache) != keyToPromiseMap.end()) {
for(auto it: keyToPromiseMap[keyAboutToGoToCache]) {
for(auto& it: keyToPromiseMap[keyAboutToGoToCache]) {
it->set_value(valueAboutToGoToCache);
}
}
if(cache.checkIfKeyExists(keyAboutToGoToCache)) {
valueAboutToGoToCache = cache.getValue(keyAboutToGoToCache);
if(cache.checkIfKeyExists(keyAboutToGoToCache) == true) {
valueAboutToGoToCache = cache.getValueWithoutPositionChange(keyAboutToGoToCache);
}
cache.insertIntoCache(keyAboutToGoToCache, valueAboutToGoToCache);
} else {
// Writing fake key values to DB
auto fakeWriteKey = extractKey(enc_engine->decrypt(storage_keys[i]));
writeBatchKeys.push_back(enc_engine->encrypt(fakeWriteKey + "#" + to_string(realBst.getFrequency(fakeWriteKey))));
writeBatchKeys.push_back(enc_engine->encrypt(fakeWriteKey + "#" + std::to_string(fakeBst.getFrequency(fakeWriteKey))));
writeBatchValues.push_back(enc_engine->encrypt("fakeValue"));

}
}
storage_interface_->put_batch(writeBatchKeys, writeBatchValues);

if(cache.size() != (R+s)) {
std::cout << "WARNING: This should never happen: Cache size is less than R+s" << std::endl;
}
};

std::string waffle_proxy::get(const std::string &key) {
Expand Down Expand Up @@ -281,7 +291,7 @@ void waffle_proxy::async_get_batch(const sequence_id &seq_id, int queue_id, cons
for (const auto &key: keys) {
waiters.push_back((get_future(queue_id, key)));
}
std::cout << "async_get_batch client ID is " << seq_id.client_id << std::endl;
// std::cout << "async_get_batch client ID is " << seq_id.client_id << std::endl;
respond_queue_.push(std::make_pair(GET_BATCH, std::make_pair(seq_id, std::move(waiters))));
sequence_queue_.push(seq_id);
};
Expand All @@ -302,7 +312,7 @@ void waffle_proxy::put_batch(int queue_id, const std::vector<std::string> &keys,
void waffle_proxy::async_put_batch(const sequence_id &seq_id, int queue_id, const std::vector<std::string> &keys, const std::vector<std::string> &values) {
// Send waiters to responder thread
std::vector<std::future<std::string>> waiters;
std::cout << "async_put_batch client ID is " << seq_id.client_id << std::endl;
// std::cout << "async_put_batch client ID is " << seq_id.client_id << std::endl;
int i = 0;
for (const auto &key: keys) {
waiters.push_back((put_future(queue_id, key, values[i])));
Expand Down
8 changes: 4 additions & 4 deletions waffleClient/proxy/src/waffle_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ class waffle_proxy : public proxy {
queue<std::pair<int, std::pair<const sequence_id&, std::vector<std::future<std::string>>>>> respond_queue_;
queue<sequence_id> sequence_queue_;
// System parameters
int R = 10;
int D = 11;
int s = 20;
int m = 1000;
int R = 100;
int D = 110;
int s = 10;
int m = 5000;
};

#endif //WAFFLE_PROXY_H

0 comments on commit 4593967

Please sign in to comment.