Skip to content

Commit

Permalink
fix: zenoh and iox on serializatio_length (AimRT#116)
Browse files Browse the repository at this point in the history
* fix zenoh and iox on serializatio_length

* fix

---------

Co-authored-by: hanjun <hanjun@agibot.com>
  • Loading branch information
owny990312 and hanjun authored Dec 3, 2024
1 parent 4bfe31f commit 356ddd7
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 50 deletions.
16 changes: 10 additions & 6 deletions src/plugins/iceoryx_plugin/iceoryx_channel_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ bool IceoryxChannelBackend::Subscribe(
msg = static_cast<const char*>(payload);

// fetch a data packet of a specified length
util::ConstBufferOperator buf_oper(msg + kFixedLen, std::stoi(std::string(msg, kFixedLen)));
util::ConstBufferOperator buf_oper_tmp(msg, 4);
uint32_t pkg_size_with_len = buf_oper_tmp.GetUint32();

util::ConstBufferOperator buf_oper(msg + 4, pkg_size_with_len);

// get serialization type
std::string serialization_type(buf_oper.GetString(util::BufferLenType::kUInt8));
Expand Down Expand Up @@ -234,7 +237,7 @@ void IceoryxChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrap
iox_pub_loaned_shm_ptr = loan_result.value();

// write info pkg on loaned shm : the first FIXED_LEN bytes needs to write the length of pkg
util::BufferOperator buf_oper(reinterpret_cast<char*>(iox_pub_loaned_shm_ptr) + kFixedLen, loan_size - kFixedLen);
util::BufferOperator buf_oper(reinterpret_cast<char*>(iox_pub_loaned_shm_ptr) + 4, loan_size - 4);

// write serialization type on loaned shm
buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8);
Expand All @@ -248,7 +251,7 @@ void IceoryxChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrap
auto type_and_ctx_len = 1 + serialization_type.size() + context_meta_kv_size;

// write msg on loaned shm: should start at the (FIXED_LEN + type_and_ctx_len)-th byte
aimrt::util::IceoryxBufferArrayAllocator iox_allocator(buf_oper.GetRemainingSize(), static_cast<char*>(iox_pub_loaned_shm_ptr) + type_and_ctx_len + kFixedLen);
aimrt::util::IceoryxBufferArrayAllocator iox_allocator(buf_oper.GetRemainingSize(), static_cast<char*>(iox_pub_loaned_shm_ptr) + type_and_ctx_len + 4);
if (buffer_array_cache_ptr == nullptr) {
try {
auto result = SerializeMsgSupportedIceoryx(msg_wrapper, serialization_type, aimrt::util::BufferArrayAllocatorRef(iox_allocator.NativeHandle()));
Expand All @@ -262,7 +265,7 @@ void IceoryxChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrap
if (msg_size > buf_oper.GetRemainingSize()) {
// in this case means the msg has serialization cache but the size is too large, then expand suitable size
is_shm_enough = false;
iox_pub_shm_size_map_[iceoryx_pub_topic] = msg_size + type_and_ctx_len + kFixedLen;
iox_pub_shm_size_map_[iceoryx_pub_topic] = msg_size + type_and_ctx_len + 4;

} else {
// in this case means the msg has serialization cache and the size is suitable, then use cachema
Expand All @@ -287,7 +290,7 @@ void IceoryxChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrap

// if has cache, the copy it to shm to replace the serialization
if (buffer_array_cache_ptr != nullptr) {
char* strat_pos = static_cast<char*>(iox_pub_loaned_shm_ptr) + kFixedLen + context_meta_kv_size + serialization_type.size() + 1;
char* strat_pos = static_cast<char*>(iox_pub_loaned_shm_ptr) + 4 + context_meta_kv_size + serialization_type.size() + 1;
for (size_t ii = 0; ii < buffer_array_cache_ptr->Size(); ++ii) {
std::memcpy(strat_pos, buffer_array_cache_ptr.get()[ii].Data()->data, buffer_array_cache_ptr.get()[ii].Data()->len);
strat_pos += buffer_array_cache_ptr.get()[ii].Data()->len;
Expand All @@ -297,7 +300,8 @@ void IceoryxChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrap
}

// write info pkg length on loaned shm
std::memcpy(static_cast<char*>(iox_pub_loaned_shm_ptr), IntToFixedLengthString(1 + serialization_type.size() + context_meta_kv_size + msg_size, kFixedLen).c_str(), kFixedLen);
uint32_t data_size = 1 + serialization_type.size() + context_meta_kv_size + msg_size;
util::SetBufFromUint32(reinterpret_cast<char*>(iox_pub_loaned_shm_ptr), data_size);

iox_pub->publish(iox_pub_loaned_shm_ptr);
}
Expand Down
6 changes: 0 additions & 6 deletions src/plugins/iceoryx_plugin/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,6 @@ std::string GetPid() {
return process_id_str;
}

std::string IntToFixedLengthString(int number, int length) {
std::ostringstream oss;
oss << std::setw(length) << number;
return oss.str();
}

std::pair<std::shared_ptr<aimrt::util::BufferArrayView>, size_t> SerializeMsgSupportedIceoryx(
MsgWrapper& msg_wrapper, std::string_view serialization_type, aimrt::util::BufferArrayAllocatorRef allocator) {
auto& serialization_cache = msg_wrapper.serialization_cache;
Expand Down
3 changes: 0 additions & 3 deletions src/plugins/iceoryx_plugin/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@ namespace aimrt::plugins::iceoryx_plugin {

using IdString_t = iox::capro::IdString_t;

constexpr unsigned int kFixedLen = 20; // FIXED_LEN represents the length of the pkg_size's string, which is enough to the max value of uint64_t
constexpr uint64_t kIoxShmInitSize = 1024; // default vaule of shm_init_size for iceoryx

iox::capro::ServiceDescription Url2ServiceDescription(std::string& url);

std::string GetPid();

std::string IntToFixedLengthString(int number, int length);

using namespace aimrt::runtime::core::channel;
std::pair<std::shared_ptr<aimrt::util::BufferArrayView>, size_t> SerializeMsgSupportedIceoryx(
MsgWrapper& msg_wrapper, std::string_view serialization_type, aimrt::util::BufferArrayAllocatorRef allocator);
Expand Down
7 changes: 0 additions & 7 deletions src/plugins/zenoh_plugin/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include "core/rpc/rpc_invoke_wrapper.h"

namespace aimrt::plugins::zenoh_plugin {
constexpr unsigned int kFixedLen = 20; // FIXED_LEN represents the length of the pkg_size's string, which is enough to the max value of uint64_t

inline std::pair<std::shared_ptr<aimrt::util::BufferArrayView>, size_t> SerializeMsgSupportedZenoh(
runtime::core::channel::MsgWrapper& msg_wrapper, std::string_view serialization_type, aimrt::util::BufferArrayAllocatorRef allocator) {
Expand Down Expand Up @@ -71,10 +70,4 @@ inline std::pair<std::shared_ptr<aimrt::util::BufferArrayView>, size_t> Serializ
return {nullptr, buffer_array_ptr->BufferSize()};
}

inline std::string IntToFixedLengthString(int number, int length) {
std::ostringstream oss;
oss << std::setw(length) << number;
return oss.str();
}

} // namespace aimrt::plugins::zenoh_plugin
21 changes: 13 additions & 8 deletions src/plugins/zenoh_plugin/zenoh_channel_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,10 @@ bool ZenohChannelBackend::Subscribe(
// read data from payload
auto ret = z_bytes_reader_read(&reader, reinterpret_cast<uint8_t*>(serialized_data.data()), serialized_size);
if (ret >= 0) {
util::ConstBufferOperator buf_oper(serialized_data.data() + kFixedLen, std::stoi(std::string(serialized_data.data(), kFixedLen)));
util::ConstBufferOperator buf_oper_tmp(serialized_data.data(), 4);
uint32_t serialized_size_with_len = buf_oper_tmp.GetUint32();

util::ConstBufferOperator buf_oper(serialized_data.data() + 4, serialized_size_with_len);

// get serialization type
std::string serialization_type(buf_oper.GetString(util::BufferLenType::kUInt8));
Expand Down Expand Up @@ -268,7 +271,7 @@ void ZenohChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrappe
z_pub_loaned_shm_ptr = z_shm_mut_data_mut(z_loan_mut(loan_result.buf));

// write info pkg on loaned shm : the first FIXED_LEN bytes needs to write the length of pkg
util::BufferOperator buf_oper(reinterpret_cast<char*>(z_pub_loaned_shm_ptr) + kFixedLen, loan_size - kFixedLen);
util::BufferOperator buf_oper(reinterpret_cast<char*>(z_pub_loaned_shm_ptr) + 4, loan_size - 4);

// write serialization type on loaned shm
buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8);
Expand All @@ -281,7 +284,7 @@ void ZenohChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrappe
auto type_and_ctx_len = 1 + serialization_type.size() + context_meta_kv_size;

// write msg on loaned shm: should start at the (FIXED_LEN + type_and_ctx_len)-th byte
aimrt::util::ZenohBufferArrayAllocator z_allocator(buf_oper.GetRemainingSize(), z_pub_loaned_shm_ptr + type_and_ctx_len + kFixedLen);
aimrt::util::ZenohBufferArrayAllocator z_allocator(buf_oper.GetRemainingSize(), z_pub_loaned_shm_ptr + type_and_ctx_len + 4);
if (buffer_array_cache_ptr == nullptr) {
try {
auto result = SerializeMsgSupportedZenoh(msg_wrapper, serialization_type, aimrt::util::BufferArrayAllocatorRef(z_allocator.NativeHandle()));
Expand All @@ -294,7 +297,7 @@ void ZenohChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrappe
if (msg_size > buf_oper.GetRemainingSize()) {
// in this case means the msg has serialization cache but the size is too large, then expand suitable size
is_shm_loan_size_enough = false;
z_pub_shm_size_map_[zenoh_pub_topic] = msg_size + type_and_ctx_len + kFixedLen;
z_pub_shm_size_map_[zenoh_pub_topic] = msg_size + type_and_ctx_len + 4;
} else {
// in this case means the msg has serialization cache and the size is suitable, then use cachema
is_shm_loan_size_enough = true;
Expand All @@ -320,7 +323,7 @@ void ZenohChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrappe
if (is_shm_pool_size_enough) {
// if has cache, the copy it to shm to replace the serialization
if (buffer_array_cache_ptr != nullptr) {
unsigned char* strat_pos = z_pub_loaned_shm_ptr + kFixedLen + context_meta_kv_size + serialization_type.size() + 1;
unsigned char* strat_pos = z_pub_loaned_shm_ptr + 4 + context_meta_kv_size + serialization_type.size() + 1;
for (size_t ii = 0; ii < buffer_array_cache_ptr->Size(); ++ii) {
std::memcpy(strat_pos, buffer_array_cache_ptr.get()[ii].Data()->data, buffer_array_cache_ptr.get()[ii].Data()->len);
strat_pos += buffer_array_cache_ptr.get()[ii].Data()->len;
Expand All @@ -329,7 +332,9 @@ void ZenohChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrappe
buffer_array_cache_ptr = nullptr;
}
// write info pkg length on loaned shm
std::memcpy(z_pub_loaned_shm_ptr, IntToFixedLengthString(1 + serialization_type.size() + context_meta_kv_size + msg_size, kFixedLen).c_str(), kFixedLen);
uint32_t data_size = 1 + serialization_type.size() + context_meta_kv_size + msg_size;
util::SetBufFromUint32(reinterpret_cast<char*>(z_pub_loaned_shm_ptr), data_size);

z_owned_bytes_t z_payload;
if (loan_result.status == ZC_BUF_LAYOUT_ALLOC_STATUS_OK) {
z_bytes_from_shm_mut(&z_payload, z_move(loan_result.buf));
Expand Down Expand Up @@ -359,14 +364,14 @@ void ZenohChannelBackend::Publish(runtime::core::channel::MsgWrapper& msg_wrappe
size_t msg_size = buffer_array_view_ptr->BufferSize();

int32_t data_size = 1 + serialization_type.size() + context_meta_kv_size + msg_size;
int32_t pkg_size = data_size + kFixedLen;
int32_t pkg_size = data_size + 4;

// create buffer for serialization
std::vector<char> serialized_data(pkg_size);
util::BufferOperator buf_oper(serialized_data.data(), pkg_size);

// full data_size
buf_oper.SetBuffer(IntToFixedLengthString(data_size, kFixedLen).c_str(), kFixedLen);
buf_oper.SetUint32(data_size);

// full serialization_type
buf_oper.SetString(serialization_type, util::BufferLenType::kUInt8);
Expand Down
Loading

0 comments on commit 356ddd7

Please sign in to comment.