Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - feat: add record counters to spu #2731

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 18 additions & 7 deletions crates/fluvio-socket/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ mod file {
&mut self,
msg: &T,
version: Version,
) -> Result<(), SocketError>
) -> Result<usize, SocketError>
where
T: FileWrite,
{
Expand All @@ -129,9 +129,14 @@ mod file {
}

/// write store values to socket
async fn write_store_values(&mut self, values: Vec<StoreValue>) -> Result<(), SocketError> {
async fn write_store_values(
&mut self,
values: Vec<StoreValue>,
) -> Result<usize, SocketError> {
trace!("writing store values to socket values: {}", values.len());

let mut total_bytes_written = 0usize;

for value in values {
match value {
StoreValue::Bytes(bytes) => {
Expand All @@ -143,6 +148,7 @@ mod file {
.get_mut()
.write_all(&bytes)
.await?;
total_bytes_written += bytes.len();
}
StoreValue::FileSlice(f_slice) => {
if f_slice.is_empty() {
Expand All @@ -154,16 +160,21 @@ mod file {
f_slice.len()
);
let writer = ZeroCopy::raw(self.fd);
writer.copy_slice(&f_slice).await.map_err(|err| {
IoError::new(ErrorKind::Other, format!("zero copy failed: {}", err))
})?;
trace!("finish writing file slice");
let bytes_written =
writer.copy_slice(&f_slice).await.map_err(|err| {
IoError::new(
ErrorKind::Other,
format!("zero copy failed: {}", err),
)
})?;
trace!("finish writing file slice with {bytes_written} bytes");
total_bytes_written += bytes_written;
}
}
}
}

Ok(())
Ok(total_bytes_written)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-spu/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ thiserror = "1"
once_cell = "1.5"
sysinfo = "0.26.0"
nix = { version = "0.25"}

opentelemetry = { version = "0.18.0", features = ["metrics", "rt-async-std"] }

# Fluvio dependencies
fluvio = { path = "../fluvio" }
Expand Down
8 changes: 8 additions & 0 deletions crates/fluvio-spu/src/core/global_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::replication::leader::{
SharedReplicaLeadersState, ReplicaLeadersState, FollowerNotifier, SharedSpuUpdates,
};
use crate::control_plane::{StatusMessageSink, SharedStatusUpdate};
use crate::core::metrics::SpuMetrics;

use super::leader_client::LeaderConnections;
use super::smartmodule::SmartModuleLocalStore;
Expand Down Expand Up @@ -47,6 +48,7 @@ pub struct GlobalContext<S> {
status_update: SharedStatusUpdate,
sm_engine: SmartEngine,
leaders: Arc<LeaderConnections>,
metrics: SpuMetrics,
}

// -----------------------------------
Expand All @@ -64,6 +66,7 @@ where
pub fn new(spu_config: SpuConfig) -> Self {
let spus = SpuLocalStore::new_shared();
let replicas = ReplicaStore::new_shared();
let metrics = SpuMetrics::new();

GlobalContext {
spu_localstore: spus.clone(),
Expand All @@ -77,6 +80,7 @@ where
status_update: StatusMessageSink::shared(),
sm_engine: SmartEngine::new(),
leaders: LeaderConnections::shared(spus, replicas),
metrics,
}
}

Expand Down Expand Up @@ -154,6 +158,10 @@ where
pub fn leaders(&self) -> Arc<LeaderConnections> {
self.leaders.clone()
}

pub(crate) fn metrics(&self) -> &SpuMetrics {
&self.metrics
}
}

mod file_replica {
Expand Down
94 changes: 94 additions & 0 deletions crates/fluvio-spu/src/core/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
pub(crate) struct SpuMetrics {
context: opentelemetry::Context,
records: opentelemetry::metrics::Counter<u64>,
bytes: opentelemetry::metrics::Counter<u64>,
}

impl SpuMetrics {
pub(crate) fn new() -> Self {
let context = opentelemetry::Context::new();
let meter = opentelemetry::global::meter("storage");
let records = meter.u64_counter("fluvio.storage.records").init();
let bytes = meter.u64_counter("fluvio.storage.io").init();

Self {
context,
records,
bytes,
}
}

pub(crate) fn with_topic_partition<'a>(
&'a self,
topic: &'a str,
partition: i32,
) -> SpuMetricsTopicPartition {
SpuMetricsTopicPartition {
metrics: self,
topic,
partition,
}
}
}

impl std::fmt::Debug for SpuMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SpuMetrics").finish()
}
}

pub(crate) struct SpuMetricsTopicPartition<'a> {
metrics: &'a SpuMetrics,
topic: &'a str,
partition: i32,
}

