Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(memory): Implement shared memory state across Relay #3821

Merged
merged 40 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
90fb599
feat(memory): Implement shared memory state across Relay
iambriccardo Jul 15, 2024
b0ca49d
feat(memory): Add a way to track memory usage
iambriccardo Jul 16, 2024
7c8d357
Fix
iambriccardo Jul 16, 2024
cc4b577
Fix
iambriccardo Jul 16, 2024
c449dfd
Fix
iambriccardo Jul 16, 2024
2dc4e13
Fix
iambriccardo Jul 16, 2024
6bfd213
Fix
iambriccardo Jul 16, 2024
584a868
Fix
iambriccardo Jul 16, 2024
e7608a8
Remove usage of floats
iambriccardo Jul 16, 2024
75a3ffc
Improve
iambriccardo Jul 16, 2024
1069847
Improve
iambriccardo Jul 16, 2024
719bea7
Improve
iambriccardo Jul 16, 2024
45b35ee
Improve
iambriccardo Jul 16, 2024
6a8be96
Improve
iambriccardo Jul 17, 2024
f4a7d39
fi
iambriccardo Jul 17, 2024
8ceac2e
Fix
iambriccardo Jul 17, 2024
cc0596d
Fix
iambriccardo Jul 18, 2024
1d3b3e8
Fix
iambriccardo Jul 18, 2024
fa67244
Fix
iambriccardo Jul 18, 2024
08a6e85
Fix
iambriccardo Jul 18, 2024
bd9e159
Fix
iambriccardo Jul 18, 2024
450311d
Fix
iambriccardo Jul 18, 2024
707cb07
Fix
iambriccardo Jul 18, 2024
8670f0d
Fix
iambriccardo Jul 18, 2024
dfc5a9b
Fix
iambriccardo Jul 18, 2024
464ca43
Fix
iambriccardo Jul 18, 2024
e2ca3d4
Merge
iambriccardo Jul 18, 2024
05b1317
Fix
iambriccardo Jul 18, 2024
6823d7d
Update relay-server/src/endpoints/common.rs
iambriccardo Jul 19, 2024
78abada
Update relay-server/src/utils/memory.rs
iambriccardo Jul 19, 2024
8755d87
Update relay-server/src/utils/memory.rs
iambriccardo Jul 19, 2024
f0f2c9f
Update relay-server/src/utils/memory.rs
iambriccardo Jul 19, 2024
aafba46
Update relay-server/src/utils/memory.rs
iambriccardo Jul 19, 2024
998fdf2
Update relay-server/src/utils/memory.rs
iambriccardo Jul 19, 2024
aa1f39e
Update relay-server/src/utils/memory.rs
iambriccardo Jul 19, 2024
735d52c
Improve
iambriccardo Jul 19, 2024
f434135
Merge
iambriccardo Jul 19, 2024
d22dfa6
Merge
iambriccardo Jul 19, 2024
9204a7f
Merge
iambriccardo Jul 19, 2024
8a70bf2
Changelog
iambriccardo Jul 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix
  • Loading branch information
