Skip to content

Commit

Permalink
YT-17689: Move TFreeList to library/cpp/yt/memory
Browse files Browse the repository at this point in the history
Iteration no. 2. First one reverted due to YT-18997
  • Loading branch information
aleexf committed May 10, 2023
1 parent c0f7a1f commit 7918b11
Show file tree
Hide file tree
Showing 22 changed files with 312 additions and 200 deletions.
113 changes: 113 additions & 0 deletions contrib/libs/cxxsupp/libcxx/include/latch
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// -*- C++ -*-
//===----------------------------------------------------------------------===//
//
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//===----------------------------------------------------------------------===//

#ifndef _LIBCPP_LATCH
#define _LIBCPP_LATCH

/*
latch synopsis
namespace std
{
class latch
{
public:
static constexpr ptrdiff_t max() noexcept;
constexpr explicit latch(ptrdiff_t __expected);
~latch();
latch(const latch&) = delete;
latch& operator=(const latch&) = delete;
void count_down(ptrdiff_t __update = 1);
bool try_wait() const noexcept;
void wait() const;
void arrive_and_wait(ptrdiff_t __update = 1);
private:
ptrdiff_t __counter; // exposition only
};
}
*/

#include <__availability>
#include <__config>
#include <atomic>
#include <version>

#if !defined(_LIBCPP_HAS_NO_PRAGMA_SYSTEM_HEADER)
# pragma GCC system_header
#endif

#ifdef _LIBCPP_HAS_NO_THREADS
# error <latch> is not supported on this single threaded system
#endif

_LIBCPP_PUSH_MACROS
#include <__undef_macros>

#if _LIBCPP_STD_VER >= 14

_LIBCPP_BEGIN_NAMESPACE_STD

class latch
{
__atomic_base<ptrdiff_t> __a;

public:
static constexpr ptrdiff_t max() noexcept {
return numeric_limits<ptrdiff_t>::max();
}

inline _LIBCPP_INLINE_VISIBILITY
constexpr explicit latch(ptrdiff_t __expected) : __a(__expected) { }

~latch() = default;
latch(const latch&) = delete;
latch& operator=(const latch&) = delete;

inline _LIBCPP_AVAILABILITY_SYNC _LIBCPP_INLINE_VISIBILITY
void count_down(ptrdiff_t __update = 1)
{
auto const __old = __a.fetch_sub(__update, memory_order_release);
if(__old == __update)
__a.notify_all();
}
inline _LIBCPP_INLINE_VISIBILITY
bool try_wait() const noexcept
{
return 0 == __a.load(memory_order_acquire);
}
inline _LIBCPP_AVAILABILITY_SYNC _LIBCPP_INLINE_VISIBILITY
void wait() const
{
auto const __test_fn = [=]() -> bool {
return try_wait();
};
__cxx_atomic_wait(&__a.__a_, __test_fn);
}
inline _LIBCPP_AVAILABILITY_SYNC _LIBCPP_INLINE_VISIBILITY
void arrive_and_wait(ptrdiff_t __update = 1)
{
count_down(__update);
wait();
}
};

_LIBCPP_END_NAMESPACE_STD

#endif // _LIBCPP_STD_VER >= 14

_LIBCPP_POP_MACROS

