Skip to content

Commit

Permalink
SDC-12909. Oracle CDC: Fix the logic for start and restart in redo lo…
Browse files Browse the repository at this point in the history
…g directory mode

This change always start logminor with currentime - duration.
So if pipeline needs to start with old start time, duration
needs to set to -1.

Change-Id: Ic0e83234077ef4e071df0f59e667469e05d091a9
Reviewed-on: https://review.streamsets.net/c/datacollector/+/27959
Tested-by: StreamSets CI <streamsets-ci-spam@streamsets.com>
Reviewed-by: Hari Shreedharan <hshreedharan@streamsets.com>
  • Loading branch information
ujunko committed Nov 26, 2019
1 parent f847390 commit 1f0b701
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,20 @@ public class OracleCDCConfigBean {
@ValueChooserModel(DictionaryChooserValues.class)
public DictionaryValues dictionary;

@ConfigDef(
required = true,
type = ConfigDef.Type.NUMBER,
label = "Duration of Directory Extraction",
description = "Interval of extracting Directory information to Redo Log Files",
dependsOn = "dictionary",
triggeredByValue = "DICT_FROM_REDO_LOGS",
defaultValue = "${24 * HOURS}",
displayPosition = 75,
elDefs = TimeEL.class,
group = "CDC"
)
public long durationDictExtract = -1;

@ConfigDef(
required = true,
type = ConfigDef.Type.BOOLEAN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import com.streamsets.pipeline.lib.jdbc.HikariPoolConfigBean;

@StageDef(
version = 10,
version = 11,
label = "Oracle CDC Client",
description = "Origin that an read change events from an Oracle Database",
icon = "rdbms.png",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ private enum DDL_EVENT {
UNKNOWN
}

private static final String GET_TIMESTAMPS_FROM_LOGMNR_CONTENTS = "SELECT TIMESTAMP FROM V$LOGMNR_CONTENTS ORDER BY TIMESTAMP";
private static final String GET_TIMESTAMPS_FROM_LOGMNR_CONTENTS = "SELECT TIMESTAMP FROM V$LOGMNR_CONTENTS WHERE ROWNUM = 1 ORDER BY TIMESTAMP";
private static final String OFFSET_DELIM = "::";
private static final int RESULTSET_CLOSED_AS_LOGMINER_SESSION_CLOSED = 1306;
private static final String NLS_DATE_FORMAT = "ALTER SESSION SET NLS_DATE_FORMAT = " + DateTimeColumnHandler.DT_SESSION_FORMAT;
Expand Down Expand Up @@ -1371,9 +1371,9 @@ public List<ConfigIssue> init() {
String commitScnField;
BigDecimal scn = null;
try {
scn = getEndingSCN();
switch (configBean.startValue) {
case SCN:
scn = getEndingSCN();
if (new BigDecimal(configBean.startSCN).compareTo(scn) > 0) {
issues.add(
getContext().createConfigIssue(CDC.name(), "oracleCDCConfigBean.startSCN", JDBC_47, scn.toPlainString()));
Expand Down Expand Up @@ -1654,19 +1654,37 @@ private void startLogMnrForRedoDict() throws SQLException, StageException {
LOG.debug("Cached SCN {} is no longer valid, retrieving new SCN", cachedSCNForRedoLogs);
}
}

// Cached SCN is no longer valid, try to get the next oldest ones and start.
getOldestSCN.setBigDecimal(1, cachedSCNForRedoLogs.get());
try (ResultSet rs = getOldestSCN.executeQuery()) {
while (rs.next()) {
BigDecimal oldestSCN = rs.getBigDecimal(1);
try {
startLogMinerUsingGivenSCNs(oldestSCN, endSCN);
cachedSCNForRedoLogs.set(oldestSCN);
startedLogMiner = true;
break;
} catch (SQLException ex) {
lastException = ex;
if (configBean.durationDictExtract > 0 ) {
// Introduced from version 11.
LocalDateTime currentTime = nowAtDBTz();
String end = currentTime.format(dateTimeColumnHandler.dateFormatter);
LocalDateTime startTime = currentTime.minusSeconds(configBean.durationDictExtract);
String start = startTime.format(dateTimeColumnHandler.dateFormatter);
try {
// Start LogMinor with current time - duration
LOG.info(TRYING_TO_START_LOG_MINER_WITH_START_DATE_AND_END_DATE, start, end);
startLogMnrForData.setString(1, start);
startLogMnrForData.setString(2, end);
startLogMnrForData.execute();
return;
} catch (SQLException e) {
LOG.debug("Unable to use start time {} and end time {} to start a LogMiner: {}", start, end, e);
}
} else {
// Default behavior is always start with oldest SCN.
// Cached SCN is no longer valid, try to get the next oldest ones and start.
getOldestSCN.setBigDecimal(1, cachedSCNForRedoLogs.get());
try (ResultSet rs = getOldestSCN.executeQuery()) {
while (rs.next()) {
BigDecimal oldestSCN = rs.getBigDecimal(1);
try {
startLogMinerUsingGivenSCNs(oldestSCN, endSCN);
cachedSCNForRedoLogs.set(oldestSCN);
startedLogMiner = true;
break;
} catch (SQLException ex) {
lastException = ex;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,14 @@ public List<Config> upgrade(
}
// fall through
case 9:
return upgradeV9ToV10(configs);
configs = upgradeV9ToV10(configs);
if (toVersion == 10){
return configs;
}
case 10:
return upgradeV10ToV11(configs);



default:
throw new IllegalStateException(Utils.format("Unexpected fromVersion {}", fromVersion));
Expand Down Expand Up @@ -187,4 +194,10 @@ private static List<Config> upgradeV9ToV10(List<Config> configs) {
configs.add(new Config("oracleCDCConfigBean.fetchSizeLatest", fetchSize.getValue()));
return configs;
}

private static List<Config> upgradeV10ToV11(List<Config> configs) {
// Applying existing fetch size to fetchSizeLatest in order to persist the behavior
configs.add(new Config("oracleCDCConfigBean.durationDictExtract", -1));
return configs;
}
}

0 comments on commit 1f0b701

Please sign in to comment.