Skip to content

Commit

Permalink
[Improve] Improve read table schema in cdc connector (apache#6702)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored Apr 15, 2024
1 parent 6f74663 commit a8c6cc6
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,12 @@ public TableChange getTableSchema(JdbcConnection jdbc, TableId tableId) {
TableChange schema = schemasByTableId.get(tableId);
if (schema == null) {
schema = readTableSchema(jdbc, tableId);
schemasByTableId.put(tableId, schema);
}
return schema;
}

private TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) {
OracleConnection oracleConnection = (OracleConnection) jdbc;
final Map<TableId, TableChange> tableChangeMap = new HashMap<>();
Tables tables = new Tables();

try {
Expand All @@ -75,22 +73,27 @@ private TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) {
connectorConfig.getTableFilters().dataCollectionFilter(),
null,
false);

Table table =
CatalogTableUtils.mergeCatalogTableConfig(
tables.forTable(tableId), tableMap.get(tableId));
TableChange tableChange = new TableChange(TableChanges.TableChangeType.CREATE, table);
tableChangeMap.put(tableId, tableChange);
for (TableId id : tables.tableIds()) {
if (tableMap.containsKey(id)) {
Table table =
CatalogTableUtils.mergeCatalogTableConfig(
tables.forTable(id), tableMap.get(id));
TableChanges.TableChange tableChange =
new TableChanges.TableChange(
TableChanges.TableChangeType.CREATE, table);
schemasByTableId.put(id, tableChange);
}
}
} catch (SQLException e) {
throw new SeaTunnelException(
String.format("Failed to read schema for table %s ", tableId), e);
}

if (!tableChangeMap.containsKey(tableId)) {
if (!schemasByTableId.containsKey(tableId)) {
throw new SeaTunnelException(
String.format("Can't obtain schema for table %s ", tableId));
}

return tableChangeMap.get(tableId);
return schemasByTableId.get(tableId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.debezium.relational.history.TableChanges;

import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand All @@ -52,44 +51,46 @@ public TableChanges.TableChange getTableSchema(JdbcConnection jdbc, TableId tabl
TableChanges.TableChange schema = schemasByTableId.get(tableId);
if (schema == null) {
schema = readTableSchema(jdbc, tableId);
schemasByTableId.put(tableId, schema);
}
return schema;
}

private TableChanges.TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) {

CatalogTable catalogTable = tableMap.get(tableId);
// Because the catalog is null in the postgresConnection.readSchema method
tableId = new TableId(null, tableId.schema(), tableId.table());
TableId tableIdWithoutCatalog = new TableId(null, tableId.schema(), tableId.table());

PostgresConnection postgresConnection = (PostgresConnection) jdbc;
final Map<TableId, TableChanges.TableChange> tableChangeMap = new HashMap<>();
Tables tables = new Tables();
try {
postgresConnection.readSchema(
tables,
tableId.catalog(),
tableId.schema(),
tableIdWithoutCatalog.catalog(),
tableIdWithoutCatalog.schema(),
connectorConfig.getTableFilters().dataCollectionFilter(),
null,
false);
Table table =
CatalogTableUtils.mergeCatalogTableConfig(
tables.forTable(tableId), catalogTable);
TableChanges.TableChange tableChange =
new TableChanges.TableChange(TableChanges.TableChangeType.CREATE, table);
tableChangeMap.put(tableId, tableChange);
for (TableId id : tables.tableIds()) {
TableId idWithCatalog = new TableId(tableId.catalog(), id.schema(), id.table());
if (tableMap.containsKey(idWithCatalog)) {
Table table =
CatalogTableUtils.mergeCatalogTableConfig(
tables.forTable(id), tableMap.get(idWithCatalog));
TableChanges.TableChange tableChange =
new TableChanges.TableChange(
TableChanges.TableChangeType.CREATE, table);
schemasByTableId.put(idWithCatalog, tableChange);
}
}
} catch (SQLException e) {
throw new SeaTunnelException(
String.format("Failed to read schema for table %s ", tableId), e);
}

if (!tableChangeMap.containsKey(tableId)) {
if (!schemasByTableId.containsKey(tableId)) {
throw new SeaTunnelException(
String.format("Can't obtain schema for table %s ", tableId));
}

return tableChangeMap.get(tableId);
return schemasByTableId.get(tableId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.debezium.relational.history.TableChanges.TableChange;

import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand All @@ -54,15 +53,12 @@ public TableChange getTableSchema(JdbcConnection jdbc, TableId tableId) {
TableChange schema = schemasByTableId.get(tableId);
if (schema == null) {
schema = readTableSchema(jdbc, tableId);
schemasByTableId.put(tableId, schema);
}
return schema;
}

private TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) {
SqlServerConnection sqlServerConnection = (SqlServerConnection) jdbc;

final Map<TableId, TableChange> tableChangeMap = new HashMap<>();
Tables tables = new Tables();
try {
sqlServerConnection.readSchema(
Expand All @@ -72,21 +68,27 @@ private TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) {
connectorConfig.getTableFilters().dataCollectionFilter(),
null,
false);
Table table =
CatalogTableUtils.mergeCatalogTableConfig(
tables.forTable(tableId), tableMap.get(tableId));
TableChange tableChange = new TableChange(TableChanges.TableChangeType.CREATE, table);
tableChangeMap.put(tableId, tableChange);
for (TableId id : tables.tableIds()) {
if (tableMap.containsKey(id)) {
Table table =
CatalogTableUtils.mergeCatalogTableConfig(
tables.forTable(id), tableMap.get(id));
TableChanges.TableChange tableChange =
new TableChanges.TableChange(
TableChanges.TableChangeType.CREATE, table);
schemasByTableId.put(id, tableChange);
}
}
} catch (SQLException e) {
throw new SeaTunnelException(
String.format("Failed to read schema for table %s ", tableId), e);
}

if (!tableChangeMap.containsKey(tableId)) {
if (!schemasByTableId.containsKey(tableId)) {
throw new SeaTunnelException(
String.format("Can't obtain schema for table %s ", tableId));
}

return tableChangeMap.get(tableId);
return schemasByTableId.get(tableId);
}
}

0 comments on commit a8c6cc6

Please sign in to comment.