Skip to content

Commit

Permalink
fix consuming with multiple segments (#1700)
Browse files Browse the repository at this point in the history
resolves #1568

* Remove unneeded spawn handle in Stream Fetch Handler
* Fix Storage segment list search
* Fix segmentload loading
* Properly update end offset when active segment is rollover
  • Loading branch information
sehz committed Sep 30, 2021
1 parent ed5a122 commit 87a756b
Show file tree
Hide file tree
Showing 17 changed files with 446 additions and 288 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ jobs:
run: cargo test --manifest-path ./crates/${{ matrix.crate }}/Cargo.toml

local_cluster_test:
name: Local cluster test run (${{ matrix.run }})
name: Local cluster test run (${{ matrix.run }})-${{ matrix.test }}
runs-on: ${{ matrix.os }}
needs:
- build_primary_binaries
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-dataplane-protocol/src/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl BatchProducer {
BatchProducerBuilder::default()
}

fn generate_batch(&self) -> Batch {
pub fn generate_batch(&self) -> Batch {
let mut batches = Batch::default();
let header = batches.get_mut_header();
header.magic = 2;
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-spu/src/replication/leader/replica_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ mod test_leader {

#[async_trait]
impl ReplicaStorage for MockStorage {
async fn create(
async fn create_or_load(
_replica: &dataplane::ReplicaKey,
_config: Self::Config,
) -> Result<Self, fluvio_storage::StorageError> {
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-spu/src/services/public/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl FluvioService for PublicService {
"FetchOffsetsRequest"
),
SpuServerRequest::FileStreamFetchRequest(request) => {
StreamFetchHandler::spawn(
StreamFetchHandler::start(
request,
context.clone(),
shared_sink.clone(),
Expand Down
258 changes: 127 additions & 131 deletions crates/fluvio-spu/src/services/public/stream_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::io::Error as IoError;
use tracing::{error, debug, trace, instrument};
use tokio::select;

use dataplane::record::FileRecordSet;
use fluvio_types::event::{StickyEvent, offsets::OffsetPublisher};
use fluvio_future::task::spawn;
use fluvio_socket::{ExclusiveFlvSink, SocketError};
Expand All @@ -19,7 +20,8 @@ use dataplane::{
use dataplane::{Offset, Isolation, ReplicaKey};
use dataplane::fetch::FilePartitionResponse;
use fluvio_spu_schema::server::stream_fetch::{
FileStreamFetchRequest, DefaultStreamFetchRequest, StreamFetchResponse, SmartStreamKind,
DefaultStreamFetchRequest, FileStreamFetchRequest, SmartStreamKind, StreamFetchRequest,
StreamFetchResponse,
};
use fluvio_types::event::offsets::OffsetChangeListener;

Expand All @@ -38,7 +40,6 @@ pub struct StreamFetchHandler {
isolation: Isolation,
max_bytes: u32,
max_fetch_bytes: u32,

header: RequestHeader,
sink: ExclusiveFlvSink,
end_event: Arc<StickyEvent>,
Expand All @@ -49,132 +50,38 @@ pub struct StreamFetchHandler {

impl StreamFetchHandler {
/// handle fluvio continuous fetch request
pub async fn spawn(
request: RequestMessage<FileStreamFetchRequest>,
ctx: DefaultSharedGlobalContext,
sink: ExclusiveFlvSink,
end_event: Arc<StickyEvent>,
) -> Result<(), SocketError> {
spawn(async move {
if let Err(err) = StreamFetchHandler::start(request, ctx, sink, end_event.clone()).await
{
error!("error starting stream fetch handler: {:#?}", err);
end_event.notify();
}
});
debug!("spawned stream fetch controller");
Ok(())
}

#[instrument(skip(request, ctx, sink, end_event))]
pub async fn start(
request: RequestMessage<FileStreamFetchRequest>,
ctx: DefaultSharedGlobalContext,
sink: ExclusiveFlvSink,
end_event: Arc<StickyEvent>,
) -> Result<(), SocketError> {
// first get receiver to offset update channel to we don't missed events
let (header, msg) = request.get_header_request();

let current_offset = msg.fetch_offset;
let isolation = msg.isolation;
let replica = ReplicaKey::new(msg.topic, msg.partition);
let max_bytes = msg.max_bytes as u32;
let replica = ReplicaKey::new(msg.topic.clone(), msg.partition);

if let Some(leader_state) = ctx.leaders_state().get(&replica) {
let (stream_id, offset_publisher) =
ctx.stream_publishers().create_new_publisher().await;
let offset_listener = offset_publisher.change_listner();

debug!(
sink = sink.id(),
%replica,
current_offset,
max_bytes,
"start stream fetch"
);

let sm_engine = SmartStreamEngine::default();

let smartstream = if let Some(payload) = msg.wasm_payload {
let wasm = &payload.wasm.get_raw()?;
debug!(len = wasm.len(), "creating WASM module with bytes");
let module = sm_engine.create_module_from_binary(wasm).map_err(|err| {
SocketError::Io(IoError::new(
ErrorKind::Other,
format!("module loading error {}", err),
))
})?;

let smartstream: Box<dyn SmartStream> = match payload.kind {
SmartStreamKind::Filter => {
debug!("Instantiating SmartStreamFilter");
let filter = module.create_filter(&sm_engine).map_err(|err| {
SocketError::Io(IoError::new(
ErrorKind::Other,
format!("Failed to instantiate SmartStreamFilter {}", err),
))
})?;
Box::new(filter)
}
SmartStreamKind::Map => {
debug!("Instantiating SmartStreamMap");
let map = module.create_map(&sm_engine).map_err(|err| {
SocketError::Io(IoError::new(
ErrorKind::Other,
format!("Failed to instantiate SmartStreamMap {}", err),
))
})?;
Box::new(map)
}
SmartStreamKind::Aggregate { accumulator } => {
debug!(
accumulator_len = accumulator.len(),
"Instantiating SmartStreamAggregate"
);
let aggregator =
module
.create_aggregate(&sm_engine, accumulator)
.map_err(|err| {
SocketError::Io(IoError::new(
ErrorKind::Other,
format!(
"Failed to instantiate SmartStreamAggregate {}",
err
),
))
})?;
Box::new(aggregator)
}
};

Some(smartstream)
} else {
None
};

// if we are filtered we should scan all batches instead of just limit to max bytes
let max_fetch_bytes = if smartstream.is_none() {
max_bytes
} else {
u32::MAX
};

let handler = Self {
ctx: ctx.clone(),
isolation,
replica,
header,
max_bytes,
sink,
end_event,
consumer_offset_listener: offset_listener,
stream_id,
leader_state: leader_state.clone(),
max_fetch_bytes,
};

handler.process(current_offset, smartstream).await;
let consumer_offset_listener = offset_publisher.change_listner();

spawn(async move {
if let Err(err) = StreamFetchHandler::fetch(
ctx,
sink,
end_event.clone(),
leader_state,
stream_id,
header,
replica,
consumer_offset_listener,
msg,
)
.await
{
error!("error starting stream fetch handler: {:#?}", err);
end_event.notify();
}
});
} else {
debug!(topic = %replica.topic," no leader founded, returning");
let response = StreamFetchResponse {
Expand All @@ -201,24 +108,113 @@ impl StreamFetchHandler {
Ok(())
}

#[allow(clippy::too_many_arguments)]
#[instrument(
skip(self, smartstream),
name = "stream fetch",
skip(ctx,replica,end_event,leader_state,header,msg,consumer_offset_listener),
fields(
replica = %self.replica,
sink = self.sink.id()
)
)]
async fn process(mut self, starting_offset: Offset, smartstream: Option<Box<dyn SmartStream>>) {
if let Err(err) = self.inner_process(starting_offset, smartstream).await {
error!("error: {:#?}", err);
self.end_event.notify();
}
replica = %replica,
sink = sink.id()
))
]
pub async fn fetch(
ctx: DefaultSharedGlobalContext,
sink: ExclusiveFlvSink,
end_event: Arc<StickyEvent>,
leader_state: SharedFileLeaderState,
stream_id: u32,
header: RequestHeader,
replica: ReplicaKey,
consumer_offset_listener: OffsetChangeListener,
msg: StreamFetchRequest<FileRecordSet>,
) -> Result<(), SocketError> {
let max_bytes = msg.max_bytes as u32;
let sm_engine = SmartStreamEngine::default();

let (smartstream, max_fetch_bytes) = if let Some(payload) = msg.wasm_payload {
let wasm = &payload.wasm.get_raw()?;
debug!(len = wasm.len(), "creating WASM module with bytes");
let module = sm_engine.create_module_from_binary(wasm).map_err(|err| {
SocketError::Io(IoError::new(
ErrorKind::Other,
format!("module loading error {}", err),
))
})?;

let smartstream: Box<dyn SmartStream> = match payload.kind {
SmartStreamKind::Filter => {
debug!("Instantiating SmartStreamFilter");
let filter = module.create_filter(&sm_engine).map_err(|err| {
SocketError::Io(IoError::new(
ErrorKind::Other,
format!("Failed to instantiate SmartStreamFilter {}", err),
))
})?;
Box::new(filter)
}
SmartStreamKind::Map => {
debug!("Instantiating SmartStreamMap");
let map = module.create_map(&sm_engine).map_err(|err| {
SocketError::Io(IoError::new(
ErrorKind::Other,
format!("Failed to instantiate SmartStreamMap {}", err),
))
})?;
Box::new(map)
}
SmartStreamKind::Aggregate { accumulator } => {
debug!(
accumulator_len = accumulator.len(),
"Instantiating SmartStreamAggregate"
);
let aggregator =
module
.create_aggregate(&sm_engine, accumulator)
.map_err(|err| {
SocketError::Io(IoError::new(
ErrorKind::Other,
format!("Failed to instantiate SmartStreamAggregate {}", err),
))
})?;
Box::new(aggregator)
}
};

(Some(smartstream), u32::MAX)
} else {
(None, max_bytes)
};

let starting_offset = msg.fetch_offset;
let isolation = msg.isolation;

debug!(
max_bytes,
max_fetch_bytes,
isolation = ?isolation,
stream_id,
sink = %sink.id(),
starting_offset,
"stream fetch");

let handler = Self {
ctx: ctx.clone(),
isolation,
replica,
max_bytes,
sink,
end_event,
header,
consumer_offset_listener,
stream_id,
leader_state,
max_fetch_bytes,
};

handler.process(starting_offset, smartstream).await
}

#[instrument(skip(self, smartstream))]
async fn inner_process(
&mut self,
async fn process(
mut self,
starting_offset: Offset,
mut smartstream: Option<Box<dyn SmartStream>>,
) -> Result<(), SocketError> {
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-spu/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ where
{
/// create new storage replica or restore from durable storage based on configuration
pub async fn create(id: ReplicaKey, config: S::Config) -> Result<Self, StorageError> {
let storage = S::create(&id, config).await?;
let storage = S::create_or_load(&id, config).await?;

let leo = Arc::new(OffsetPublisher::new(storage.get_leo()));
let hw = Arc::new(OffsetPublisher::new(storage.get_hw()));
Expand Down
9 changes: 5 additions & 4 deletions crates/fluvio-storage/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,15 @@ impl LogIndex {
) -> Result<Self, IoError> {
let index_file_path = generate_file_name(&option.base_dir, base_offset, EXTENSION);

debug!("opening index mm at: {:#?}", index_file_path);
debug!(?index_file_path, "opening index");

// make sure it is log file
let (m_file, file) =
MemoryMappedFile::open(index_file_path, INDEX_ENTRY_SIZE as u64).await?;

let len = (file.metadata().await?).len();

trace!("opening memory mapped file with len : {}", len);
debug!(len, "memory mapped len");

if len > std::u32::MAX as u64 {
return Err(IoError::new(
Expand Down Expand Up @@ -243,8 +244,8 @@ mod tests {

let mut mut_index = MutLogIndex::create(921, &option).await.expect("create");

mut_index.send((5, 16, 70)).await.expect("send");
mut_index.send((10, 100, 70)).await.expect("send");
mut_index.write_index((5, 16, 70)).await.expect("send");
mut_index.write_index((10, 100, 70)).await.expect("send");

mut_index.shrink().await.expect("shrink");

Expand Down
5 changes: 4 additions & 1 deletion crates/fluvio-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,10 @@ mod inner {

/// create new storage area,
/// if there exists replica state, this should restore state
async fn create(replica: &ReplicaKey, config: Self::Config) -> Result<Self, StorageError>;
async fn create_or_load(
replica: &ReplicaKey,
config: Self::Config,
) -> Result<Self, StorageError>;

/// high water mark offset (records that has been replicated)
fn get_hw(&self) -> Offset;
Expand Down
Loading

0 comments on commit 87a756b

Please sign in to comment.