Skip to content

Commit

Permalink
SDC-8738. Kafka destination pipeline does not retry upon Kafka stage …
Browse files Browse the repository at this point in the history
…exception when 'On Record Error: Stop Pipeline' is set at Kafka destination

StageException should not cause the pipeline to stop retrying just because the "On Record Error" action is set to "Stop Pipeline".
Remove the logic for checking calculateShouldStopOnStageError() altogether from the pipeline retry logic check

For e.g the Kakfa Producer destination could produce a StageException because of network Connectivity. This does not translate to an record error.
In this case, it is desired/expected that the pipeline should try to retry on failure (retry needs to be set in the configuration), as opposed to stopping pipeline

In addition, make sure to handle the following scenario:
When the "On Error Record : Stop Pipeline' is set, and if there is a valid error record situation, then stop the pipeline and do not retry EVEN IF retry pipeline is set to -1.

In Summary, expected behavior is as follows:
	- 	If pipeline is stopped by a StageException, and if retry is set, then retry starting the pipleine.
	- 	If the pipeline is stopped by an onErrorRecordException, and if error record action is to stop pipeline, then ignore whether or not retry is set.
		Simply stop pipeline and do not restart it

Change-Id: Iab8fea82805d96a37e3428b8580334c220241c76
Reviewed-on: https://review.streamsets.net/c/datacollector/+/27480
Reviewed-by: Jarcec Cecho <jarcec@streamsets.com>
  • Loading branch information
Karthik Iyer committed Nov 15, 2019
1 parent 1dbdf13 commit 1e4b9e1
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.streamsets.pipeline.api.ExecutionMode;
import com.streamsets.pipeline.api.Record;
import com.streamsets.pipeline.api.StageException;
import com.streamsets.pipeline.api.base.OnRecordErrorException;
import com.streamsets.pipeline.api.impl.ClusterSource;
import com.streamsets.pipeline.api.impl.ErrorMessage;
import org.slf4j.Logger;
Expand Down Expand Up @@ -84,6 +85,7 @@ public void run() throws StageException, PipelineRuntimeException {
boolean finishing = false;
boolean errorWhileInitializing = false;
boolean errorWhileRunning = false;
Throwable runningException = null;
boolean errorWhileDestroying = false;
boolean isRecoverable = true;
executionFailed = false;
Expand Down Expand Up @@ -129,6 +131,7 @@ public void run() throws StageException, PipelineRuntimeException {

stateChanged(PipelineStatus.RUNNING_ERROR, runningErrorMsg, extraAttributes);
errorWhileRunning = true;
runningException = e;
isRecoverable = isRecoverableThrowable(e);
}
throw e;
Expand All @@ -150,7 +153,6 @@ public void run() throws StageException, PipelineRuntimeException {
}
} finally {
LOG.debug("Destroying");

try {
// Determine the reason why we got all the way here
PipelineStopReason stopReason;
Expand All @@ -175,11 +177,19 @@ public void run() throws StageException, PipelineRuntimeException {
throw e;
} finally {
if(errorWhileInitializing || errorWhileRunning || errorWhileDestroying) {
boolean retry = true;
// Check if the exception is an onRecordErrorException
// The DefaultErrorRecordHandler throws this exception back when the value is set to STOP PIPELINE.
// In such case, do not retry the pipeline. Let it transition to a error state and stop.
if (errorWhileRunning && runningException instanceof OnRecordErrorException) {
retry = false;
}

// In case of any error, persist that information
executionFailed = true;

// If there was any problem, we will consider retry
if (shouldRetry && !pipeline.shouldStopOnStageError() && !isExecutingInSlave && isRecoverable && !wasStopped()) {
if (shouldRetry && retry && !isExecutingInSlave && isRecoverable && !wasStopped()) {
stateChanged(PipelineStatus.RETRY, runningErrorMsg, null);
} else if(errorWhileInitializing) {
stateChanged(PipelineStatus.START_ERROR, runningErrorMsg, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,7 @@ public Iterator<Record> getRecords() {
passed = predicate.evaluate(record);
if (!passed) {
rejectedMessage = predicate.getRejectedMessage();
try {
errorHandler.onError(new OnRecordErrorException(record, Errors.COMMON_0001, rejectedMessage.toString()));
} catch (StageException e) {
throw new RuntimeException(e.getMessage(), e);
}
errorHandler.onError(new OnRecordErrorException(record, Errors.COMMON_0001, rejectedMessage.toString()));
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ public class Pipeline {
private final StatsAggregationHandler statsAggregationHandler;
private final ResourceControlledScheduledExecutor scheduledExecutorService;
private volatile boolean running;
private boolean shouldStopOnStageError = false;
private final ResourceControlledScheduledExecutor scheduledExecutor;
private final List<Stage.Info> stageInfos;
private final UserContext userContext;
Expand Down Expand Up @@ -151,7 +150,6 @@ private Pipeline(
this.scheduledExecutorService = scheduledExecutorService;
this.running = false;
this.statsAggregationHandler = statsAggregationHandler;
this.shouldStopOnStageError = calculateShouldStopOnStageError();
this.scheduledExecutor = scheduledExecutor;
this.stageInfos = stageInfos;
this.runnerSharedMaps = runnerSharedMaps;
Expand Down Expand Up @@ -193,21 +191,6 @@ public int getNumOfRunners() {
return pipes.size();
}

private boolean calculateShouldStopOnStageError() {
// Check origin
StageContext stageContext = originPipe.getStage().getContext();
if(stageContext.getOnErrorRecord() == OnRecordError.STOP_PIPELINE) {
return true;
}

// Working with only first runner is sufficient here as all runners share the same configuration
return pipes.get(0).onRecordErrorStopPipeline();
}

public boolean shouldStopOnStageError() {
return shouldStopOnStageError;
}

public ProtoSource getSource() {
return (ProtoSource) originPipe.getStage().getStage();
}
Expand Down

0 comments on commit 1e4b9e1

Please sign in to comment.