iambriccardo committed Jul 18, 2024
commit 08a6e85584f7d56a74e1c65735d7911c6905ea24
10 changes: 6 additions & 4 deletions relay-server/src/services/spooler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1302,10 +1302,6 @@ impl Drop for BufferService {

#[cfg(test)]
mod tests {
use super::*;
use crate::services::project_cache::SpoolHealth;
use crate::testutils::empty_envelope;
use crate::utils::MemoryStat;
use insta::assert_debug_snapshot;
use rand::Rng;
use relay_system::AsyncResponse;
Expand All @@ -1316,6 +1312,12 @@ mod tests {
use std::time::{Duration, Instant};
use uuid::Uuid;

use crate::services::project_cache::SpoolHealth;
use crate::testutils::empty_envelope;
use crate::utils::MemoryStat;

use super::*;

fn services() -> Services {
let (project_cache, _) = mock_service("project_cache", (), |&mut (), _| {});
let (outcome_aggregator, _) = mock_service("outcome_aggregator", (), |&mut (), _| {});
Expand Down
4 changes: 3 additions & 1 deletion relay-server/src/utils/managed_envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ pub struct ManagedEnvelope {
}

impl ManagedEnvelope {
/// Computes a managed envelope from the given envelope.
/// Computes a managed envelope from the given envelope and binds it to the processing queue.
///
/// To provide additional scoping, use [`ManagedEnvelope::scope`].
pub fn new(
envelope: Box<Envelope>,
outcome_aggregator: Addr<TrackOutcome>,
Expand Down
58 changes: 57 additions & 1 deletion relay-server/src/utils/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,62 @@ use sysinfo::{MemoryRefreshKind, System};
/// Count after which the [`MemoryStat`] data will be refreshed.
const UPDATE_TIME_THRESHOLD_MS: u64 = 100;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this configurable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do that, I could put it under the health key.


/// The representation of the current memory state.
#[derive(Clone, Copy, Debug)]
pub struct Memory {
/// Used memory.
///
/// This measure of used memory represents the Resident Set Size (RSS) which represents the
/// amount of physical memory that a process has in the main memory that does not correspond
/// to anything on disk.
pub used: u64,
/// Total memory.
pub total: u64,
}

impl Memory {
/// Returns the percentage amount of used memory in the interval [0.0, 1.0].
///
/// The percentage measurement will return 1.0 in the following edge cases:
/// - When total is 0
/// - When used / total produces a NaN
pub fn used_percent(&self) -> f32 {
(self.used as f32 / self.total as f32).clamp(0.0, 1.0)
let used_percent = self.used as f32 / self.total as f32;
if used_percent.is_nan() {
return 1.0;
};

used_percent.clamp(0.0, 1.0)
}
}

/// Inner struct that holds the latest [`Memory`] state which is polled at least every 100ms.
///
/// The goal of this implementation is to offer lock-free reading to any arbitrary number of threads
/// while at the same time, reducing to the minimum the need for locking when memory stats need to
/// be updated.
///
/// Because of how the implementation is designed, there is a very small chance that multiple
/// threads are waiting on the lock that guards [`System`]. The only case in which there might be
/// multiple threads waiting on the lock, is if a thread holds the lock for more than
/// [`UPDATE_TIME_THRESHOLD_MS`] and a new thread comes and updates the `last_update` and tries
/// to acquire the lock to perform another memory reading. Since the reading of [`System`] is much
/// faster than [`UPDATE_TIME_THRESHOLD_MS`] this should not happen in the real world.
struct Inner {
memory: ArcSwap<Memory>,
last_update: AtomicU64,
reference_time: Instant,
system: Mutex<System>,
}

/// Wrapper around [`Inner`] which hides the [`Arc`] and exposes utils method to make working with
/// [`MemoryStat`] as opaque as possible.
#[derive(Clone)]
pub struct MemoryStat(Arc<Inner>);

impl MemoryStat {
/// Creates an instance of [`MemoryStat`] and obtains the current memory readings from
/// [`System`].
pub fn new() -> Self {
// sysinfo docs suggest to use a single instance of `System` across the program.
let mut system = System::new();
Expand All @@ -46,18 +79,22 @@ impl MemoryStat {
}))
}

/// Returns a copy of the most up to date [`Memory`] data.
pub fn memory(&self) -> Memory {
self.try_update();
**self.0.memory.load()
}

/// Builds a [`MemoryStatConfig`] which holds a reference to the supplied [`Config`] and the
/// current [`MemoryStat`].
pub fn with_config(&self, config: Arc<Config>) -> MemoryStatConfig {
MemoryStatConfig {
memory_stat: self.clone(),
config,
}
}

/// Refreshes the memory readings.
fn refresh_memory(system: &mut System) -> Memory {
system.refresh_memory_specifics(MemoryRefreshKind::new().with_ram());
let memory = match system.cgroup_limits() {
Expand All @@ -77,6 +114,7 @@ impl MemoryStat {
memory
}

/// Updates the memory readings if at least [`UPDATE_TIME_THRESHOLD_MS`] has passed.
fn try_update(&self) {
let last_update = self.0.last_update.load(Ordering::Relaxed);
let elapsed_time = self.0.reference_time.elapsed().as_millis() as u64;
Expand Down Expand Up @@ -117,21 +155,33 @@ impl Default for MemoryStat {
}
}

/// Struct that composes a [`Config`] and [`MemoryStat`] and provides utility methods to validate
/// whether memory is within limits.
///
/// The rationale behind such struct, is to be able to share across Relay the same logic for dealing
/// with memory readings. It's decoupled from [`MemoryStat`] because it's just a layer on top that
/// decides how memory readings are interpreted.
#[derive(Clone, Debug)]
pub struct MemoryStatConfig {
iambriccardo marked this conversation as resolved.
Show resolved Hide resolved
pub memory_stat: MemoryStat,
config: Arc<Config>,
}

impl MemoryStatConfig {
/// Returns `true` if the used percentage of memory is below the specified threshold.
pub fn has_enough_memory_percent(&self) -> bool {
self.memory_stat.memory().used_percent() < self.config.health_max_memory_watermark_percent()
}

/// Returns `true` if the used memory is below the specified threshold.
iambriccardo marked this conversation as resolved.
Show resolved Hide resolved
pub fn has_enough_memory_bytes(&self) -> bool {
self.memory_stat.memory().used < self.config.health_max_memory_watermark_bytes()
}

/// Returns `true` if the used memory is below both percentage and bytes thresholds.
iambriccardo marked this conversation as resolved.
Show resolved Hide resolved
///
/// This is the function that should be mainly used for checking whether of not Relay has
/// enough memory.
pub fn has_enough_memory(&self) -> bool {
let memory = self.memory_stat.memory();
memory.used_percent() < self.config.health_max_memory_watermark_percent()
Expand All @@ -145,6 +195,12 @@ mod tests {
use relay_config::Config;
use std::sync::Arc;

#[test]
fn test_memory_used_percent_both_0() {
let memory = Memory { used: 0, total: 0 };
assert_eq!(memory.used_percent(), 1.0);
}

#[test]
fn test_memory_used_percent_total_0() {
let memory = Memory {
Expand Down
Loading