Skip to content

Commit

Permalink
add: validation for analytics rules and events (typesense#1927)
Browse files Browse the repository at this point in the history
* add: validation for analytics rules

* fix: tests for the same

* add: check for log_to_store and mandatory collection in rule for log type

* fix: missing of events

* fix: status codes in event manager

* fix: validation condition and tests for the same
  • Loading branch information
harisarang authored and kishorenc committed Sep 14, 2024
1 parent f96c43e commit cccba10
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 141 deletions.
11 changes: 6 additions & 5 deletions include/analytics_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@

struct event_type_collection {
std::string event_type;
std::string collection;
std::string destination_collection;
std::vector<std::string> src_collections;
bool log_to_store = false;
std::string analytic_rule;
QueryAnalytics* queries_ptr = nullptr;
Expand Down Expand Up @@ -90,8 +91,8 @@ class AnalyticsManager {

struct suggestion_config_t {
std::string name;
std::string suggestion_collection;
std::vector<std::string> query_collections;
std::string destination_collection;
std::vector<std::string> src_collections;
size_t limit;
std::string rule_type;
bool expand_query = false;
Expand All @@ -103,8 +104,8 @@ class AnalyticsManager {
obj["type"] = rule_type;
obj["params"] = nlohmann::json::object();
obj["params"]["limit"] = limit;
obj["params"]["source"]["collections"] = query_collections;
obj["params"]["destination"]["collection"] = suggestion_collection;
obj["params"]["source"]["collections"] = src_collections;
obj["params"]["destination"]["collection"] = destination_collection;

if(rule_type == POPULAR_QUERIES_TYPE) {
obj["params"]["expand_query"] = expand_query;
Expand Down
116 changes: 68 additions & 48 deletions src/analytics_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,33 +59,32 @@ Option<bool> AnalyticsManager::create_index(nlohmann::json &payload, bool upsert
}

std::string counter_field;
std::string suggestion_collection = "generic";
std::string destination_collection;
std::vector<std::string> src_collections;

suggestion_config_t suggestion_config;
suggestion_config.name = suggestion_config_name;
suggestion_config.limit = limit;
suggestion_config.expand_query = expand_query;
suggestion_config.rule_type = payload["type"];

//for counter events source collections are not needed
if(params["source"].contains("collections")) {
if(!params["source"]["collections"].is_array()) {
return Option<bool>(400, "Must contain a valid list of source collections.");
}

//for all types source collection is needed.
if(!params["source"].contains("collections") || !params["source"]["collections"].is_array()) {
return Option<bool>(400, "Must contain a valid list of source collections.");
} else {
for(const auto& coll: params["source"]["collections"]) {
if (!coll.is_string()) {
return Option<bool>(400, "Must contain a valid list of source collection names.");
return Option<bool>(400, "Source collections value should be a string.");
}
auto collection = CollectionManager::get_instance().get_collection(coll.get<std::string>());
if (collection == nullptr) {
return Option<bool>(404, "Collection `" + coll.get<std::string>() + "` is not found");
}

const std::string &src_collection = coll.get<std::string>();
suggestion_config.query_collections.push_back(src_collection);

suggestion_collection = src_collection;
src_collections.push_back(src_collection);
destination_collection = src_collection;
}
} else if(payload["type"] == POPULAR_QUERIES_TYPE || payload["type"] == NOHITS_QUERIES_TYPE) {
//for popular and nohits queries, source collection is mandatory
return Option<bool>(400, "Must contain a valid list of source collections.");
}

if((payload["type"] == POPULAR_QUERIES_TYPE || payload["type"] == NOHITS_QUERIES_TYPE)
Expand Down Expand Up @@ -126,30 +125,30 @@ Option<bool> AnalyticsManager::create_index(nlohmann::json &payload, bool upsert
suggestion_config.counter_field = counter_field;
}

suggestion_collection = params["destination"]["collection"].get<std::string>();
destination_collection = params["destination"]["collection"].get<std::string>();
}

if(payload["type"] == POPULAR_QUERIES_TYPE) {
if(!upsert && popular_queries.count(suggestion_collection) != 0) {
if(!upsert && popular_queries.count(destination_collection) != 0) {
return Option<bool>(400, "There's already another configuration for this destination collection.");
}
} else if(payload["type"] == NOHITS_QUERIES_TYPE) {
if(!upsert && nohits_queries.count(suggestion_collection) != 0) {
if(!upsert && nohits_queries.count(destination_collection) != 0) {
return Option<bool>(400, "There's already another configuration for this destination collection.");
}
} else if(payload["type"] == COUNTER_TYPE) {
if(!upsert && counter_events.count(suggestion_collection) != 0) {
if(!upsert && counter_events.count(destination_collection) != 0) {
return Option<bool>(400, "There's already another configuration for this destination collection.");
}

auto coll = CollectionManager::get_instance().get_collection(suggestion_collection).get();
auto coll = CollectionManager::get_instance().get_collection(destination_collection).get();
if(coll != nullptr) {
if(!coll->contains_field(counter_field)) {
return Option<bool>(404,
"counter_field `" + counter_field + "` not found in destination collection.");
}
} else {
return Option<bool>(404, "Collection `" + suggestion_collection + "` not found.");
return Option<bool>(404, "Collection `" + destination_collection + "` not found.");
}
}

Expand All @@ -163,21 +162,28 @@ Option<bool> AnalyticsManager::create_index(nlohmann::json &payload, bool upsert
}
}

if(query_collection_events.count(suggestion_collection) == 0) {
if(query_collection_events.count(destination_collection) == 0) {
std::vector<event_t> vec;
query_collection_events.emplace(suggestion_collection, vec);
query_collection_events.emplace(destination_collection, vec);
}

std::map<std::string, uint16_t> event_weight_map;
bool log_to_store = payload["type"] == LOG_TYPE ? true : false;
bool log_to_store = payload["type"] == LOG_TYPE;

for (const std::string coll: src_collections) {
if(query_collection_events.count(coll) == 0) {
std::vector<event_t> vec;
query_collection_events.emplace(coll, vec);
}
}

if(payload["type"] == POPULAR_QUERIES_TYPE) {
QueryAnalytics* popularQueries = new QueryAnalytics(limit, enable_auto_aggregation);
popularQueries->set_expand_query(suggestion_config.expand_query);
popular_queries.emplace(suggestion_collection, popularQueries);
popular_queries.emplace(destination_collection, popularQueries);
} else if(payload["type"] == NOHITS_QUERIES_TYPE) {
QueryAnalytics* noresultsQueries = new QueryAnalytics(limit, enable_auto_aggregation);
nohits_queries.emplace(suggestion_collection, noresultsQueries);
nohits_queries.emplace(destination_collection, noresultsQueries);
}

if(valid_events_found) {
Expand All @@ -186,47 +192,46 @@ Option<bool> AnalyticsManager::create_index(nlohmann::json &payload, bool upsert
return Option<bool>(400, "Events must contain a unique name.");
}

bool event_log_to_store = false;
if(payload["type"] == COUNTER_TYPE) {
if(!event.contains("weight") || !event["weight"].is_number()) {
return Option<bool>(400, "Counter events must contain a weight value.");
}
event_weight_map[event["name"]] = event["weight"];
}

//store event name to their weights
//which can be used to keep counter events separate from non counter events
if(event.contains("log_to_store")) {
log_to_store = event["log_to_store"].get<bool>();

if(log_to_store && !analytics_store) {
return Option<bool>(400, "Event can't be logged when analytics-db is not defined.");
}
if(event.contains("log_to_store")) {
event_log_to_store = event["log_to_store"].get<bool>();
if(event_log_to_store && !analytics_store) {
return Option<bool>(400, "Event can't be logged when analytics-db is not defined.");
}
event_weight_map[event["name"]] = event["weight"];
}

event_type_collection ec{event["type"], suggestion_collection, log_to_store, suggestion_config_name};
event_type_collection ec{event["type"], destination_collection, src_collections, event_log_to_store || log_to_store, suggestion_config_name};

//keep pointer for /events API
if(payload["type"] == POPULAR_QUERIES_TYPE) {
ec.queries_ptr = popular_queries.at(suggestion_collection);
ec.queries_ptr = popular_queries.at(destination_collection);
} else if(payload["type"] == NOHITS_QUERIES_TYPE) {
ec.queries_ptr = nohits_queries.at(suggestion_collection);
ec.queries_ptr = nohits_queries.at(destination_collection);
}

event_collection_map.emplace(event["name"], ec);
}

//store counter events data
if(payload["type"] == COUNTER_TYPE) {
counter_events.emplace(suggestion_collection, counter_event_t{counter_field, {}, event_weight_map});
counter_events.emplace(destination_collection, counter_event_t{counter_field, {}, event_weight_map});
}
}

suggestion_config.suggestion_collection = suggestion_collection;
suggestion_config.destination_collection = destination_collection;
suggestion_config.src_collections = src_collections;

suggestion_configs.emplace(suggestion_config_name, suggestion_config);

for(const auto& query_coll: suggestion_config.query_collections) {
query_collection_mapping[query_coll].push_back(suggestion_collection);
for(const auto& query_coll: suggestion_config.src_collections) {
query_collection_mapping[query_coll].push_back(destination_collection);
}

if(write_to_disk) {
Expand Down Expand Up @@ -315,9 +320,9 @@ Option<bool> AnalyticsManager::remove_index(const std::string &name) {
return Option<bool>(404, "Rule not found.");
}

const auto& suggestion_collection = suggestion_configs_it->second.suggestion_collection;
const auto& suggestion_collection = suggestion_configs_it->second.destination_collection;

for(const auto& query_collection: suggestion_configs_it->second.query_collections) {
for(const auto& query_collection: suggestion_configs_it->second.src_collections) {
query_collection_mapping.erase(query_collection);
}

Expand Down Expand Up @@ -388,9 +393,22 @@ Option<bool> AnalyticsManager::add_event(const std::string& client_ip, const std
return Option<bool>(400, "event_type mismatch in analytic rules.");
}

const auto& query_collection = event_collection_map_it->second.collection;
std::string destination_collection = event_collection_map_it->second.destination_collection;
std::vector<std::string> src_collections = event_collection_map_it->second.src_collections;

const auto& query_collection_events_it = query_collection_events.find(query_collection);
std::string src_collection;
if (!event_json.contains("collection") && src_collections.size() == 1) {
src_collection = src_collections[0];
} else if(!event_json.contains("collection") && src_collections.size() > 1) {
return Option<bool>(400, "Multiple source collections. 'collection' should be specified");
} else if (event_json.contains("collection")) {
if(std::find(src_collections.begin(), src_collections.end(), event_json["collection"]) == src_collections.end()) {
return Option<bool>(400, event_json["collection"].get<std::string>() + " not found in the rule " + event_name);
}
src_collection = event_json["collection"];
}

const auto& query_collection_events_it = query_collection_events.find(src_collection);
if(query_collection_events_it != query_collection_events.end()) {
auto &events_vec = query_collection_events_it->second;
#ifdef TEST_BUILD
Expand Down Expand Up @@ -467,7 +485,7 @@ Option<bool> AnalyticsManager::add_event(const std::string& client_ip, const std
}

if (!counter_events.empty()) {
auto counter_events_it = counter_events.find(query_collection);
auto counter_events_it = counter_events.find(destination_collection);
if (counter_events_it != counter_events.end()) {
auto event_weight_map_it = counter_events_it->second.event_weight_map.find(event_name);
if (event_weight_map_it != counter_events_it->second.event_weight_map.end()) {
Expand All @@ -478,9 +496,11 @@ Option<bool> AnalyticsManager::add_event(const std::string& client_ip, const std
<< " not defined in analytic rule for counter events.";
}
} else {
LOG(ERROR) << "collection " << query_collection << " not found in analytics rule.";
LOG(ERROR) << "collection " << destination_collection << " not found in analytics rule.";
}
}
} else {
return Option<bool>(500, "Failure in adding an event.");
}
return Option<bool>(true);
}
Expand Down Expand Up @@ -581,7 +601,7 @@ void AnalyticsManager::persist_query_events(ReplicationState *raft_server, uint6

for(const auto& suggestion_config: suggestion_configs) {
const std::string& sink_name = suggestion_config.first;
const std::string& suggestion_coll = suggestion_config.second.suggestion_collection;
const std::string& suggestion_coll = suggestion_config.second.destination_collection;

auto popular_queries_it = popular_queries.find(suggestion_coll);
auto nohits_queries_it = nohits_queries.find(suggestion_coll);
Expand Down
20 changes: 12 additions & 8 deletions src/event_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,30 +37,34 @@ Option<bool> EventManager::add_event(const nlohmann::json& event, const std::str
}
const auto& event_name = event[EVENT_NAME];
if(!event_data_val.is_object()) {
return Option<bool>(500, "event_data_val is not object.");
return Option<bool>(400, "data is not object.");
}

if(event_type == AnalyticsManager::SEARCH_EVENT) {
if(!event_data_val.contains("user_id") || !event_data_val["user_id"].is_string()) {
return Option<bool>(500,
return Option<bool>(400,
"search event json data fields should contain `user_id` as string value.");
}

if(!event_data_val.contains("q") || !event_data_val["q"].is_string()) {
return Option<bool>(500,
return Option<bool>(400,
"search event json data fields should contain `q` as string value.");
}
} else {
if(!event_data_val.contains("doc_id") || !event_data_val["doc_id"].is_string()) {
return Option<bool>(500, "event should have 'doc_id' as string value.");
return Option<bool>(400, "event should have 'doc_id' as string value.");
}

if(event_data_val.contains("user_id") && !event_data_val["user_id"].is_string()) {
return Option<bool>(500, "'user_id' should be a string value.");
if(event_data_val.contains("collection") && !event_data_val["collection"].is_string()) {
return Option<bool>(400, "'collection' should be a string value.");
}

if(!event_data_val.contains("user_id") || !event_data_val["user_id"].is_string()) {
return Option<bool>(400, "event should have 'user_id' as string value.");
}

if(event_data_val.contains("q") && !event_data_val["q"].is_string()) {
return Option<bool>(500, "'q' should be a string value.");
return Option<bool>(400, "'q' should be a string value.");
}
}

Expand All @@ -72,7 +76,7 @@ Option<bool> EventManager::add_event(const nlohmann::json& event, const std::str
return Option<bool>(404, "event_type " + event_type + " not found.");
}
} else {
return Option<bool>(500, "`event_type` value should be string.");
return Option<bool>(400, "`event_type` value should be string.");
}

return Option(true);
Expand Down
6 changes: 6 additions & 0 deletions src/typesense_server_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,12 @@ int run_server(const Config & config, const std::string & version, void (*master
return 1;
}

if (config.get_enable_search_analytics() && !config.get_analytics_dir().empty() && !directory_exists(config.get_analytics_dir())) {
LOG(ERROR) << "Typesense failed to start. " << "Analytics directory " << config.get_analytics_dir()
<< " does not exist.";
return 1;
}

if(!config.get_master().empty()) {
LOG(ERROR) << "The --master option has been deprecated. Please use clustering for high availability. "
<< "Look for the --nodes configuration in the documentation.";
Expand Down
Loading

0 comments on commit cccba10

Please sign in to comment.