Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(memory): Implement shared memory state across Relay #3821

Merged
merged 40 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
90fb599
feat(memory): Implement shared memory state across Relay
iambriccardo Jul 15, 2024
b0ca49d
feat(memory): Add a way to track memory usage
iambriccardo Jul 16, 2024
7c8d357
Fix
iambriccardo Jul 16, 2024
cc4b577
Fix
iambriccardo Jul 16, 2024
c449dfd
Fix
iambriccardo Jul 16, 2024
2dc4e13
Fix
iambriccardo Jul 16, 2024
6bfd213
Fix
iambriccardo Jul 16, 2024
584a868
Fix
iambriccardo Jul 16, 2024
e7608a8
Remove usage of floats
iambriccardo Jul 16, 2024
75a3ffc
Improve
iambriccardo Jul 16, 2024
1069847
Improve
iambriccardo Jul 16, 2024
719bea7
Improve
iambriccardo Jul 16, 2024
45b35ee
Improve
iambriccardo Jul 16, 2024
6a8be96
Improve
iambriccardo Jul 17, 2024
f4a7d39
fi
iambriccardo Jul 17, 2024
8ceac2e
Fix
iambriccardo Jul 17, 2024
cc0596d
Fix
iambriccardo Jul 18, 2024
1d3b3e8
Fix
iambriccardo Jul 18, 2024
fa67244
Fix
iambriccardo Jul 18, 2024
08a6e85
Fix
iambriccardo Jul 18, 2024
bd9e159
Fix
iambriccardo Jul 18, 2024
450311d
Fix
iambriccardo Jul 18, 2024
707cb07
Fix
iambriccardo Jul 18, 2024
8670f0d
Fix
iambriccardo Jul 18, 2024
dfc5a9b
Fix
iambriccardo Jul 18, 2024
464ca43
Fix
iambriccardo Jul 18, 2024
e2ca3d4
Merge
iambriccardo Jul 18, 2024
05b1317
Fix
iambriccardo Jul 18, 2024
6823d7d
Update relay-server/src/endpoints/common.rs
iambriccardo Jul 19, 2024
78abada
Update relay-server/src/utils/memory.rs
iambriccardo Jul 19, 2024
8755d87
Update relay-server/src/utils/memory.rs
iambriccardo Jul 19, 2024
f0f2c9f
Update relay-server/src/utils/memory.rs
iambriccardo Jul 19, 2024
aafba46
Update relay-server/src/utils/memory.rs
iambriccardo Jul 19, 2024
998fdf2
Update relay-server/src/utils/memory.rs
iambriccardo Jul 19, 2024
aa1f39e
Update relay-server/src/utils/memory.rs
iambriccardo Jul 19, 2024
735d52c
Improve
iambriccardo Jul 19, 2024
f434135
Merge
iambriccardo Jul 19, 2024
d22dfa6
Merge
iambriccardo Jul 19, 2024
9204a7f
Merge
iambriccardo Jul 19, 2024
8a70bf2
Changelog
iambriccardo Jul 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
**Internal**:

