Skip to content

Commit

Permalink
submit local_delta memory usage when need to pass memory_tracker betw…
Browse files Browse the repository at this point in the history
…een threads (pingcap#4450)

close pingcap#4449
  • Loading branch information
windtalker authored Mar 31, 2022
1 parent 5727db7 commit 6ed39f5
Showing 5 changed files with 84 additions and 47 deletions.
115 changes: 71 additions & 44 deletions dbms/src/Common/MemoryTracker.cpp
Original file line number Diff line number Diff line change
@@ -61,7 +61,7 @@ void MemoryTracker::logPeakMemoryUsage() const
LOG_FMT_DEBUG(getLogger(), "Peak memory usage{}: {}.", (description ? " " + std::string(description) : ""), formatReadableSizeWithBinarySuffix(peak));
}

void MemoryTracker::alloc(Int64 size)
void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
{
/** Using memory_order_relaxed means that if allocations are done simultaneously,
* we allow exception about memory limit exceeded to be thrown only on next allocation.
@@ -72,48 +72,51 @@ void MemoryTracker::alloc(Int64 size)
if (!next.load(std::memory_order_relaxed))
CurrentMetrics::add(metric, size);

Int64 current_limit = limit.load(std::memory_order_relaxed);

/// Using non-thread-safe random number generator. Joint distribution in different threads would not be uniform.
/// In this case, it doesn't matter.
if (unlikely(fault_probability && drand48() < fault_probability))
if (check_memory_limit)
{
free(size);

DB::FmtBuffer fmt_buf;
fmt_buf.append("Memory tracker");
if (description)
fmt_buf.fmtAppend(" {}", description);
fmt_buf.fmtAppend(": fault injected. Would use {} (attempt to allocate chunk of {} bytes), maximum: {}",
formatReadableSizeWithBinarySuffix(will_be),
size,
formatReadableSizeWithBinarySuffix(current_limit));

throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded);
}
Int64 current_limit = limit.load(std::memory_order_relaxed);

if (unlikely(current_limit && will_be > current_limit))
{
free(size);
/// Using non-thread-safe random number generator. Joint distribution in different threads would not be uniform.
/// In this case, it doesn't matter.
if (unlikely(fault_probability && drand48() < fault_probability))
{
free(size);

DB::FmtBuffer fmt_buf;
fmt_buf.append("Memory tracker");
if (description)
fmt_buf.fmtAppend(" {}", description);
fmt_buf.fmtAppend(": fault injected. Would use {} (attempt to allocate chunk of {} bytes), maximum: {}",
formatReadableSizeWithBinarySuffix(will_be),
size,
formatReadableSizeWithBinarySuffix(current_limit));

throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded);
}

if (unlikely(current_limit && will_be > current_limit))
{
free(size);

DB::FmtBuffer fmt_buf;
fmt_buf.append("Memory limit");
if (description)
fmt_buf.fmtAppend(" {}", description);
DB::FmtBuffer fmt_buf;
fmt_buf.append("Memory limit");
if (description)
fmt_buf.fmtAppend(" {}", description);

fmt_buf.fmtAppend(" exceeded: would use {} (attempt to allocate chunk of {} bytes), maximum: {}",
formatReadableSizeWithBinarySuffix(will_be),
size,
formatReadableSizeWithBinarySuffix(current_limit));
fmt_buf.fmtAppend(" exceeded: would use {} (attempt to allocate chunk of {} bytes), maximum: {}",
formatReadableSizeWithBinarySuffix(will_be),
size,
formatReadableSizeWithBinarySuffix(current_limit));

throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded);
throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded);
}
}

if (will_be > peak.load(std::memory_order_relaxed)) /// Races doesn't matter. Could rewrite with CAS, but not worth.
peak.store(will_be, std::memory_order_relaxed);

if (auto * loaded_next = next.load(std::memory_order_relaxed))
loaded_next->alloc(size);
loaded_next->alloc(size, check_memory_limit);
}


@@ -176,21 +179,22 @@ static thread_local Int64 local_delta{};

__attribute__((always_inline)) inline void checkSubmitAndUpdateLocalDelta(Int64 updated_local_delta)
{
if (unlikely(updated_local_delta > MEMORY_TRACER_SUBMIT_THRESHOLD))
if (current_memory_tracker)
{
if (current_memory_tracker)
if (unlikely(updated_local_delta > MEMORY_TRACER_SUBMIT_THRESHOLD))
{
current_memory_tracker->alloc(updated_local_delta);
local_delta = 0;
}
else if (unlikely(updated_local_delta < -MEMORY_TRACER_SUBMIT_THRESHOLD))
{
if (current_memory_tracker)
local_delta = 0;
}
else if (unlikely(updated_local_delta < -MEMORY_TRACER_SUBMIT_THRESHOLD))
{
current_memory_tracker->free(-updated_local_delta);
local_delta = 0;
}
else
{
local_delta = updated_local_delta;
local_delta = 0;
}
else
{
local_delta = updated_local_delta;
}
}
}

@@ -199,6 +203,29 @@ void disableThreshold()
MEMORY_TRACER_SUBMIT_THRESHOLD = 0;
}

void submitLocalDeltaMemory()
{
if (current_memory_tracker)
{
try
{
if (local_delta > 0)
{
current_memory_tracker->alloc(local_delta, false);
}
else if (local_delta < 0)
{
current_memory_tracker->free(-local_delta);
}
}
catch (...)
{
DB::tryLogCurrentException("MemoryTracker", "Failed when try to submit local delta memory");
}
}
local_delta = 0;
}

void alloc(Int64 size)
{
checkSubmitAndUpdateLocalDelta(local_delta + size);
3 changes: 2 additions & 1 deletion dbms/src/Common/MemoryTracker.h
Original file line number Diff line number Diff line change
@@ -58,7 +58,7 @@ class MemoryTracker

/** Call the following functions before calling of corresponding operations with memory allocators.
*/
void alloc(Int64 size);
void alloc(Int64 size, bool check_memory_limit = true);

void realloc(Int64 old_size, Int64 new_size) { alloc(new_size - old_size); }

@@ -110,6 +110,7 @@ extern thread_local MemoryTracker * current_memory_tracker;
namespace CurrentMemoryTracker
{
void disableThreshold();
void submitLocalDeltaMemory();
void alloc(Int64 size);
void realloc(Int64 old_size, Int64 new_size);
void free(Int64 size);
3 changes: 3 additions & 0 deletions dbms/src/Common/MemoryTrackerSetter.h
Original file line number Diff line number Diff line change
@@ -30,12 +30,15 @@ class MemoryTrackerSetter : private boost::noncopyable
: enable(enable_)
, old_memory_tracker(current_memory_tracker)
{
CurrentMemoryTracker::submitLocalDeltaMemory();
if (enable)
current_memory_tracker = memory_tracker;
}

~MemoryTrackerSetter()
{
/// submit current local delta memory if the memory tracker is leaving current thread
CurrentMemoryTracker::submitLocalDeltaMemory();
current_memory_tracker = old_memory_tracker;
}

5 changes: 4 additions & 1 deletion dbms/src/Common/ThreadFactory.h
Original file line number Diff line number Diff line change
@@ -35,7 +35,10 @@ class ThreadFactory
template <typename F, typename... Args>
static std::thread newThread(bool propagate_memory_tracker, String thread_name, F && f, Args &&... args)
{
auto memory_tracker = current_memory_tracker;
/// submit current local delta memory if the memory tracker needs to be propagated to other threads
if (propagate_memory_tracker)
CurrentMemoryTracker::submitLocalDeltaMemory();
auto * memory_tracker = current_memory_tracker;
auto wrapped_func = [propagate_memory_tracker, memory_tracker, thread_name = std::move(thread_name), f = std::move(f)](auto &&... args) {
UPDATE_CUR_AND_MAX_METRIC(tiflash_thread_count, type_total_threads_of_raw, type_max_threads_of_raw);
MemoryTrackerSetter setter(propagate_memory_tracker, memory_tracker);
5 changes: 4 additions & 1 deletion dbms/src/Common/wrapInvocable.h
Original file line number Diff line number Diff line change
@@ -21,7 +21,10 @@ namespace DB
template <typename Func, typename... Args>
inline auto wrapInvocable(bool propagate_memory_tracker, Func && func, Args &&... args)
{
auto memory_tracker = current_memory_tracker;
/// submit current local delta memory if the memory tracker needs to be propagated to other threads
if (propagate_memory_tracker)
CurrentMemoryTracker::submitLocalDeltaMemory();
auto * memory_tracker = current_memory_tracker;

// capature our task into lambda with all its parameters
auto capture = [propagate_memory_tracker,

0 comments on commit 6ed39f5

Please sign in to comment.