Skip to content

Commit

Permalink
[Improve][Zeta]Optimize the logic of RestHttpGetCommandProcessor#getS…
Browse files Browse the repository at this point in the history
…eaTunnelServer() (apache#6666)

* remove plugin lifecycle

* remove transform fallback

* code style

* &&

* optimize code

* remove UnsupportedOperationException

---------

Co-authored-by: ClownXC <chen3494269@163.com>
  • Loading branch information
xiaochen-zhou and xiaochen-zhou authored Apr 9, 2024
1 parent 65aedf6 commit 66d8502
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public CoordinatorService getCoordinatorService() {
.getProperty(INVOCATION_MAX_RETRY_COUNT.getName());
int maxRetry =
hazelcastInvocationMaxRetry == null
? 250 * 2
? Integer.parseInt(INVOCATION_MAX_RETRY_COUNT.getDefaultValue()) * 2
: Integer.parseInt(hazelcastInvocationMaxRetry) * 2;

String hazelcastRetryPause =
Expand All @@ -199,12 +199,14 @@ public CoordinatorService getCoordinatorService() {
.getProperty(INVOCATION_RETRY_PAUSE.getName());

int retryPause =
hazelcastRetryPause == null ? 500 : Integer.parseInt(hazelcastRetryPause);
hazelcastRetryPause == null
? Integer.parseInt(INVOCATION_RETRY_PAUSE.getDefaultValue())
: Integer.parseInt(hazelcastRetryPause);

while (isMasterNode()
&& !coordinatorService.isCoordinatorActive()
while (isRunning
&& retryCount < maxRetry
&& isRunning) {
&& !coordinatorService.isCoordinatorActive()
&& isMasterNode()) {
try {
LOGGER.warning(
"This is master node, waiting the coordinator service init finished");
Expand Down Expand Up @@ -254,13 +256,15 @@ public boolean taskIsEnded(@NonNull TaskGroupLocation taskGroupLocation) {
public boolean isMasterNode() {
// must retry until the cluster have master node
try {
return RetryUtils.retryWithException(
() -> nodeEngine.getThisAddress().equals(nodeEngine.getMasterAddress()),
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,
exception -> exception instanceof NullPointerException && isRunning,
Constant.OPERATION_RETRY_SLEEP));
return Boolean.TRUE.equals(
RetryUtils.retryWithException(
() -> nodeEngine.getThisAddress().equals(nodeEngine.getMasterAddress()),
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,
exception ->
isRunning && exception instanceof NullPointerException,
Constant.OPERATION_RETRY_SLEEP)));
} catch (InterruptedException e) {
LOGGER.info("master node check interrupted");
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public static final class JobState implements Serializable {
private Long jobId;
private String jobName;
private JobStatus jobStatus;
private long submitTime;
private Long submitTime;
private Long finishTime;
private Map<PipelineLocation, PipelineStateData> pipelineStateMapperMap;
private String errorMessage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.seatunnel.api.common.metrics.JobMetrics;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
Expand All @@ -44,7 +45,6 @@
import com.hazelcast.internal.ascii.TextCommandService;
import com.hazelcast.internal.ascii.rest.HttpCommandProcessor;
import com.hazelcast.internal.ascii.rest.HttpGetCommand;
import com.hazelcast.internal.json.Json;
import com.hazelcast.internal.json.JsonArray;
import com.hazelcast.internal.json.JsonObject;
import com.hazelcast.internal.json.JsonValue;
Expand All @@ -54,10 +54,8 @@
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;

import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -210,7 +208,7 @@ private void handleFinishedJobsInfo(HttpGetCommand command, String uri) {
.getNodeEngine()
.getHazelcastInstance()
.getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO);

SeaTunnelServer seaTunnelServer = getSeaTunnelServer(true);
JsonArray jobs =
finishedJob.values().stream()
.filter(
Expand All @@ -226,7 +224,6 @@ private void handleFinishedJobsInfo(HttpGetCommand command, String uri) {
.map(
jobState -> {
Long jobId = jobState.getJobId();
SeaTunnelServer seaTunnelServer = getSeaTunnelServer(true);
String jobMetrics;
if (seaTunnelServer == null) {
jobMetrics =
Expand All @@ -243,15 +240,8 @@ private void handleFinishedJobsInfo(HttpGetCommand command, String uri) {
.getJobMetrics(jobId)
.toJsonString();
}

JobDAGInfo jobDAGInfo = finishedJobDAGInfo.get(jobId);

return convertToJson(
jobState,
jobMetrics,
Json.parse(JsonUtils.toJsonString(jobDAGInfo))
.asObject(),
jobId);
return getJobInfoJson(
jobState, jobMetrics, finishedJobDAGInfo.get(jobId));
})
.collect(JsonArray::new, JsonArray::add, JsonArray::add);

Expand Down Expand Up @@ -301,7 +291,10 @@ private void handleJobInfoById(HttpGetCommand command, String uri) {
.get(Long.valueOf(jobId));
this.prepareResponse(
command,
convertToJson(finishedJobState, finishedJobMetrics, finishedJobDAGInfo));
getJobInfoJson(
finishedJobState,
finishedJobMetrics.toJsonString(),
finishedJobDAGInfo));
} else {
this.prepareResponse(command, new JsonObject().add(RestConstant.JOB_ID, jobId));
}
Expand Down Expand Up @@ -353,7 +346,7 @@ private SeaTunnelServer getSeaTunnelServer(boolean shouldBeMaster) {
this.textCommandService.getNode().getNodeExtension().createExtensionServices();
SeaTunnelServer seaTunnelServer =
(SeaTunnelServer) extensionServices.get(Constant.SEATUNNEL_SERVICE_NAME);
if (!seaTunnelServer.isMasterNode() && shouldBeMaster) {
if (shouldBeMaster && !seaTunnelServer.isMasterNode()) {
return null;
}
return seaTunnelServer;
Expand All @@ -374,7 +367,11 @@ private JsonObject convertToJson(JobInfo jobInfo, long jobId) {
.getSerializationService()
.toObject(jobInfo.getJobImmutableInformation()));

ClassLoaderService classLoaderService = getSeaTunnelServer(false).getClassLoaderService();
SeaTunnelServer seaTunnelServer = getSeaTunnelServer(true);
ClassLoaderService classLoaderService =
seaTunnelServer == null
? getSeaTunnelServer(false).getClassLoaderService()
: seaTunnelServer.getClassLoaderService();
ClassLoader classLoader =
classLoaderService.getClassLoader(
jobId, jobImmutableInformation.getPluginJarsUrls());
Expand All @@ -385,7 +382,6 @@ private JsonObject convertToJson(JobInfo jobInfo, long jobId) {
jobImmutableInformation.getLogicalDag());
classLoaderService.releaseClassLoader(jobId, jobImmutableInformation.getPluginJarsUrls());

SeaTunnelServer seaTunnelServer = getSeaTunnelServer(true);
String jobMetrics;
JobStatus jobStatus;
if (seaTunnelServer == null) {
Expand Down Expand Up @@ -416,8 +412,9 @@ private JsonObject convertToJson(JobInfo jobInfo, long jobId) {
JsonUtil.toJsonObject(logicalDag.getJobConfig().getEnvOptions()))
.add(
RestConstant.CREATE_TIME,
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.format(new Date(jobImmutableInformation.getCreateTime())))
DateTimeUtils.toString(
jobImmutableInformation.getCreateTime(),
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS))
.add(RestConstant.JOB_DAG, logicalDag.getLogicalDagAsJson())
.add(
RestConstant.PLUGIN_JARS_URLS,
Expand All @@ -439,53 +436,24 @@ private JsonObject convertToJson(JobInfo jobInfo, long jobId) {
return jobInfoJson;
}

private JsonObject convertToJson(
JobState finishedJobState,
JobMetrics finishedJobMetrics,
JobDAGInfo finishedJobDAGInfo) {
JsonObject jobInfoJson = new JsonObject();
jobInfoJson
.add(RestConstant.JOB_ID, String.valueOf(finishedJobState.getJobId()))
.add(RestConstant.JOB_NAME, finishedJobState.getJobName())
.add(RestConstant.JOB_STATUS, finishedJobState.getJobStatus().toString())
.add(RestConstant.ERROR_MSG, finishedJobState.getErrorMessage())
.add(
RestConstant.CREATE_TIME,
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.format(new Date(finishedJobState.getSubmitTime())))
.add(
RestConstant.FINISH_TIME,
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.format(new Date(finishedJobState.getFinishTime())))
.add(
RestConstant.JOB_DAG,
Json.parse(JsonUtils.toJsonString(finishedJobDAGInfo)).asObject())
.add(RestConstant.PLUGIN_JARS_URLS, new JsonArray())
.add(
RestConstant.METRICS,
JsonUtil.toJsonObject(getJobMetrics(finishedJobMetrics.toJsonString())));
return jobInfoJson;
}

private JsonObject convertToJson(
JobState jobState, String jobMetrics, JsonObject jobDAGInfo, long jobId) {
JsonObject jobInfoJson = new JsonObject();
jobInfoJson
.add(RestConstant.JOB_ID, String.valueOf(jobId))
private JsonObject getJobInfoJson(JobState jobState, String jobMetrics, JobDAGInfo jobDAGInfo) {
return new JsonObject()
.add(RestConstant.JOB_ID, String.valueOf(jobState.getJobId()))
.add(RestConstant.JOB_NAME, jobState.getJobName())
.add(RestConstant.JOB_STATUS, jobState.getJobStatus().toString())
.add(RestConstant.ERROR_MSG, jobState.getErrorMessage())
.add(
RestConstant.CREATE_TIME,
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.format(new Date(jobState.getSubmitTime())))
DateTimeUtils.toString(
jobState.getSubmitTime(),
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS))
.add(
RestConstant.FINISH_TIME,
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.format(new Date(jobState.getFinishTime())))
.add(RestConstant.JOB_DAG, jobDAGInfo)
DateTimeUtils.toString(
jobState.getFinishTime(),
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS))
.add(RestConstant.JOB_DAG, JsonUtils.toJsonString(jobDAGInfo))
.add(RestConstant.PLUGIN_JARS_URLS, new JsonArray())
.add(RestConstant.METRICS, JsonUtil.toJsonObject(getJobMetrics(jobMetrics)));

return jobInfoJson;
}
}

0 comments on commit 66d8502

Please sign in to comment.