Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(memory): Implement shared memory state across Relay #3821

Merged
merged 40 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
90fb599
feat(memory): Implement shared memory state across Relay
iambriccardo Jul 15, 2024
b0ca49d
feat(memory): Add a way to track memory usage
iambriccardo Jul 16, 2024
7c8d357
Fix
iambriccardo Jul 16, 2024
cc4b577
Fix
iambriccardo Jul 16, 2024
c449dfd
Fix
iambriccardo Jul 16, 2024
2dc4e13
Fix
iambriccardo Jul 16, 2024
6bfd213
Fix
iambriccardo Jul 16, 2024
584a868
Fix
iambriccardo Jul 16, 2024
e7608a8
Remove usage of floats
iambriccardo Jul 16, 2024
75a3ffc
Improve
iambriccardo Jul 16, 2024
1069847
Improve
iambriccardo Jul 16, 2024
719bea7
Improve
iambriccardo Jul 16, 2024
45b35ee
Improve
iambriccardo Jul 16, 2024
6a8be96
Improve
iambriccardo Jul 17, 2024
f4a7d39
fi
iambriccardo Jul 17, 2024
8ceac2e
Fix
iambriccardo Jul 17, 2024
cc0596d
Fix
iambriccardo Jul 18, 2024
1d3b3e8
Fix
iambriccardo Jul 18, 2024
fa67244
Fix
iambriccardo Jul 18, 2024
08a6e85
Fix
iambriccardo Jul 18, 2024
bd9e159
Fix
iambriccardo Jul 18, 2024
450311d
Fix
iambriccardo Jul 18, 2024
707cb07
Fix
iambriccardo Jul 18, 2024
8670f0d
Fix
iambriccardo Jul 18, 2024
dfc5a9b
Fix
iambriccardo Jul 18, 2024
464ca43
Fix
iambriccardo Jul 18, 2024
e2ca3d4
Merge
iambriccardo Jul 18, 2024
05b1317
Fix
iambriccardo Jul 18, 2024
6823d7d
Update relay-server/src/endpoints/common.rs
iambriccardo Jul 19, 2024
78abada
Update relay-server/src/utils/memory.rs
iambriccardo Jul 19, 2024
8755d87
Update relay-server/src/utils/memory.rs
iambriccardo Jul 19, 2024
f0f2c9f
Update relay-server/src/utils/memory.rs
iambriccardo Jul 19, 2024
aafba46
Update relay-server/src/utils/memory.rs
iambriccardo Jul 19, 2024
998fdf2
Update relay-server/src/utils/memory.rs
iambriccardo Jul 19, 2024
aa1f39e
Update relay-server/src/utils/memory.rs
iambriccardo Jul 19, 2024
735d52c
Improve
iambriccardo Jul 19, 2024
f434135
Merge
iambriccardo Jul 19, 2024
d22dfa6
Merge
iambriccardo Jul 19, 2024
9204a7f
Merge
iambriccardo Jul 19, 2024
8a70bf2
Changelog
iambriccardo Jul 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix
  • Loading branch information
iambriccardo committed Jul 18, 2024
commit bd9e1596814243253c5d52d247d952aa89d714f5
81 changes: 3 additions & 78 deletions relay-server/src/services/project_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1326,6 +1326,9 @@ mod tests {
"envelopes": {
"path": std::env::temp_dir().join(Uuid::new_v4().to_string()),
"max_memory_size": 0, // 0 bytes, to force to spool to disk all the envelopes.
},
"health": {
"max_memory_percent": 0.0
}
}
}))
Expand Down Expand Up @@ -1382,84 +1385,6 @@ mod tests {
)
}

