Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable overriding per-query working-memory target. #9544

Merged
merged 3 commits into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/common/sort/sort_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "duckdb/common/row_operations/row_operations.hpp"
#include "duckdb/common/sort/sort.hpp"
#include "duckdb/common/sort/sorted_block.hpp"
#include "duckdb/storage/buffer/buffer_pool.hpp"

#include <algorithm>
#include <numeric>
Expand Down Expand Up @@ -400,7 +401,7 @@ void GlobalSortState::PrepareMergePhase() {
idx_t total_heap_size =
std::accumulate(sorted_blocks.begin(), sorted_blocks.end(), (idx_t)0,
[](idx_t a, const unique_ptr<SortedBlock> &b) { return a + b->HeapSize(); });
if (external || (pinned_blocks.empty() && total_heap_size > 0.25 * buffer_manager.GetMaxMemory())) {
if (external || (pinned_blocks.empty() && total_heap_size > 0.25 * buffer_manager.GetQueryMaxMemory())) {
external = true;
}
// Use the data that we have to determine which partition size to use during the merge
Expand Down
2 changes: 1 addition & 1 deletion src/execution/operator/join/physical_hash_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class HashJoinLocalSinkState : public LocalSinkState {
unique_ptr<JoinHashTable> PhysicalHashJoin::InitializeHashTable(ClientContext &context) const {
auto result =
make_uniq<JoinHashTable>(BufferManager::GetBufferManager(context), conditions, build_types, join_type);
result->max_ht_size = double(0.6) * BufferManager::GetBufferManager(context).GetMaxMemory();
result->max_ht_size = double(0.6) * BufferManager::GetBufferManager(context).GetQueryMaxMemory();
if (!delim_types.empty() && join_type == JoinType::MARK) {
// correlated MARK join
if (delim_types.size() + 1 == conditions.size()) {
Expand Down
3 changes: 2 additions & 1 deletion src/execution/physical_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "duckdb/parallel/pipeline.hpp"
#include "duckdb/parallel/thread_context.hpp"
#include "duckdb/storage/buffer_manager.hpp"
#include "duckdb/storage/buffer/buffer_pool.hpp"

namespace duckdb {

Expand Down Expand Up @@ -121,7 +122,7 @@ unique_ptr<GlobalSinkState> PhysicalOperator::GetGlobalSinkState(ClientContext &
idx_t PhysicalOperator::GetMaxThreadMemory(ClientContext &context) {
// Memory usage per thread should scale with max mem / num threads
// We take 1/4th of this, to be conservative
idx_t max_memory = BufferManager::GetBufferManager(context).GetMaxMemory();
idx_t max_memory = BufferManager::GetBufferManager(context).GetQueryMaxMemory();
idx_t num_threads = TaskScheduler::GetScheduler(context).NumberOfThreads();
return (max_memory / num_threads) / 4;
}
Expand Down
3 changes: 2 additions & 1 deletion src/execution/radix_partitioned_hashtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "duckdb/main/config.hpp"
#include "duckdb/parallel/event.hpp"
#include "duckdb/planner/expression/bound_reference_expression.hpp"
#include "duckdb/storage/buffer/buffer_pool.hpp"

namespace duckdb {

Expand Down Expand Up @@ -331,7 +332,7 @@ bool MaybeRepartition(ClientContext &context, RadixHTGlobalSinkState &gstate, Ra

// Check if we're approaching the memory limit
const idx_t n_threads = TaskScheduler::GetScheduler(context).NumberOfThreads();
const idx_t limit = BufferManager::GetBufferManager(context).GetMaxMemory();
const idx_t limit = BufferManager::GetBufferManager(context).GetQueryMaxMemory();
const idx_t thread_limit = 0.6 * limit / n_threads;
if (ht.GetPartitionedData()->SizeInBytes() > thread_limit || context.config.force_external) {
if (gstate.config.SetRadixBitsToExternal()) {
Expand Down
2 changes: 1 addition & 1 deletion src/include/duckdb/main/database.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class DatabaseInstance : public std::enable_shared_from_this<DatabaseInstance> {
DBConfig config;

public:
BufferPool &GetBufferPool();
BufferPool &GetBufferPool() const;
DUCKDB_API SecretManager &GetSecretManager();
DUCKDB_API BufferManager &GetBufferManager();
DUCKDB_API DatabaseManager &GetDatabaseManager();
Expand Down
2 changes: 2 additions & 0 deletions src/include/duckdb/storage/buffer/buffer_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class BufferPool {

idx_t GetMaxMemory() const;

virtual idx_t GetQueryMaxMemory() const;

protected:
//! Evict blocks until the currently used memory + extra_memory fit, returns false if this was not possible
//! (i.e. not enough blocks could be evicted)
Expand Down
4 changes: 3 additions & 1 deletion src/include/duckdb/storage/buffer_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class BufferManager {
virtual unique_ptr<FileBuffer> ConstructManagedBuffer(idx_t size, unique_ptr<FileBuffer> &&source,
FileBufferType type = FileBufferType::MANAGED_BUFFER);
//! Get the underlying buffer pool responsible for managing the buffers
virtual BufferPool &GetBufferPool();
virtual BufferPool &GetBufferPool() const;

// Static methods
DUCKDB_API static BufferManager &GetBufferManager(DatabaseInstance &db);
Expand All @@ -69,6 +69,8 @@ class BufferManager {
static idx_t GetAllocSize(idx_t block_size) {
return AlignValue<idx_t, Storage::SECTOR_SIZE>(block_size + Storage::BLOCK_HEADER_SIZE);
}
//! Returns the maximum available memory for a given query
idx_t GetQueryMaxMemory() const;

protected:
virtual void PurgeQueue() = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/include/duckdb/storage/standard_buffer_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class StandardBufferManager : public BufferManager {
//! Garbage collect eviction queue
void PurgeQueue() final override;

BufferPool &GetBufferPool() final override;
BufferPool &GetBufferPool() const final override;

//! Write a temporary buffer to disk
void WriteTemporaryBuffer(block_id_t block_id, FileBuffer &buffer) final override;
Expand Down
2 changes: 1 addition & 1 deletion src/main/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ BufferManager &DatabaseInstance::GetBufferManager() {
return *buffer_manager;
}

BufferPool &DatabaseInstance::GetBufferPool() {
BufferPool &DatabaseInstance::GetBufferPool() const {
return *config.buffer_pool;
}

Expand Down
3 changes: 3 additions & 0 deletions src/storage/buffer/buffer_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ idx_t BufferPool::GetUsedMemory() const {
idx_t BufferPool::GetMaxMemory() const {
return maximum_memory;
}
idx_t BufferPool::GetQueryMaxMemory() const {
return GetMaxMemory();
}

BufferPool::EvictionResult BufferPool::EvictBlocks(idx_t extra_memory, idx_t memory_limit,
unique_ptr<FileBuffer> *buffer) {
Expand Down
8 changes: 7 additions & 1 deletion src/storage/buffer_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "duckdb/common/allocator.hpp"
#include "duckdb/common/exception.hpp"
#include "duckdb/common/file_buffer.hpp"
#include "duckdb/storage/buffer/buffer_pool.hpp"
#include "duckdb/storage/standard_buffer_manager.hpp"

namespace duckdb {
Expand Down Expand Up @@ -37,7 +38,7 @@ const string &BufferManager::GetTemporaryDirectory() {
throw InternalException("This type of BufferManager does not allow a temporary directory");
}

BufferPool &BufferManager::GetBufferPool() {
BufferPool &BufferManager::GetBufferPool() const {
throw InternalException("This type of BufferManager does not have a buffer pool");
}

Expand All @@ -53,6 +54,11 @@ bool BufferManager::HasTemporaryDirectory() const {
return false;
}

//! Returns the maximum available memory for a given query
idx_t BufferManager::GetQueryMaxMemory() const {
return GetBufferPool().GetQueryMaxMemory();
}

unique_ptr<FileBuffer> BufferManager::ConstructManagedBuffer(idx_t size, unique_ptr<FileBuffer> &&source,
FileBufferType type) {
throw NotImplementedException("This type of BufferManager can not construct managed buffers");
Expand Down
2 changes: 1 addition & 1 deletion src/storage/standard_buffer_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ StandardBufferManager::StandardBufferManager(DatabaseInstance &db, string tmp)
StandardBufferManager::~StandardBufferManager() {
}

BufferPool &StandardBufferManager::GetBufferPool() {
BufferPool &StandardBufferManager::GetBufferPool() const {
return buffer_pool;
}

Expand Down
Loading