Skip to content

Commit

Permalink
fix: recover from invalid segments (#2909)
Browse files Browse the repository at this point in the history
resolves #2509.

Recover from invalid logs due to incorrect batch encoding.  It does this by setting the log size to the last correct batches.
Tested with data from corrupted segments.   

- More clean and simplification of the Log validator.
- Migrated more API to use anyhow
  • Loading branch information
sehz committed Dec 30, 2022
1 parent c0011a8 commit e68dcca
Show file tree
Hide file tree
Showing 18 changed files with 529 additions and 356 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ jobs:
UNINSTALL: noclean
SERVER_LOG: fluvio=debug
strategy:
fail-fast: false
# fail-fast: true
matrix:
os: [ubuntu-latest]
rust-target: [x86_64-unknown-linux-musl]
Expand Down
9 changes: 3 additions & 6 deletions crates/fluvio-spu/src/replication/follower/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use fluvio_storage::config::ReplicaConfig;
use fluvio_controlplane_metadata::partition::{Replica, ReplicaKey};
use fluvio_protocol::record::RecordSet;
use fluvio_protocol::record::Offset;
use fluvio_storage::{FileReplica, StorageError, ReplicaStorage, ReplicaStorageConfig};
use fluvio_storage::{FileReplica, ReplicaStorage, ReplicaStorageConfig};
use fluvio_types::SpuId;

use crate::replication::leader::ReplicaOffsetRequest;
Expand Down Expand Up @@ -209,7 +209,7 @@ where
&self,
records: &mut RecordSet<R>,
leader_hw: Offset,
) -> Result<bool, StorageError> {
) -> Result<bool> {
let mut changes = false;

if records.total_records() > 0 {
Expand Down Expand Up @@ -245,10 +245,7 @@ where

/// try to write records
/// ensure records has correct baseoffset
async fn write_recordsets<R: BatchRecords>(
&self,
records: &mut RecordSet<R>,
) -> Result<bool, StorageError> {
async fn write_recordsets<R: BatchRecords>(&self, records: &mut RecordSet<R>) -> Result<bool> {
let storage_leo = self.leo();
if records.base_offset() != storage_leo {
// this could happened if records were sent from leader before hw was sync
Expand Down
6 changes: 3 additions & 3 deletions crates/fluvio-spu/src/replication/leader/replica_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use anyhow::Result;
use fluvio_protocol::record::{RecordSet, Offset, ReplicaKey, BatchRecords};
use fluvio_controlplane_metadata::partition::{Replica, ReplicaStatus, PartitionStatus};
use fluvio_controlplane::LrsRequest;
use fluvio_storage::{FileReplica, StorageError, ReplicaStorage, OffsetInfo, ReplicaStorageConfig};
use fluvio_storage::{FileReplica, ReplicaStorage, OffsetInfo, ReplicaStorageConfig};
use fluvio_types::{SpuId};
use fluvio_spu_schema::Isolation;

Expand Down Expand Up @@ -315,7 +315,7 @@ where
&self,
records: &mut RecordSet<R>,
notifiers: &FollowerNotifier,
) -> Result<(Offset, Offset, usize), StorageError> {
) -> Result<(Offset, Offset, usize)> {
let offsets = self
.storage
.write_record_set(records, self.in_sync_replica == 1)
Expand Down Expand Up @@ -742,7 +742,7 @@ mod test_leader {
&mut self,
records: &mut fluvio_protocol::record::RecordSet<R>,
update_highwatermark: bool,
) -> Result<usize, fluvio_storage::StorageError> {
) -> Result<usize> {
self.pos.leo = records.last_offset().unwrap();
if update_highwatermark {
self.pos.hw = self.pos.leo;
Expand Down
18 changes: 10 additions & 8 deletions crates/fluvio-spu/src/services/public/produce_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,16 @@ async fn handle_produce_partition<R: BatchRecords>(

PartitionWriteResult::ok(replica_id, base_offset, leo)
}
Err(err @ StorageError::BatchTooBig(_)) => {
error!(%replica_id, "Batch is too big: {:#?}", err);
PartitionWriteResult::error(replica_id, ErrorCode::MessageTooLarge)
}
Err(err) => {
error!(%replica_id, "Error writing to replica: {:#?}", err);
PartitionWriteResult::error(replica_id, ErrorCode::StorageError)
}
Err(err) => match err.downcast_ref::<StorageError>() {
Some(StorageError::BatchTooBig(_)) => {
error!(%replica_id, "Batch is too big: {:#?}", err);
PartitionWriteResult::error(replica_id, ErrorCode::MessageTooLarge)
}
_ => {
error!(%replica_id, "Error writing to replica: {:#?}", err);
PartitionWriteResult::error(replica_id, ErrorCode::StorageError)
}
},
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-spu/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ where
&self,
records: &mut RecordSet<R>,
hw_update: bool,
) -> Result<(Offset, Offset, usize), StorageError> {
) -> Result<(Offset, Offset, usize)> {
debug!(
replica = %self.id,
leo = self.leo(),
Expand Down
43 changes: 27 additions & 16 deletions crates/fluvio-storage/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ use std::fmt::Debug;
use std::marker::PhantomData;
use std::path::Path;

use fluvio_protocol::record::BatchHeader;
use fluvio_protocol::record::Offset;
use tracing::error;
use tracing::instrument;
use tracing::trace;
use tracing::debug;
use anyhow::{Result, anyhow};
use async_trait::async_trait;
use bytes::Bytes;
Expand All @@ -19,20 +21,21 @@ use fluvio_protocol::record::Size;

use crate::file::FileBytesIterator;

#[derive(Debug, thiserror::Error)]
#[derive(Debug, thiserror::Error, Clone)]
/// Outer batch representation
/// It's either sucessfully decoded into actual batch or not enough bytes to decode
pub enum BatchHeaderError<R> {
#[error(transparent)]
Io(#[from] IoError),
#[error("Not Enough Header{actual_len} {expected_len}")]
pub enum BatchHeaderError {
#[error("Not Enough Header {pos}, {actual_len} {expected_len}")]
NotEnoughHeader {
pos: u32,
actual_len: usize,
expected_len: usize,
},
#[error("Not Enough Content {actual_len} {expected_len}")]
#[error("Not Enough Content {pos} {base_offset} {actual_len} {expected_len}")]
NotEnoughContent {
header: Batch<R>, // decoded header
header: BatchHeader,
base_offset: Offset,
pos: u32,
actual_len: usize,
expected_len: usize,
},
Expand All @@ -57,7 +60,7 @@ where
#[instrument(skip(file))]
pub(crate) async fn read_from<S: StorageBytesIterator>(
file: &mut S,
) -> Result<Option<FileBatchPos<R>>, BatchHeaderError<R>> {
) -> Result<Option<FileBatchPos<R>>> {
let pos = file.get_pos();
trace!(pos, "reading from pos");
let bytes = match file.read_bytes(BATCH_FILE_HEADER_SIZE as u32).await? {
Expand All @@ -81,7 +84,9 @@ where
return Err(BatchHeaderError::NotEnoughHeader {
actual_len: read_len,
expected_len: BATCH_FILE_HEADER_SIZE,
});
pos,
}
.into());
}

let mut cursor = Cursor::new(bytes);
Expand Down Expand Up @@ -110,10 +115,13 @@ where
Some(bytes) => bytes,
None => {
return Err(BatchHeaderError::NotEnoughContent {
header: batch,
base_offset: batch.get_base_offset(),
header: batch.header,
actual_len: 0,
expected_len: content_len,
})
pos,
}
.into())
}
};

Expand All @@ -128,10 +136,13 @@ where

if read_len < content_len {
return Err(BatchHeaderError::NotEnoughContent {
header: batch,
base_offset: batch.get_base_offset(),
header: batch.header,
actual_len: read_len,
expected_len: content_len,
});
pos,
}
.into());
}

let mut cursor = Cursor::new(bytes);
Expand Down Expand Up @@ -226,9 +237,9 @@ where
match FileBatchPos::read_from(&mut self.byte_iterator).await {
Ok(batch_res) => Ok(batch_res),
Err(err) => {
debug!("error getting batch: {}", err);
error!("error getting batch: {}, invalidating", err);
self.invalid = true;
Err(anyhow!("error decoding batch: {}", err))
Err(err)
}
}
}
Expand Down
107 changes: 75 additions & 32 deletions crates/fluvio-storage/src/bin/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ use std::{path::PathBuf};
use clap::Parser;
use anyhow::{Result, anyhow};

use fluvio_controlplane_metadata::partition::ReplicaKey;
use fluvio_protocol::record::Offset;
use fluvio_future::task::run_block_on;
use fluvio_storage::{
LogIndex, OffsetPosition,
batch_header::BatchHeaderStream,
segment::{MutableSegment},
config::{ReplicaConfig},
FileReplica, ReplicaStorage,
};
use fluvio_storage::records::FileRecords;

Expand All @@ -29,18 +31,23 @@ enum Main {

#[clap(name = "validate")]
ValidateSegment(SegmentValidateOpt),

/// show information about replica
#[clap(name = "replica")]
Replica(ReplicaOpt),
}

fn main() {
fluvio_future::subscriber::init_logger();

let opt = Main::parse();
let main_opt = Main::parse();

let result = run_block_on(async {
match opt {
match main_opt {
Main::Log(opt) => dump_log(opt).await,
Main::Index(opt) => dump_index(opt).await,
Main::ValidateSegment(opt) => validate_segment(opt).await,
Main::Replica(opt) => replica_info(opt).await,
}
});
if let Err(err) = result {
Expand Down Expand Up @@ -86,31 +93,45 @@ async fn dump_log(opt: LogOpt) -> Result<()> {

let mut count: usize = 0;
let time = std::time::Instant::now();
while let Some(batch_pos) = header_stream.try_next().await? {
let pos = batch_pos.get_pos();
let batch = batch_pos.inner();

let base_offset = batch.get_base_offset();

if let Some(min) = opt.min {
if (base_offset as usize) < min {
continue;
let mut last_batch_offset = 0;
loop {
match header_stream.try_next().await {
Ok(Some(batch_pos)) => {
let pos = batch_pos.get_pos();
let batch = batch_pos.inner();

let base_offset = batch.get_base_offset();

if let Some(min) = opt.min {
if (base_offset as usize) < min {
continue;
}
}
if let Some(max) = opt.max {
if (base_offset as usize) > max {
break;
}
}

if opt.print {
println!(
"batch offset: {}, pos: {}, len: {}, ",
base_offset, pos, batch.batch_len,
);
}

count += 1;
last_batch_offset = base_offset;
}
}
if let Some(max) = opt.max {
if (base_offset as usize) > max {
Ok(None) => {
break;
}
Err(err) => {
println!("encountered error: {:#?}", err);
println!("last batch offset: {}", last_batch_offset);
break;
}
}

if opt.print {
println!(
"batch offset: {}, pos: {}, len: {}, ",
base_offset, pos, batch.batch_len,
);
}

count += 1;
}

println!(
Expand Down Expand Up @@ -171,12 +192,6 @@ pub(crate) struct SegmentValidateOpt {

#[clap(long, default_value = "0")]
base_offset: Offset,

#[clap(long)]
skip_errors: bool,

#[clap(long)]
verbose: bool,
}

pub(crate) async fn validate_segment(opt: SegmentValidateOpt) -> Result<()> {
Expand All @@ -196,13 +211,41 @@ pub(crate) async fn validate_segment(opt: SegmentValidateOpt) -> Result<()> {
);

let start = std::time::Instant::now();
let last_offset = active_segment
.validate(opt.skip_errors, opt.verbose)
.await?;
let last_offset = active_segment.validate_and_repair().await?;

let duration = start.elapsed().as_secs_f32();

println!("completed, last offset = {last_offset}, took: {duration} seconds");

Ok(())
}

#[derive(Debug, Parser)]
pub(crate) struct ReplicaOpt {
/// base data directory
#[clap(value_parser)]
replica_dir: PathBuf,

#[clap(long)]
topic: String,

#[clap(long, default_value = "0")]
partition: u32,
}

pub(crate) async fn replica_info(opt: ReplicaOpt) -> Result<()> {
let replica_dir = opt.replica_dir;

println!("opening replica dir: {:#?}", replica_dir);
let option = ReplicaConfig::builder()
.base_dir(replica_dir.clone())
.build();

let replica = ReplicaKey::new(opt.topic, opt.partition);
let replica = FileReplica::create_or_load(&replica, option).await?;

println!("hw: {:#?}", replica.get_hw());
println!("leo: {:#?}", replica.get_leo());

Ok(())
}
Loading

0 comments on commit e68dcca

Please sign in to comment.