Skip to content

Commit

Permalink
DBZ-1539 Add interval.handling.mode to oracle connector
Browse files Browse the repository at this point in the history
  • Loading branch information
TomBillietKlarrio authored and gunnarmorling committed Nov 29, 2021
1 parent 2157256 commit 46b47a4
Show file tree
Hide file tree
Showing 7 changed files with 407 additions and 18 deletions.
1 change: 1 addition & 0 deletions COPYRIGHT.txt
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ Thomas Prelle
Thomas Thornton
Tin Nguyen
Tom Bentley
Tom Billiet
Tomaz Lemos Fernandes
Tommy Karlsson
Tony Rizko
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
.withValidation(OracleConnectorConfig::validateOutServerName)
.withDescription("Name of the XStream Out server to connect to.");

public static final Field INTERVAL_HANDLING_MODE = Field.create("interval.handling.mode")
.withDisplayName("Interval Handling")
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 21))
.withEnum(IntervalHandlingMode.class, IntervalHandlingMode.NUMERIC)
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
.withDescription("Specify how INTERVAL columns should be represented in change events, including:"
+ "'string' represents values as an exact ISO formatted string"
+ "'numeric' (default) represents values using the inexact conversion into microseconds");

public static final Field SNAPSHOT_MODE = Field.create("snapshot.mode")
.withDisplayName("Snapshot mode")
.withEnum(SnapshotMode.class, SnapshotMode.INITIAL)
Expand Down Expand Up @@ -413,6 +423,7 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
SNAPSHOT_ENHANCEMENT_TOKEN,
SNAPSHOT_LOCKING_MODE,
RAC_NODES,
INTERVAL_HANDLING_MODE,
LOG_MINING_ARCHIVE_LOG_HOURS,
LOG_MINING_BATCH_SIZE_DEFAULT,
LOG_MINING_BATCH_SIZE_MIN,
Expand Down Expand Up @@ -457,6 +468,7 @@ public static ConfigDef configDef() {
private final String databaseName;
private final String pdbName;
private final String xoutServerName;
private final IntervalHandlingMode intervalHandlingMode;
private final SnapshotMode snapshotMode;

private final String oracleVersion;
Expand Down Expand Up @@ -496,6 +508,7 @@ public OracleConnectorConfig(Configuration config) {
this.databaseName = toUpperCase(config.getString(DATABASE_NAME));
this.pdbName = toUpperCase(config.getString(PDB_NAME));
this.xoutServerName = config.getString(XSTREAM_SERVER_NAME);
this.intervalHandlingMode = IntervalHandlingMode.parse(config.getString(INTERVAL_HANDLING_MODE));
this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE));
this.oracleVersion = config.getString(ORACLE_VERSION);
this.snapshotEnhancementToken = config.getString(SNAPSHOT_ENHANCEMENT_TOKEN);
Expand Down Expand Up @@ -552,6 +565,10 @@ public String getXoutServerName() {
return xoutServerName;
}

public IntervalHandlingMode getIntervalHandlingMode() {
return intervalHandlingMode;
}

public SnapshotMode getSnapshotMode() {
return snapshotMode;
}
Expand All @@ -569,6 +586,67 @@ protected HistoryRecordComparator getHistoryRecordComparator() {
return getAdapter().getHistoryRecordComparator();
}

/**
* Defines modes of representation of {@code interval} datatype
*/
public enum IntervalHandlingMode implements EnumeratedValue {

/**
* Represents interval as inexact microseconds count
*/
NUMERIC("numeric"),

/**
* Represents interval as ISO 8601 time interval
*/
STRING("string");

private final String value;

IntervalHandlingMode(String value) {
this.value = value;
}

@Override
public String getValue() {
return value;
}

/**
* Convert mode name into the logical value
*
* @param value the configuration property value ; may not be null
* @return the matching option, or null if the match is not found
*/
public static IntervalHandlingMode parse(String value) {
if (value == null) {
return null;
}
value = value.trim();
for (IntervalHandlingMode option : IntervalHandlingMode.values()) {
if (option.getValue().equalsIgnoreCase(value)) {
return option;
}
}
return null;
}

/**
* Convert mode name into the logical value
*
* @param value the configuration property value ; may not be null
* @param defaultValue the default value ; may be null
* @return the matching option or null if the match is not found and non-null default is invalid
*/
public static IntervalHandlingMode parse(String value, String defaultValue) {
IntervalHandlingMode mode = parse(value);
if (mode == null && defaultValue != null) {
mode = parse(defaultValue);
}
return mode;
}
}

