Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: dangling open transaction for cdc-only, feat: add nats connector #81

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

cawfeecoder
Copy link

@cawfeecoder cawfeecoder commented Jan 1, 2025

What kind of change does this PR introduce?

Bug fix and Feature (happy to break these into 2 seperate PRs)

What is the current behavior?

For the fix, #80. The existing behavior leaves a dangling transaction
open when calling PostgresSource::new that it expects BatchDataPipeline::copy_tables to close

What is the new behavior?

BatchDataPipeline::copy_tables explicitly requests it's own transaction and then closes it when completed. This avoids CDC only complaining about the above error since we don't use transactions during CDC only now.

I've also added a WIP NATS connector to publish out changes on a NATS message broker.

Additional context

feat: add nats connector (for cdc events)

For the first part, fix START_REPLICATION cannot run inside a
transaction block for CDC only, which is caused by
PostgresSource::new opening a transaction, but never closing it
for CDC only. This can be remedied by committing that transaction
and offering an explicit start_transaction function to be called
in cases of not CDC only.

For the 2nd part, introduce a NATS connector (WIP,
this should probably create or get a stream in Jetstream
for persistence and needs some configuration options)
so we can propagate out CDC events onto a message broker

Refs: supabase#80
@@ -54,6 +55,10 @@ impl<Src: Source, Snk: BatchSink> BatchDataPipeline<Src, Snk> {
copied_tables: &HashSet<TableId>,
) -> Result<(), PipelineError<Src::Error, Snk::Error>> {
let start = Instant::now();
self.source
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix for the transaction, by starting an explicit transaction here instead of relying on the constructor of PostgresSource to open a transaction.

@@ -69,6 +69,7 @@ impl PostgresSource {
let (table_names, publication) =
Self::get_table_names_and_publication(&replication_client, table_names_from).await?;
let table_schemas = replication_client.get_table_schemas(&table_names).await?;
replication_client.commit_txn().await?;
Copy link
Author

@cawfeecoder cawfeecoder Jan 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix for #80, by making sure we don't have a dangling transaction going out of the constructor. I'm not sure whether we need the transaction at all here, but erred on the side of caution.

@imor
Copy link
Contributor

imor commented Jan 2, 2025

@cawfeecoder thanks for the PR. Could you please break the fix for #80 into a separate PR as it helps in reviewing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants