Skip to content

Commit

Permalink
always use text format for CdcStream
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Nov 29, 2024
1 parent d5819a6 commit d9cce17
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 295 deletions.
39 changes: 1 addition & 38 deletions pg_replicate/src/clients/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,47 +442,10 @@ impl ReplicationClient {
publication: &str,
slot_name: &str,
start_lsn: PgLsn,
) -> Result<(LogicalReplicationStream, bool), ReplicationClientError> {
match self
.get_stream(publication, slot_name, start_lsn, true)
.await
{
Ok(stream) => {
info!("binary format supported by logical replication");
Ok((stream, true))
}
Err(rce) => {
if let ReplicationClientError::TokioPostgresError(e) = &rce {
if let Some(dbe) = e.as_db_error() {
//TODO: use a more robust method of recognizing whether the server supports binary option or not
if dbe.message() == "unrecognized pgoutput option: binary" {
info!(
"binary format not supported by logical replication, trying text"
);
return self
.get_stream(publication, slot_name, start_lsn, false)
.await
.map(|s| (s, false));
}
}
}
Err(rce)
}
}
}

pub async fn get_stream(
&self,
publication: &str,
slot_name: &str,
start_lsn: PgLsn,
binary_format: bool,
) -> Result<LogicalReplicationStream, ReplicationClientError> {
let binary_option = if binary_format { r#", "binary""# } else { "" };
let options = format!(
r#"("proto_version" '1', "publication_names" {}{})"#,
r#"("proto_version" '1', "publication_names" {})"#,
quote_literal(publication),
binary_option
);

let query = format!(
Expand Down
278 changes: 60 additions & 218 deletions pg_replicate/src/conversions/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use std::{
};

use bigdecimal::ParseBigDecimalError;
use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime, Utc};
use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime};
use thiserror::Error;
use tokio_postgres::types::{FromSql, Type};
use tokio_postgres::types::Type;
use uuid::Uuid;

use crate::conversions::{bool::parse_bool, hex};

use super::{bool::ParseBoolError, hex::ByteaHexParseError, numeric::PgNumeric, ArrayCell, Cell};
use super::{bool::ParseBoolError, hex::ByteaHexParseError, ArrayCell, Cell};

#[derive(Debug, Error)]
pub enum FromBytesError {
Expand Down Expand Up @@ -54,162 +54,6 @@ pub enum FromBytesError {
RowGetError(#[from] Box<dyn std::error::Error + Sync + Send>),
}

pub trait FromBytes {
fn try_from_tuple_data(&self, typ: &Type, bytes: &[u8]) -> Result<Cell, FromBytesError>;
}

pub struct BinaryFormatConverter;

impl FromBytes for BinaryFormatConverter {
fn try_from_tuple_data(&self, typ: &Type, bytes: &[u8]) -> Result<Cell, FromBytesError> {
match *typ {
Type::BOOL => {
let val = bool::from_sql(typ, bytes)?;
Ok(Cell::Bool(val))
}
Type::BOOL_ARRAY => {
let val = Vec::<Option<bool>>::from_sql(typ, bytes)?;
Ok(Cell::Array(ArrayCell::Bool(val)))
}
Type::CHAR | Type::BPCHAR | Type::VARCHAR | Type::NAME | Type::TEXT => {
let val = String::from_sql(typ, bytes)?;
Ok(Cell::String(val.to_string()))
}
Type::CHAR_ARRAY
| Type::BPCHAR_ARRAY
| Type::VARCHAR_ARRAY
| Type::NAME_ARRAY
| Type::TEXT_ARRAY => {
let val = Vec::<Option<String>>::from_sql(typ, bytes)?;
Ok(Cell::Array(ArrayCell::String(val)))
}
Type::INT2 => {
let val = i16::from_sql(typ, bytes)?;
Ok(Cell::I16(val))
}
Type::INT2_ARRAY => {
let val = Vec::<Option<i16>>::from_sql(typ, bytes)?;
Ok(Cell::Array(ArrayCell::I16(val)))
}
Type::INT4 => {
let val = i32::from_sql(typ, bytes)?;
Ok(Cell::I32(val))
}
Type::INT4_ARRAY => {
let val = Vec::<Option<i32>>::from_sql(typ, bytes)?;
Ok(Cell::Array(ArrayCell::I32(val)))
}
Type::INT8 => {
let val = i64::from_sql(typ, bytes)?;
Ok(Cell::I64(val))
}
Type::INT8_ARRAY => {
let val = Vec::<Option<i64>>::from_sql(typ, bytes)?;
Ok(Cell::Array(ArrayCell::I64(val)))
}
Type::FLOAT4 => {
let val = f32::from_sql(typ, bytes)?;
Ok(Cell::F32(val))
}
Type::FLOAT4_ARRAY => {
let val = Vec::<Option<f32>>::from_sql(typ, bytes)?;
Ok(Cell::Array(ArrayCell::F32(val)))
}
Type::FLOAT8 => {
let val = f64::from_sql(typ, bytes)?;
Ok(Cell::F64(val))
}
Type::FLOAT8_ARRAY => {
let val = Vec::<Option<f64>>::from_sql(typ, bytes)?;
Ok(Cell::Array(ArrayCell::F64(val)))
}
Type::NUMERIC => {
let val = PgNumeric::from_sql(typ, bytes)?;
Ok(Cell::Numeric(val))
}
Type::NUMERIC_ARRAY => {
let val = Vec::<Option<PgNumeric>>::from_sql(typ, bytes)?;
Ok(Cell::Array(ArrayCell::Numeric(val)))
}
Type::BYTEA => {
let val = Vec::<u8>::from_sql(typ, bytes)?;
Ok(Cell::Bytes(val))
}
Type::BYTEA_ARRAY => {
let val = Vec::<Option<Vec<u8>>>::from_sql(typ, bytes)?;
Ok(Cell::Array(ArrayCell::Bytes(val)))
}
Type::DATE => {
let val = NaiveDate::from_sql(typ, bytes)?;
Ok(Cell::Date(val))
}
Type::DATE_ARRAY => {
let val = Vec::<Option<NaiveDate>>::from_sql(typ, bytes)?;
Ok(Cell::Array(ArrayCell::Date(val)))
}
Type::TIME => {
let val = NaiveTime::from_sql(typ, bytes)?;
Ok(Cell::Time(val))
}
Type::TIME_ARRAY => {
let val = Vec::<Option<NaiveTime>>::from_sql(typ, bytes)?;
Ok(Cell::Array(ArrayCell::Time(val)))
}
Type::TIMESTAMP => {
let val = NaiveDateTime::from_sql(typ, bytes)?;
Ok(Cell::TimeStamp(val))
}
Type::TIMESTAMP_ARRAY => {
let val = Vec::<Option<NaiveDateTime>>::from_sql(typ, bytes)?;
Ok(Cell::Array(ArrayCell::TimeStamp(val)))
}
Type::TIMESTAMPTZ => {
let val = DateTime::<FixedOffset>::from_sql(typ, bytes)?;
Ok(Cell::TimeStampTz(val.into()))
}
Type::TIMESTAMPTZ_ARRAY => {
let mut val = Vec::<Option<DateTime<FixedOffset>>>::from_sql(typ, bytes)?;
let val: Vec<Option<DateTime<Utc>>> =
val.drain(..).map(|v| v.map(|v| v.into())).collect();
Ok(Cell::Array(ArrayCell::TimeStampTz(val)))
}
Type::UUID => {
let val = Uuid::from_sql(typ, bytes)?;
Ok(Cell::Uuid(val))
}
Type::UUID_ARRAY => {
let val = Vec::<Option<Uuid>>::from_sql(typ, bytes)?;
Ok(Cell::Array(ArrayCell::Uuid(val)))
}
Type::JSON | Type::JSONB => {
let val = serde_json::Value::from_sql(typ, bytes)?;
Ok(Cell::Json(val))
}
Type::JSON_ARRAY | Type::JSONB_ARRAY => {
let val = Vec::<Option<serde_json::Value>>::from_sql(typ, bytes)?;
Ok(Cell::Array(ArrayCell::Json(val)))
}
Type::OID => {
let val = u32::from_sql(typ, bytes)?;
Ok(Cell::U32(val))
}
Type::OID_ARRAY => {
let val = Vec::<Option<u32>>::from_sql(typ, bytes)?;
Ok(Cell::Array(ArrayCell::U32(val)))
}
#[cfg(feature = "unknown_types_to_bytes")]
_ => {
let val = String::from_sql(typ, bytes)?;
Ok(Cell::String(val.to_string()))
}
#[cfg(not(feature = "unknown_types_to_bytes"))]
_ => Err(CdcEventConversionError::UnsupportedType(
typ.name().to_string(),
)),
}
}
}

pub struct TextFormatConverter;

#[derive(Debug, Error)]
Expand All @@ -222,65 +66,7 @@ pub enum ArrayParseError {
}

impl TextFormatConverter {
fn parse_array<P, M, T>(str: &str, mut parse: P, m: M) -> Result<Cell, FromBytesError>
where
P: FnMut(&str) -> Result<Option<T>, FromBytesError>,
M: FnOnce(Vec<Option<T>>) -> ArrayCell,
{
if str.len() < 2 {
return Err(ArrayParseError::InputTooShort.into());
}

if !str.starts_with('{') || !str.ends_with('}') {
return Err(ArrayParseError::MissingBraces.into());
}

let mut res = vec![];
let str = &str[1..(str.len() - 1)];
let mut val_str = String::with_capacity(10);
let mut in_quotes = false;
let mut in_escape = false;
let mut chars = str.chars();
let mut done = str.is_empty();

while !done {
loop {
match chars.next() {
Some(c) => match c {
c if in_escape => {
val_str.push(c);
in_escape = false;
}
'"' => in_quotes = !in_quotes,
'\\' => in_escape = true,
',' if !in_quotes => {
break;
}
c => {
val_str.push(c);
}
},
None => {
done = true;
break;
}
}
}
let val = if val_str.to_lowercase() == "null" {
None
} else {
parse(&val_str)?
};
res.push(val);
val_str.clear();
}

Ok(Cell::Array(m(res)))
}
}

impl FromBytes for TextFormatConverter {
fn try_from_tuple_data(&self, typ: &Type, bytes: &[u8]) -> Result<Cell, FromBytesError> {
pub fn try_from_bytes(typ: &Type, bytes: &[u8]) -> Result<Cell, FromBytesError> {
let str = str::from_utf8(bytes)?;
match *typ {
Type::BOOL => Ok(Cell::Bool(parse_bool(str)?)),
Expand Down Expand Up @@ -411,4 +197,60 @@ impl FromBytes for TextFormatConverter {
)),
}
}

fn parse_array<P, M, T>(str: &str, mut parse: P, m: M) -> Result<Cell, FromBytesError>
where
P: FnMut(&str) -> Result<Option<T>, FromBytesError>,
M: FnOnce(Vec<Option<T>>) -> ArrayCell,
{
if str.len() < 2 {
return Err(ArrayParseError::InputTooShort.into());
}

if !str.starts_with('{') || !str.ends_with('}') {
return Err(ArrayParseError::MissingBraces.into());
}

let mut res = vec![];
let str = &str[1..(str.len() - 1)];
let mut val_str = String::with_capacity(10);
let mut in_quotes = false;
let mut in_escape = false;
let mut chars = str.chars();
let mut done = str.is_empty();

while !done {
loop {
match chars.next() {
Some(c) => match c {
c if in_escape => {
val_str.push(c);
in_escape = false;
}
'"' => in_quotes = !in_quotes,
'\\' => in_escape = true,
',' if !in_quotes => {
break;
}
c => {
val_str.push(c);
}
},
None => {
done = true;
break;
}
}
}
let val = if val_str.to_lowercase() == "null" {
None
} else {
parse(&val_str)?
};
res.push(val);
val_str.clear();
}

Ok(Cell::Array(m(res)))
}
}
Loading

0 comments on commit d9cce17

Please sign in to comment.