Skip to content

Commit

Permalink
SDC-12957. jython processor does not Stop Pipeline on record error wh…
Browse files Browse the repository at this point in the history
…en record error occurs

- needed errorRecordHander.onError() called so STOP_PIPELINE param honoured.
- note this impacted other abstract pipelines.

Tested by: SDC-12958

Signed-off-by: Keith Burns <keith@streamsets.com>
Change-Id: Ie20b9d265c713abd9194d7c54445341d19178182
Signed-off-by: Keith Burns <keith@streamsets.com>
Reviewed-on: https://review.streamsets.net/c/datacollector/+/27889
Reviewed-by: Matt Bayley <bayley@streamsets.com>
Tested-by: StreamSets CI <streamsets-ci-spam@streamsets.com>
  • Loading branch information
Keith Burns committed Nov 26, 2019
1 parent e456a64 commit f847390
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import com.streamsets.pipeline.api.EventRecord;
import com.streamsets.pipeline.api.PushSource;
import com.streamsets.pipeline.api.Record;
import com.streamsets.pipeline.api.base.OnRecordErrorException;
import com.streamsets.pipeline.stage.common.DefaultErrorRecordHandler;
import com.streamsets.pipeline.stage.common.ErrorRecordHandler;
import com.streamsets.pipeline.stage.util.scripting.Errors;
import com.streamsets.pipeline.stage.util.scripting.ScriptObjectFactory;
import com.streamsets.pipeline.stage.util.scripting.ScriptRecord;
import com.streamsets.pipeline.stage.util.scripting.ScriptingStageBindings;
Expand Down Expand Up @@ -96,6 +98,7 @@ public void add(ScriptRecord[] scriptRecords) {
public void addError(ScriptRecord scriptRecord, String errorMsg) {
errorCount++;
Record record = scriptObjectFactory.getRecord(scriptRecord);
errorRecordHandler.onError(Errors.SCRIPTING_04, errorMsg);
batchContext.toError(record, errorMsg);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public static <C extends AbstractScriptingDSource> void testAllBindings(
PushSourceRunner runner = new PushSourceRunner.Builder(clazz, scriptingDSource)
.addConstants(pipelineConstants)
.addConfiguration("script", getScript(scriptName, clazz))
.setOnRecordError(OnRecordError.TO_ERROR)
.addOutputLane("lane")
.build();
runner.runInit();
Expand Down

0 comments on commit f847390

Please sign in to comment.