Skip to content

Commit

Permalink
DBZ-4364 Awaiting table metadata to become queryable
Browse files Browse the repository at this point in the history
  • Loading branch information
gunnarmorling authored and jpechane committed Nov 29, 2021
1 parent a041a6b commit 2157256
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.postgresql.util.PSQLException;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode;
Expand Down Expand Up @@ -90,6 +91,8 @@
import io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.time.MicroTime;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.ZonedTime;
Expand Down Expand Up @@ -2566,9 +2569,12 @@ private void testReceiveChangesForReplicaIdentityFullTableWithToastedValue(Postg
"DROP TABLE IF EXISTS test_table;",
"CREATE TABLE test_table (id SERIAL, not_toast int, text TEXT);",
"ALTER TABLE test_table REPLICA IDENTITY FULL");

awaitTableMetaDataIsQueryable(new TableId(null, "public", "test_table"));
}

startConnector(config -> config.with(PostgresConnectorConfig.SCHEMA_REFRESH_MODE, mode), false);
assertConnectorIsRunning();
consumer = testConsumer(1);

final String toastedValue = RandomStringUtils.randomAlphanumeric(10000);
Expand All @@ -2578,6 +2584,9 @@ private void testReceiveChangesForReplicaIdentityFullTableWithToastedValue(Postg
"DROP TABLE IF EXISTS test_table;",
"CREATE TABLE test_table (id SERIAL, not_toast int, text TEXT);",
"ALTER TABLE test_table REPLICA IDENTITY FULL");

awaitTableMetaDataIsQueryable(new TableId(null, "public", "test_table"));

}

// INSERT
Expand Down Expand Up @@ -2685,6 +2694,26 @@ private void testReceiveChangesForReplicaIdentityFullTableWithToastedValue(Postg
}
}

/**
* It appears in some cases retrieving column metadata "too quickly" raises
* a PSQLException: ERROR: could not open relation with OID xyz.
* This causes intermittent failures during schema refresh.
* This is an attempt to avoid that situation by making sure the metadata can be retrieved
* before proceeding.
*/
private void awaitTableMetaDataIsQueryable(TableId tableId) {
Awaitility.await()
.atMost(TestHelper.waitTimeForRecords() * 10, TimeUnit.SECONDS)
.ignoreException(PSQLException.class)
.until(() -> {
try (PostgresConnection connection = TestHelper.createWithTypeRegistry()) {
Tables tables = new Tables();
connection.readSchema(tables, null, "public", TableFilter.fromPredicate(t -> t.equals(tableId)), null, false);
return tables.forTable(tableId) != null;
}
});
}

@Test()
@FixFor("DBZ-1815")
public void testHeartbeatActionQueryExecuted() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void signalLog() throws InterruptedException {

final SourceRecords records = consumeRecordsByTopic(2);
Assertions.assertThat(records.allRecordsInOrder()).hasSize(2);
Assertions.assertThat(logInterceptor.containsMessage("Signal message")).isTrue();
Assertions.assertThat(logInterceptor.containsMessage("Received signal")).isTrue();
}

@Test
Expand Down Expand Up @@ -102,7 +102,7 @@ public void signalingDisabled() throws InterruptedException {

final SourceRecords records = consumeRecordsByTopic(2);
Assertions.assertThat(records.allRecordsInOrder()).hasSize(2);
Assertions.assertThat(logInterceptor.containsMessage("Signal message")).isFalse();
Assertions.assertThat(logInterceptor.containsMessage("Received signal")).isFalse();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ log4j.rootLogger=INFO, stdout
log4j.logger.io.confluent.connect.avro=WARN
log4j.logger.io.confluent.kafka.serializers=WARN
log4j.logger.io.debezium=INFO
log4j.logger.io.debezium.jdbc.JdbcConnection=DEBUG
log4j.logger.io.debezium.pipeline=DEBUG
log4j.logger.io.debezium.connector.postgresql=DEBUG
log4j.logger.io.debezium.connector.postgresql.connection.PostgresReplicationConnection=DEBUG # Needed for PostgresConnectorIT.shouldClearDatabaseWarnings()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1198,6 +1198,8 @@ public void readSchema(Tables tables, String databaseCatalog, String schemaNameP
}
else {
for (TableId includeTable : tableIds) {
LOGGER.debug("Retrieving columns of table {}", includeTable);

Map<TableId, List<Column>> cols = getColumnsDetails(databaseCatalog, schemaNamePattern, includeTable.table(), tableFilter,
columnFilter, metadata, viewIds);
columnsByTable.putAll(cols);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,10 @@ public void registerSignalAction(String id, Action signal) {
}

public boolean process(Partition partition, String id, String type, String data, OffsetContext offset, Struct source) throws InterruptedException {
LOGGER.debug("Arrived signal id = '{}', type = '{}', data = '{}'", id, type, data);
LOGGER.debug("Received signal id = '{}', type = '{}', data = '{}'", id, type, data);
final Action action = signalActions.get(type);
if (action == null) {
LOGGER.warn("Signal '{}' has arrived but the type '{}' is not recognized", id, type);
LOGGER.warn("Signal '{}' has been received but the type '{}' is not recognized", id, type);
return false;
}
try {
Expand All @@ -140,7 +140,7 @@ public boolean process(Partition partition, String id, String type, String data,
return action.arrived(new Payload(partition, id, type, jsonData, offset, source));
}
catch (IOException e) {
LOGGER.warn("Signal '{}' has arrived but the data '{}' cannot be parsed", id, data, e);
LOGGER.warn("Signal '{}' has been received but the data '{}' cannot be parsed", id, data, e);
return false;
}
}
Expand All @@ -150,7 +150,7 @@ public boolean process(Partition partition, String id, String type, String data)
}

/**
*
*
* @param value Envelope with change from signaling table
* @param offset offset of the incoming signal
* @return true if the signal was processed
Expand Down

0 comments on commit 2157256

Please sign in to comment.