#[tokio::test]
async fn always_spools() {
relay_log::init_test!();

let services = mocked_services();
let (state_tx, _) = mpsc::unbounded_channel();
let (buffer_tx, mut buffer_rx) = mpsc::unbounded_channel();
let (mut broker, buffer_svc) =
project_cache_broker_setup(services.clone(), state_tx, buffer_tx).await;

for _ in 0..8 {
let envelope = ManagedEnvelope::new(
empty_envelope(),
services.outcome_aggregator.clone(),
services.test_store.clone(),
ProcessingGroup::Ungrouped,
);

let message = ValidateEnvelope { envelope };

broker.handle_validate_envelope(message);
tokio::time::sleep(Duration::from_millis(200)).await;
// Nothing will be dequeued.
assert!(buffer_rx.try_recv().is_err())
}

// All the messages should have been spooled to disk.
assert_eq!(broker.index.len(), 1);

let project_key = ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap();
let key = QueueKey {
own_key: project_key,
sampling_key: project_key,
};
let (tx, mut rx) = mpsc::unbounded_channel();

// Check if we can also dequeue from the buffer directly.
buffer_svc.send(spooler::DequeueMany::new([key].into(), tx.clone()));
tokio::time::sleep(Duration::from_millis(100)).await;

// We should be able to unspool 5 envelopes since we have 5 permits.
let mut envelopes = vec![];
while let Ok(envelope) = rx.try_recv() {
envelopes.push(envelope)
}

// We can unspool only 5 envelopes.
assert_eq!(envelopes.len(), 5);

// Drop one.
envelopes.pop().unwrap();

// Till now we should have enqueued 5 envelopes and dequeued only 1, it means the index is
// still populated with same keys and values.
assert_eq!(broker.index.len(), 1);

// Check if we can also dequeue from the buffer directly.
buffer_svc.send(spooler::DequeueMany::new([key].into(), tx));
tokio::time::sleep(Duration::from_millis(100)).await;
// Cannot dequeue anymore, no more available permits.
assert!(rx.try_recv().is_err());

// The rest envelopes will be immediately spooled, since we at 80% buffer gueard usage.
for _ in 0..10 {
let envelope = ManagedEnvelope::untracked(
empty_envelope(),
services.outcome_aggregator.clone(),
services.test_store.clone(),
);
let message = ValidateEnvelope { envelope };

broker.handle_validate_envelope(message);
tokio::time::sleep(Duration::from_millis(100)).await;
// Nothing will be dequeued.
assert!(buffer_rx.try_recv().is_err())
}
}

#[tokio::test]
async fn periodic_unspool() {
relay_log::init_test!();
Expand Down
166 changes: 8 additions & 158 deletions relay-server/src/services/spooler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1437,89 +1437,6 @@ mod tests {
}
}

#[tokio::test]
async fn dequeue_waits_for_permits() {
relay_test::setup();
let config: Arc<_> = Config::from_json_value(serde_json::json!({
"spool": {
"envelopes": {
"path": std::env::temp_dir().join(Uuid::new_v4().to_string()),
"max_memory_size": 0, // 0 bytes, to force to spool to disk all the envelopes.
}
}
}))
.unwrap()
.into();
let memory_stat_config = MemoryStat::new().with_config(config.clone());

let services = services();

let service = BufferService::create(memory_stat_config.clone(), services.clone(), config)
.await
.unwrap();
let addr = service.start();
let (tx, mut rx) = mpsc::unbounded_channel();

let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let key = QueueKey {
own_key: project_key,
sampling_key: project_key,
};

// Enqueue an envelope:
addr.send(Enqueue {
key,
value: empty_managed_envelope(),
});

// Nothing dequeued yet:
assert!(rx.try_recv().is_err());

// Dequeue an envelope:
addr.send(DequeueMany {
keys: [key].into(),
sender: tx.clone(),
});

// There are enough permits, so get an envelope:
let res = rx.recv().await;
assert!(res.is_some(), "{res:?}");

// Simulate a new envelope coming in via a web request:
let new_envelope = ManagedEnvelope::new(
empty_envelope(),
services.outcome_aggregator,
services.test_store,
ProcessingGroup::Ungrouped,
);

// Enqueue & dequeue another envelope:
addr.send(Enqueue {
key,
value: empty_managed_envelope(),
});
// Request to dequeue:
addr.send(DequeueMany {
keys: [key].into(),
sender: tx.clone(),
});
tokio::time::sleep(Duration::from_millis(50)).await;

// There is one permit left, but we only dequeue if we gave >= 50% capacity:
assert!(rx.try_recv().is_err());

// Freeing one permit gives us enough capacity:
drop(new_envelope);

// Dequeue an envelope:
addr.send(DequeueMany {
keys: [key].into(),
sender: tx.clone(),
});
tokio::time::sleep(Duration::from_millis(100)).await; // give time to flush
assert!(rx.try_recv().is_ok());
}

