Skip to content

Commit

Permalink
Remove legacy plasma unlimited and pull manager pinning flag (ray-pro…
Browse files Browse the repository at this point in the history
  • Loading branch information
ericl authored Aug 12, 2021
1 parent 63f9ba2 commit ce171f1
Show file tree
Hide file tree
Showing 25 changed files with 48 additions and 404 deletions.
14 changes: 0 additions & 14 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -962,20 +962,6 @@ cc_test(
],
)

cc_test(
name = "allocator_test",
srcs = [
"src/ray/object_manager/plasma/test/allocator_test.cc",
],
copts = COPTS,
deps = [
":plasma_store_server_lib",
"@boost//:filesystem",
"@com_google_absl//absl/strings:str_format",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "fallback_allocator_test",
srcs = [
Expand Down
3 changes: 0 additions & 3 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1529,9 +1529,6 @@ cdef class CoreWorker:

return resources_dict

def plasma_unlimited(self):
return RayConfig.instance().plasma_unlimited()

def profile_event(self, c_string event_type, object extra_data=None):
if RayConfig.instance().enable_timeline():
return ProfileEvent.make(
Expand Down
2 changes: 0 additions & 2 deletions python/ray/includes/ray_config.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,4 @@ cdef extern from "ray/common/ray_config.h" nogil:

c_bool enable_timeline() const

c_bool plasma_unlimited() const

uint32_t max_grpc_message_size() const
4 changes: 0 additions & 4 deletions python/ray/includes/ray_config.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,6 @@ cdef class Config:
def enable_timeline():
return RayConfig.instance().enable_timeline()

@staticmethod
def plasma_unlimited():
return RayConfig.instance().plasma_unlimited()

@staticmethod
def max_grpc_message_size():
return RayConfig.instance().max_grpc_message_size()
37 changes: 0 additions & 37 deletions python/ray/tests/test_failure_4.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import ray

import numpy as np
import pytest

from ray.cluster_utils import Cluster
Expand All @@ -11,42 +10,6 @@
run_string_as_driver)


def test_fill_object_store_exception(shutdown_only):
ray.init(
num_cpus=2,
object_store_memory=10**8,
_system_config={"automatic_object_spilling_enabled": False})

if ray.worker.global_worker.core_worker.plasma_unlimited():
return # No exception is raised.

@ray.remote
def expensive_task():
return np.zeros((10**8) // 10, dtype=np.uint8)

with pytest.raises(ray.exceptions.RayTaskError) as e:
ray.get([expensive_task.remote() for _ in range(20)])
with pytest.raises(ray.exceptions.ObjectStoreFullError):
raise e.as_instanceof_cause()

@ray.remote
class LargeMemoryActor:
def some_expensive_task(self):
return np.zeros(10**8 + 2, dtype=np.uint8)

def test(self):
return 1

actor = LargeMemoryActor.remote()
with pytest.raises(ray.exceptions.RayTaskError):
ray.get(actor.some_expensive_task.remote())
# Make sure actor does not die
ray.get(actor.test.remote())

with pytest.raises(ray.exceptions.ObjectStoreFullError):
ray.put(np.zeros(10**8 + 2, dtype=np.uint8))


def test_connect_with_disconnected_node(shutdown_only):
config = {
"num_heartbeats_timeout": 50,
Expand Down
8 changes: 1 addition & 7 deletions python/ray/tests/test_object_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,12 +283,7 @@ def driver():
remote_ref = put.remote()

ready, _ = ray.wait([remote_ref], timeout=30)

if ray.worker.global_worker.core_worker.plasma_unlimited():
# Sadly, the test cannot work in this mode.
assert len(ready) == 1
else:
assert len(ready) == 0
assert len(ready) == 1

del local_ref

Expand Down Expand Up @@ -336,7 +331,6 @@ def foo(*args):
ray.get(tasks)


# Will hang if RAY_pull_manager_pin_active_objects=0 due to an eviction loop.
def test_pull_bundles_pinning(shutdown_only):
cluster = Cluster()
object_size = int(50e6)
Expand Down
13 changes: 2 additions & 11 deletions python/ray/tests/test_object_spilling.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,7 @@ def test_default_config(shutdown_only):
"object_store_full_delay_ms": 100
})
assert "object_spilling_config" not in ray.worker._global_node._config
if ray.worker.global_worker.core_worker.plasma_unlimited():
run_basic_workload()
else:
with pytest.raises(ray.exceptions.ObjectStoreFullError):
run_basic_workload()
run_basic_workload()
ray.shutdown()

# Make sure when we use a different config, it is reflected.
Expand Down Expand Up @@ -170,12 +166,7 @@ def test_spilling_not_done_for_pinned_object(object_spilling_config,
})
arr = np.random.rand(5 * 1024 * 1024) # 40 MB
ref = ray.get(ray.put(arr)) # noqa
# Since the ref exists, it should raise OOM.
if ray.worker.global_worker.core_worker.plasma_unlimited():
ref2 = ray.put(arr) # noqa
else:
with pytest.raises(ray.exceptions.ObjectStoreFullError):
ref2 = ray.put(arr) # noqa
ref2 = ray.put(arr) # noqa

wait_for_condition(lambda: is_dir_empty(temp_folder))
assert_no_thrashing(address["redis_address"])
Expand Down
5 changes: 1 addition & 4 deletions python/ray/tests/test_plasma_unlimited.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@


def _init_ray():
return ray.init(
num_cpus=2,
object_store_memory=700e6,
_system_config={"plasma_unlimited": True})
return ray.init(num_cpus=2, object_store_memory=700e6)


def _check_spilled_mb(address, spilled=None, restored=None, fallback=None):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
base_image: "anyscale/ray-ml:pinned-nightly-py37"
env_vars: {"RAY_plasma_unlimited": "1", "RAY_oom_grace_period_s": "2"}
debian_packages: []

python:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
base_image: "anyscale/ray-ml:pinned-nightly-py37"
env_vars: {"RAY_scheduler_hybrid_threshold": "0", "RAY_plasma_unlimited": "1", "RAY_oom_grace_period_s": "2"}
env_vars: {}
debian_packages: []

python:
Expand Down
16 changes: 1 addition & 15 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,20 +95,6 @@ RAY_CONFIG(bool, lineage_pinning_enabled, false)
/// See also: https://github.com/ray-project/ray/issues/14182
RAY_CONFIG(bool, preallocate_plasma_memory, false)

/// Whether to never raise OOM. Instead, we fallback to allocating from the filesystem
/// in /tmp, creating a new file per object. This degrades performance since filesystem
/// backed objects are written to disk, but allows Ray to operate with degraded
/// performance instead of crashing. Note that memory admission control is still in play,
/// so Ray will still do its best to avoid running out of memory (i.e., via throttling and
/// spilling).
RAY_CONFIG(bool, plasma_unlimited, true)

/// DEBUG-ONLY: Min number of pulls to keep active. Only supports values {0, 1}.
RAY_CONFIG(int, pull_manager_min_active_pulls, 1)

/// DEBUG-ONLY: Whether to exclude actively pulled objects from spilling and eviction.
RAY_CONFIG(bool, pull_manager_pin_active_objects, true)

/// Whether to use the hybrid scheduling policy, or one of the legacy spillback
/// strategies. In the hybrid scheduling strategy, leases are packed until a threshold,
/// then spread via weighted (by critical resource usage).
Expand Down Expand Up @@ -469,4 +455,4 @@ RAY_CONFIG(uint64_t, resource_broadcast_batch_size, 512);
RAY_CONFIG(bool, worker_resource_limits_enabled, false)

/// ServerCall instance number of each RPC service handler
RAY_CONFIG(int64_t, gcs_max_active_rpcs_per_handler, 100)
RAY_CONFIG(int64_t, gcs_max_active_rpcs_per_handler, 100)
14 changes: 4 additions & 10 deletions src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,10 @@ ObjectManager::ObjectManager(
if (available_memory < 0) {
available_memory = 0;
}
pull_manager_.reset(new PullManager(
self_node_id_, object_is_local, send_pull_request, cancel_pull_request,
restore_spilled_object_, get_time, config.pull_timeout_ms, available_memory,
[spill_objects_callback, object_store_full_callback]() {
// TODO(swang): This copies the out-of-memory handling in the
// CreateRequestQueue. It would be nice to unify these.
object_store_full_callback();
static_cast<void>(spill_objects_callback());
},
pin_object));
pull_manager_.reset(new PullManager(self_node_id_, object_is_local, send_pull_request,
cancel_pull_request, restore_spilled_object_,
get_time, config.pull_timeout_ms, available_memory,
pin_object));
// Start object manager rpc server and send & receive request threads
StartRpcService();
}
Expand Down
33 changes: 5 additions & 28 deletions src/ray/object_manager/plasma/create_request_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,29 +64,8 @@ std::pair<PlasmaObject, PlasmaError> CreateRequestQueue::TryRequestImmediately(
PlasmaObject result = {};

// Immediately fulfill it using the fallback allocator.
if (RayConfig::instance().plasma_unlimited()) {
PlasmaError error = create_callback(/*fallback_allocator=*/true, &result,
/*spilling_required=*/nullptr);
return {result, error};
}

if (!queue_.empty()) {
// There are other requests queued. Return an out-of-memory error
// immediately because this request cannot be served.
return {result, PlasmaError::OutOfMemory};
}

auto req_id = AddRequest(object_id, client, create_callback, object_size);
if (!ProcessRequests().ok()) {
// If the request was not immediately fulfillable, finish it.
if (!queue_.empty()) {
// Some errors such as a transient OOM error doesn't finish the request, so we
// should finish it here.
FinishRequest(queue_.begin());
}
}
PlasmaError error;
RAY_CHECK(GetRequestResult(req_id, &result, &error));
PlasmaError error = create_callback(/*fallback_allocator=*/true, &result,
/*spilling_required=*/nullptr);
return {result, error};
}

Expand Down Expand Up @@ -139,11 +118,9 @@ Status CreateRequestQueue::ProcessRequests() {
RAY_LOG(DEBUG) << "In grace period before fallback allocation / oom.";
return Status::ObjectStoreFull("Waiting for grace period.");
} else {
if (plasma_unlimited_) {
// Trigger the fallback allocator.
status = ProcessRequest(/*fallback_allocator=*/true, *request_it,
/*spilling_required=*/nullptr);
}
// Trigger the fallback allocator.
status = ProcessRequest(/*fallback_allocator=*/true, *request_it,
/*spilling_required=*/nullptr);
if (!status.ok()) {
std::string dump = "";
if (dump_debug_info_callback_ && !logged_oom) {
Expand Down
7 changes: 1 addition & 6 deletions src/ray/object_manager/plasma/create_request_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,11 @@ class CreateRequestQueue {
ray::SpillObjectsCallback spill_objects_callback,
std::function<void()> trigger_global_gc,
std::function<int64_t()> get_time,
std::function<std::string()> dump_debug_info_callback = nullptr,
bool plasma_unlimited = RayConfig::instance().plasma_unlimited())
std::function<std::string()> dump_debug_info_callback = nullptr)
: oom_grace_period_ns_(oom_grace_period_s * 1e9),
spill_objects_callback_(spill_objects_callback),
trigger_global_gc_(trigger_global_gc),
get_time_(get_time),
plasma_unlimited_(plasma_unlimited),
dump_debug_info_callback_(dump_debug_info_callback) {}

/// Add a request to the queue. The caller should use the returned request ID
Expand Down Expand Up @@ -177,9 +175,6 @@ class CreateRequestQueue {
/// A callback to return the current time.
const std::function<int64_t()> get_time_;

/// Whether to use the fallback allocator when out of memory.
bool plasma_unlimited_;

/// Sink for debug info.
const std::function<std::string()> dump_debug_info_callback_;

Expand Down
21 changes: 3 additions & 18 deletions src/ray/object_manager/plasma/plasma_allocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,13 @@ absl::optional<Allocation> BuildAllocation(void *addr, size_t size) {

PlasmaAllocator::PlasmaAllocator(const std::string &plasma_directory,
const std::string &fallback_directory,
bool hugepage_enabled, int64_t footprint_limit,
bool fallback_enabled)
bool hugepage_enabled, int64_t footprint_limit)
: kFootprintLimit(footprint_limit),
kAlignment(kAllocationAlignment),
kFallbackEnabled(fallback_enabled),
allocated_(0),
fallback_allocated_(0) {
internal::SetDLMallocConfig(plasma_directory, fallback_directory, hugepage_enabled,
fallback_enabled);
/*fallback_enabled=*/true);
RAY_CHECK(kFootprintLimit > kDlMallocReserved)
<< "Footprint limit has to be greater than " << kDlMallocReserved;
auto allocation = Allocate(kFootprintLimit - kDlMallocReserved);
Expand All @@ -94,15 +92,6 @@ PlasmaAllocator::PlasmaAllocator(const std::string &plasma_directory,
}

absl::optional<Allocation> PlasmaAllocator::Allocate(size_t bytes) {
if (!kFallbackEnabled) {
// We only check against the footprint limit in when fallback allocation is disabled.
// In fallback disabled mode: the check is done here; dlmemalign never returns
// nullptr. In fallback enabled mode: dlmemalign returns nullptr once the initial
// /dev/shm block fills.
if (allocated_ + static_cast<int64_t>(bytes) > kFootprintLimit) {
return absl::nullopt;
}
}
RAY_LOG(DEBUG) << "allocating " << bytes;
void *mem = dlmemalign(kAlignment, bytes);
RAY_LOG(DEBUG) << "allocated " << bytes << " at " << mem;
Expand All @@ -114,9 +103,6 @@ absl::optional<Allocation> PlasmaAllocator::Allocate(size_t bytes) {
}

absl::optional<Allocation> PlasmaAllocator::FallbackAllocate(size_t bytes) {
if (!kFallbackEnabled) {
return absl::nullopt;
}
// Forces allocation as a separate file.
RAY_CHECK(dlmallopt(M_MMAP_THRESHOLD, 0));
RAY_LOG(DEBUG) << "fallback allocating " << bytes;
Expand All @@ -142,8 +128,7 @@ void PlasmaAllocator::Free(Allocation allocation) {
RAY_LOG(DEBUG) << "deallocating " << allocation.size << " at " << allocation.address;
dlfree(allocation.address);
allocated_ -= allocation.size;
if (RayConfig::instance().plasma_unlimited() &&
internal::IsOutsideInitialAllocation(allocation.address)) {
if (internal::IsOutsideInitialAllocation(allocation.address)) {
fallback_allocated_ -= allocation.size;
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/ray/object_manager/plasma/plasma_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class PlasmaAllocator : public IAllocator {
public:
PlasmaAllocator(const std::string &plasma_directory,
const std::string &fallback_directory, bool hugepage_enabled,
int64_t footprint_limit, bool fallback_enabled);
int64_t footprint_limit);

/// On linux, it allocates memory from a pre-mmapped file from /dev/shm.
/// On other system, it allocates memory from a pre-mmapped file on disk.
Expand Down Expand Up @@ -82,7 +82,6 @@ class PlasmaAllocator : public IAllocator {
private:
const int64_t kFootprintLimit;
const size_t kAlignment;
const bool kFallbackEnabled;
int64_t allocated_;
// TODO(scv119): once we refactor object_manager this no longer
// need to be atomic.
Expand Down
4 changes: 0 additions & 4 deletions src/ray/object_manager/plasma/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,6 @@ const LocalObject *PlasmaStore::CreateObjectInternal(const ray::ObjectInfo &obje
}
}

if (!RayConfig::instance().plasma_unlimited()) {
RAY_LOG(DEBUG) << "Fallback allocation is not enabled.";
return nullptr;
}
if (!allow_fallback_allocation) {
RAY_LOG(DEBUG) << "Fallback allocation not enabled for this request.";
return nullptr;
Expand Down
3 changes: 0 additions & 3 deletions src/ray/object_manager/plasma/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,6 @@ class PlasmaStore {
// created by the object manager.
int64_t num_bytes_in_use =
static_cast<int64_t>(num_bytes_in_use_ - object_store_.GetNumBytesUnsealed());
if (!RayConfig::instance().plasma_unlimited()) {
RAY_CHECK(allocator_.GetFootprintLimit() >= num_bytes_in_use);
}
size_t available = 0;
if (num_bytes_in_use < allocator_.GetFootprintLimit()) {
available = allocator_.GetFootprintLimit() - num_bytes_in_use;
Expand Down
5 changes: 2 additions & 3 deletions src/ray/object_manager/plasma/store_runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,8 @@ void PlasmaStoreRunner::Start(ray::SpillObjectsCallback spill_objects_callback,
RAY_LOG(DEBUG) << "starting server listening on " << socket_name_;
{
absl::MutexLock lock(&store_runner_mutex_);
allocator_ = std::make_unique<PlasmaAllocator>(
plasma_directory_, fallback_directory_, hugepages_enabled_, system_memory_,
RayConfig::instance().plasma_unlimited());
allocator_ = std::make_unique<PlasmaAllocator>(plasma_directory_, fallback_directory_,
hugepages_enabled_, system_memory_);
store_.reset(new PlasmaStore(main_service_, *allocator_, socket_name_,
RayConfig::instance().object_store_full_delay_ms(),
RayConfig::instance().object_spilling_threshold(),
Expand Down
Loading

0 comments on commit ce171f1

Please sign in to comment.