From d9cce170da2750fed1a4e71805d99e094eda831e Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Fri, 29 Nov 2024 22:05:36 +0530 Subject: [PATCH] always use text format for CdcStream --- pg_replicate/src/clients/postgres.rs | 39 +-- pg_replicate/src/conversions/bytes.rs | 278 ++++-------------- pg_replicate/src/conversions/cdc_event.rs | 47 +-- pg_replicate/src/conversions/table_row.rs | 7 +- pg_replicate/src/pipeline/sources/postgres.rs | 19 +- 5 files changed, 95 insertions(+), 295 deletions(-) diff --git a/pg_replicate/src/clients/postgres.rs b/pg_replicate/src/clients/postgres.rs index 54a8a31..ccf706b 100644 --- a/pg_replicate/src/clients/postgres.rs +++ b/pg_replicate/src/clients/postgres.rs @@ -442,47 +442,10 @@ impl ReplicationClient { publication: &str, slot_name: &str, start_lsn: PgLsn, - ) -> Result<(LogicalReplicationStream, bool), ReplicationClientError> { - match self - .get_stream(publication, slot_name, start_lsn, true) - .await - { - Ok(stream) => { - info!("binary format supported by logical replication"); - Ok((stream, true)) - } - Err(rce) => { - if let ReplicationClientError::TokioPostgresError(e) = &rce { - if let Some(dbe) = e.as_db_error() { - //TODO: use a more robust method of recognizing whether the server supports binary option or not - if dbe.message() == "unrecognized pgoutput option: binary" { - info!( - "binary format not supported by logical replication, trying text" - ); - return self - .get_stream(publication, slot_name, start_lsn, false) - .await - .map(|s| (s, false)); - } - } - } - Err(rce) - } - } - } - - pub async fn get_stream( - &self, - publication: &str, - slot_name: &str, - start_lsn: PgLsn, - binary_format: bool, ) -> Result { - let binary_option = if binary_format { r#", "binary""# } else { "" }; let options = format!( - r#"("proto_version" '1', "publication_names" {}{})"#, + r#"("proto_version" '1', "publication_names" {})"#, quote_literal(publication), - binary_option ); let query = format!( diff --git a/pg_replicate/src/conversions/bytes.rs b/pg_replicate/src/conversions/bytes.rs index 654abcc..28ee9dd 100644 --- a/pg_replicate/src/conversions/bytes.rs +++ b/pg_replicate/src/conversions/bytes.rs @@ -6,14 +6,14 @@ use std::{ }; use bigdecimal::ParseBigDecimalError; -use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime, Utc}; +use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime}; use thiserror::Error; -use tokio_postgres::types::{FromSql, Type}; +use tokio_postgres::types::Type; use uuid::Uuid; use crate::conversions::{bool::parse_bool, hex}; -use super::{bool::ParseBoolError, hex::ByteaHexParseError, numeric::PgNumeric, ArrayCell, Cell}; +use super::{bool::ParseBoolError, hex::ByteaHexParseError, ArrayCell, Cell}; #[derive(Debug, Error)] pub enum FromBytesError { @@ -54,162 +54,6 @@ pub enum FromBytesError { RowGetError(#[from] Box), } -pub trait FromBytes { - fn try_from_tuple_data(&self, typ: &Type, bytes: &[u8]) -> Result; -} - -pub struct BinaryFormatConverter; - -impl FromBytes for BinaryFormatConverter { - fn try_from_tuple_data(&self, typ: &Type, bytes: &[u8]) -> Result { - match *typ { - Type::BOOL => { - let val = bool::from_sql(typ, bytes)?; - Ok(Cell::Bool(val)) - } - Type::BOOL_ARRAY => { - let val = Vec::>::from_sql(typ, bytes)?; - Ok(Cell::Array(ArrayCell::Bool(val))) - } - Type::CHAR | Type::BPCHAR | Type::VARCHAR | Type::NAME | Type::TEXT => { - let val = String::from_sql(typ, bytes)?; - Ok(Cell::String(val.to_string())) - } - Type::CHAR_ARRAY - | Type::BPCHAR_ARRAY - | Type::VARCHAR_ARRAY - | Type::NAME_ARRAY - | Type::TEXT_ARRAY => { - let val = Vec::>::from_sql(typ, bytes)?; - Ok(Cell::Array(ArrayCell::String(val))) - } - Type::INT2 => { - let val = i16::from_sql(typ, bytes)?; - Ok(Cell::I16(val)) - } - Type::INT2_ARRAY => { - let val = Vec::>::from_sql(typ, bytes)?; - Ok(Cell::Array(ArrayCell::I16(val))) - } - Type::INT4 => { - let val = i32::from_sql(typ, bytes)?; - Ok(Cell::I32(val)) - } - Type::INT4_ARRAY => { - let val = Vec::>::from_sql(typ, bytes)?; - Ok(Cell::Array(ArrayCell::I32(val))) - } - Type::INT8 => { - let val = i64::from_sql(typ, bytes)?; - Ok(Cell::I64(val)) - } - Type::INT8_ARRAY => { - let val = Vec::>::from_sql(typ, bytes)?; - Ok(Cell::Array(ArrayCell::I64(val))) - } - Type::FLOAT4 => { - let val = f32::from_sql(typ, bytes)?; - Ok(Cell::F32(val)) - } - Type::FLOAT4_ARRAY => { - let val = Vec::>::from_sql(typ, bytes)?; - Ok(Cell::Array(ArrayCell::F32(val))) - } - Type::FLOAT8 => { - let val = f64::from_sql(typ, bytes)?; - Ok(Cell::F64(val)) - } - Type::FLOAT8_ARRAY => { - let val = Vec::>::from_sql(typ, bytes)?; - Ok(Cell::Array(ArrayCell::F64(val))) - } - Type::NUMERIC => { - let val = PgNumeric::from_sql(typ, bytes)?; - Ok(Cell::Numeric(val)) - } - Type::NUMERIC_ARRAY => { - let val = Vec::>::from_sql(typ, bytes)?; - Ok(Cell::Array(ArrayCell::Numeric(val))) - } - Type::BYTEA => { - let val = Vec::::from_sql(typ, bytes)?; - Ok(Cell::Bytes(val)) - } - Type::BYTEA_ARRAY => { - let val = Vec::>>::from_sql(typ, bytes)?; - Ok(Cell::Array(ArrayCell::Bytes(val))) - } - Type::DATE => { - let val = NaiveDate::from_sql(typ, bytes)?; - Ok(Cell::Date(val)) - } - Type::DATE_ARRAY => { - let val = Vec::>::from_sql(typ, bytes)?; - Ok(Cell::Array(ArrayCell::Date(val))) - } - Type::TIME => { - let val = NaiveTime::from_sql(typ, bytes)?; - Ok(Cell::Time(val)) - } - Type::TIME_ARRAY => { - let val = Vec::>::from_sql(typ, bytes)?; - Ok(Cell::Array(ArrayCell::Time(val))) - } - Type::TIMESTAMP => { - let val = NaiveDateTime::from_sql(typ, bytes)?; - Ok(Cell::TimeStamp(val)) - } - Type::TIMESTAMP_ARRAY => { - let val = Vec::>::from_sql(typ, bytes)?; - Ok(Cell::Array(ArrayCell::TimeStamp(val))) - } - Type::TIMESTAMPTZ => { - let val = DateTime::::from_sql(typ, bytes)?; - Ok(Cell::TimeStampTz(val.into())) - } - Type::TIMESTAMPTZ_ARRAY => { - let mut val = Vec::>>::from_sql(typ, bytes)?; - let val: Vec>> = - val.drain(..).map(|v| v.map(|v| v.into())).collect(); - Ok(Cell::Array(ArrayCell::TimeStampTz(val))) - } - Type::UUID => { - let val = Uuid::from_sql(typ, bytes)?; - Ok(Cell::Uuid(val)) - } - Type::UUID_ARRAY => { - let val = Vec::>::from_sql(typ, bytes)?; - Ok(Cell::Array(ArrayCell::Uuid(val))) - } - Type::JSON | Type::JSONB => { - let val = serde_json::Value::from_sql(typ, bytes)?; - Ok(Cell::Json(val)) - } - Type::JSON_ARRAY | Type::JSONB_ARRAY => { - let val = Vec::>::from_sql(typ, bytes)?; - Ok(Cell::Array(ArrayCell::Json(val))) - } - Type::OID => { - let val = u32::from_sql(typ, bytes)?; - Ok(Cell::U32(val)) - } - Type::OID_ARRAY => { - let val = Vec::>::from_sql(typ, bytes)?; - Ok(Cell::Array(ArrayCell::U32(val))) - } - #[cfg(feature = "unknown_types_to_bytes")] - _ => { - let val = String::from_sql(typ, bytes)?; - Ok(Cell::String(val.to_string())) - } - #[cfg(not(feature = "unknown_types_to_bytes"))] - _ => Err(CdcEventConversionError::UnsupportedType( - typ.name().to_string(), - )), - } - } -} - pub struct TextFormatConverter; #[derive(Debug, Error)] @@ -222,65 +66,7 @@ pub enum ArrayParseError { } impl TextFormatConverter { - fn parse_array(str: &str, mut parse: P, m: M) -> Result - where - P: FnMut(&str) -> Result, FromBytesError>, - M: FnOnce(Vec>) -> ArrayCell, - { - if str.len() < 2 { - return Err(ArrayParseError::InputTooShort.into()); - } - - if !str.starts_with('{') || !str.ends_with('}') { - return Err(ArrayParseError::MissingBraces.into()); - } - - let mut res = vec![]; - let str = &str[1..(str.len() - 1)]; - let mut val_str = String::with_capacity(10); - let mut in_quotes = false; - let mut in_escape = false; - let mut chars = str.chars(); - let mut done = str.is_empty(); - - while !done { - loop { - match chars.next() { - Some(c) => match c { - c if in_escape => { - val_str.push(c); - in_escape = false; - } - '"' => in_quotes = !in_quotes, - '\\' => in_escape = true, - ',' if !in_quotes => { - break; - } - c => { - val_str.push(c); - } - }, - None => { - done = true; - break; - } - } - } - let val = if val_str.to_lowercase() == "null" { - None - } else { - parse(&val_str)? - }; - res.push(val); - val_str.clear(); - } - - Ok(Cell::Array(m(res))) - } -} - -impl FromBytes for TextFormatConverter { - fn try_from_tuple_data(&self, typ: &Type, bytes: &[u8]) -> Result { + pub fn try_from_bytes(typ: &Type, bytes: &[u8]) -> Result { let str = str::from_utf8(bytes)?; match *typ { Type::BOOL => Ok(Cell::Bool(parse_bool(str)?)), @@ -411,4 +197,60 @@ impl FromBytes for TextFormatConverter { )), } } + + fn parse_array(str: &str, mut parse: P, m: M) -> Result + where + P: FnMut(&str) -> Result, FromBytesError>, + M: FnOnce(Vec>) -> ArrayCell, + { + if str.len() < 2 { + return Err(ArrayParseError::InputTooShort.into()); + } + + if !str.starts_with('{') || !str.ends_with('}') { + return Err(ArrayParseError::MissingBraces.into()); + } + + let mut res = vec![]; + let str = &str[1..(str.len() - 1)]; + let mut val_str = String::with_capacity(10); + let mut in_quotes = false; + let mut in_escape = false; + let mut chars = str.chars(); + let mut done = str.is_empty(); + + while !done { + loop { + match chars.next() { + Some(c) => match c { + c if in_escape => { + val_str.push(c); + in_escape = false; + } + '"' => in_quotes = !in_quotes, + '\\' => in_escape = true, + ',' if !in_quotes => { + break; + } + c => { + val_str.push(c); + } + }, + None => { + done = true; + break; + } + } + } + let val = if val_str.to_lowercase() == "null" { + None + } else { + parse(&val_str)? + }; + res.push(val); + val_str.clear(); + } + + Ok(Cell::Array(m(res))) + } } diff --git a/pg_replicate/src/conversions/cdc_event.rs b/pg_replicate/src/conversions/cdc_event.rs index f4010d4..aae31ec 100644 --- a/pg_replicate/src/conversions/cdc_event.rs +++ b/pg_replicate/src/conversions/cdc_event.rs @@ -12,7 +12,7 @@ use crate::{ }; use super::{ - bytes::{FromBytes, FromBytesError}, + bytes::{FromBytesError, TextFormatConverter}, table_row::TableRow, Cell, }; @@ -28,6 +28,9 @@ pub enum CdcEventConversionError { #[error("unchanged toast not yet supported")] UnchangedToastNotSupported, + #[error("binary format not yet supported")] + BinaryFormatNotSupported, + #[error("unsupported type: {0}")] UnsupportedType(String), @@ -41,13 +44,10 @@ pub enum CdcEventConversionError { FromBytes(#[from] FromBytesError), } -pub struct CdcEventConverter { - pub tuple_converter: Box, -} +pub struct CdcEventConverter; impl CdcEventConverter { fn try_from_tuple_data_slice( - &self, column_schemas: &[ColumnSchema], tuple_data: &[TupleData], ) -> Result { @@ -59,9 +59,12 @@ impl CdcEventConverter { TupleData::UnchangedToast => { return Err(CdcEventConversionError::UnchangedToastNotSupported) } - TupleData::Text(bytes) | TupleData::Binary(bytes) => self - .tuple_converter - .try_from_tuple_data(&column_schema.typ, &bytes[..])?, + TupleData::Binary(_) => { + return Err(CdcEventConversionError::BinaryFormatNotSupported) + } + TupleData::Text(bytes) => { + TextFormatConverter::try_from_bytes(&column_schema.typ, &bytes[..])? + } }; values.push(cell); } @@ -70,32 +73,29 @@ impl CdcEventConverter { } fn try_from_insert_body( - &self, table_id: TableId, column_schemas: &[ColumnSchema], insert_body: InsertBody, ) -> Result { let row = - self.try_from_tuple_data_slice(column_schemas, insert_body.tuple().tuple_data())?; + Self::try_from_tuple_data_slice(column_schemas, insert_body.tuple().tuple_data())?; Ok(CdcEvent::Insert((table_id, row))) } //TODO: handle when identity columns are changed fn try_from_update_body( - &self, table_id: TableId, column_schemas: &[ColumnSchema], update_body: UpdateBody, ) -> Result { let row = - self.try_from_tuple_data_slice(column_schemas, update_body.new_tuple().tuple_data())?; + Self::try_from_tuple_data_slice(column_schemas, update_body.new_tuple().tuple_data())?; Ok(CdcEvent::Update((table_id, row))) } fn try_from_delete_body( - &self, table_id: TableId, column_schemas: &[ColumnSchema], delete_body: DeleteBody, @@ -105,13 +105,12 @@ impl CdcEventConverter { .or(delete_body.old_tuple()) .ok_or(CdcEventConversionError::MissingTupleInDeleteBody)?; - let row = self.try_from_tuple_data_slice(column_schemas, tuple.tuple_data())?; + let row = Self::try_from_tuple_data_slice(column_schemas, tuple.tuple_data())?; Ok(CdcEvent::Delete((table_id, row))) } pub fn try_from( - &self, value: ReplicationMessage, table_schemas: &HashMap, ) -> Result { @@ -132,7 +131,11 @@ impl CdcEventConverter { .get(&table_id) .ok_or(CdcEventConversionError::MissingSchema(table_id))? .column_schemas; - Ok(self.try_from_insert_body(table_id, column_schemas, insert_body)?) + Ok(Self::try_from_insert_body( + table_id, + column_schemas, + insert_body, + )?) } LogicalReplicationMessage::Update(update_body) => { let table_id = update_body.rel_id(); @@ -140,7 +143,11 @@ impl CdcEventConverter { .get(&table_id) .ok_or(CdcEventConversionError::MissingSchema(table_id))? .column_schemas; - Ok(self.try_from_update_body(table_id, column_schemas, update_body)?) + Ok(Self::try_from_update_body( + table_id, + column_schemas, + update_body, + )?) } LogicalReplicationMessage::Delete(delete_body) => { let table_id = delete_body.rel_id(); @@ -148,7 +155,11 @@ impl CdcEventConverter { .get(&table_id) .ok_or(CdcEventConversionError::MissingSchema(table_id))? .column_schemas; - Ok(self.try_from_delete_body(table_id, column_schemas, delete_body)?) + Ok(Self::try_from_delete_body( + table_id, + column_schemas, + delete_body, + )?) } LogicalReplicationMessage::Truncate(_) => { Err(CdcEventConversionError::MessageNotSupported) diff --git a/pg_replicate/src/conversions/table_row.rs b/pg_replicate/src/conversions/table_row.rs index 45d4372..bbe0661 100644 --- a/pg_replicate/src/conversions/table_row.rs +++ b/pg_replicate/src/conversions/table_row.rs @@ -16,7 +16,7 @@ use uuid::Uuid; use crate::{pipeline::batching::BatchBoundary, table::ColumnSchema}; -use super::{bytes::FromBytes, numeric::PgNumeric, ArrayCell, Cell}; +use super::{numeric::PgNumeric, ArrayCell, Cell}; #[derive(Debug)] pub struct TableRow { @@ -41,9 +41,7 @@ pub enum TableRowConversionError { RowGetError(Option>), } -pub struct TableRowConverter { - pub tuple_converter: Box, -} +pub struct TableRowConverter; /// A wrapper type over Vec to help implement the FromSql trait. /// The wrapper is needed to avoid Rust's trait coherence rules. i.e. @@ -286,7 +284,6 @@ impl TableRowConverter { } pub fn try_from( - &self, row: &tokio_postgres::binary_copy::BinaryCopyOutRow, column_schemas: &[crate::table::ColumnSchema], ) -> Result { diff --git a/pg_replicate/src/pipeline/sources/postgres.rs b/pg_replicate/src/pipeline/sources/postgres.rs index 6a5fe6d..0e6807d 100644 --- a/pg_replicate/src/pipeline/sources/postgres.rs +++ b/pg_replicate/src/pipeline/sources/postgres.rs @@ -19,7 +19,6 @@ use tracing::info; use crate::{ clients::postgres::{ReplicationClient, ReplicationClientError}, conversions::{ - bytes::{BinaryFormatConverter, FromBytes, TextFormatConverter}, cdc_event::{CdcEvent, CdcEventConversionError, CdcEventConverter}, table_row::{TableRow, TableRowConversionError, TableRowConverter}, }, @@ -130,12 +129,9 @@ impl Source for PostgresSource { .await .map_err(PostgresSourceError::ReplicationClient)?; - let tuple_converter: Box = Box::new(TextFormatConverter); - Ok(TableCopyStream { stream, column_schemas: column_schemas.to_vec(), - table_row_converter: TableRowConverter { tuple_converter }, }) } @@ -155,7 +151,7 @@ impl Source for PostgresSource { let slot_name = self .slot_name() .ok_or(PostgresSourceError::MissingSlotName)?; - let (stream, binary_format) = self + let stream = self .replication_client .get_logical_replication_stream(publication, slot_name, start_lsn) .await @@ -164,17 +160,10 @@ impl Source for PostgresSource { const TIME_SEC_CONVERSION: u64 = 946_684_800; let postgres_epoch = UNIX_EPOCH + Duration::from_secs(TIME_SEC_CONVERSION); - let tuple_converter: Box = if binary_format { - Box::new(BinaryFormatConverter) - } else { - Box::new(TextFormatConverter) - }; - Ok(CdcStream { stream, table_schemas: self.table_schemas.clone(), postgres_epoch, - converter: CdcEventConverter { tuple_converter }, }) } } @@ -194,7 +183,6 @@ pin_project! { #[pin] stream: BinaryCopyOutStream, column_schemas: Vec, - table_row_converter: TableRowConverter, } } @@ -204,7 +192,7 @@ impl Stream for TableCopyStream { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); match ready!(this.stream.poll_next(cx)) { - Some(Ok(row)) => match this.table_row_converter.try_from(&row, this.column_schemas) { + Some(Ok(row)) => match TableRowConverter::try_from(&row, this.column_schemas) { Ok(row) => Poll::Ready(Some(Ok(row))), Err(e) => { let e = TableCopyStreamError::ConversionError(e); @@ -233,7 +221,6 @@ pin_project! { stream: LogicalReplicationStream, table_schemas: HashMap, postgres_epoch: SystemTime, - converter: CdcEventConverter } } @@ -267,7 +254,7 @@ impl Stream for CdcStream { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); match ready!(this.stream.poll_next(cx)) { - Some(Ok(msg)) => match this.converter.try_from(msg, this.table_schemas) { + Some(Ok(msg)) => match CdcEventConverter::try_from(msg, this.table_schemas) { Ok(row) => Poll::Ready(Some(Ok(row))), Err(e) => Poll::Ready(Some(Err(e.into()))), },