Skip to content


[Classes] Add buffersync class
Browse files Browse the repository at this point in the history
  • Loading branch information
JuanCarlosgg committed May 9, 2024
1 parent 0d70af6 commit bd4d7c1
Show file tree
Hide file tree
Showing 4 changed files with 310 additions and 55 deletions.
305 changes: 305 additions & 0 deletions classes/buffersync/buffersync.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
// Created by juancarlos on 09/05/24.
// This class is a generic container for a thread-safe synced producer-consumer
// buffer (queue like but with random access support) used to transfer data
// between threads. It can be used to synchronize multiple data sources in one
// consumer using a timestamp. For example, between the main thread of a
// component and the (threaded) middleware stubs Example of Buffer creation with
// default converters between input and output types:
// decl: BufferSync<InOut<RoboCompLaser::TLaserData,
// RoboCompLaser::TLaserData>, InOut<std::string, std::string>> buffer;
// Internally it creates a queue for each data source.
// use:
// auto timestamp = get_timestamp();
// buffer.put<0>(std::move(laserData), timestamp); // inserts the
// laser data value to the queue 0. buffer.put<1>("foo", timestamp);
// // inserts the laser data value to the queue 1.
// use: auto [laser, str] = buffer.take(timestamp) // consume the nearest
// values to a timestamp.
// auto [laser, str] = buffer.take(timestamp, max_diff) // consume
// the nearest values to a timestamp given a max_difference. auto
// [laser, str] = buffer.take_first() // consumes the first elements
// of the queues. It doesn't check difference
// //Advanced use: can return unexpected values if not handled
// carefully.
// //between timestamps, so results may be wrong if there are missing
// values in some of the queues. auto [laser, str] =
// buffer.take_last() // consumes the first elements of the queues.
// It can return inconsistent values if some
// //queue was not populated with the last timestamp.
// auto [laser, str] = buffer.take_last(max_diff) // consumes the
// first elements of the queues. It only consumes the last element
// //from queues when the difference between the last timestamp and
// the last element of the queues in less than `max_diff`.
// //It is also possible to use the functions to retrieve elements
// from specific queues only. auto str = buffer.take<0>(timestamp);
// //Only returns the element from the InOut<std::string, std::string>
// queue. The other queues
// //(InOut<RoboCompLaser::TLaserData, RoboCompLaser::TLaserData>)
// still has a value.
// Every take operation consumes the value from the queue, returns an optional
// and allows passing a max_time_diff to consider two values part of the same
// time group. Example of Buffer creation with user-defined converter from input
// to output types
// decl: BufferSync<InOut<RoboCompLaser::TLaserData,
// RoboCompLaser::TLaserData>> laser_buffer; use:
// laser_buffer.put<0>(std::move(laserData), [](auto &&I, auto &T){
// for(auto &&i , I){ T.append(i/2);}});

#pragma once

#include <atomic>
#include <functional>
#include <future>
#include <iomanip>
#include <iostream>
#include <limits>
#include <mutex>
#include <optional>
#include <queue>
#include <ranges>
#include <shared_mutex>
#include <string>
#include <type_traits>
#include <vector>

#include "threadpool.h"

using namespace std::chrono_literals;

// Función utilizada como argumento por defecto.
constexpr auto empty_fn = [](auto &&I, auto &T) {};

template <typename T, typename = void> struct is_iterable : std::false_type {};
template <typename T>
// especialización del template si tiene función begin y end.
struct is_iterable<T, std::void_t<decltype(std::declval<T>().begin()),
: std::true_type {};