#endif //_LIBCPP_LATCH
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,96 @@ namespace NYT {

////////////////////////////////////////////////////////////////////////////////

// DCAS is supported in Clang with option -mcx16, is not supported in GCC. See following links.
// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=84522
// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=80878

template <class T1, class T2>
Y_FORCE_INLINE bool CompareAndSet(
TAtomicUint128* atomic,
T1& expected1,
T2& expected2,
T1 new1,
T2 new2)
{
#if defined(__x86_64__)
bool success;
__asm__ __volatile__
(
"lock cmpxchg16b %1\n"
"setz %0"
: "=q"(success)
, "+m"(*atomic)
, "+a"(expected1)
, "+d"(expected2)
: "b"(new1)
, "c"(new2)
: "cc"
);
return success;
#elif defined(__arm64__) || (defined(__aarch64__) && defined(RTE_ARM_FEATURE_ATOMICS))
register ui64 x0 __asm("x0") = (ui64)expected1;
register ui64 x1 __asm("x1") = (ui64)expected2;
register ui64 x2 __asm("x2") = (ui64)new1;
register ui64 x3 __asm("x3") = (ui64)new2;
ui64 old1 = (ui64)expected1;
ui64 old2 = (ui64)expected2;
asm volatile
(
#if defined(RTE_CC_CLANG)
".arch armv8-a+lse\n"
#endif
"caspal %[old0], %[old1], %[upd0], %[upd1], [%[dst]]"
: [old0] "+r" (x0)
, [old1] "+r" (x1)
: [upd0] "r" (x2)
, [upd1] "r" (x3)
, [dst] "r" (atomic)
: "memory"
);
expected1 = (T1)x0;
expected2 = (T2)x1;
return x0 == old1 && x1 == old2;
#elif defined(__aarch64__)
ui64 exp1 = reinterpret_cast<ui64>(expected1);
ui64 exp2 = reinterpret_cast<ui64>(expected2);
ui32 fail = 0;

do {
ui64 current1 = 0;
ui64 current2 = 0;
asm volatile (
"ldaxp %[cur1], %[cur2], [%[src]]"
: [cur1] "=r" (current1)
, [cur2] "=r" (current2)
: [src] "r" (atomic)
: "memory"
);

if (current1 != exp1 || current2 != exp2) {
expected1 = reinterpret_cast<T1>(current1);
expected2 = reinterpret_cast<T2>(current2);
return false;
}

asm volatile (
"stlxp %w[fail], %[new1], %[new2], [%[dst]]"
: [fail] "=&r" (fail)
: [new1] "r" (new1)
, [new2] "r" (new2)
, [dst] "r" (atomic)
: "memory"
);

} while (Y_UNLIKELY(fail));
return true;
#else
# error Unsupported platform
#endif
}

////////////////////////////////////////////////////////////////////////////////

template <class TItem>
TFreeList<TItem>::THead::THead(TItem* pointer)
: Pointer(pointer)
Expand Down
72 changes: 72 additions & 0 deletions library/cpp/yt/memory/free_list.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#pragma once

#include "public.h"

#include <atomic>

namespace NYT {

////////////////////////////////////////////////////////////////////////////////

template <class T>
struct TFreeListItemBase
{
std::atomic<T*> Next = nullptr;
};

using TAtomicUint128 = volatile unsigned __int128 __attribute__((aligned(16)));

template <class TItem>
class TFreeList
{
private:
struct THead
{
std::atomic<TItem*> Pointer = nullptr;
std::atomic<size_t> Epoch = 0;

THead() = default;

explicit THead(TItem* pointer);

};

union
{
THead Head_;
TAtomicUint128 AtomicHead_;
};

// Avoid false sharing.
char Padding[CacheLineSize - sizeof(TAtomicUint128)];

public:
TFreeList();

TFreeList(TFreeList&& other);

~TFreeList();

template <class TPredicate>
bool PutIf(TItem* head, TItem* tail, TPredicate predicate);

void Put(TItem* head, TItem* tail);

void Put(TItem* item);

TItem* Extract();

TItem* ExtractAll();

bool IsEmpty() const;

void Append(TFreeList& other);
};

////////////////////////////////////////////////////////////////////////////////

} // namespace NYT

#define FREE_LIST_INL_H_
#include "free_list-inl.h"
#undef FREE_LIST_INL_H_
3 changes: 3 additions & 0 deletions library/cpp/yt/memory/public.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ namespace NYT {

////////////////////////////////////////////////////////////////////////////////

// TODO(babenko): consider increasing to 128 due to cache line pairing in L2 prefetcher.
constexpr size_t CacheLineSize = 64;

class TChunkedMemoryPool;

DECLARE_REFCOUNTED_STRUCT(IMemoryChunkProvider)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
#include <yt/yt/core/test_framework/framework.h>
#include <library/cpp/testing/gtest/gtest.h>

#include <yt/yt/core/concurrency/count_down_latch.h>

#include <yt/yt/core/misc/free_list.h>
#include <library/cpp/yt/memory/free_list.h>

#include <util/random/random.h>

#include <thread>
#include <stack>
#include <latch>

namespace NYT {
namespace {
Expand Down Expand Up @@ -50,7 +49,7 @@ struct TTestItem
ui64 Value = 0;
ui64 IndexInSet = 0;
// Avoid false sharing.
char Padding[NThreading::CacheLineSize - 2 * sizeof(ui64)];
char Padding[CacheLineSize - 2 * sizeof(ui64)];
};

class TTestItemSet
Expand Down Expand Up @@ -100,7 +99,7 @@ TEST_P(TFreeListStressTest, Stress)

TFreeList<TTestItem> list;

NConcurrency::TCountDownLatch start(params.Threads);
std::latch start(params.Threads);

std::atomic<bool> running{true};
std::atomic<ui64> put{0};
Expand All @@ -110,8 +109,7 @@ TEST_P(TFreeListStressTest, Stress)
for (ui64 i = 0; i < params.Threads; ++i) {
auto itemSet = TTestItemSet::Allocate(params.MaxBatchSize);
workers.emplace_back([&, params, itemSet = std::move(itemSet)]() mutable {
start.CountDown();
start.Wait();
start.arrive_and_wait();

while (running.load(std::memory_order::relaxed)) {
// Push batch of items.
Expand Down Expand Up @@ -150,7 +148,6 @@ INSTANTIATE_TEST_SUITE_P(
TFreeListTest,
TFreeListStressTest,
testing::Values(
TTestConfig{2, 2, 15s},
TTestConfig{4, 1, 15s},
TTestConfig{4, 3, 15s},
TTestConfig{4, 5, 15s}));
Expand Down
4 changes: 0 additions & 4 deletions library/cpp/yt/threading/public.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,9 @@ namespace NYT::NThreading {

////////////////////////////////////////////////////////////////////////////////

// TODO(babenko): consider increasing to 128 due to cache line pairing in L2 prefetcher.
constexpr size_t CacheLineSize = 64;

#define YT_DECLARE_SPIN_LOCK(type, name) \
type name{__LOCATION__}

////////////////////////////////////////////////////////////////////////////////

} // namespace NYT::NThreading

2 changes: 2 additions & 0 deletions library/cpp/yt/threading/rw_spin_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include "public.h"
#include "spin_lock_base.h"

#include <library/cpp/yt/memory/public.h>

#include <util/system/rwlock.h>

#include <atomic>
Expand Down
3 changes: 2 additions & 1 deletion library/cpp/yt/threading/spin_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#include <library/cpp/yt/system/thread_id.h>

#include <library/cpp/yt/memory/public.h>

#include <util/system/src_location.h>
#include <util/system/types.h>

Expand Down Expand Up @@ -78,4 +80,3 @@ class TPaddedSpinLock
#define SPIN_LOCK_INL_H_
#include "spin_lock-inl.h"
#undef SPIN_LOCK_INL_H_

Loading

0 comments on commit 7918b11

Please sign in to comment.