Skip to content

Commit

Permalink
[Embedding] Initialize multi-tier storage of EmbeddingVariable based …
Browse files Browse the repository at this point in the history
…on frequency when restore model. (DeepRec-AI#421)
lixy9474 authored Sep 29, 2022
1 parent 8b99d64 commit 6176b9d
Showing 16 changed files with 492 additions and 134 deletions.
85 changes: 76 additions & 9 deletions tensorflow/core/framework/embedding/cache.h
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@
#include <unordered_map>
#include <set>
#include <list>
#include <limits>
#include "tensorflow/core/framework/tensor.h"
#include "tensorflow/core/platform/types.h"
#include "tensorflow/core/platform/mutex.h"
@@ -24,6 +25,9 @@ class BatchCache {
}
virtual size_t get_evic_ids(K* evic_ids, size_t k_size) = 0;
virtual void add_to_rank(const K* batch_ids, size_t batch_size) = 0;
virtual void add_to_rank(const K* batch_ids, size_t batch_size,
const int64* batch_versions,
const int64* batch_freqs) = 0;
virtual size_t size() = 0;
virtual void reset_status() {
num_hit = 0;
@@ -105,6 +109,13 @@ class LRUCache : public BatchCache<K> {
}
}
}

void add_to_rank(const K* batch_ids, size_t batch_size,
const int64* batch_version,
const int64* batch_freqs) {
//TODO: add to rank accroding to the version of ids
add_to_rank(batch_ids, batch_size);
}
private:
class LRUNode {
public:
@@ -121,7 +132,7 @@ template <class K>
class LFUCache : public BatchCache<K> {
public:
LFUCache() {
min_freq = 0;
min_freq = std::numeric_limits<size_t>::max();
max_freq = 0;
freq_table.emplace_back(std::pair<std::list<LFUNode>*, int64>(
new std::list<LFUNode>, 0));
@@ -137,18 +148,19 @@ class LFUCache : public BatchCache<K> {
size_t get_evic_ids(K *evic_ids, size_t k_size) {
mutex_lock l(mu_);
size_t true_size = 0;
size_t st_freq = min_freq;
for (size_t i = 0; i < k_size && key_table.size() > 0; ++i) {
auto rm_it = freq_table[min_freq-1].first->back();
auto rm_it = freq_table[st_freq-1].first->back();
key_table.erase(rm_it.key);
evic_ids[i] = rm_it.key;
++true_size;
freq_table[min_freq-1].first->pop_back();
freq_table[min_freq-1].second--;
if (freq_table[min_freq-1].second == 0) {
++min_freq;
while (min_freq <= max_freq) {
if (freq_table[min_freq-1].second == 0) {
++min_freq;
freq_table[st_freq-1].first->pop_back();
freq_table[st_freq-1].second--;
if (freq_table[st_freq-1].second == 0) {
++st_freq;
while (st_freq <= max_freq) {
if (freq_table[st_freq-1].second == 0) {
++st_freq;
} else {
break;
}
@@ -191,6 +203,61 @@ class LFUCache : public BatchCache<K> {
}
}
}

void add_to_rank(const K *batch_ids, const size_t batch_size,
const int64* batch_versions,
const int64* batch_freqs) {
mutex_lock l(mu_);
for (size_t i = 0; i < batch_size; ++i) {
K id = batch_ids[i];
auto it = key_table.find(id);
size_t freq = batch_freqs[i];
if (it == key_table.end()) {
if (freq < min_freq) {
min_freq = freq;
}

if (freq >= max_freq) {
max_freq = freq;
freq_table.resize(max_freq, std::pair<std::list<LFUNode>*, int64>(
new std::list<LFUNode>, 0));
}
freq_table[freq-1].first->emplace_front(LFUNode(id, freq));
freq_table[freq-1].second++;
key_table[id] = freq_table[freq-1].first->begin();
BatchCache<K>::num_miss++;
} else {
typename std::list<LFUNode>::iterator node = it->second;
size_t last_freq = node->freq;
size_t curr_freq = last_freq + freq;
freq_table[last_freq-1].first->erase(node);
freq_table[last_freq-1].second--;

if (curr_freq > max_freq) {
max_freq = curr_freq;
freq_table.resize(max_freq, std::pair<std::list<LFUNode>*, int64>(
new std::list<LFUNode>, 0));
}

if (freq_table[last_freq-1].second == 0) {
if (min_freq == last_freq){
for (size_t j = last_freq + 1; j < max_freq; j++) {
if(freq_table[j-1].second != 0) {
min_freq = j;
}
}
}
}

freq_table[curr_freq-1].first->emplace_front(LFUNode(id, curr_freq));
freq_table[curr_freq-1].second++;
key_table[id] = freq_table[curr_freq-1].first->begin();
BatchCache<K>::num_hit++;
}
}
}


private:
class LFUNode {
public:
5 changes: 5 additions & 0 deletions tensorflow/core/framework/embedding/config.proto
Original file line number Diff line number Diff line change
@@ -29,3 +29,8 @@ enum SlotType {
EMBEDDING_VARIABLE = 0;
VARIABLE = 1;
}

enum CacheStrategy {
LRU = 0;
LFU = 1;
}
29 changes: 18 additions & 11 deletions tensorflow/core/framework/embedding/embedding_filter.h
Original file line number Diff line number Diff line change
@@ -20,14 +20,6 @@ const static std::vector<int64> default_seeds = {
2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41,
43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97
};

template<typename K, typename EV>
void UpdateCache(K* key_buff, int64 key_num, EV* ev) {
embedding::BatchCache<K>* cache = ev->Cache();
if (cache) {
cache->add_to_rank(key_buff, key_num);
}
}
}

struct RestoreBuffer {
@@ -63,6 +55,21 @@ class EmbeddingFilter {
}
}

virtual void UpdateCache(const K* key_buff, int64 key_num, EV* ev,
const int64* version_buff, const int64* freq_buff) {
embedding::BatchCache<K>* cache = ev->Cache();
if (cache) {
cache->add_to_rank(key_buff, key_num, version_buff, freq_buff);
if (cache->size() > ev->CacheSize()) {
int64 evict_size = cache->size() - ev->CacheSize();
K* evict_ids = new K[evict_size];
size_t true_size = cache->get_evic_ids(evict_ids, evict_size);
ev->Eviction(evict_ids, true_size);
delete []evict_ids;
}
}
}

virtual Status LookupOrCreateKey(K key, ValuePtr<V>** val, bool* is_filter) = 0;
virtual void CreateGPUBatch(V* val_base, V** default_values, int64 size,
int64 slice_elems, int64 value_len_, bool* init_flags, V** memcpy_address) = 0;
@@ -281,7 +288,7 @@ class BloomFilter : public EmbeddingFilter<K, V, EV> {
TF_CHECK_OK(ev_->storage_manager()->Commit(key_buff[i], value_ptr));
}
}
UpdateCache(key_buff, key_num, ev_);
this->UpdateCache(key_buff, key_num, ev_, version_buff, freq_buff);
return Status::OK();
}

@@ -470,7 +477,7 @@ class CounterFilter : public EmbeddingFilter<K, V, EV> {
TF_CHECK_OK(ev_->storage_manager()->Commit(key_buff[i], value_ptr));
}
}
UpdateCache(key_buff, key_num, ev_);
this->UpdateCache(key_buff, key_num, ev_, version_buff, freq_buff);
return Status::OK();
}

@@ -588,7 +595,7 @@ class NullableFilter : public EmbeddingFilter<K, V, EV> {
TF_CHECK_OK(ev_->storage_manager()->Commit(key_buff[i], value_ptr));
}
}
UpdateCache(key_buff, key_num, ev_);
this->UpdateCache(key_buff, key_num, ev_, version_buff, freq_buff);
return Status::OK();
}

12 changes: 11 additions & 1 deletion tensorflow/core/framework/embedding/embedding_var.h
Original file line number Diff line number Diff line change
@@ -106,6 +106,7 @@ class EmbeddingVar : public ResourceBase {
default_tensor.NumElements() / emb_config_.default_value_dim;
default_value_ = TypedAllocator::Allocate<V>(alloc_,
default_tensor.NumElements(), AllocationAttributes());

auto default_tensor_flat = default_tensor.flat<V>();
memcpy(default_value_, &default_tensor_flat(0),
default_tensor.TotalBytes());
@@ -170,6 +171,10 @@ class EmbeddingVar : public ResourceBase {
TF_CHECK_OK(storage_manager_->BatchCommit(keys, value_ptrs));
}

void Eviction(K* evict_ids, int64 evict_size) {
TF_CHECK_OK(storage_manager_->Eviction(evict_ids, evict_size));
}

int64 GetVersion(K key) {
ValuePtr<V>* value_ptr = nullptr;
TF_CHECK_OK(LookupOrCreateKey(key, &value_ptr));
@@ -382,6 +387,10 @@ class EmbeddingVar : public ResourceBase {
storage_manager_->GetStorageType();
}

void InitStorageCacheStrategy(embedding::CacheStrategy cache_strategy) {
storage_manager_->InitCacheStrategy(cache_strategy);
}

std::string DebugString() const {
return emb_config_.DebugString();
}
@@ -535,7 +544,8 @@ class EmbeddingVar : public ResourceBase {
buffer3 = nullptr;
}
}
TypedAllocator::Deallocate(alloc_, default_value_, value_len_);
TypedAllocator::Deallocate(alloc_, default_value_,
value_len_ * emb_config_.default_value_dim);
if (default_value_no_permission_) {
TypedAllocator::Deallocate(alloc_, default_value_no_permission_,
value_len_);
56 changes: 44 additions & 12 deletions tensorflow/core/framework/embedding/multilevel_embedding.h
Original file line number Diff line number Diff line change
@@ -27,14 +27,20 @@ namespace embedding {

struct StorageConfig {
StorageConfig() : type(StorageType::INVALID),
path(""), layout_type(LayoutType::NORMAL) {
path(""),
layout_type(LayoutType::NORMAL),
cache_strategy(CacheStrategy::LFU) {
size = {1<<30,1<<30,1<<30,1<<30};
}

StorageConfig(StorageType t,
const std::string& p,
const std::vector<int64>& s,
const std::string& layout) : type(t), path(p) {
const std::string& layout,
const CacheStrategy cache_strategy_ = CacheStrategy::LFU)
: type(t),
path(p),
cache_strategy(cache_strategy_) {
if ("normal" == layout) {
layout_type = LayoutType::NORMAL;
} else if ("light" == layout) {
@@ -52,6 +58,7 @@ struct StorageConfig {
LayoutType layout_type;
std::string path;
std::vector<int64> size;
CacheStrategy cache_strategy;
};

template <class K, class V>
@@ -174,17 +181,7 @@ class StorageManager {
sc_.type == embedding::DRAM_LEVELDB) {
is_multi_level_ = true;
}

hash_table_count_ = kvs_.size();
if (hash_table_count_ > 1) {
cache_ = new LRUCache<K>();
eviction_thread_ = Env::Default()->StartThread(
ThreadOptions(), "EV_Eviction", [this]() { BatchEviction(); });
thread_pool_.reset(
new thread::ThreadPool(Env::Default(), ThreadOptions(),
"MultiLevel_Embedding_Cache", 2, /*low_latency_hint=*/false));
}
// DebugString();
CHECK(2 >= hash_table_count_)
<< "Not support multi-level(>2) embedding.";

@@ -218,6 +215,24 @@ class StorageManager {
flag_.clear(std::memory_order_release);
}

void InitCacheStrategy(embedding::CacheStrategy cache_strategy) {
sc_.cache_strategy = cache_strategy;
if (hash_table_count_ > 1) {
if (sc_.cache_strategy == CacheStrategy::LRU) {
LOG(INFO)<<" Use StorageManager::LRU in multi-tier EV "<< name_;
cache_ = new LRUCache<K>();
} else {
LOG(INFO) << "Use StorageManager::LFU in multi-tier EV " << name_;
cache_ = new LFUCache<K>();
}
eviction_thread_ = Env::Default()->StartThread(
ThreadOptions(), "EV_Eviction", [this]() { BatchEviction(); });
thread_pool_.reset(
new thread::ThreadPool(Env::Default(), ThreadOptions(),
"MultiLevel_Embedding_Cache", 2, /*low_latency_hint=*/false));
}
}

int64 GetAllocLen(){
return alloc_len_;
}
@@ -242,6 +257,10 @@ class StorageManager {
return sc_.path;
}

int64 Size(int level){
return kvs_[level].first->Size();
}

bool IsMultiLevel() {
return is_multi_level_;
}
@@ -546,6 +565,19 @@ class StorageManager {
return Status::OK();
}

Status Eviction(K* evict_ids, int64 evict_size) {
ValuePtr<V>* value_ptr;
for (int64 i = 0; i < evict_size; ++i) {
if (kvs_[0].first->Lookup(evict_ids[i], &value_ptr).ok()) {
TF_CHECK_OK(kvs_[1].first->Commit(evict_ids[i], value_ptr));
TF_CHECK_OK(kvs_[0].first->Remove(evict_ids[i]));
value_ptr->Destroy(kvs_[0].second);
delete value_ptr;
}
}
return Status::OK();
}

void FreeValuePtr(ValuePtr<V>* value_ptr) {
for (auto kv : kvs_) {
kv.first->FreeValuePtr(value_ptr);
4 changes: 1 addition & 3 deletions tensorflow/core/framework/embedding/ssd_hashkv.h
Original file line number Diff line number Diff line change
@@ -385,9 +385,7 @@ class SSDHashKV : public KVInterface<K, V> {
return Status::OK();
}

int64 Size() const {
return hash_map_.size();
}
int64 Size() const { return hash_map_.size_lockless(); }

void FreeValuePtr(ValuePtr<V>* value_ptr) {
delete value_ptr;
Loading
Oops, something went wrong.

0 comments on commit 6176b9d

Please sign in to comment.