template <typename T>
concept printable = requires(T t) {
{ std::cout << t } -> std::same_as<std::ostream &>;

template <typename _I, typename _O /*, void(*Fn)(_I&&, _O&) fn */>
struct InOut {
typedef _I I;
typedef _O O;

template <class... DBs> class BufferSync {
template <typename T> using pair_t_time = std::pair<T, size_t>;
template <typename T> using deque_db_t = std::deque<pair_t_time<T>>;

static constexpr size_t DBs_size = sizeof...(DBs);

std::tuple<deque_db_t<typename DBs::O>...> _out;
std::array<size_t, DBs_size> last_write;

mutable std::shared_mutex bufferMutex;
std::atomic_bool empty;
ThreadPool worker;
size_t queue_size;

BufferSync() : worker(1), queue_size(5) {};
BufferSync(size_t size) : worker(1), queue_size(size) {};

~BufferSync() {};

auto take_first() -> std::tuple<std::optional<typename DBs::O>...> {
constexpr auto seq = std::make_index_sequence<DBs_size>{};
return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
return take_first<Is...>();

template <size_t... idx> auto take_first() {

auto ret = subtuple<idx...>();

if (empty.load()) {
return ret;
std::unique_lock lock(bufferMutex);

[](auto &q, auto &r) {
if (!q.empty()) {
r = std::move(q.front().first);
}(std::get<idx>(_out), std::get<idx>(ret)),

if ((std::get<idx>(_out).empty() && ...)) {;

return ret;

auto take_last(size_t max_diff = std::numeric_limits<size_t>::max())
-> std::tuple<std::optional<typename DBs::O>...> {
constexpr auto seq = std::make_index_sequence<DBs_size>{};
return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
return take_last<Is...>(max_diff);

template <size_t... idx>
auto take_last(size_t max_diff = std::numeric_limits<size_t>::max()) {

auto ret = subtuple<idx...>();

if (empty.load()) {
return ret;
std::unique_lock lock(bufferMutex);

size_t max = *std::max_element(last_write.begin(), last_write.end());
[max, max_diff](auto &q, auto &r) {
if (!q.empty() && (max - q.back().second < max_diff)) {
r = std::move(q.back().first);
}(std::get<idx>(_out), std::get<idx>(ret)),

if ((std::get<idx>(_out).empty() && ...)) {;

return ret;

auto take(size_t timestamp,
size_t max_diff = std::numeric_limits<size_t>::max())
-> std::tuple<std::optional<typename DBs::O>...> {
constexpr auto seq = std::make_index_sequence<DBs_size>{};
return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
return take<Is...>(timestamp, max_diff);

template <size_t... idx>
auto take(size_t timestamp,
size_t max_diff = std::numeric_limits<size_t>::max()) {
auto ret = subtuple<idx...>();

if (empty.load()) {
return ret;

std::unique_lock lock(bufferMutex);

size_t dropped = 0;
[timestamp, max_diff, &dropped](auto &q, auto &r) {
std::vector<size_t> diffs;
std::transform(q.begin(), q.end(), std::back_inserter(diffs),
[timestamp](auto &val) {
return std::abs(static_cast<ssize_t>(val.second) -
auto it_idx = std::min(diffs.begin(), diffs.end()) - diffs.begin();
auto it = q.begin() + it_idx;
if (it != q.end() && timestamp - it->second <= max_diff) {
r = std::move(it->first);
}(std::get<idx>(_out), std::get<idx>(ret)),

if ((std::get<idx>(_out).empty() && ...)) {;

return ret;

template <size_t idx, typename InOut = std::remove_cvref_t<
bool put(typename InOut::I &&d, size_t timestamp,
std::function<void(typename InOut::I &&, typename InOut::O &)> t =
empty_fn) {

[this, d = std::move(d), t = std::move(t), timestamp]() mutable {
typename InOut::O temp;
this->ItoO(std::move(d), temp, t);
std::unique_lock lock(this->bufferMutex);
last_write[idx] =
std::get<idx>(_out).emplace_back(std::move(temp), timestamp);
if (std::get<idx>(_out).size() > queue_size) {
return true;

void show()
requires(printable<typename DBs::O> && ...)
std::cout << "--------------------------------------------------\n";
for (auto i : std::views::iota(0, (int)queue_size)) {
constexpr auto seq = std::make_index_sequence<DBs_size>{};
[&]<std::size_t... Is>(std::index_sequence<Is...>) {
std::cout << "Element: " << i << "\n";
std::cout << "idx: |" << std::setw(15) << "val:" << " | "
<< std::setw(12) << "timestamp:" << "\n";
std::cout << "--------------------------------------------------\n";
[&]<size_t idx>() {
if (i < std::get<idx>(_out).size()) {
auto &[f, s] = std::get<idx>(_out)[i];
std::cout << std::setw(4) << idx << " | " << std::setw(14) << f
<< " | " << std::setw(15) << s << "\n";
} else {
std::cout << std::setw(4) << idx << " | " << std::setw(14)
<< " empty" << " |\n";
}.template operator()<Is>(),
std::cout << "--------------------------------------------------\n";

template <std::size_t... Is> constexpr auto subtuple() {
constexpr std::tuple<std::optional<typename DBs::O>...> tuple{};
return std::make_tuple(std::get<Is>(tuple)...);

template <typename I, typename O>
void ItoO(I &&iTypeData, O &oTypeData,
const std::function<void(I &&, O &)> &t = empty_fn) {
if constexpr (std::is_same<I, O>::value ||
std::is_convertible<I, O>::value) {
oTypeData = std::move(iTypeData);
} else if constexpr (is_iterable<I>::value && is_iterable<O>::value) {
using I_T = typename std::decay<decltype(*iTypeData.begin())>::type;
using O_T = typename std::decay<decltype(*oTypeData.begin())>::type;
if constexpr (std::is_convertible<I_T, O_T>::value) {
oTypeData = O(std::make_move_iterator(iTypeData.begin()),
} else {
static_assert(!std::is_same<decltype(t), decltype(empty_fn)>::value,
"A function needs to be implemented to transform ItoO");
t(std::move(iTypeData), oTypeData);
} else {
static_assert(!std::is_same<decltype(t), decltype(empty_fn)>::value,
"A function needs to be implemented to transform ItoO");
t(std::move(iTypeData), oTypeData);
58 changes: 4 additions & 54 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ FROM ${base_image}:${base_tag} AS dependencies-version
ARG DEBIAN_FRONTEND=noninteractive
ARG branch
# DEFAULT LD_LIBRARY_PATH vaiables from the nvidia/opengl image confuses pyside2 about the qt5 Library paths
ENV LD_LIBRARY_PATH="/lib:/usr/lib"
RUN export LD_LIBRARY_PATH="/lib:/usr/lib"
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates \
curl \
Expand Down Expand Up @@ -37,10 +37,7 @@ ENV PATH=$PATH:/opt/robocomp/bin/:'/home/robolab/.local/bin'
ENV ROBOCOMP=/home/robolab/robocomp
RUN export ROBOCOMP=/home/robolab/robocomp
RUN export PATH=$PATH:/opt/robocomp/bin/:'/home/robolab/.local/bin'
#RUN sudo pip3 install /home/robolab/robocomp/tools/cli/
#RUN cd robocomp && rcconfig init && mkdir build && cd build && cmake .. && make -j$(nproc) && sudo make install
#RUN sudo sh -c "echo '/opt/robocomp/lib/' >> /etc/"
#RUN sudo ldconfig

WORKDIR robocomp
RUN git annex get \
files/innermodel/simpleworld.xml \
Expand All @@ -56,59 +53,12 @@ USER root
RUN apt update && apt-get install --no-install-recommends --yes \
USER robolab
RUN cd /home/robolab/robocomp && mkdir -p build && cd build && cmake -DFCL_SUPPORT=True .. && make -j$(nproc)
RUN cd /home/robolab/robocomp && mkdir -p build && cd build && cmake -DFCL_SUPPORT=True .. && make
USER root
RUN cd /home/robolab/robocomp/build && make install
USER robolab

FROM fcl-version AS dsr-version
USER root
RUN apt-get update && apt-get install -y --no-install-recommends \
g++-10 \
gcc-10 \
libeigen3-dev \
python3-dev \
python3-pybind11 \
libqglviewer-dev-qt5 \
libasio-dev \
libtinyxml2-dev \
libqt5svg5-dev \
&& rm -rf /var/lib/apt/lists/*
RUN update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-10 1
RUN update-alternatives --install /usr/bin/g++ g++ /usr/bin/g++-10 1
RUN python3 -m pip install pyrep

# ICE fix
COPY Connection.h /usr/include/Ice/Connection.h

### Third party dependencies
RUN (curl -O && mkdir -p /usr/include/threadpool && mv threadpool.h /usr/include/threadpool )
RUN curl -S | bash -x
RUN rm -r /usr/local/include/cppitertools
RUN git clone /usr/local/include/cppitertools
USER robolab

FROM dsr-version AS dsr-components-version
USER root
RUN apt-get update && apt-get install -y --no-install-recommends \
libopencv-dev \
python3-zeroc-ice \
psmisc \
&& rm -rf /var/lib/apt/lists/*
#Compilation of DSR components
USER robolab
WORKDIR /home/robolab/robocomp/components/
RUN git clone --progress
WORKDIR /home/robolab/robocomp/components/dsr-graph/components/
RUN cd idserver && cmake . && make -j10
RUN cd pioneer_dsr && cmake . && make -j10
RUN cd path_follower && cmake . && make -j10
RUN cd path_planner_astar && cmake . && make -j10
RUN cd elastic_band && cmake . && make -j10

FROM ${robocomp_version}-version AS final
ARG robocomp_version
RUN echo "Built ${robocomp_version}"
Empty file modified docker/
100644 → 100755
Empty file.
2 changes: 1 addition & 1 deletion tools/rcnode/
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
yaku rcnode > /dev/null 2>&1
icebox --Ice.Config=/opt/robocomp/etc/rcnode.conf
icebox --Ice.Config=/home/robocomp/robocomp/tools/rcnode/rcnode.conf

0 comments on commit bd4d7c1

Please sign in to comment.