impl<'a> SpuMetricsTopicPartition<'a> {
pub(crate) fn add_records_read(&self, value: u64) {
self.metrics.records.add(
&self.metrics.context,
value,
&[
opentelemetry::KeyValue::new("direction", "read"),
opentelemetry::KeyValue::new("topic", self.topic.to_owned()),
opentelemetry::KeyValue::new("partition", self.partition as i64),
],
);
}

pub(crate) fn add_bytes_read(&self, value: u64) {
self.metrics.bytes.add(
&self.metrics.context,
value,
&[
opentelemetry::KeyValue::new("direction", "read"),
opentelemetry::KeyValue::new("topic", self.topic.to_owned()),
opentelemetry::KeyValue::new("partition", self.partition as i64),
],
);
}

pub(crate) fn add_records_written(&self, value: u64) {
self.metrics.records.add(
&self.metrics.context,
value,
&[
opentelemetry::KeyValue::new("direction", "write"),
opentelemetry::KeyValue::new("topic", self.topic.to_owned()),
opentelemetry::KeyValue::new("partition", self.partition as i64),
],
);
}

pub(crate) fn add_bytes_written(&self, value: u64) {
self.metrics.bytes.add(
&self.metrics.context,
value,
&[
opentelemetry::KeyValue::new("direction", "write"),
opentelemetry::KeyValue::new("topic", self.topic.to_owned()),
opentelemetry::KeyValue::new("partition", self.partition as i64),
],
);
}
}
1 change: 1 addition & 0 deletions crates/fluvio-spu/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod spus;
pub mod replica;
pub mod smartmodule;
pub mod derivedstream;
pub mod metrics;

