Skip to content

Commit

Permalink
Use wsong::perf timestamp logger.
Browse files Browse the repository at this point in the history
  • Loading branch information
songweijia committed Mar 28, 2024
1 parent de3150b commit 9ee1379
Show file tree
Hide file tree
Showing 14 changed files with 99 additions and 131 deletions.
8 changes: 4 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,9 @@ endif()
find_program(DOTNET_CMD dotnet)

# detect libwsong library, which is required for the mproc feature
find_package(libwsong QUIET)
if (libwsong_FOUND)
set(ENABLE_MPROC 1)
endif()
find_package(libwsong REQUIRED)

set(ENABLE_MPROC 1)

# Doxygen, optional to generate documentation HTML
find_package(Doxygen)
Expand Down Expand Up @@ -114,6 +113,7 @@ target_link_libraries(cascade
mutils::mutils
OpenSSL::Crypto
rpclib::rpc
libwsong::perf
${Hyperscan_LIBRARIES})
set_target_properties(cascade PROPERTIES
SOVERSION ${cascade_VERSION}
Expand Down
37 changes: 18 additions & 19 deletions include/cascade/detail/debug_util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,30 +67,29 @@ void make_workload(uint32_t payload_size, uint32_t num_distinct_objects, const K

#if __cplusplus > 201703L
// C++ 20
#define LOG_TIMESTAMP_BY_TAG(t, g, v, ...) \
if constexpr(std::is_base_of<IHasMessageID, std::decay_t<decltype(v)>>::value) { \
TimestampLogger::log(t, \
g->get_my_id(), \
dynamic_cast<const IHasMessageID*>(&(v))->get_message_id(), \
get_walltime() \
__VA_OPT__(, ) __VA_ARGS__); \
#define LOG_TIMESTAMP_BY_TAG(t, g, v, ...) \
if constexpr(std::is_base_of<IHasMessageID, std::decay_t<decltype(v)>>::value) { \
TimestampLogger::log(t, \
g->get_my_id(), \
dynamic_cast<const IHasMessageID*>(&(v))->get_message_id() \
__VA_OPT__(, ) __VA_ARGS__); \
}
#else
// C++ 17
#define LOG_TIMESTAMP_BY_TAG(t, g, v) \
if constexpr(std::is_base_of<IHasMessageID, std::decay_t<decltype(v)>>::value) { \
TimestampLogger::log(t, \
g->get_my_id(), \
dynamic_cast<const IHasMessageID*>(&(v))->get_message_id(), \
get_walltime()); \
#define LOG_TIMESTAMP_BY_TAG(t, g, v) \
if constexpr(std::is_base_of<IHasMessageID, std::decay_t<decltype(v)>>::value) { \
TimestampLogger::log(t, \
g->get_my_id(), \
dynamic_cast<const IHasMessageID*>(&(v))->get_message_id() \
); \
}

#define LOG_TIMESTAMP_BY_TAG_EXTRA(t, g, v, e) \
if constexpr(std::is_base_of<IHasMessageID, std::decay_t<decltype(v)>>::value) { \
TimestampLogger::log(t, \
g->get_my_id(), \
dynamic_cast<const IHasMessageID*>(&(v))->get_message_id(), \
get_walltime(), e); \
#define LOG_TIMESTAMP_BY_TAG_EXTRA(t, g, v, e) \
if constexpr(std::is_base_of<IHasMessageID, std::decay_t<decltype(v)>>::value) { \
TimestampLogger::log(t, \
g->get_my_id(), \
dynamic_cast<const IHasMessageID*>(&(v))->get_message_id(),\
e); \
}

#endif //__cplusplus > 201703L
Expand Down
10 changes: 10 additions & 0 deletions include/cascade/detail/persistent_store_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,17 @@ const VT PersistentCascadeStore<KT, VT, IK, IV, ST>::get(const KT& key, const pe
derecho::Replicated<PersistentCascadeStore>& subgroup_handle = group->template get_subgroup<PersistentCascadeStore>(this->subgroup_index);
requested_version = ver;
if(requested_version == CURRENT_VERSION) {
#if __cplusplus > 201703L
LOG_TIMESTAMP_BY_TAG(100001, group,*IV,ver);
#else
LOG_TIMESTAMP_BY_TAG_EXTRA(100001, group,*IV,ver);
#endif
requested_version = subgroup_handle.get_global_persistence_frontier();
#if __cplusplus > 201703L
LOG_TIMESTAMP_BY_TAG(100002, group,*IV,ver);
#else
LOG_TIMESTAMP_BY_TAG_EXTRA(100002, group,*IV,ver);
#endif
} else {
// The first condition test if requested_version is beyond the active latest atomic broadcast version.
// However, that could be true for a valid requested version for a new started setup, where the active
Expand Down
4 changes: 2 additions & 2 deletions include/cascade/detail/service_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Service<CascadeTypes...>::Service(const std::vector<DeserializationContext*>& ds
nullptr,
// persistent
[this](subgroup_id_t sgid, persistent::version_t ver){
TimestampLogger::log(TLT_PERSISTED,group->get_my_id(),0,get_walltime(),ver);
TimestampLogger::log(TLT_PERSISTED,group->get_my_id(),0,ver);
},
nullptr
#endif
Expand Down Expand Up @@ -147,7 +147,7 @@ std::unique_ptr<CascadeType> client_stub_factory() {

#ifdef ENABLE_EVALUATION
#define LOG_SERVICE_CLIENT_TIMESTAMP(tag,msgid) \
TimestampLogger::log(tag,this->get_my_id(),msgid,get_walltime());
TimestampLogger::log(tag,this->get_my_id(),msgid);
#else
#define LOG_SERVICE_CLIENT_TIMESTAMP(tag,msgid)
#endif
Expand Down
2 changes: 1 addition & 1 deletion include/cascade/service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ namespace cascade {
TimestampLogger::log(TLT_ACTION_FIRE_START,
0,
dynamic_cast<const IHasMessageID*>(value_ptr.get())->get_message_id(),
get_time_ns(), 0);
0);
dbg_default_trace("In {}: [worker_id={}] action is fired.", __PRETTY_FUNCTION__, worker_id);
(*ocdpo_ptr)(sender,key_string,prefix_length,version,value_ptr.get(),outputs,ctxt,worker_id);
}
Expand Down
74 changes: 38 additions & 36 deletions include/cascade/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <unordered_set>
#include <derecho/utils/time.h>
#include <cascade/config.h>
#include <wsong/perf/timing.h>

namespace derecho {
namespace cascade {
Expand Down Expand Up @@ -437,61 +438,62 @@ typedef union __attribute__((packed,aligned(8))) action_fire_extra_info {

#define CASCADE_TIMESTAMP_TAG_FILTER "CASCADE/timestamp_tag_enabler"

/**
* @class TimestampLogger utils.hpp "cascade/utils.hpp"
* @brief The timestamp logger tool.
*
* A wrapper class over the thread-safe timestamp logger in libwsong
*/
class TimestampLogger {
private:
std::vector<std::tuple<uint64_t,uint64_t,uint64_t,uint64_t,uint64_t>> _log;
pthread_spinlock_t lck;
std::unordered_set<uint64_t> tag_enabler;
/**
* Constructor
*/
TimestampLogger();
/**
* Log the timestamp
* @param tag timestamp tag
* @param node_id node id
* @param msg_id message id
* @param ts_ns timestamp in nanoseconds
* Only events with a tag included tag_enabler will be logged. Other events are dropped silently.
*/
void instance_log(uint64_t tag, uint64_t node_id, uint64_t msg_id, uint64_t ts_ns, uint64_t extra=0ull);
std::unordered_set<uint64_t> tag_enabler;
/**
* Flush log to file
* @param filename filename
* @param clear True for clear the log after flush
* @fn void instance_log(uint64_t, uint64_t, uint64_t, uint64_t)
* @brief Log an event
* @param[in] tag The event tag
* @param[in] node_id Node id
* @param[in] msg_id Message id
* @param[in] extra Optional extra information
*/
void instance_flush(const std::string& filename, bool clear = true);
/**
* Clear the log
*/
void instance_clear();
void instance_log(uint64_t tag, uint64_t node_id, uint64_t msg_id, uint64_t extra=0ull);

/** singleton */
/** The singleton logger */
static TimestampLogger _tl;

public:
/**
* Log the timestamp
* @param tag timestamp tag
* @param node_id node id
* @param msg_id message id
* @param ts_ns timestamp in nanoseconds
* @fn TimestampLogger()
* @brief Constructor
*/
TimestampLogger();
/**
* @fn void log(uint64_t, uint64_t, uint64_t, uint64_t)
* @brief Log an event
* @param[in] tag The event tag
* @param[in] node_id Node id
* @param[in] msg_id Message id
* @param[in] extra Optional extra information
*/
static inline void log(uint64_t tag, uint64_t node_id, uint64_t msg_id, uint64_t ts_ns=get_time_ns(), uint64_t extra=0ull) {
_tl.instance_log(tag,node_id,msg_id,ts_ns,extra);
static inline void log(uint64_t tag, uint64_t node_id, uint64_t msg_id, uint64_t extra=0ull) {
_tl.instance_log(tag,node_id,msg_id,extra);
}
/**
* Flush log to file
* @param filename filename
* @param clear True for clear the log after flush
* @fn void flush(const std::string&,bool)
* @brief Flush log to file.
* @param[in] filename The file name
*/
static inline void flush(const std::string& filename, bool clear = true) {
_tl.instance_flush(filename,clear);
static inline void flush(const std::string& filename) {
ws_timing_save(filename.c_str());
}
/**
* Clear the log
* @fn void clear()
* @brief Drop all event logs.
*/
static inline void clear() {
_tl.instance_clear();
ws_timing_clear();
}
};

Expand Down
6 changes: 3 additions & 3 deletions src/applications/standalone/dds/src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ static bool run_perftest(
}
}
#if !defined(USE_DDS_TIMESTAMP_LOG)
TimestampLogger::flush(topic+".publisher.log",true);
TimestampLogger::flush(topic+".publisher.log");
#endif

} else {
Expand All @@ -265,7 +265,7 @@ static bool run_perftest(
// ts_log.emplace_back(std::tuple{msg.seqno,msg.sending_ts_us,get_walltime()/1000});
ts_log.emplace_back(std::tuple{header->seqno,header->sending_ts_us,get_walltime()/1000});
#else
TimestampLogger::log(TLT_DDS_SUBSCRIBER_CALLED,-1,0,get_time_ns(),received);
TimestampLogger::log(TLT_DDS_SUBSCRIBER_CALLED,-1,0,received);
received ++;
#endif
// end of test
Expand Down Expand Up @@ -299,7 +299,7 @@ static bool run_perftest(
}
ofile.close();
#else
TimestampLogger::flush(topic+".subscriber.log",true);
TimestampLogger::flush(topic+".subscriber.log");
#endif
}

Expand Down
1 change: 0 additions & 1 deletion src/applications/tests/pipeline/pipeline_udl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ class PipelineOCDPO: public OffCriticalDataPathObserver {
TimestampLogger::log(TLT_PIPELINE(stage),
typed_ctxt->get_service_client_ref().get_my_id(),
value->get_message_id(),
get_walltime(),
worker_id+stage*10000);
#endif//ENABLE_EVALUATION
for (auto& okv:outputs) {
Expand Down
6 changes: 3 additions & 3 deletions src/service/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ target_include_directories(service PRIVATE
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}/include>
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
)
target_link_libraries(service derecho::derecho)
target_link_libraries(service derecho::derecho libwsong::perf)
add_dependencies(service udl_signature)

if (ENABLE_MPROC)
Expand All @@ -70,7 +70,7 @@ target_include_directories(server PRIVATE
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
$<BUILD_INTERFACE:${CMAKE_BINARY_DIR}>
)
target_link_libraries(server cascade dl pthread)
target_link_libraries(server cascade dl pthread libwsong::perf)
target_link_options(server PUBLIC -rdynamic)
set_target_properties(server PROPERTIES OUTPUT_NAME cascade_server)
add_custom_command(TARGET server POST_BUILD
Expand Down Expand Up @@ -103,7 +103,7 @@ target_include_directories(client PRIVATE
target_include_directories(client PUBLIC
$<BUILD_INTERFACE:${Readline_INCLUDE_DIRS}>
)
target_link_libraries(client cascade ${Readline_LIBRARIES} pthread)
target_link_libraries(client cascade ${Readline_LIBRARIES} pthread libwsong::perf)
if(ENABLE_EVALUATION)
target_link_libraries(client rpclib::rpc)
endif()
Expand Down
24 changes: 12 additions & 12 deletions src/service/perftest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ bool PerfTestServer::eval_put(uint64_t max_operation_per_second,
} else {
throw derecho_exception{"Evaluation requests an object to support IHasMessageID interface."};
}
TimestampLogger::log(TLT_READY_TO_SEND,this->capi.get_my_id(),message_id,get_walltime());
TimestampLogger::log(TLT_READY_TO_SEND,this->capi.get_my_id(),message_id);
if (subgroup_index == INVALID_SUBGROUP_INDEX ||
shard_index == INVALID_SHARD_INDEX) {
future_appender(this->capi.put(objects.at(now_ns%num_distinct_objects)));
Expand All @@ -142,7 +142,7 @@ bool PerfTestServer::eval_put(uint64_t max_operation_per_second,
future_appender,
this->capi.template put, objects.at(now_ns%num_distinct_objects), subgroup_index, shard_index);
}
TimestampLogger::log(TLT_EC_SENT,this->capi.get_my_id(),message_id,get_walltime());
TimestampLogger::log(TLT_EC_SENT,this->capi.get_my_id(),message_id);
message_id ++;
}
// wait for all pending futures.
Expand Down Expand Up @@ -179,7 +179,7 @@ bool PerfTestServer::eval_put_and_forget(uint64_t max_operation_per_second,
throw derecho_exception{"Evaluation requests an object to support IHasMessageID interface."};
}
// log time.
TimestampLogger::log(TLT_READY_TO_SEND,this->capi.get_my_id(),message_id,get_walltime());
TimestampLogger::log(TLT_READY_TO_SEND,this->capi.get_my_id(),message_id);
// send it
if (subgroup_index == INVALID_SUBGROUP_INDEX || shard_index == INVALID_SHARD_INDEX) {
this->capi.put_and_forget(objects.at(now_ns%num_distinct_objects));
Expand All @@ -188,7 +188,7 @@ bool PerfTestServer::eval_put_and_forget(uint64_t max_operation_per_second,
this->capi.template put_and_forget, objects.at(now_ns%num_distinct_objects), subgroup_index, shard_index);
}
// log time.
TimestampLogger::log(TLT_EC_SENT,this->capi.get_my_id(),message_id,get_walltime());
TimestampLogger::log(TLT_EC_SENT,this->capi.get_my_id(),message_id);
message_id ++;
}
return true;
Expand Down Expand Up @@ -223,14 +223,14 @@ bool PerfTestServer::eval_trigger_put(uint64_t max_operation_per_second,
throw derecho_exception{"Evaluation requests an object to support IHasMessageID interface."};
}
// log time.
TimestampLogger::log(TLT_READY_TO_SEND,this->capi.get_my_id(),message_id,get_walltime());
TimestampLogger::log(TLT_READY_TO_SEND,this->capi.get_my_id(),message_id);
if (subgroup_index == INVALID_SUBGROUP_INDEX || shard_index == INVALID_SHARD_INDEX) {
this->capi.trigger_put(objects.at(now_ns%num_distinct_objects));
} else {
on_subgroup_type_index(std::decay_t<decltype(capi)>::subgroup_type_order.at(subgroup_type_index),
this->capi.template trigger_put, objects.at(now_ns%num_distinct_objects), subgroup_index, shard_index);
}
TimestampLogger::log(TLT_EC_SENT,this->capi.get_my_id(),message_id,get_walltime());
TimestampLogger::log(TLT_EC_SENT,this->capi.get_my_id(),message_id);
message_id ++;
}

Expand Down Expand Up @@ -283,7 +283,7 @@ bool PerfTestServer::eval_get(int32_t log_depth,
for(auto& reply : replies) {
reply.second.get();
// This might not be an accurate time for when the query completed, depending on how long the thread waited to acquire the queue lock
TimestampLogger::log(TLT_EC_GET_FINISHED, my_node_id, message_id, get_walltime());
TimestampLogger::log(TLT_EC_GET_FINISHED, my_node_id, message_id);
break;
}
pending_futures.pop();
Expand Down Expand Up @@ -377,7 +377,7 @@ bool PerfTestServer::eval_get(int32_t log_depth,
};
std::size_t cur_object_index = now_ns % num_distinct_objects;
// NOTE: Setting the message ID on the object won't do anything because we're doing a Get, not a Put
TimestampLogger::log(TLT_READY_TO_SEND, my_node_id, message_id, get_walltime());
TimestampLogger::log(TLT_READY_TO_SEND, my_node_id, message_id);
// With either the object pool interface or the shard interface, further decide whether to request the current version or an old version
if(subgroup_index == INVALID_SUBGROUP_INDEX || shard_index == INVALID_SHARD_INDEX) {
if(log_depth == -1) {
Expand Down Expand Up @@ -405,7 +405,7 @@ bool PerfTestServer::eval_get(int32_t log_depth,
this->capi.template get, objects.at(cur_object_index).get_key_ref(), oldest_object_versions.at(cur_object_index), true, subgroup_index, shard_index);
}
}
TimestampLogger::log(TLT_EC_SENT, my_node_id, message_id, get_walltime());
TimestampLogger::log(TLT_EC_SENT, my_node_id, message_id);
message_id++;
}
dbg_default_info("eval_get: All messages sent, waiting for queries to complete");
Expand Down Expand Up @@ -459,7 +459,7 @@ bool PerfTestServer::eval_get_by_time(uint64_t ms_in_past,
// Get only the first reply
for(auto& reply : replies) {
reply.second.get();
TimestampLogger::log(TLT_EC_GET_FINISHED, my_node_id, message_id, get_walltime());
TimestampLogger::log(TLT_EC_GET_FINISHED, my_node_id, message_id);
break;
}
pending_futures.pop();
Expand Down Expand Up @@ -597,7 +597,7 @@ bool PerfTestServer::eval_get_by_time(uint64_t ms_in_past,
futures_cv.notify_one();
};
// NOTE: Setting the message ID on the object won't do anything because we're doing a Get, not a Put
TimestampLogger::log(TLT_READY_TO_SEND, my_node_id, message_id, get_walltime());
TimestampLogger::log(TLT_READY_TO_SEND, my_node_id, message_id);
if(subgroup_index == INVALID_SUBGROUP_INDEX || shard_index == INVALID_SHARD_INDEX) {
future_appender(this->capi.get_by_time(objects.at(object_to_request).get_key_ref(), timestamp_to_request));
} else {
Expand All @@ -606,7 +606,7 @@ bool PerfTestServer::eval_get_by_time(uint64_t ms_in_past,
future_appender,
this->capi.template get_by_time, objects.at(object_to_request).get_key_ref(), timestamp_to_request, true, subgroup_index, shard_index);
}
TimestampLogger::log(TLT_EC_SENT, my_node_id, message_id, get_walltime());
TimestampLogger::log(TLT_EC_SENT, my_node_id, message_id);
message_id++;
}
dbg_default_info("eval_get: All messages sent, waiting for queries to complete");
Expand Down
Loading

0 comments on commit 9ee1379

Please sign in to comment.