Skip to content

Commit

Permalink
DBZ-4272 Trigger schema refresh before snapshot; only for PostgreSQL
Browse files Browse the repository at this point in the history
  • Loading branch information
Naros authored and gunnarmorling committed Nov 30, 2021
1 parent 652f41f commit 0a10d5b
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSourceFactory;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
Expand Down Expand Up @@ -94,7 +93,7 @@ public Optional<IncrementalSnapshotChangeEventSource<? extends DataCollectionId>
if (Strings.isNullOrEmpty(configuration.getSignalingDataCollectionId())) {
return Optional.empty();
}
final SignalBasedIncrementalSnapshotChangeEventSource<TableId> incrementalSnapshotChangeEventSource = new SignalBasedIncrementalSnapshotChangeEventSource<>(
final PostgresSignalBasedIncrementalSnapshotChangeEventSource incrementalSnapshotChangeEventSource = new PostgresSignalBasedIncrementalSnapshotChangeEventSource(
configuration,
jdbcConnection,
dispatcher,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql;

import java.sql.SQLException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.DatabaseSchema;
import io.debezium.util.Clock;

/**
* @author Chris Cranford
*/
public class PostgresSignalBasedIncrementalSnapshotChangeEventSource extends SignalBasedIncrementalSnapshotChangeEventSource<TableId> {

private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSignalBasedIncrementalSnapshotChangeEventSource.class);

private final PostgresConnection jdbcConnection;
private final PostgresSchema schema;

public PostgresSignalBasedIncrementalSnapshotChangeEventSource(RelationalDatabaseConnectorConfig config,
JdbcConnection jdbcConnection,
EventDispatcher<TableId> dispatcher,
DatabaseSchema<?> databaseSchema,
Clock clock,
SnapshotProgressListener progressListener,
DataChangeEventListener dataChangeEventListener) {
super(config, jdbcConnection, dispatcher, databaseSchema, clock, progressListener, dataChangeEventListener);
this.jdbcConnection = (PostgresConnection) jdbcConnection;
this.schema = (PostgresSchema) databaseSchema;
}

@Override
protected Table refreshTableSchema(Table table) throws SQLException {
LOGGER.debug("Refreshing table '{}' schema for incremental snapshot.", table.id());
schema.refresh(jdbcConnection, table.id(), true);
return schema.tableFor(table.id());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected Configuration.Builder config() {
.with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1")
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, false)
.with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, "s1.a42:pk1,pk2,pk3,pk4")
// DBZ-4272 required to allow dropping columns during incremental snapshots
// DBZ-4272 required to allow dropping columns just before an incremental snapshot
.with("database.autosave", "conservative");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ protected void readChunk() throws InterruptedException {
}
final TableId currentTableId = (TableId) context.currentDataCollectionId();
if (!context.maximumKey().isPresent()) {
currentTable = refreshTableSchema(currentTable);
context.maximumKey(jdbcConnection.queryAndMap(buildMaxPrimaryKeyQuery(currentTable), rs -> {
if (!rs.next()) {
return null;
Expand Down Expand Up @@ -580,6 +581,14 @@ protected void postIncrementalSnapshotCompleted() {
// no-op
}

protected Table refreshTableSchema(Table table) throws SQLException {
// default behavior is to simply return the existing table with no refresh
// this allows connectors that may require a schema refresh to trigger it, such as PostgreSQL
// since schema changes are not emitted as change events in the same way that they are for
// connectors like MySQL or Oracle
return table;
}

private KeyMapper getKeyMapper() {
return connectorConfig.getKeyMapper() == null ? table -> table.primaryKeyColumns() : connectorConfig.getKeyMapper();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public static MappedColumns toMap(Table table) {
public static ColumnArray toArray(ResultSet resultSet, Table table) throws SQLException {
ResultSetMetaData metaData = resultSet.getMetaData();

Column[] columns = new Column[Math.min(table.columns().size(), metaData.getColumnCount())];
Column[] columns = new Column[metaData.getColumnCount()];
int greatestColumnPosition = 0;
for (int i = 0; i < columns.length; i++) {
columns[i] = table.columnWithName(metaData.getColumnName(i + 1));
Expand Down

0 comments on commit 0a10d5b

Please sign in to comment.