pub use self::global_context::{GlobalContext, ReplicaChange};
pub use self::store::Spec;
Expand Down
7 changes: 4 additions & 3 deletions crates/fluvio-spu/src/replication/leader/replica_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ where
&self,
records: &mut RecordSet<R>,
notifiers: &FollowerNotifier,
) -> Result<(Offset, Offset), StorageError> {
) -> Result<(Offset, Offset, usize), StorageError> {
let offsets = self
.storage
.write_record_set(records, self.in_sync_replica == 1)
Expand Down Expand Up @@ -753,12 +753,13 @@ mod test_leader {
&mut self,
records: &mut fluvio_protocol::record::RecordSet<R>,
update_highwatermark: bool,
) -> Result<(), fluvio_storage::StorageError> {
) -> Result<usize, fluvio_storage::StorageError> {
self.pos.leo = records.last_offset().unwrap();
if update_highwatermark {
self.pos.hw = self.pos.leo;
}
Ok(())
// assume 1 byte records
Ok((self.pos.hw - self.pos.leo) as usize)
}

// just return hw multiplied by 100.
Expand Down
8 changes: 8 additions & 0 deletions crates/fluvio-spu/src/services/public/fetch_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub async fn handle_fetch_request(
inner
.encode_file_slices(&response, header.api_version())
.await?;

drop(inner);

trace!("Finished sending FileFetchResponse");
Expand Down Expand Up @@ -100,6 +101,10 @@ async fn handle_fetch_partition(
}
};

let partition_metrics = ctx
.metrics()
.with_topic_partition(&replica_id.topic, partition_request.partition_index);

match leader_state
.read_records(
fetch_offset,
Expand All @@ -112,7 +117,10 @@ async fn handle_fetch_partition(
partition_response.high_watermark = slice.end.hw;
partition_response.log_start_offset = slice.start;

partition_metrics.add_records_read((slice.end.hw - slice.start) as u64);

if let Some(file_slice) = slice.file_slice {
partition_metrics.add_bytes_read(file_slice.len());
partition_response.records = file_slice.into();
}
}
Expand Down
23 changes: 16 additions & 7 deletions crates/fluvio-spu/src/services/public/produce_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,17 @@ async fn handle_produce_topic(
ctx: &DefaultSharedGlobalContext,
topic_request: DefaultTopicRequest,
) -> TopicWriteResult {
trace!("Handling produce request for topic:");
let topic = &topic_request.name;

trace!("Handling produce request for topic: {topic}");

let mut topic_result = TopicWriteResult {
topic: topic_request.name,
topic: topic.clone(),
partitions: vec![],
};

for partition_request in topic_request.partitions.into_iter() {
let replica_id = ReplicaKey::new(
topic_result.topic.clone(),
partition_request.partition_index,
);
let replica_id = ReplicaKey::new(topic.clone(), partition_request.partition_index);
let partition_response = handle_produce_partition(ctx, replica_id, partition_request).await;
topic_result.partitions.push(partition_response);
}
Expand Down Expand Up @@ -124,6 +124,7 @@ async fn handle_produce_partition<R: BatchRecords>(
};

let mut records = partition_request.records;

if validate_records(&records, replica_metadata.compression_type).is_err() {
error!(%replica_id, "Compression in batch not supported by this topic");
return PartitionWriteResult::error(replica_id, ErrorCode::CompressionError);
Expand All @@ -134,7 +135,15 @@ async fn handle_produce_partition<R: BatchRecords>(
.await;

match write_result {
Ok((base_offset, leo)) => PartitionWriteResult::ok(replica_id, base_offset, leo),
Ok((base_offset, leo, bytes)) => {
let partition_metrics = ctx
.metrics()
.with_topic_partition(&replica_id.topic, partition_request.partition_index);
partition_metrics.add_records_written((leo - base_offset) as u64);
partition_metrics.add_bytes_written(bytes as u64);

PartitionWriteResult::ok(replica_id, base_offset, leo)
}
Err(err @ StorageError::BatchTooBig(_)) => {
error!(%replica_id, "Batch is too big: {:#?}", err);
PartitionWriteResult::error(replica_id, ErrorCode::MessageTooLarge)
Expand Down
14 changes: 14 additions & 0 deletions crates/fluvio-spu/src/services/public/stream_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::services::public::stream_fetch::publishers::INIT_OFFSET;
use crate::smartengine::context::SmartModuleContext;
use crate::smartengine::batch::BatchSmartEngine;
use crate::smartengine::file_batch::FileBatchIterator;
use crate::core::metrics::SpuMetrics;

/// Fetch records as stream
pub struct StreamFetchHandler {
Expand All @@ -47,6 +48,7 @@ pub struct StreamFetchHandler {
consumer_offset_listener: OffsetChangeListener,
leader_state: SharedFileLeaderState,
stream_id: u32,
metrics: SpuMetrics,
}

impl StreamFetchHandler {
Expand Down Expand Up @@ -170,6 +172,8 @@ impl StreamFetchHandler {
starting_offset,
"stream fetch");

let metrics = SpuMetrics::new();

let handler = Self {
isolation,
replica: replica.clone(),
Expand All @@ -181,6 +185,7 @@ impl StreamFetchHandler {
stream_id,
leader_state,
max_fetch_bytes,
metrics,
};

if let Err(err) = handler.process(starting_offset, derivedstream_ctx).await {
Expand Down Expand Up @@ -394,6 +399,10 @@ impl StreamFetchHandler {
..Default::default()
};

let partition_metrics = self
.metrics
.with_topic_partition(&self.replica.topic, self.replica.partition);

// Read records from the leader starting from `offset`
// Returns with the HW/LEO of the latest records available in the leader
// This describes the range of records that can be read in this request
Expand All @@ -405,7 +414,12 @@ impl StreamFetchHandler {
Ok(slice) => {
file_partition_response.high_watermark = slice.end.hw;
file_partition_response.log_start_offset = slice.start;

partition_metrics.add_records_read((slice.end.hw - slice.start) as u64);

if let Some(file_slice) = slice.file_slice {
partition_metrics.add_bytes_read(file_slice.len());

file_partition_response.records = file_slice.into();
}
slice.end
Expand Down
Loading