Skip to content

Commit

Permalink
Feature/2481 make produce request cloneable (infinyon#2491)
Browse files Browse the repository at this point in the history
This is needed for being able to retry produce requests if an error occurs.

Related infinyon#2481
  • Loading branch information
Alexander Galibey committed Jul 19, 2022
1 parent ebe193f commit 8f6d489
Show file tree
Hide file tree
Showing 16 changed files with 159 additions and 47 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* Keep serving incoming requests even if socket closed to write. ([#2484](https://github.com/infinyon/fluvio/pull/2484))
* Support async response in multiplexed socket. ([#2488](https://github.com/infinyon/fluvio/pull/2488))
* Drop write lock before async IO operations. ([#2490](https://github.com/infinyon/fluvio/pull/2490))
* Add `Clone` trait to `DefaultProduceRequest`. ([#2491](https://github.com/infinyon/fluvio/pull/2491))

## Platform Version 0.9.31 - 2022-07-13
* Move stream publishers to connection-level context ([#2452](https://github.com/infinyon/fluvio/pull/2452))
Expand Down
33 changes: 17 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/fluvio-compression/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluvio-compression"
version = "0.1.1"
version = "0.2.0"
edition = "2021"
license = "Apache-2.0"
authors = ["Fluvio Contributors <team@fluvio.io>"]
Expand All @@ -18,3 +18,4 @@ flate2 = { version = "1.0.20"}
snap = { version = "1" }
lz4_flex = { version = "0.9", default-features = false, features = ["safe-decode", "safe-encode", "frame"] }
thiserror = "1.0.30"
bytes = { version = "1.1.0"}
4 changes: 3 additions & 1 deletion crates/fluvio-compression/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use bytes::buf::Writer;
use bytes::BytesMut;
use snap::write::{IntoInnerError, FrameEncoder};

#[derive(thiserror::Error, Debug)]
Expand All @@ -7,7 +9,7 @@ pub enum CompressionError {
#[error("unknown compression format: {0}")]
UnknownCompressionFormat(String),
#[error("error flushing Snap encoder: {0}")]
SnapError(#[from] Box<IntoInnerError<FrameEncoder<Vec<u8>>>>),
SnapError(#[from] Box<IntoInnerError<FrameEncoder<Writer<BytesMut>>>>),
#[error("error flushing Snap encoder: {0}")]
Lz4Error(#[from] lz4_flex::frame::Error),
#[error("Unreachable error")]
Expand Down
10 changes: 6 additions & 4 deletions crates/fluvio-compression/src/gzip.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use std::io::{Read, Write};
use bytes::{BufMut, Bytes, BytesMut};

use flate2::Compression;
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;

use crate::error::CompressionError;

pub fn compress(src: &[u8]) -> Result<Vec<u8>, CompressionError> {
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
pub fn compress(src: &[u8]) -> Result<Bytes, CompressionError> {
let mut encoder = GzEncoder::new(BytesMut::new().writer(), Compression::default());
encoder.write_all(src)?;
Ok(encoder.finish()?)
Ok(encoder.finish()?.into_inner().freeze())
}

pub fn uncompress<T: Read>(src: T) -> Result<Vec<u8>, CompressionError> {
Expand All @@ -21,6 +22,7 @@ pub fn uncompress<T: Read>(src: T) -> Result<Vec<u8>, CompressionError> {

#[cfg(test)]
mod tests {
use bytes::Buf;
use super::*;

#[test]
Expand All @@ -30,7 +32,7 @@ mod tests {

assert!(compressed.len() < text.as_bytes().len());

let uncompressed = String::from_utf8(uncompress(compressed.as_slice()).unwrap()).unwrap();
let uncompressed = String::from_utf8(uncompress(compressed.reader()).unwrap()).unwrap();

assert_eq!(uncompressed, text);
}
Expand Down
5 changes: 3 additions & 2 deletions crates/fluvio-compression/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::str::FromStr;
use bytes::Bytes;

mod error;

Expand Down Expand Up @@ -58,9 +59,9 @@ impl FromStr for Compression {

impl Compression {
/// Compress the given data, returning the compressed data
pub fn compress(&self, src: &[u8]) -> Result<Vec<u8>, CompressionError> {
pub fn compress(&self, src: &[u8]) -> Result<Bytes, CompressionError> {
match *self {
Compression::None => Ok(src.to_vec()),
Compression::None => Ok(Bytes::copy_from_slice(src)),
Compression::Gzip => gzip::compress(src),
Compression::Snappy => snappy::compress(src),
Compression::Lz4 => lz4::compress(src),
Expand Down
12 changes: 7 additions & 5 deletions crates/fluvio-compression/src/lz4.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::io::{Read, Write};
use bytes::{BufMut, Bytes, BytesMut};

use crate::error::CompressionError;
use lz4_flex::frame::{FrameDecoder, FrameEncoder};

pub fn compress(src: &[u8]) -> Result<Vec<u8>, CompressionError> {
let buf = Vec::with_capacity(src.len());
let mut encoder = FrameEncoder::new(buf);
pub fn compress(src: &[u8]) -> Result<Bytes, CompressionError> {
let buf = BytesMut::with_capacity(src.len());
let mut encoder = FrameEncoder::new(buf.writer());
encoder.write_all(src)?;
Ok(encoder.finish()?)
Ok(encoder.finish()?.into_inner().freeze())
}

pub fn uncompress<T: Read>(src: T) -> Result<Vec<u8>, CompressionError> {
Expand All @@ -19,6 +20,7 @@ pub fn uncompress<T: Read>(src: T) -> Result<Vec<u8>, CompressionError> {

#[cfg(test)]
mod tests {
use bytes::Buf;
use super::*;

#[test]
Expand All @@ -28,7 +30,7 @@ mod tests {

assert!(compressed.len() < text.as_bytes().len());

let uncompressed = String::from_utf8(uncompress(compressed.as_slice()).unwrap()).unwrap();
let uncompressed = String::from_utf8(uncompress(compressed.reader()).unwrap()).unwrap();

assert_eq!(uncompressed, text);
}
Expand Down
15 changes: 10 additions & 5 deletions crates/fluvio-compression/src/snappy.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use std::io::{Read, Write};
use bytes::{BufMut, Bytes, BytesMut};

use crate::error::CompressionError;
use snap::{read::FrameDecoder, write::FrameEncoder};

pub fn compress(src: &[u8]) -> Result<Vec<u8>, CompressionError> {
let buf = Vec::with_capacity(src.len());
let mut encoder = FrameEncoder::new(buf);
pub fn compress(src: &[u8]) -> Result<Bytes, CompressionError> {
let buf = BytesMut::with_capacity(src.len());
let mut encoder = FrameEncoder::new(buf.writer());
encoder.write_all(src)?;
Ok(encoder.into_inner().map_err(Box::new)?)
Ok(encoder
.into_inner()
.map(|w| w.into_inner().freeze())
.map_err(Box::new)?)
}

pub fn uncompress<T: Read>(src: T) -> Result<Vec<u8>, CompressionError> {
Expand All @@ -19,6 +23,7 @@ pub fn uncompress<T: Read>(src: T) -> Result<Vec<u8>, CompressionError> {

#[cfg(test)]
mod tests {
use bytes::Buf;
use super::*;

#[test]
Expand All @@ -28,7 +33,7 @@ mod tests {

assert!(compressed.len() < text.as_bytes().len());

let uncompressed = String::from_utf8(uncompress(compressed.as_slice()).unwrap()).unwrap();
let uncompressed = String::from_utf8(uncompress(compressed.reader()).unwrap()).unwrap();

assert_eq!(uncompressed, text);
}
Expand Down
4 changes: 2 additions & 2 deletions crates/fluvio-dataplane-protocol/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluvio-dataplane-protocol"
version = "0.11.4"
version = "0.11.5"
edition = "2021"
authors = ["Fluvio Contributors <team@fluvio.io>"]
description = "data plane protocol"
Expand Down Expand Up @@ -28,7 +28,7 @@ eyre = { version = "0.6", default-features = false }
thiserror = "1"

# Fluvio dependencies
fluvio-compression = { version = "0.1.0", path = "../fluvio-compression" }
fluvio-compression = { version = "0.2", path = "../fluvio-compression" }
fluvio-future = { version = "0.4.0" }
fluvio-protocol = { path = "../fluvio-protocol", version = "0.7", features = [
"derive",
Expand Down
22 changes: 16 additions & 6 deletions crates/fluvio-dataplane-protocol/src/batch.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::io::Error;
use std::mem::size_of;
use std::fmt::Debug;
use bytes::Bytes;
use fluvio_compression::CompressionError;
use fluvio_types::PartitionId;
use tracing::trace;
Expand Down Expand Up @@ -37,8 +38,8 @@ pub type MemoryRecords = Vec<Record>;
/// A type describing Raw records
/// This structs decodes and encode its bytes as it is. Just the raw bytes of its internal vector.
/// When decoding, please be sure that your src buffer have the exact number of bytes.
#[derive(Debug, Default)]
pub struct RawRecords(pub Vec<u8>);
#[derive(Debug, Default, Clone)]
pub struct RawRecords(pub Bytes);

impl Encoder for RawRecords {
fn write_size(&self, _version: Version) -> usize {
Expand All @@ -54,9 +55,7 @@ impl Encoder for RawRecords {
impl Decoder for RawRecords {
fn decode<T: Buf>(&mut self, buf: &mut T, _version: Version) -> Result<(), Error> {
let len = buf.remaining();

self.0.resize(len, 0);
buf.copy_to_slice(&mut self.0);
self.0 = buf.copy_to_bytes(len);
Ok(())
}
}
Expand Down Expand Up @@ -364,7 +363,18 @@ where
}
}

#[derive(Debug, Decoder, Encoder)]
impl<R: Clone> Clone for Batch<R> {
fn clone(&self) -> Self {
Self {
base_offset: self.base_offset,
batch_len: self.batch_len,
header: self.header.clone(),
records: self.records.clone(),
}
}
}

#[derive(Debug, Decoder, Encoder, Clone)]
pub struct BatchHeader {
pub partition_leader_epoch: i32,
pub magic: i8,
Expand Down
Loading

0 comments on commit 8f6d489

Please sign in to comment.