Skip to content

Commit

Permalink
Batched background indexer.
Browse files Browse the repository at this point in the history
Enables write parallelism across collections.
  • Loading branch information
kishorenc committed Jul 31, 2021
1 parent dedb666 commit 35409f8
Show file tree
Hide file tree
Showing 13 changed files with 348 additions and 105 deletions.
2 changes: 0 additions & 2 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# Typesense: TODO

- Writes on dedicated threads
- Single field aggregation short circuit
- Test for group + multiple fields
- Intersect with single posting list
- Test for erase dropping elements below compressed list threshold
Expand Down
8 changes: 4 additions & 4 deletions docker/patches/braft_cmakelists.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
if(BUILD_UNIT_TESTS)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DUNIT_TEST")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DUNIT_TEST")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DUNIT_TEST -Wno-deprecated-copy -Wno-sign-compare -Wno-implicit-fallthrough")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DUNIT_TEST -Wno-deprecated-copy -Wno-sign-compare -Wno-implicit-fallthrough")
elseif(NOT DEBUG)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DNDEBUG")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DNDEBUG")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DNDEBUG -Wno-deprecated-copy -Wno-sign-compare -Wno-implicit-fallthrough")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DNDEBUG -Wno-deprecated-copy -Wno-sign-compare -Wno-implicit-fallthrough")
endif()

include_directories(${CMAKE_CURRENT_BINARY_DIR})
Expand Down
63 changes: 63 additions & 0 deletions include/batched_indexer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#pragma once

#include <unordered_map>
#include <deque>
#include "store.h"
#include "http_data.h"
#include "threadpool.h"
#include "http_server.h"