- Use a dedicated thread pool for CPU intensive workloads. ([#3833](https://github.com/getsentry/relay/pull/3833))
- Remove `BufferGuard` in favor of memory checks via `MemoryStat`. ([#3821](https://github.com/getsentry/relay/pull/3821))

## 24.7.0

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ anyhow = "1.0.66"
axum = "0.6.20"
axum-extra = "0.7.7"
axum-server = "0.4.7"
arc-swap = "1.7.1"
backoff = "0.4.0"
bindgen = "0.64.0"
brotli = "3.3.4"
Expand Down
14 changes: 13 additions & 1 deletion relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1350,7 +1350,7 @@ pub struct Health {
pub refresh_interval_ms: u64,
/// Maximum memory watermark in bytes.
///
/// By default there is no absolute limit set and the watermark
/// By default, there is no absolute limit set and the watermark
/// is only controlled by setting [`Self::max_memory_percent`].
pub max_memory_bytes: Option<ByteSize>,
/// Maximum memory watermark as a percentage of maximum system memory.
Expand All @@ -1364,6 +1364,12 @@ pub struct Health {
///
/// Defaults to 900 milliseconds.
pub probe_timeout_ms: u64,
/// The refresh frequency of memory stats which are used to poll memory
/// usage of Relay.
///
/// The implementation of memory stats guarantees that the refresh will happen at
/// least every `x` ms since memory readings are lazy and are updated only if needed.
pub memory_stat_refresh_frequency_ms: u64,
}

impl Default for Health {
Expand All @@ -1373,6 +1379,7 @@ impl Default for Health {
max_memory_bytes: None,
max_memory_percent: 0.95,
probe_timeout_ms: 900,
memory_stat_refresh_frequency_ms: 100,
}
}
}
Expand Down Expand Up @@ -2346,6 +2353,11 @@ impl Config {
Duration::from_millis(self.values.health.probe_timeout_ms)
}

/// Refresh frequency for polling new memory stats.
pub fn memory_stat_refresh_frequency_ms(&self) -> u64 {
self.values.health.memory_stat_refresh_frequency_ms
}

/// Whether COGS measurements are enabled.
pub fn cogs_enabled(&self) -> bool {
self.values.cogs.enabled
Expand Down
1 change: 1 addition & 0 deletions relay-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ axum = { workspace = true, features = [
"tracing",
] }
axum-server = { workspace = true }
arc-swap = { workspace = true }
backoff = { workspace = true }
brotli = { workspace = true }
bytes = { workspace = true, features = ["serde"] }
Expand Down
51 changes: 25 additions & 26 deletions relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ use crate::services::outcome::{DiscardReason, Outcome};
use crate::services::processor::{ProcessMetricMeta, ProcessMetrics, ProcessingGroup};
use crate::services::project_cache::{CheckEnvelope, ValidateEnvelope};
use crate::statsd::{RelayCounters, RelayHistograms};
use crate::utils::{
self, ApiErrorResponse, BufferError, BufferGuard, FormDataIter, ManagedEnvelope, MultipartError,
};
use crate::utils::{self, ApiErrorResponse, FormDataIter, ManagedEnvelope, MultipartError};

#[derive(Clone, Copy, Debug, thiserror::Error)]
#[error("the service is overloaded")]
Expand Down Expand Up @@ -75,7 +73,7 @@ pub enum BadStoreRequest {
InvalidEventId,

#[error("failed to queue envelope")]
QueueFailed(#[from] BufferError),
QueueFailed,

#[error(
"envelope exceeded size limits for type '{0}' (https://develop.sentry.dev/sdk/envelopes/#size-limits)"
Expand Down Expand Up @@ -114,7 +112,7 @@ impl IntoResponse for BadStoreRequest {

(StatusCode::TOO_MANY_REQUESTS, headers, body).into_response()
}
BadStoreRequest::ScheduleFailed | BadStoreRequest::QueueFailed(_) => {
BadStoreRequest::ScheduleFailed | BadStoreRequest::QueueFailed => {
// These errors indicate that something's wrong with our service system, most likely
// mailbox congestion or a faulty shutdown. Indicate an unavailable service to the
// client. It might retry event submission at a later time.
Expand Down Expand Up @@ -264,7 +262,6 @@ pub fn event_id_from_items(items: &Items) -> Result<Option<EventId>, BadStoreReq
fn queue_envelope(
state: &ServiceState,
mut managed_envelope: ManagedEnvelope,
buffer_guard: &BufferGuard,
) -> Result<(), BadStoreRequest> {
let envelope = managed_envelope.envelope_mut();

Expand Down Expand Up @@ -299,14 +296,13 @@ fn queue_envelope(
// Split off the envelopes by item type.
let envelopes = ProcessingGroup::split_envelope(*managed_envelope.take_envelope());
for (group, envelope) in envelopes {
let envelope = buffer_guard
.enter(
envelope,
state.outcome_aggregator().clone(),
state.test_store().clone(),
group,
)
.map_err(BadStoreRequest::QueueFailed)?;
let envelope = ManagedEnvelope::new(
envelope,
state.outcome_aggregator().clone(),
state.test_store().clone(),
group,
);

state.project_cache().send(ValidateEnvelope::new(envelope));
}
// The entire envelope is taken for a split above, and it's empty at this point, we can just
Expand Down Expand Up @@ -335,17 +331,20 @@ pub async fn handle_envelope(
)
}

let buffer_guard = state.buffer_guard();
let mut managed_envelope = buffer_guard
.enter(
envelope,
state.outcome_aggregator().clone(),
state.test_store().clone(),
// It's not clear at this point which group this envelope belongs to.
// The decission will be made while queueing in `queue_envelope` function.
ProcessingGroup::Ungrouped,
)
.map_err(BadStoreRequest::QueueFailed)?;
if state.memory_checker().check_memory().is_exceeded() {
// NOTE: Long-term, we should not reject the envelope here, but spool it to disk instead.
// This will be fixed with the new spool implementation.
return Err(BadStoreRequest::QueueFailed);
iambriccardo marked this conversation as resolved.
Show resolved Hide resolved
};

let mut managed_envelope = ManagedEnvelope::new(
envelope,
state.outcome_aggregator().clone(),
state.test_store().clone(),
// It's not clear at this point which group this envelope belongs to.
// The decision will be made while queueing in `queue_envelope` function.
ProcessingGroup::Ungrouped,
);

// If configured, remove unknown items at the very beginning. If the envelope is
// empty, we fail the request with a special control flow error to skip checks and
Expand Down Expand Up @@ -377,7 +376,7 @@ pub async fn handle_envelope(
return Err(BadStoreRequest::Overflow(offender));
}

queue_envelope(state, managed_envelope, buffer_guard)?;
queue_envelope(state, managed_envelope)?;

if checked.rate_limits.is_limited() {
// Even if some envelope items have been queued, there might be active rate limits on
Expand Down
26 changes: 14 additions & 12 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::services::relays::{RelayCache, RelayCacheService};
use crate::services::store::StoreService;
use crate::services::test_store::{TestStore, TestStoreService};
use crate::services::upstream::{UpstreamRelay, UpstreamRelayService};
use crate::utils::BufferGuard;
use crate::utils::{MemoryChecker, MemoryStat};

/// Indicates the type of failure of the server.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, thiserror::Error)]
Expand Down Expand Up @@ -103,7 +103,7 @@ fn create_processor_pool(config: &Config) -> Result<ThreadPool> {
#[derive(Debug)]
struct StateInner {
config: Arc<Config>,
buffer_guard: Arc<BufferGuard>,
memory_checker: MemoryChecker,
registry: Registry,
}

Expand Down Expand Up @@ -131,7 +131,9 @@ impl ServiceState {
.transpose()
.context(ServiceError::Redis)?;

let buffer_guard = Arc::new(BufferGuard::new(config.envelope_buffer_size()));
// We create an instance of `MemoryStat` which can be supplied composed with any arbitrary
// configuration object down the line.
let memory_stat = MemoryStat::new(config.memory_stat_refresh_frequency_ms());

// Create an address for the `EnvelopeProcessor`, which can be injected into the
// other services.
Expand Down Expand Up @@ -220,7 +222,7 @@ impl ServiceState {
);
ProjectCacheService::new(
config.clone(),
buffer_guard.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating new memory checkers, we could just clone a single one and pass that around and internally create the MemoryStat in the constructor.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also do that yes, we will likely not use MemoryStat in isolation, so it's fair to implement it this way.

project_cache_services,
metric_outcomes,
redis_pool.clone(),
Expand All @@ -229,6 +231,7 @@ impl ServiceState {

let health_check = HealthCheckService::new(
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
aggregator.clone(),
upstream_relay.clone(),
project_cache.clone(),
Expand Down Expand Up @@ -259,8 +262,8 @@ impl ServiceState {
};

let state = StateInner {
buffer_guard,
config,
config: config.clone(),
memory_checker: MemoryChecker::new(memory_stat, config),
registry,
};

Expand All @@ -274,12 +277,11 @@ impl ServiceState {
&self.inner.config
}

/// Returns a reference to the guard of the envelope buffer.
///
/// This can be used to enter new envelopes into the processing queue and reserve a slot in the
/// buffer. See [`BufferGuard`] for more information.
pub fn buffer_guard(&self) -> &BufferGuard {
&self.inner.buffer_guard
/// Returns a reference to the [`MemoryChecker`] which is a [`Config`] aware wrapper on the
/// [`MemoryStat`] which gives utility methods to determine whether memory usage is above
/// thresholds set in the [`Config`].
pub fn memory_checker(&self) -> &MemoryChecker {
&self.inner.memory_checker
}

/// Returns the address of the [`ProjectCache`] service.
Expand Down
84 changes: 8 additions & 76 deletions relay-server/src/services/health_check.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
use std::sync::Arc;

use relay_config::Config;
use relay_statsd::metric;
use relay_system::{Addr, AsyncResponse, Controller, FromMessage, Interface, Sender, Service};
use std::future::Future;
use sysinfo::{MemoryRefreshKind, System};
use tokio::sync::watch;
use tokio::time::{timeout, Instant};

use crate::services::metrics::{AcceptsMetrics, Aggregator};
use crate::services::project_cache::{ProjectCache, SpoolHealth};
use crate::services::upstream::{IsAuthenticated, UpstreamRelay};
use crate::statsd::{RelayGauges, RelayTimers};
use crate::statsd::RelayTimers;
use crate::utils::{MemoryCheck, MemoryChecker};

/// Checks whether Relay is alive and healthy based on its variant.
#[derive(Clone, Copy, Debug, serde::Deserialize)]
Expand Down Expand Up @@ -84,10 +83,10 @@ impl StatusUpdate {
#[derive(Debug)]
pub struct HealthCheckService {
config: Arc<Config>,
memory_checker: MemoryChecker,
aggregator: Addr<Aggregator>,
upstream_relay: Addr<UpstreamRelay>,
project_cache: Addr<ProjectCache>,
system: System,
}

impl HealthCheckService {
Expand All @@ -96,44 +95,22 @@ impl HealthCheckService {
/// The service does not run. To run the service, use [`start`](Self::start).
pub fn new(
config: Arc<Config>,
memory_checker: MemoryChecker,
aggregator: Addr<Aggregator>,
upstream_relay: Addr<UpstreamRelay>,
project_cache: Addr<ProjectCache>,
) -> Self {
Self {
system: System::new(),
config,
memory_checker,
aggregator,
upstream_relay,
project_cache,
config,
}
}

fn system_memory_probe(&mut self) -> Status {
self.system
.refresh_memory_specifics(MemoryRefreshKind::new().with_ram());

// Use the cgroup if available in case Relay is running in a container.
// TODO: once we measured the new rss metric, we will remove `rss` and just used cgroup.rss
// `used`.
let memory = match self.system.cgroup_limits() {
Some(cgroup) => Memory {
used: cgroup.total_memory.saturating_sub(cgroup.free_memory),
total: cgroup.total_memory,
rss: cgroup.rss,
},
None => Memory {
used: self.system.used_memory(),
total: self.system.total_memory(),
rss: self.system.used_memory(),
},
};

metric!(gauge(RelayGauges::SystemMemoryUsed) = memory.used);
metric!(gauge(RelayGauges::SystemMemoryTotal) = memory.total);
metric!(gauge(RelayGauges::SystemMemoryRss) = memory.rss);

if memory.used_percent() >= self.config.health_max_memory_watermark_percent() {
if let MemoryCheck::Exceeded(memory) = self.memory_checker.check_memory_percent() {
relay_log::error!(
"Not enough memory, {} / {} ({:.2}% >= {:.2}%)",
memory.used,
Expand All @@ -144,7 +121,7 @@ impl HealthCheckService {
return Status::Unhealthy;
}

if memory.used > self.config.health_max_memory_watermark_bytes() {
if let MemoryCheck::Exceeded(memory) = self.memory_checker.check_memory_bytes() {
relay_log::error!(
"Not enough memory, {} / {} ({} >= {})",
memory.used,
Expand Down Expand Up @@ -254,21 +231,6 @@ impl Service for HealthCheckService {
}
}

/// A memory measurement.
#[derive(Debug)]
struct Memory {
pub used: u64,
pub total: u64,
pub rss: u64,
}

impl Memory {
/// Amount of used RAM in percent `0.0` to `1.0`.
pub fn used_percent(&self) -> f32 {
(self.used as f32 / self.total as f32).clamp(0.0, 1.0)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -303,34 +265,4 @@ mod tests {
let s = [].into_iter().collect();
assert!(matches!(s, Status::Healthy));
}

#[test]
fn test_memory_used_percent_total_0() {
let memory = Memory {
used: 100,
total: 0,
rss: 0,
};
assert_eq!(memory.used_percent(), 1.0);
}

#[test]
fn test_memory_used_percent_zero() {
let memory = Memory {
used: 0,
total: 100,
rss: 0,
};
assert_eq!(memory.used_percent(), 0.0);
}

#[test]
fn test_memory_used_percent_half() {
let memory = Memory {
used: 50,
total: 100,
rss: 0,
};
assert_eq!(memory.used_percent(), 0.5);
}
}
Loading
Loading