Skip to content

Commit

Permalink
DBZ-4272 Incrementally snapshot after schema change without NPE
Browse files Browse the repository at this point in the history
  • Loading branch information
Naros authored and gunnarmorling committed Nov 30, 2021
1 parent 687b797 commit 3a8d1ff
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,9 @@ protected String valueFieldName() {
protected String pkFieldName() {
return "PK";
}

@Override
protected String alterTableStatement(String tableName) {
return "ALTER TABLE " + tableName + " ADD col3 INTEGER DEFAULT 0";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public static MappedColumns toMap(Table table) {
public static ColumnArray toArray(ResultSet resultSet, Table table) throws SQLException {
ResultSetMetaData metaData = resultSet.getMetaData();

Column[] columns = new Column[metaData.getColumnCount()];
Column[] columns = new Column[Math.min(table.columns().size(), metaData.getColumnCount())];
int greatestColumnPosition = 0;
for (int i = 0; i < columns.length; i++) {
columns[i] = table.columnWithName(metaData.getColumnName(i + 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.engine.DebeziumEngine;
import io.debezium.jdbc.JdbcConnection;
Expand All @@ -53,6 +54,10 @@ public abstract class AbstractIncrementalSnapshotTest<T extends SourceConnector>

protected abstract Configuration.Builder config();

protected String alterTableStatement(String tableName) {
return "ALTER TABLE " + tableName + " add col3 int default 0";
}

protected String tableDataCollectionId() {
return tableName();
}
Expand Down Expand Up @@ -404,6 +409,31 @@ public void snapshotOnlyWithRestart() throws Exception {
}
}

@Test
@FixFor("DBZ-4272")
public void snapshotProceededBySchemaChange() throws Exception {
Testing.Print.enable();

populateTable();
startConnector();
waitForConnectorToStart();

// Initiate a schema change to the table immediately before the adhoc-snapshot
try (JdbcConnection connection = databaseConnection()) {
connection.execute(alterTableStatement(tableName()));
}

// Some connectors, such as PostgreSQL won't be notified of the previous schema change
// until a DML event occurs, but regardless the incremental snapshot should succeed.
sendAdHocSnapshotSignal();

final int expectedRecordCount = ROW_COUNT;
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount);
for (int i = 0; i < expectedRecordCount; i++) {
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i));
}
}

@Override
protected int getMaximumEnqueuedRecordCount() {
return ROW_COUNT * 3;
Expand Down

0 comments on commit 3a8d1ff

Please sign in to comment.