Skip to content

Commit

Permalink
DBZ-4370 Only setting "messages" option on PG 14+;
Browse files Browse the repository at this point in the history
Also avoiding setting stream options repeatedly.
  • Loading branch information
gunnarmorling authored and jpechane committed Nov 29, 2021
1 parent 707dac8 commit a8cda23
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.util.function.Function;

import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;

Expand Down Expand Up @@ -40,7 +41,7 @@ public interface MessageDecoder {
* @param builder
* @return the builder instance
*/
ChainedLogicalStreamBuilder optionsWithMetadata(ChainedLogicalStreamBuilder builder);
ChainedLogicalStreamBuilder optionsWithMetadata(ChainedLogicalStreamBuilder builder, Function<Integer, Boolean> hasMinimumServerVersion);

/**
* Allows MessageDecoder to configure options with which the replication stream is started.
Expand All @@ -50,7 +51,7 @@ public interface MessageDecoder {
* @param builder
* @return the builder instance
*/
ChainedLogicalStreamBuilder optionsWithoutMetadata(ChainedLogicalStreamBuilder builder);
ChainedLogicalStreamBuilder optionsWithoutMetadata(ChainedLogicalStreamBuilder builder, Function<Integer, Boolean> hasMinimumServerVersion);

/**
* Signals to this decoder whether messages contain type metadata or not.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -183,11 +184,6 @@ protected void initPublication() {
catch (SQLException e) {
throw new JdbcConnectionException(e);
}

// This is what ties the publication definition to the replication stream
streamParams.put("proto_version", 1);
streamParams.put("publication_names", publicationName);
streamParams.put("messages", true);
}
}

Expand Down Expand Up @@ -569,7 +565,8 @@ public Lsn startLsn() {
};
}

private PGReplicationStream startPgReplicationStream(final Lsn lsn, Function<ChainedLogicalStreamBuilder, ChainedLogicalStreamBuilder> configurator)
private PGReplicationStream startPgReplicationStream(final Lsn lsn,
BiFunction<ChainedLogicalStreamBuilder, Function<Integer, Boolean>, ChainedLogicalStreamBuilder> configurator)
throws SQLException {
assert lsn != null;
ChainedLogicalStreamBuilder streamBuilder = pgConnection()
Expand All @@ -579,7 +576,7 @@ private PGReplicationStream startPgReplicationStream(final Lsn lsn, Function<Cha
.withSlotName("\"" + slotName + "\"")
.withStartPosition(lsn.asLogSequenceNumber())
.withSlotOptions(streamParams);
streamBuilder = configurator.apply(streamBuilder);
streamBuilder = configurator.apply(streamBuilder, this::hasMinimumVersion);

if (statusUpdateInterval != null && statusUpdateInterval.toMillis() > 0) {
streamBuilder.withStatusInterval(toIntExact(statusUpdateInterval.toMillis()), TimeUnit.MILLISECONDS);
Expand All @@ -598,6 +595,15 @@ private PGReplicationStream startPgReplicationStream(final Lsn lsn, Function<Cha
return stream;
}

private Boolean hasMinimumVersion(int version) {
try {
return pgConnection().haveMinimumServerVersion(version);
}
catch (SQLException e) {
throw new DebeziumException(e);
}
}

@Override
public synchronized void close() {
close(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;

import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
import org.slf4j.Logger;
Expand Down Expand Up @@ -210,14 +211,20 @@ public void processNotEmptyMessage(ByteBuffer buffer, ReplicationMessageProcesso
}

@Override
public ChainedLogicalStreamBuilder optionsWithMetadata(ChainedLogicalStreamBuilder builder) {
return builder.withSlotOption("proto_version", 1)
.withSlotOption("publication_names", decoderContext.getConfig().publicationName())
.withSlotOption("messages", true);
public ChainedLogicalStreamBuilder optionsWithMetadata(ChainedLogicalStreamBuilder builder, Function<Integer, Boolean> hasMinimumServerVersion) {
builder = builder.withSlotOption("proto_version", 1)
.withSlotOption("publication_names", decoderContext.getConfig().publicationName());

// DBZ-4374 Use enum once the driver got updated
if (hasMinimumServerVersion.apply(140000)) {
builder = builder.withSlotOption("messages", true);
}

return builder;
}

@Override
public ChainedLogicalStreamBuilder optionsWithoutMetadata(ChainedLogicalStreamBuilder builder) {
public ChainedLogicalStreamBuilder optionsWithoutMetadata(ChainedLogicalStreamBuilder builder, Function<Integer, Boolean> hasMinimumServerVersion) {
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Set;
import java.util.function.Function;

import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
Expand Down Expand Up @@ -72,12 +73,12 @@ public void processNotEmptyMessage(final ByteBuffer buffer, ReplicationMessagePr
}

@Override
public ChainedLogicalStreamBuilder optionsWithMetadata(ChainedLogicalStreamBuilder builder) {
public ChainedLogicalStreamBuilder optionsWithMetadata(ChainedLogicalStreamBuilder builder, Function<Integer, Boolean> hasMinimumServerVersion) {
return builder;
}

@Override
public ChainedLogicalStreamBuilder optionsWithoutMetadata(ChainedLogicalStreamBuilder builder) {
public ChainedLogicalStreamBuilder optionsWithoutMetadata(ChainedLogicalStreamBuilder builder, Function<Integer, Boolean> hasMinimumServerVersion) {
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.time.Instant;
import java.util.Arrays;
import java.util.Iterator;
import java.util.function.Function;

import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
Expand Down Expand Up @@ -81,13 +82,13 @@ public void processNotEmptyMessage(ByteBuffer buffer, ReplicationMessageProcesso
}

@Override
public ChainedLogicalStreamBuilder optionsWithMetadata(ChainedLogicalStreamBuilder builder) {
return optionsWithoutMetadata(builder)
public ChainedLogicalStreamBuilder optionsWithMetadata(ChainedLogicalStreamBuilder builder, Function<Integer, Boolean> hasMinimumServerVersion) {
return optionsWithoutMetadata(builder, hasMinimumServerVersion)
.withSlotOption("include-not-null", "true");
}

@Override
public ChainedLogicalStreamBuilder optionsWithoutMetadata(ChainedLogicalStreamBuilder builder) {
public ChainedLogicalStreamBuilder optionsWithoutMetadata(ChainedLogicalStreamBuilder builder, Function<Integer, Boolean> hasMinimumServerVersion) {
return builder
.withSlotOption("pretty-print", 1)
.withSlotOption("write-in-chunks", 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.sql.SQLException;
import java.time.Instant;
import java.util.Arrays;
import java.util.function.Function;

import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
Expand Down Expand Up @@ -263,13 +264,13 @@ private void doProcessMessage(ReplicationMessageProcessor processor, TypeRegistr
}

@Override
public ChainedLogicalStreamBuilder optionsWithMetadata(ChainedLogicalStreamBuilder builder) {
return optionsWithoutMetadata(builder)
public ChainedLogicalStreamBuilder optionsWithMetadata(ChainedLogicalStreamBuilder builder, Function<Integer, Boolean> hasMinimumServerVersion) {
return optionsWithoutMetadata(builder, hasMinimumServerVersion)
.withSlotOption("include-not-null", "true");
}

@Override
public ChainedLogicalStreamBuilder optionsWithoutMetadata(ChainedLogicalStreamBuilder builder) {
public ChainedLogicalStreamBuilder optionsWithoutMetadata(ChainedLogicalStreamBuilder builder, Function<Integer, Boolean> hasMinimumServerVersion) {
return builder
.withSlotOption("pretty-print", 1)
.withSlotOption("write-in-chunks", 1)
Expand Down

0 comments on commit a8cda23

Please sign in to comment.