Skip to content

Commit

Permalink
feat(producer): added blocking if the batch queue is full (infinyon#2562
Browse files Browse the repository at this point in the history
)

Added the batch queue size control using `Condvar`.
If the configured queue size is reached, we block pushing new records until there is enough space for a new batch. During the flush, we notify all waiting threads that they can progress. 

Fixed infinyon#2512 

Co-authored-by: Luis Moreno <morenol@users.noreply.github.com>
  • Loading branch information
Alexander Galibey and morenol committed Aug 19, 2022
1 parent 2c634af commit a3cf20e
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 57 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Release Notes

## Platform Version 0.9.34 - UNRELEASED
* Added througput control to fluvio producer ([#2512](https://github.com/infinyon/fluvio/issues/2512))

## Platform Version 0.9.33 - 2022-08-10
* Added `DeliverySemantic` to `fluvio-cli`. ([#2508](https://github.com/infinyon/fluvio/pull/2508))
Expand Down
52 changes: 26 additions & 26 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/fluvio-test/src/tests/longevity/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub async fn consumer_stream(test_driver: TestDriver, option: MyTestCase, consum
// Vec of consumer streams
let mut streams = Vec::new();
// Create channel here
let (s, r) = async_channel::unbounded();
let (s, r) = async_channel::bounded(1000);

// loop over number of topics
for t in 0..option.environment.topic {
Expand Down
13 changes: 2 additions & 11 deletions crates/fluvio-test/src/tests/longevity/producer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use fluvio::{RecordKey, TopicProducerConfig, TopicProducerConfigBuilder};
use fluvio_test_util::test_runner::test_driver::TestDriver;
use fluvio_test_util::test_meta::environment::EnvDetail;
use std::time::{Duration, Instant, SystemTime};
use std::time::{Duration, SystemTime};
use tracing::debug;

use super::MyTestCase;
Expand All @@ -24,6 +24,7 @@ pub async fn producer(test_driver: TestDriver, option: MyTestCase, producer_id:
let config: TopicProducerConfig = TopicProducerConfigBuilder::default()
.linger(linger)
.batch_size(option.environment.producer_batch_size.unwrap_or(16_384))
.batch_queue_size(25)
.build()
.unwrap();

Expand All @@ -40,9 +41,6 @@ pub async fn producer(test_driver: TestDriver, option: MyTestCase, producer_id:
let mut records_sent = 0;
let test_start = SystemTime::now();

let flush_period = linger * 10; // sync with producer flush pace periodically to do back pressure
let mut last_flush_time = Instant::now();

debug!("About to start producer loop");
while test_start.elapsed().unwrap() <= option.environment.timeout {
let record = TestRecordBuilder::new()
Expand Down Expand Up @@ -75,13 +73,6 @@ pub async fn producer(test_driver: TestDriver, option: MyTestCase, producer_id:
.expect("Producer Send failed");
}

if last_flush_time.elapsed() > flush_period {
for p in &producers {
let _ = p.flush().await;
}
last_flush_time = Instant::now();
}

records_sent += 1;
}

Expand Down
5 changes: 3 additions & 2 deletions crates/fluvio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,12 @@ strum = { version = "0.24", features = ["derive"], optional = true }
num-traits = { version = "0.2", optional = true }

# Fluvio dependencies
fluvio-future = { version = "0.4.1", features = [
fluvio-future = { version = "0.4.2", features = [
"task",
"openssl_tls",
"task_unstable",
"retry"
"retry",
"sync"
] }
fluvio-types = { version = "0.3.7", features = [
"events",
Expand Down
Loading

0 comments on commit a3cf20e

Please sign in to comment.