/**
* The set of predefined SnapshotMode options or aliases.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.debezium.relational.Column;
import io.debezium.relational.ValueConverter;
import io.debezium.time.Date;
import io.debezium.time.Interval;
import io.debezium.time.MicroDuration;
import io.debezium.time.ZonedTimestamp;
import io.debezium.util.NumberConversions;
Expand Down Expand Up @@ -106,16 +107,19 @@ public class OracleValueConverters extends JdbcValueConverters {
private static final Pattern TO_TIMESTAMP = Pattern.compile("TO_TIMESTAMP\\('(.*)'\\)", Pattern.CASE_INSENSITIVE);
private static final Pattern TO_TIMESTAMP_TZ = Pattern.compile("TO_TIMESTAMP_TZ\\('(.*)'\\)", Pattern.CASE_INSENSITIVE);
private static final Pattern TO_DATE = Pattern.compile("TO_DATE\\('(.*)',[ ]*'(.*)'\\)", Pattern.CASE_INSENSITIVE);
private static final BigDecimal MICROSECONDS_PER_SECOND = new BigDecimal(1_000_000);

private final OracleConnection connection;
private final boolean lobEnabled;
private final OracleConnectorConfig.IntervalHandlingMode intervalHandlingMode;
private final byte[] unavailableValuePlaceholderBinary;
private final String unavailableValuePlaceholderString;

public OracleValueConverters(OracleConnectorConfig config, OracleConnection connection) {
super(config.getDecimalMode(), config.getTemporalPrecisionMode(), ZoneOffset.UTC, null, null, null);
this.connection = connection;
this.lobEnabled = config.isLobEnabled();
this.intervalHandlingMode = config.getIntervalHandlingMode();
this.unavailableValuePlaceholderBinary = config.getUnavailableValuePlaceholder();
this.unavailableValuePlaceholderString = new String(config.getUnavailableValuePlaceholder());
}
Expand Down Expand Up @@ -144,7 +148,7 @@ public SchemaBuilder schemaBuilder(Column column) {
return ZonedTimestamp.builder();
case OracleTypes.INTERVALYM:
case OracleTypes.INTERVALDS:
return MicroDuration.builder();
return intervalHandlingMode == OracleConnectorConfig.IntervalHandlingMode.STRING ? Interval.builder() : MicroDuration.builder();
case Types.STRUCT:
return SchemaBuilder.string();
default: {
Expand Down Expand Up @@ -651,7 +655,13 @@ protected Object convertIntervalYearMonth(Column column, Field fieldDefn, Object
return convertValue(column, fieldDefn, data, NumberConversions.LONG_FALSE, (r) -> {
if (data instanceof Number) {
// we expect to get back from the plugin a double value
r.deliver(((Number) data).longValue());
final long micros = ((Number) data).longValue();
if (intervalHandlingMode == OracleConnectorConfig.IntervalHandlingMode.STRING) {
r.deliver(Interval.toIsoString(0, 0, 0, 0, 0, new BigDecimal(micros).divide(MICROSECONDS_PER_SECOND)));
}
else {
r.deliver(micros);
}
}
else if (data instanceof INTERVALYM) {
convertOracleIntervalYearMonth(data, r);
Expand All @@ -677,8 +687,13 @@ private void convertOracleIntervalYearMonth(Object data, ResultReceiver r) {
if (interval.charAt(i) == '-') {
final int year = sign * Integer.parseInt(interval.substring(start, i));
final int month = sign * Integer.parseInt(interval.substring(i + 1, interval.length()));
r.deliver(MicroDuration.durationMicros(year, month, 0, 0,
0, 0, MicroDuration.DAYS_PER_MONTH_AVG));
if (intervalHandlingMode == OracleConnectorConfig.IntervalHandlingMode.STRING) {
r.deliver(Interval.toIsoString(year, month, 0, 0, 0, BigDecimal.ZERO));
}
else {
r.deliver(MicroDuration.durationMicros(year, month, 0, 0,
0, 0, MicroDuration.DAYS_PER_MONTH_AVG));
}
}
}
}
Expand All @@ -687,7 +702,13 @@ protected Object convertIntervalDaySecond(Column column, Field fieldDefn, Object
return convertValue(column, fieldDefn, data, NumberConversions.LONG_FALSE, (r) -> {
if (data instanceof Number) {
// we expect to get back from the plugin a double value
r.deliver(((Number) data).longValue());
final long micros = ((Number) data).longValue();
if (intervalHandlingMode == OracleConnectorConfig.IntervalHandlingMode.STRING) {
r.deliver(Interval.toIsoString(0, 0, 0, 0, 0, new BigDecimal(micros).divide(MICROSECONDS_PER_SECOND)));
}
else {
r.deliver(micros);
}
}
else if (data instanceof INTERVALDS) {
convertOracleIntervalDaySecond(data, r);
Expand All @@ -706,15 +727,28 @@ private void convertOracleIntervalDaySecond(Object data, ResultReceiver r) {
final Matcher m = INTERVAL_DAY_SECOND_PATTERN.matcher(interval);
if (m.matches()) {
final int sign = "-".equals(m.group(1)) ? -1 : 1;
r.deliver(MicroDuration.durationMicros(
0,
0,
sign * Integer.valueOf(m.group(2)),
sign * Integer.valueOf(m.group(3)),
sign * Integer.valueOf(m.group(4)),
sign * Integer.valueOf(m.group(5)),
sign * Integer.valueOf(Strings.pad(m.group(6), 6, '0')),
MicroDuration.DAYS_PER_MONTH_AVG));
if (intervalHandlingMode == OracleConnectorConfig.IntervalHandlingMode.STRING) {
double seconds = (double) (sign * Integer.parseInt(m.group(5)))
+ (double) Integer.parseInt(Strings.pad(m.group(6), 6, '0')) / 1_000_000D;
r.deliver(Interval.toIsoString(
0,
0,
sign * Integer.valueOf(m.group(2)),
sign * Integer.valueOf(m.group(3)),
sign * Integer.valueOf(m.group(4)),
BigDecimal.valueOf(seconds)));
}
else {
r.deliver(MicroDuration.durationMicros(
0,
0,
sign * Integer.valueOf(m.group(2)),
sign * Integer.valueOf(m.group(3)),
sign * Integer.valueOf(m.group(4)),
sign * Integer.valueOf(m.group(5)),
sign * Integer.valueOf(Strings.pad(m.group(6), 6, '0')),
MicroDuration.DAYS_PER_MONTH_AVG));
}
}
}

Expand Down
Loading

0 comments on commit 46b47a4

Please sign in to comment.