#[test]
fn metrics_work() {
relay_log::init_test!();
Expand All @@ -1533,7 +1450,7 @@ mod tests {
},
"health": {
"max_memory_percent": 1.0
}
}
}
}))
.unwrap()
Expand Down Expand Up @@ -1711,7 +1628,7 @@ mod tests {
"max_disk_size": "100KB",
},
"health": {
"max_memory_percent": 1.0
"max_memory_percent": 1.0
}
}
}))
Expand Down Expand Up @@ -1743,9 +1660,9 @@ mod tests {
"max_memory_size": 0, // 0 bytes, to force to spool to disk all the envelopes.
"max_disk_size": "100KB",
},
"health": {
"max_memory_percent": 1.0
}
"health": {
"max_memory_percent": 1.0
}
}
}))
.unwrap()
Expand Down Expand Up @@ -1822,9 +1739,9 @@ mod tests {
"max_memory_size": "10KB",
"max_disk_size": "20MB",
},
"health": {
"max_memory_percent": 1.0
}
"health": {
"max_memory_percent": 1.0
}
}
}))
.unwrap()
Expand Down Expand Up @@ -1864,73 +1781,6 @@ mod tests {
assert_eq!(count, 300);
}

#[tokio::test]
async fn over_the_low_watermark() {
let db_path = std::env::temp_dir().join(Uuid::new_v4().to_string());
let config: Arc<_> = Config::from_json_value(serde_json::json!({
"spool": {
"envelopes": {
"path": db_path,
"max_memory_size": "10KB",
"max_disk_size": "20MB",
},
"health": {
"max_memory_percent": 1.0
}
}
}))
.unwrap()
.into();
let memory_stat_config = MemoryStat::new().with_config(config.clone());

let index = Arc::new(Mutex::new(HashSet::new()));
let mut services = services();
let index_in = index.clone();
let (project_cache, _) = mock_service("project_cache", (), move |(), msg: ProjectCache| {
// First chunk in the unspool will take us over the low watermark, that means we will get
// small portion of the keys back.
let ProjectCache::UpdateSpoolIndex(new_index) = msg else {
return;
};
index_in.lock().unwrap().extend(new_index.0);
});

services.project_cache = project_cache;
let buffer = BufferService::create(memory_stat_config, services, config)
.await
.unwrap();
let addr = buffer.start();

let mut keys = HashSet::new();
for _ in 1..=300 {
let project_key = uuid::Uuid::new_v4().as_simple().to_string();
let key = ProjectKey::parse(&project_key).unwrap();
let index_key = QueueKey {
own_key: key,
sampling_key: key,
};
keys.insert(index_key);
addr.send(Enqueue::new(index_key, empty_managed_envelope()))
}

let (tx, mut rx) = mpsc::unbounded_channel();
// Dequeue all the keys at once.
addr.send(DequeueMany {
keys,
sender: tx.clone(),
});
drop(tx);

let mut envelopes = Vec::new();
while let Some(envelope) = rx.recv().await {
envelopes.push(envelope);
}

assert_eq!(envelopes.len(), 200);
let index = index.lock().unwrap().clone();
assert_eq!(index.len(), 100);
}

#[ignore] // Slow. Should probably be a criterion benchmark.
#[tokio::test]
async fn compare_counts() {
Expand Down
Loading