Skip to content

Commit

Permalink
feat: lazy evaluated record batch stream (GreptimeTeam#573)
Browse files Browse the repository at this point in the history
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
waynexia authored Nov 18, 2022
1 parent 22ae983 commit f465040
Showing 10 changed files with 135 additions and 65 deletions.
1 change: 0 additions & 1 deletion src/catalog/src/system.rs
Original file line number Diff line number Diff line change
@@ -134,7 +134,6 @@ impl SystemCatalogTable {
.context(error::SystemCatalogTableScanSnafu)?;
let stream = scan
.execute(0, Arc::new(RuntimeEnv::default()))
.await
.context(error::SystemCatalogTableScanExecSnafu)?;
Ok(stream)
}
1 change: 0 additions & 1 deletion src/catalog/src/tables.rs
Original file line number Diff line number Diff line change
@@ -368,7 +368,6 @@ mod tests {
let tables_stream = tables.scan(&None, &[], None).await.unwrap();
let mut tables_stream = tables_stream
.execute(0, Arc::new(RuntimeEnv::default()))
.await
.unwrap();

if let Some(t) = tables_stream.next().await {
24 changes: 9 additions & 15 deletions src/common/query/src/physical_plan.rs
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@ use std::fmt::Debug;
use std::sync::Arc;

use async_trait::async_trait;
use common_recordbatch::adapter::{DfRecordBatchStreamAdapter, RecordBatchStreamAdapter};
use common_recordbatch::adapter::{AsyncRecordBatchStreamAdapter, DfRecordBatchStreamAdapter};
use common_recordbatch::{DfSendableRecordBatchStream, SendableRecordBatchStream};
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
use datafusion::error::Result as DfResult;
@@ -39,7 +39,6 @@ pub type PhysicalPlanRef = Arc<dyn PhysicalPlan>;
/// creating the actual `async` [`SendableRecordBatchStream`]s
/// of [`RecordBatch`] that incrementally compute the operator's
/// output from its input partition.
#[async_trait]
pub trait PhysicalPlan: Debug + Send + Sync {
/// Returns the physical plan as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
@@ -61,7 +60,7 @@ pub trait PhysicalPlan: Debug + Send + Sync {
fn with_new_children(&self, children: Vec<PhysicalPlanRef>) -> Result<PhysicalPlanRef>;

/// Creates an RecordBatch stream.
async fn execute(
fn execute(
&self,
partition: usize,
runtime: Arc<RuntimeEnv>,
@@ -84,7 +83,6 @@ impl PhysicalPlanAdapter {
}
}

#[async_trait]
impl PhysicalPlan for PhysicalPlanAdapter {
fn as_any(&self) -> &dyn Any {
self
@@ -118,18 +116,15 @@ impl PhysicalPlan for PhysicalPlanAdapter {
Ok(Arc::new(PhysicalPlanAdapter::new(self.schema(), plan)))
}

async fn execute(
fn execute(
&self,
partition: usize,
runtime: Arc<RuntimeEnv>,
) -> Result<SendableRecordBatchStream> {
let stream = self
.df_plan
.execute(partition, runtime)
.await
.context(error::DataFusionExecutionPlanSnafu)?;
let stream = RecordBatchStreamAdapter::try_new(stream)
.context(error::ConvertDfRecordBatchStreamSnafu)?;
let df_plan = self.df_plan.clone();
let stream = Box::pin(async move { df_plan.execute(partition, runtime).await });
let stream = AsyncRecordBatchStreamAdapter::new(self.schema(), stream);

Ok(Box::pin(stream))
}
}
@@ -187,7 +182,7 @@ impl DfPhysicalPlan for DfPhysicalPlanAdapter {
partition: usize,
runtime: Arc<RuntimeEnv>,
) -> DfResult<DfSendableRecordBatchStream> {
let stream = self.0.execute(partition, runtime).await?;
let stream = self.0.execute(partition, runtime)?;
Ok(Box::pin(DfRecordBatchStreamAdapter::new(stream)))
}

@@ -250,7 +245,6 @@ mod test {
schema: SchemaRef,
}

#[async_trait]
impl PhysicalPlan for MyExecutionPlan {
fn as_any(&self) -> &dyn Any {
self
@@ -272,7 +266,7 @@ mod test {
unimplemented!()
}

async fn execute(
fn execute(
&self,
_partition: usize,
_runtime: Arc<RuntimeEnv>,
79 changes: 79 additions & 0 deletions src/common/recordbatch/src/adapter.rs
Original file line number Diff line number Diff line change
@@ -12,22 +12,33 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
use datafusion::physical_plan::RecordBatchStream as DfRecordBatchStream;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datafusion_common::DataFusionError;
use datatypes::arrow::error::{ArrowError, Result as ArrowResult};
use datatypes::schema::{Schema, SchemaRef};
use futures::ready;
use snafu::ResultExt;

use crate::error::{self, Result};
use crate::{
DfSendableRecordBatchStream, RecordBatch, RecordBatchStream, SendableRecordBatchStream, Stream,
};

type FutureStream = Pin<
Box<
dyn std::future::Future<
Output = std::result::Result<DfSendableRecordBatchStream, DataFusionError>,
> + Send,
>,
>;

/// Greptime SendableRecordBatchStream -> DataFusion RecordBatchStream
pub struct DfRecordBatchStreamAdapter {
stream: SendableRecordBatchStream,
@@ -104,3 +115,71 @@ impl Stream for RecordBatchStreamAdapter {
self.stream.size_hint()
}
}

enum AsyncRecordBatchStreamAdapterState {
Uninit(FutureStream),
Inited(std::result::Result<DfSendableRecordBatchStream, DataFusionError>),
}

pub struct AsyncRecordBatchStreamAdapter {
schema: SchemaRef,
state: AsyncRecordBatchStreamAdapterState,
}

impl AsyncRecordBatchStreamAdapter {
pub fn new(schema: SchemaRef, stream: FutureStream) -> Self {
Self {
schema,
state: AsyncRecordBatchStreamAdapterState::Uninit(stream),
}
}
}

impl RecordBatchStream for AsyncRecordBatchStreamAdapter {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}

impl Stream for AsyncRecordBatchStreamAdapter {
type Item = Result<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match &mut self.state {
AsyncRecordBatchStreamAdapterState::Uninit(stream_future) => {
self.state = AsyncRecordBatchStreamAdapterState::Inited(ready!(Pin::new(
stream_future
)
.poll(cx)));
continue;
}
AsyncRecordBatchStreamAdapterState::Inited(stream) => match stream {
Ok(stream) => {
return Poll::Ready(ready!(Pin::new(stream).poll_next(cx)).map(|df| {
Ok(RecordBatch {
schema: self.schema(),
df_recordbatch: df.context(error::PollStreamSnafu)?,
})
}));
}
Err(e) => {
return Poll::Ready(Some(
error::CreateRecordBatchesSnafu {
reason: format!("Read error {:?} from stream", e),
}
.fail()
.map_err(|e| e.into()),
))
}
},
}
}
}

// This is not supported for lazy stream.
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}
14 changes: 14 additions & 0 deletions src/common/recordbatch/src/lib.rs
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ use std::pin::Pin;
use std::sync::Arc;

use datafusion::arrow_print;
use datafusion::physical_plan::memory::MemoryStream;
pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::VectorRef;
use datatypes::schema::{Schema, SchemaRef};
@@ -132,6 +133,19 @@ impl RecordBatches {
index: 0,
})
}

pub fn into_df_stream(self) -> DfSendableRecordBatchStream {
let df_record_batches = self
.batches
.into_iter()
.map(|batch| batch.df_recordbatch)
.collect();
// unwrap safety: `MemoryStream::try_new` won't fail
Box::pin(
MemoryStream::try_new(df_record_batches, self.schema.arrow_schema().clone(), None)
.unwrap(),
)
}
}

pub struct SimpleRecordBatchStream {
40 changes: 24 additions & 16 deletions src/frontend/src/table.rs
Original file line number Diff line number Diff line change
@@ -20,14 +20,17 @@ use std::sync::Arc;

use async_trait::async_trait;
use client::Database;
use common_error::prelude::BoxedError;
use common_query::error::Result as QueryResult;
use common_query::logical_plan::Expr;
use common_query::physical_plan::{PhysicalPlan, PhysicalPlanRef};
use common_recordbatch::adapter::AsyncRecordBatchStreamAdapter;
use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::logical_plan::Expr as DfExpr;
use datafusion::physical_plan::Partitioning;
use datafusion::physical_plan::{
Partitioning, SendableRecordBatchStream as DfSendableRecordBatchStream,
};
use datafusion_common::DataFusionError;
use datatypes::prelude::Value;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use meta_client::rpc::{Peer, TableName};
@@ -110,14 +113,14 @@ impl Table for DistTable {
let datanode_instance = DatanodeInstance::new(Arc::new(self.clone()) as _, db);

// TODO(LFC): Pass in "regions" when Datanode supports multi regions for a table.
partition_execs.push(PartitionExec {
partition_execs.push(Arc::new(PartitionExec {
table_name: self.table_name.clone(),
datanode_instance,
projection: projection.clone(),
filters: filters.to_vec(),
limit,
batches: Arc::new(RwLock::new(None)),
})
}));
}

let dist_scan = DistTableScan {
@@ -385,10 +388,9 @@ fn reverse_operator(op: &Operator) -> Operator {
#[derive(Debug)]
struct DistTableScan {
schema: SchemaRef,
partition_execs: Vec<PartitionExec>,
partition_execs: Vec<Arc<PartitionExec>>,
}

#[async_trait]
impl PhysicalPlan for DistTableScan {
fn as_any(&self) -> &dyn Any {
self
@@ -410,14 +412,20 @@ impl PhysicalPlan for DistTableScan {
unimplemented!()
}

async fn execute(
fn execute(
&self,
partition: usize,
_runtime: Arc<RuntimeEnv>,
) -> QueryResult<SendableRecordBatchStream> {
let exec = &self.partition_execs[partition];
exec.maybe_init().await.map_err(BoxedError::new)?;
Ok(exec.as_stream().await)
let exec = self.partition_execs[partition].clone();
let stream = Box::pin(async move {
exec.maybe_init()
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
exec.as_stream().await
});
let stream = AsyncRecordBatchStreamAdapter::new(self.schema(), stream);
Ok(Box::pin(stream))
}
}

@@ -453,12 +461,13 @@ impl PartitionExec {
Ok(())
}

async fn as_stream(&self) -> SendableRecordBatchStream {
let batches = self.batches.read().await;
batches
.as_ref()
/// Notice: the record batch will be consumed.
async fn as_stream(&self) -> std::result::Result<DfSendableRecordBatchStream, DataFusionError> {
let mut batches = self.batches.write().await;
Ok(batches
.take()
.expect("should have been initialized in \"maybe_init\"")
.as_stream()
.into_df_stream())
}
}

@@ -759,7 +768,6 @@ mod test {
for partition in 0..table_scan.output_partitioning().partition_count() {
let result = table_scan
.execute(partition, Arc::new(RuntimeEnv::default()))
.await
.unwrap();
let recordbatches = util::collect(result).await.unwrap();

Loading
Oops, something went wrong.

0 comments on commit f465040

Please sign in to comment.