Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - fix: recover from invalid segments #2909

Closed
wants to merge 15 commits into from
Next Next commit
more cleanup in validator
migrate storage api to use anyhow
  • Loading branch information
sehz committed Dec 28, 2022
commit e965b1f09a14bad92893752ad1580f5204c3446b
9 changes: 3 additions & 6 deletions crates/fluvio-spu/src/replication/follower/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use fluvio_storage::config::ReplicaConfig;
use fluvio_controlplane_metadata::partition::{Replica, ReplicaKey};
use fluvio_protocol::record::RecordSet;
use fluvio_protocol::record::Offset;
use fluvio_storage::{FileReplica, StorageError, ReplicaStorage, ReplicaStorageConfig};
use fluvio_storage::{FileReplica, ReplicaStorage, ReplicaStorageConfig};
use fluvio_types::SpuId;

use crate::replication::leader::ReplicaOffsetRequest;
Expand Down Expand Up @@ -209,7 +209,7 @@ where
&self,
records: &mut RecordSet<R>,
leader_hw: Offset,
) -> Result<bool, StorageError> {
) -> Result<bool> {
let mut changes = false;

if records.total_records() > 0 {
Expand Down Expand Up @@ -245,10 +245,7 @@ where

/// try to write records
/// ensure records has correct baseoffset
async fn write_recordsets<R: BatchRecords>(
&self,
records: &mut RecordSet<R>,
) -> Result<bool, StorageError> {
async fn write_recordsets<R: BatchRecords>(&self, records: &mut RecordSet<R>) -> Result<bool> {
let storage_leo = self.leo();
if records.base_offset() != storage_leo {
// this could happened if records were sent from leader before hw was sync
Expand Down
6 changes: 3 additions & 3 deletions crates/fluvio-spu/src/replication/leader/replica_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use anyhow::Result;
use fluvio_protocol::record::{RecordSet, Offset, ReplicaKey, BatchRecords};
use fluvio_controlplane_metadata::partition::{Replica, ReplicaStatus, PartitionStatus};
use fluvio_controlplane::LrsRequest;
use fluvio_storage::{FileReplica, StorageError, ReplicaStorage, OffsetInfo, ReplicaStorageConfig};
use fluvio_storage::{FileReplica, ReplicaStorage, OffsetInfo, ReplicaStorageConfig};
use fluvio_types::{SpuId};
use fluvio_spu_schema::Isolation;

Expand Down Expand Up @@ -315,7 +315,7 @@ where
&self,
records: &mut RecordSet<R>,
notifiers: &FollowerNotifier,
) -> Result<(Offset, Offset, usize), StorageError> {
) -> Result<(Offset, Offset, usize)> {
let offsets = self
.storage
.write_record_set(records, self.in_sync_replica == 1)
Expand Down Expand Up @@ -742,7 +742,7 @@ mod test_leader {
&mut self,
records: &mut fluvio_protocol::record::RecordSet<R>,
update_highwatermark: bool,
) -> Result<usize, fluvio_storage::StorageError> {
) -> Result<usize> {
self.pos.leo = records.last_offset().unwrap();
if update_highwatermark {
self.pos.hw = self.pos.leo;
Expand Down
18 changes: 10 additions & 8 deletions crates/fluvio-spu/src/services/public/produce_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,16 @@ async fn handle_produce_partition<R: BatchRecords>(

PartitionWriteResult::ok(replica_id, base_offset, leo)
}
Err(err @ StorageError::BatchTooBig(_)) => {
error!(%replica_id, "Batch is too big: {:#?}", err);
PartitionWriteResult::error(replica_id, ErrorCode::MessageTooLarge)
}
Err(err) => {
error!(%replica_id, "Error writing to replica: {:#?}", err);
PartitionWriteResult::error(replica_id, ErrorCode::StorageError)
}
Err(err) => match err.downcast_ref::<StorageError>() {
Some(StorageError::BatchTooBig(_)) => {
error!(%replica_id, "Batch is too big: {:#?}", err);
PartitionWriteResult::error(replica_id, ErrorCode::MessageTooLarge)
}
_ => {
error!(%replica_id, "Error writing to replica: {:#?}", err);
PartitionWriteResult::error(replica_id, ErrorCode::StorageError)
}
},
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-spu/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ where
&self,
records: &mut RecordSet<R>,
hw_update: bool,
) -> Result<(Offset, Offset, usize), StorageError> {
) -> Result<(Offset, Offset, usize)> {
debug!(
replica = %self.id,
leo = self.leo(),
Expand Down
5 changes: 3 additions & 2 deletions crates/fluvio-storage/src/cleaner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,13 @@ mod tests {
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;

use fluvio_future::timer::sleep;
use flv_util::fixture::ensure_new_dir;
use fluvio_protocol::fixture::create_batch;
use fluvio_protocol::record::Offset;

use crate::StorageError;
use crate::config::SharedReplicaConfig;
use crate::segment::MutableSegment;
use crate::segment::ReadSegment;
Expand Down Expand Up @@ -253,7 +254,7 @@ mod tests {
option: Arc<SharedReplicaConfig>,
start: Offset,
end_offset: Offset,
) -> Result<ReadSegment, StorageError> {
) -> Result<ReadSegment> {
let mut mut_segment = MutableSegment::create(start, option).await?;
mut_segment.append_batch(&mut create_batch()).await?;
mut_segment.set_end_offset(end_offset);
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ mod inner {
&mut self,
records: &mut RecordSet<R>,
update_highwatermark: bool,
) -> Result<usize, StorageError>;
) -> Result<usize>;

async fn update_high_watermark(&mut self, offset: Offset) -> Result<bool, StorageError>;

Expand Down
14 changes: 8 additions & 6 deletions crates/fluvio-storage/src/mut_records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@ use fluvio_protocol::Encoder;
use crate::config::SharedReplicaConfig;
use crate::mut_index::MutLogIndex;
use crate::util::generate_file_name;
use crate::validator::validate;
use crate::validator::LogValidationError;
use crate::StorageError;
use crate::records::FileRecords;
use crate::validator::LogValidator;

pub const MESSAGE_LOG_EXTENSION: &str = "log";

Expand Down Expand Up @@ -112,8 +111,8 @@ impl MutFileRecords {
index: &MutLogIndex,
skip_errors: bool,
verbose: bool,
) -> Result<Offset> {
validate(&self.path, Some(index), skip_errors, verbose).await
) -> Result<LogValidator> {
LogValidator::validate(&self.path, Some(index), skip_errors, verbose).await
}

/// get current file position
Expand All @@ -128,10 +127,13 @@ impl MutFileRecords {
pub async fn write_batch<R: BatchRecords>(
&mut self,
batch: &Batch<R>,
) -> Result<(bool, usize, u32), StorageError> {
) -> Result<(bool, usize, u32)> {
trace!("start sending using batch {:#?}", batch.get_header());
if batch.base_offset < self.base_offset {
return Err(StorageError::LogValidation(LogValidationError::BaseOff));
return Err(LogValidationError::InvalidBaseOffsetMinimum {
invalid_batch_offset: batch.base_offset,
}
.into());
}

let batch_len = batch.write_size(0);
Expand Down
6 changes: 3 additions & 3 deletions crates/fluvio-storage/src/records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use fluvio_protocol::record::{Offset, Size, Size64};
use crate::LogIndex;
use crate::config::SharedReplicaConfig;
use crate::util::generate_file_name;
use crate::validator::validate;
use crate::validator::LogValidator;
use crate::StorageError;

pub const MESSAGE_LOG_EXTENSION: &str = "log";
Expand Down Expand Up @@ -93,8 +93,8 @@ impl FileRecordsSlice {
index: &LogIndex,
skip_errors: bool,
verbose: bool,
) -> Result<Offset> {
validate(&self.path, Some(index), skip_errors, verbose).await
) -> Result<LogValidator> {
LogValidator::validate(&self.path, Some(index), skip_errors, verbose).await
}

pub fn modified_time_elapsed(&self) -> Result<Duration, SystemTimeError> {
Expand Down
18 changes: 8 additions & 10 deletions crates/fluvio-storage/src/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,15 @@ impl ReplicaStorage for FileReplica {
&mut self,
records: &mut RecordSet<R>,
update_highwatermark: bool,
) -> Result<usize, StorageError> {
) -> Result<usize> {
let max_batch_size = self.option.max_batch_size.get() as usize;
let mut total_size = 0;
// check if any of the records's batch exceed max length
for batch in &records.batches {
let batch_size = batch.write_size(0);
total_size += batch_size;
if batch_size > max_batch_size {
return Err(StorageError::BatchTooBig(max_batch_size));
return Err(StorageError::BatchTooBig(max_batch_size).into());
}
}

Expand Down Expand Up @@ -369,10 +369,7 @@ impl FileReplica {
}

#[instrument(skip(self, item))]
async fn write_batch<R: BatchRecords>(
&mut self,
item: &mut Batch<R>,
) -> Result<(), StorageError> {
async fn write_batch<R: BatchRecords>(&mut self, item: &mut Batch<R>) -> Result<()> {
if !(self.active_segment.append_batch(item).await?) {
info!(
partition = self.partition,
Expand Down Expand Up @@ -850,11 +847,12 @@ mod tests {
.expect("batch")
.records();
assert!(largest_batch.write_size(0) > 100); // ensure we are writing more than 100
let err = replica
.write_recordset(&mut largest_batch, true)
.await
.unwrap_err();
assert!(matches!(
replica
.write_recordset(&mut largest_batch, true)
.await
.unwrap_err(),
err.downcast_ref::<StorageError>().expect("downcast"),
StorageError::BatchTooBig(_)
));
}
Expand Down
53 changes: 18 additions & 35 deletions crates/fluvio-storage/src/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use crate::records::FileRecords;
use crate::mut_records::MutFileRecords;
use crate::records::FileRecordsSlice;
use crate::config::{SharedReplicaConfig};
use crate::validator::{InvalidIndexError};
use crate::StorageError;
use crate::batch::{FileBatchStream};
use crate::index::OffsetPosition;
Expand Down Expand Up @@ -233,7 +232,7 @@ impl Segment<LogIndex, FileRecordsSlice> {
base_offset: Offset,
end_offset: Offset,
option: Arc<SharedReplicaConfig>,
) -> Result<Self, StorageError> {
) -> Result<Self> {
debug!(base_offset, end_offset, ?option, "open for read");
let msg_log = FileRecordsSlice::open(base_offset, option.clone()).await?;
let base_offset = msg_log.get_base_offset();
Expand All @@ -258,40 +257,26 @@ impl Segment<LogIndex, FileRecordsSlice> {
let msg_log = FileRecordsSlice::open(base_offset, option.clone()).await?;
let index = LogIndex::open_from_offset(base_offset, option.clone()).await?;
let base_offset = msg_log.get_base_offset();
let end_offset = msg_log.validate(&index, false, false).await?;
match msg_log.validate(&index, false, false).await {
Ok(end_offset) => {
debug!(end_offset, base_offset, "base offset from msg_log");
Ok(val) => {
// check if validation is successful
if let Some(err) = val.error {
error!(err = ?err, "segment validation failed");
return Err(err.into());
}

info!(end_offset = val.last_valid_offset, base_offset = val.base_offset, time_ms = %val.duration.as_millis(), "segment validated");
Ok(Segment {
msg_log,
index,
option,
base_offset,
end_offset,
end_offset: val.last_valid_offset,
})
}
Err(err) => {
// try to downcast
if let Some(index_error) = err.downcast_ref::<InvalidIndexError>() {
error!(
offset = index_error.offset,
batch_file_pos = index_error.batch_file_pos,
index_position = index_error.index_position,
diff_position = index_error.diff_position,
"invalid index, rebuilding"
);
// let index = msg_log.generate_index().await?;
Ok(Segment {
msg_log,
index,
option,
base_offset,
end_offset,
})
} else {
error!(?err, "validation error");
Err(err)
}
error!(?err, "segment validation encountered fail error");
Err(err)
}
}
}
Expand Down Expand Up @@ -364,7 +349,8 @@ impl Segment<MutLogIndex, MutFileRecords> {
self.end_offset = self
.msg_log
.validate(&self.index, skip_errors, verbose)
.await?;
.await?
.last_valid_offset;
Ok(self.end_offset)
}

Expand All @@ -381,13 +367,13 @@ impl Segment<MutLogIndex, MutFileRecords> {

/// convert to immutable segment
#[allow(clippy::wrong_self_convention)]
pub async fn as_segment(self) -> Result<ReadSegment, StorageError> {
pub async fn as_segment(self) -> Result<ReadSegment> {
Segment::open_for_read(self.get_base_offset(), self.end_offset, self.option.clone()).await
}

/// use only in test
#[cfg(test)]
pub async fn convert_to_segment(mut self) -> Result<ReadSegment, StorageError> {
pub async fn convert_to_segment(mut self) -> Result<ReadSegment> {
self.shrink_index().await?;
Segment::open_for_read(self.get_base_offset(), self.end_offset, self.option.clone()).await
}
Expand All @@ -398,14 +384,11 @@ impl Segment<MutLogIndex, MutFileRecords> {
/// 2. Append batch to msg log
/// 3. Write batch location to index
#[instrument(skip(batch))]
pub async fn append_batch<R: BatchRecords>(
&mut self,
batch: &mut Batch<R>,
) -> Result<bool, StorageError> {
pub async fn append_batch<R: BatchRecords>(&mut self, batch: &mut Batch<R>) -> Result<bool> {
// adjust base offset and offset delta
// reject if batch len is 0
if batch.records_len() == 0 {
return Err(StorageError::EmptyBatch);
return Err(StorageError::EmptyBatch.into());
}

batch.set_base_offset(self.end_offset);
Expand Down
5 changes: 3 additions & 2 deletions crates/fluvio-storage/src/segments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,12 @@ mod tests {
use std::path::PathBuf;
use std::sync::Arc;

use anyhow::Result;

use flv_util::fixture::ensure_new_dir;
use fluvio_protocol::fixture::create_batch;
use fluvio_protocol::record::Offset;

use crate::StorageError;
use crate::config::SharedReplicaConfig;
use crate::segment::MutableSegment;
use crate::segment::ReadSegment;
Expand All @@ -291,7 +292,7 @@ mod tests {
option: Arc<SharedReplicaConfig>,
start: Offset,
end_offset: Offset,
) -> Result<ReadSegment, StorageError> {
) -> Result<ReadSegment> {
let mut mut_segment = MutableSegment::create(start, option).await?;
mut_segment.append_batch(&mut create_batch()).await?;
mut_segment.set_end_offset(end_offset); // only used for testing
Expand Down
Loading