class BatchedIndexer {
private:
static const constexpr char* RAFT_REQ_LOG_PREFIX = "$RL_";

struct req_res_t {
std::shared_ptr<http_req> req;
std::shared_ptr<http_res> res;
uint64_t batch_begin_ts;

req_res_t(const std::shared_ptr<http_req>& req,
const std::shared_ptr<http_res>& res, uint64_t batch_begin_ts):
req(req), res(res), batch_begin_ts(batch_begin_ts) {

}

req_res_t() {

}
};

HttpServer* server;
Store* store;

ThreadPool* thread_pool;
const size_t num_threads;

std::vector<std::deque<req_res_t>> queues;
std::mutex* qmutuxes;

std::mutex mutex;
std::unordered_map<uint64_t, uint32_t> request_to_chunk;
std::unordered_map<uint64_t, req_res_t> req_res_map;

std::chrono::high_resolution_clock::time_point last_gc_run;

std::atomic<bool> exit;

static const size_t GC_INTERVAL_SECONDS = 60;
static const size_t GC_PRUNE_MAX_SECONDS = 3600;

static std::string get_req_prefix_key(uint64_t req_id);

public:

BatchedIndexer(HttpServer* server, Store* store, size_t num_threads);

~BatchedIndexer();

void enqueue(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res);

void run();

void stop();
};
12 changes: 7 additions & 5 deletions include/http_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ struct http_req {
first_chunk_aggregate(true), last_chunk_aggregate(false),
chunk_len(0), body_index(0), data(nullptr), deserialized_request(true), ready(false) {

start_ts = std::chrono::duration_cast<std::chrono::milliseconds>(
start_ts = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();

}
Expand All @@ -254,7 +254,7 @@ struct http_req {
params(params), first_chunk_aggregate(true), last_chunk_aggregate(false),
chunk_len(0), body(body), body_index(0), data(nullptr), deserialized_request(false), ready(false) {

start_ts = std::chrono::duration_cast<std::chrono::milliseconds>(
start_ts = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
}

Expand All @@ -263,10 +263,10 @@ struct http_req {
//LOG(INFO) << "~http_req " << this;

if(!deserialized_request) {
uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
uint64_t now = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();

uint64_t ms_since_start = (now - start_ts);
uint64_t ms_since_start = (now - start_ts) / 1000;
std::string metric_identifier = http_method + " " + path_without_query;

AppMetrics::get_instance().increment_duration(metric_identifier, ms_since_start);
Expand Down Expand Up @@ -309,7 +309,7 @@ struct http_req {
void deserialize(const std::string& serialized_content) {
nlohmann::json content = nlohmann::json::parse(serialized_content);
route_hash = content["route_hash"];
body = content["body"];
body += content["body"];

for (nlohmann::json::iterator it = content["params"].begin(); it != content["params"].end(); ++it) {
params.emplace(it.key(), it.value());
Expand All @@ -318,6 +318,7 @@ struct http_req {
metadata = content.count("metadata") != 0 ? content["metadata"] : "";
first_chunk_aggregate = content.count("first_chunk_aggregate") != 0 ? content["first_chunk_aggregate"].get<bool>() : true;
last_chunk_aggregate = content.count("last_chunk_aggregate") != 0 ? content["last_chunk_aggregate"].get<bool>() : false;
start_ts = content.count("start_ts") != 0 ? content["start_ts"].get<uint64_t>() : 0;
_req = nullptr;

deserialized_request = true;
Expand All @@ -331,6 +332,7 @@ struct http_req {
content["last_chunk_aggregate"] = last_chunk_aggregate;
content["body"] = body;
content["metadata"] = metadata;
content["start_ts"] = start_ts;

return content.dump(-1, ' ', false, nlohmann::detail::error_handler_t::ignore);
}
Expand Down
4 changes: 3 additions & 1 deletion include/raft_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "http_data.h"
#include "threadpool.h"
#include "http_server.h"
#include "batched_indexer.h"

class Store;
class ReplicationState;
Expand Down Expand Up @@ -93,6 +94,7 @@ class ReplicationState : public braft::StateMachine {
butil::atomic<int64_t> leader_term;

HttpServer* server;
BatchedIndexer* batched_indexer;

Store* store;
Store* meta_store;
Expand Down Expand Up @@ -135,7 +137,7 @@ class ReplicationState : public braft::StateMachine {
static constexpr const char* meta_dir_name = "meta";
static constexpr const char* snapshot_dir_name = "snapshot";

ReplicationState(HttpServer* server, Store* store, Store* meta_store,
ReplicationState(HttpServer* server, BatchedIndexer* batched_indexer, Store* store, Store* meta_store,
ThreadPool* thread_pool, http_message_dispatcher* message_dispatcher,
bool api_uses_ssl, int64_t healthy_read_lag, int64_t healthy_write_lag,
size_t num_collections_parallel_load, size_t num_documents_parallel_load);
Expand Down
4 changes: 4 additions & 0 deletions include/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,10 @@ class Store {
return status;
}

rocksdb::Status delete_range(const std::string& begin_key, const std::string& end_key) {
return db->DeleteRange(rocksdb::WriteOptions(), db->DefaultColumnFamily(), begin_key, end_key);
}

// Only for internal tests
rocksdb::DB* _get_db_unsafe() const {
return db;
Expand Down
14 changes: 14 additions & 0 deletions include/string_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,20 @@ struct StringUtils {
return std::string(bytes, bytes+4);
}

static std::string serialize_uint64_t(uint64_t num) {
unsigned char bytes[8];
bytes[0] = (unsigned char) ((num >> 56) & 0xFF);
bytes[1] = (unsigned char) ((num >> 48) & 0xFF);
bytes[2] = (unsigned char) ((num >> 40) & 0xFF);
bytes[3] = (unsigned char) ((num >> 32) & 0xFF);
bytes[4] = (unsigned char) ((num >> 24) & 0xFF);
bytes[5] = (unsigned char) ((num >> 16) & 0xFF);
bytes[6] = (unsigned char) ((num >> 8) & 0xFF);
bytes[7] = (unsigned char) ((num & 0xFF));

return std::string(bytes, bytes+8);
}

static uint32_t deserialize_uint32_t(std::string serialized_num) {
uint32_t seq_id = ((serialized_num[0] & 0xFF) << 24) | ((serialized_num[1] & 0xFF) << 16) |
((serialized_num[2] & 0xFF) << 8) | (serialized_num[3] & 0xFF);
Expand Down
188 changes: 188 additions & 0 deletions src/batched_indexer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
#include "batched_indexer.h"

BatchedIndexer::BatchedIndexer(HttpServer* server, Store* store, const size_t num_threads):
server(server), store(store), num_threads(num_threads),
last_gc_run(std::chrono::high_resolution_clock::now()), exit(false) {
thread_pool = new ThreadPool(num_threads);
queues.reserve(num_threads);
qmutuxes = new std::mutex[num_threads];
}

void BatchedIndexer::enqueue(const std::shared_ptr<http_req>& req, const std::shared_ptr<http_res>& res) {
const std::string& coll_name = req->params["collection"];
uint64_t queue_id = StringUtils::hash_wy(coll_name.c_str(), coll_name.size()) % num_threads;

uint32_t chunk_sequence = 0;

{
std::unique_lock lk(mutex);
chunk_sequence = request_to_chunk[req->start_ts];
request_to_chunk[req->start_ts] += 1;
}

const std::string& req_key_prefix = get_req_prefix_key(req->start_ts);
const std::string& request_chunk_key = req_key_prefix + StringUtils::serialize_uint32_t(chunk_sequence);

LOG(INFO) << "req_id: " << req->start_ts << ", chunk_sequence: " << chunk_sequence;

store->insert(request_chunk_key, req->serialize());
req->body = "";

uint64_t batch_begin_ts;

{
std::unique_lock lk(mutex);
auto req_res_map_it = req_res_map.find(req->start_ts);
if(req_res_map_it == req_res_map.end()) {
batch_begin_ts = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count();

req_res_t req_res{req, res, batch_begin_ts};
req_res_map[req->start_ts] = req_res;
} else {
batch_begin_ts = req_res_map_it->second.batch_begin_ts;
}
}

if(req->last_chunk_aggregate) {
{
std::unique_lock lk(qmutuxes[queue_id]);
req_res_t req_res(req, res, batch_begin_ts);
queues[queue_id].emplace_back(req_res);
}

std::unique_lock lk(mutex);
request_to_chunk.erase(req->start_ts);
}

if(req->_req != nullptr) {
deferred_req_res_t* req_res = new deferred_req_res_t(req, res, server, true);
server->get_message_dispatcher()->send_message(HttpServer::REQUEST_PROCEED_MESSAGE, req_res);
}
}

void BatchedIndexer::run() {
LOG(INFO) << "Starting batch indexer with " << num_threads << " threads.";

for(size_t i = 0; i < num_threads; i++) {
std::deque<req_res_t>& queue = queues[i];
std::mutex& queue_mutex = qmutuxes[i];

thread_pool->enqueue([&queue, &queue_mutex, this]() {
while(!exit) {
std::unique_lock<std::mutex> qlk(queue_mutex);

if(queue.empty()) {
qlk.unlock();
} else {
req_res_t req_res = queue.front();
queue.pop_front();
qlk.unlock();

std::unique_lock mlk(mutex);
req_res_t orig_req_res = req_res_map[req_res.req->start_ts];
mlk.unlock();

// scan db for all logs associated with request
const std::string& req_key_prefix = get_req_prefix_key(req_res.req->start_ts);

rocksdb::Iterator* iter = store->scan(req_key_prefix);
std::string prev_body = ""; // used to handle partial JSON documents caused by chunking

while(iter->Valid() && iter->key().starts_with(req_key_prefix)) {
std::shared_ptr<http_req> saved_req = std::make_shared<http_req>();
saved_req->body = prev_body;
saved_req->deserialize(iter->value().ToString());
saved_req->_req = orig_req_res.req->_req;

route_path* found_rpath = nullptr;
bool route_found = server->get_route(saved_req->route_hash, &found_rpath);
bool async_res = false;

if(route_found) {
async_res = found_rpath->async_res;
found_rpath->handler(saved_req, req_res.res);
prev_body = saved_req->body;
} else {
req_res.res->set_404();
prev_body = "";
}

if(!async_res && orig_req_res.req->_req != nullptr) {
deferred_req_res_t* deferred_req_res = new deferred_req_res_t(saved_req, req_res.res,
server, true);
server->get_message_dispatcher()->send_message(HttpServer::STREAM_RESPONSE_MESSAGE,
deferred_req_res);
}

iter->Next();
}

LOG(INFO) << "Erasing request data from disk and memory for request " << req_res.req->start_ts;

// we can delete the buffered request content
store->delete_range(req_key_prefix, req_key_prefix + StringUtils::serialize_uint32_t(UINT32_MAX));

std::unique_lock lk(mutex);
req_res_map.erase(req_res.req->start_ts);
}

std::this_thread::sleep_for(std::chrono::milliseconds (10));
}
});
}

while(!exit) {
std::this_thread::sleep_for(std::chrono::milliseconds (1000));

//LOG(INFO) << "Batch indexer main thread";

// do gc, if we are due for one
uint64_t seconds_elapsed = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::high_resolution_clock::now() - last_gc_run).count();

if(seconds_elapsed > GC_INTERVAL_SECONDS) {

std::unique_lock lk(mutex);
LOG(INFO) << "Running GC for aborted requests, req map size: " << req_res_map.size();

// iterate through all map entries and delete ones which are > GC_PRUNE_MAX_SECONDS
for (auto it = req_res_map.cbegin(); it != req_res_map.cend();) {
uint64_t seconds_since_batch_start = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count() - it->second.batch_begin_ts;

//LOG(INFO) << "Seconds since batch start: " << seconds_since_batch_start;

if(seconds_since_batch_start > GC_PRUNE_MAX_SECONDS) {
LOG(INFO) << "Deleting partial upload for req id " << it->second.req->start_ts;
const std::string& req_key_prefix = get_req_prefix_key(it->second.req->start_ts);
store->delete_range(req_key_prefix, req_key_prefix + StringUtils::serialize_uint32_t(UINT32_MAX));
request_to_chunk.erase(it->second.req->start_ts);
it = req_res_map.erase(it);
} else {
it++;
}
}

last_gc_run = std::chrono::high_resolution_clock::now();
}
}

LOG(INFO) << "Batched indexer threadpool shutdown...";
thread_pool->shutdown();
}

std::string BatchedIndexer::get_req_prefix_key(uint64_t req_id) {
const std::string& req_key_prefix =
RAFT_REQ_LOG_PREFIX + StringUtils::serialize_uint64_t(req_id) + "_";

return req_key_prefix;
}

BatchedIndexer::~BatchedIndexer() {
delete [] qmutuxes;
}

void BatchedIndexer::stop() {
exit = true;
}
Loading

0 comments on commit 35409f8

Please sign in to comment.