Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Jun 17, 2016
1 parent 2e23c5f commit 0c3ac6c
Show file tree
Hide file tree
Showing 17 changed files with 232 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@
*/
public interface JobOperateAPI {

/**
* 作业立刻执行.
*
* <p>作业在不与上次运行中作业冲突的情况下才会启动, 并在启动后自动清理此标记.</p>
*
* @param jobName 作业名称
* @param serverIp 作业服务器IP地址
*/
void trigger(Optional<String> jobName, Optional<String> serverIp);

/**
* 作业暂停.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ public JobOperateAPIImpl(final CoordinatorRegistryCenter registryCenter) {
jobOperatorTemplate = new JobOperateTemplate(registryCenter);
}

@Override
public void trigger(final Optional<String> jobName, final Optional<String> serverIp) {
jobOperatorTemplate.operate(jobName, serverIp, new JobOperateCallback() {

@Override
public boolean doOperate(final String jobName, final String serverIp) {
registryCenter.persist(new JobNodePath(jobName).getServerNodePath(serverIp, JobNodePath.TRIGGER_NODE), "");
return true;
}
});
}

@Override
public void pause(final Optional<String> jobName, final Optional<String> serverIp) {
jobOperatorTemplate.operate(jobName, serverIp, new JobOperateCallback() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,30 @@ public void setUp() {
jobOperateAPI = new JobOperateAPIImpl(registryCenter);
}

@Test
public void assertTriggerWithJobNameAndServerIp() {
jobOperateAPI.trigger(Optional.of("testJob"), Optional.of("localhost"));
verify(registryCenter).persist("/testJob/servers/localhost/trigger", "");
}

@Test
public void assertTriggerWithJobName() {
when(registryCenter.getChildrenKeys("/testJob/servers")).thenReturn(Arrays.asList("ip1", "ip2"));
jobOperateAPI.trigger(Optional.of("testJob"), Optional.<String>absent());
verify(registryCenter).getChildrenKeys("/testJob/servers");
verify(registryCenter).persist("/testJob/servers/ip1/trigger", "");
verify(registryCenter).persist("/testJob/servers/ip2/trigger", "");
}

@Test
public void assertTriggerWithServerIp() {
when(registryCenter.getChildrenKeys("/")).thenReturn(Arrays.asList("testJob1", "testJob2"));
jobOperateAPI.trigger(Optional.<String>absent(), Optional.of("localhost"));
verify(registryCenter).getChildrenKeys("/");
verify(registryCenter).persist("/testJob1/servers/localhost/trigger", "");
verify(registryCenter).persist("/testJob2/servers/localhost/trigger", "");
}

@Test
public void assertPauseWithJobNameAndServerIp() {
jobOperateAPI.pause(Optional.of("testJob"), Optional.of("localhost"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public class JobOperationController {
@Resource
private JobAPIService jobAPIService;

@RequestMapping(value = "trigger", method = RequestMethod.POST)
public void triggerJob(final ServerInfo jobServer) {
jobAPIService.getJobOperatorAPI().trigger(Optional.of(jobServer.getJobName()), Optional.of(jobServer.getIp()));
}

@RequestMapping(value = "pause", method = RequestMethod.POST)
public void pauseJob(final ServerInfo jobServer) {
jobAPIService.getJobOperatorAPI().pause(Optional.of(jobServer.getJobName()), Optional.of(jobServer.getIp()));
Expand All @@ -43,6 +48,11 @@ public void resumeJob(final ServerInfo jobServer) {
jobAPIService.getJobOperatorAPI().resume(Optional.of(jobServer.getJobName()), Optional.of(jobServer.getIp()));
}

@RequestMapping(value = "triggerAll/name", method = RequestMethod.POST)
public void triggerAllJobsByJobName(final ServerInfo jobServer) {
jobAPIService.getJobOperatorAPI().trigger(Optional.of(jobServer.getJobName()), Optional.<String>absent());
}

@RequestMapping(value = "pauseAll/name", method = RequestMethod.POST)
public void pauseAllJobsByJobName(final ServerInfo jobServer) {
jobAPIService.getJobOperatorAPI().pause(Optional.of(jobServer.getJobName()), Optional.<String>absent());
Expand All @@ -53,6 +63,11 @@ public void resumeAllJobsByJobName(final ServerInfo jobServer) {
jobAPIService.getJobOperatorAPI().resume(Optional.of(jobServer.getJobName()), Optional.<String>absent());
}

@RequestMapping(value = "triggerAll/ip", method = RequestMethod.POST)
public void triggerAllJobs(final ServerInfo jobServer) {
jobAPIService.getJobOperatorAPI().trigger(Optional.<String>absent(), Optional.of(jobServer.getIp()));
}

@RequestMapping(value = "pauseAll/ip", method = RequestMethod.POST)
public void pauseAllJobs(final ServerInfo jobServer) {
jobAPIService.getJobOperatorAPI().pause(Optional.<String>absent(), Optional.of(jobServer.getIp()));
Expand Down
28 changes: 26 additions & 2 deletions elastic-job-console/src/main/webapp/js/job_detail.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ $(function() {
$('[href="#execution_info"]').click(function(event) {
renderExecution();
});
bindTriggerButtons();
bindTriggerAllButtons();
bindPauseButtons();
bindPauseAllButtons();
bindResumeButtons();
Expand Down Expand Up @@ -86,16 +88,18 @@ function renderServers() {
var status = data[i].status;
var baseTd = "<td>" + data[i].ip + "</td><td>" + data[i].hostName + "</td><td>" + status + "</td><td>" + data[i].processSuccessCount + "</td><td>" + data[i].processFailureCount + "</td><td>" + data[i].sharding + "</td>";
var operationTd = "";
var triggerButton = "<button operation='trigger' class='btn btn-success' ip='" + data[i].ip + "'>触发</button>";
var resumeButton = "<button operation='resume' class='btn btn-success' ip='" + data[i].ip + "'>恢复</button>";
var pauseButton = "<button operation='pause' class='btn btn-warning' ip='" + data[i].ip + "'" + ">暂停</button>";
var shutdownButton = "<button operation='shutdown' class='btn btn-danger' ip='" + data[i].ip + "'>关闭</button>";
var removeButton = "<button operation='remove' class='btn btn-danger' ip='" + data[i].ip + "'>删除</button>";
var disableButton = "<button operation='disable' class='btn btn-danger' ip='" + data[i].ip + "'>失效</button>";
var enableButton = "<button operation='enable' class='btn btn-success' ip='" + data[i].ip + "'>生效</button>";
operationTd = triggerButton + "&nbsp;";
if ("PAUSED" === status) {
operationTd = resumeButton + "&nbsp;";
operationTd = operationTd + resumeButton + "&nbsp;";
} else if ("DISABLED" !== status && "CRASHED" !== status && "SHUTDOWN" !== status) {
operationTd = pauseButton + "&nbsp;";
operationTd = operationTd + pauseButton + "&nbsp;";
}
if ("SHUTDOWN" !== status) {
operationTd = operationTd + shutdownButton + "&nbsp;";
Expand Down Expand Up @@ -124,6 +128,26 @@ function renderServers() {
});
}

function bindTriggerButtons() {
$(document).on("click", "button[operation='trigger'][data-toggle!='modal']", function(event) {
var jobName = $("#job-name").text();
$.post("job/trigger", {jobName : jobName, ip : $(event.currentTarget).attr("ip")}, function (data) {
renderServers();
showSuccessDialog();
});
});
}

function bindTriggerAllButtons() {
$(document).on("click", "#trigger-all-jobs-btn", function(event) {
var jobName = $("#job-name").text();
$.post("job/triggerAll/name", {jobName : jobName}, function (data) {
renderServers();
showSuccessDialog();
});
});
}

function bindPauseButtons() {
$(document).on("click", "button[operation='pause'][data-toggle!='modal']", function(event) {
var jobName = $("#job-name").text();
Expand Down
29 changes: 27 additions & 2 deletions elastic-job-console/src/main/webapp/js/server_detail.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
$(function() {
renderJobs();
bindTriggerButtons();
bindPauseButtons();
bindResumeButtons();
bindTriggerAllButton();
bindPauseAllButton();
bindResumeAllButton();
bindShutdownButtons();
Expand All @@ -16,14 +18,16 @@ function renderJobs() {
var status = data[i].status;
var baseTd = "<td>" + data[i].jobName + "</td><td>" + status + "</td><td>" + data[i].processSuccessCount + "</td><td>" + data[i].processFailureCount + "</td><td>" + data[i].sharding + "</td>";
var operationTd = "";
var triggerButton = "<button operation='trigger' class='btn btn-success' job-name='" + data[i].jobName + "'>触发</button>";
var resumeButton = "<button operation='resume' class='btn btn-success' job-name='" + data[i].jobName + "'>恢复</button>";
var pauseButton = "<button operation='pause' class='btn btn-warning' job-name='" + data[i].jobName + "'" + ">暂停</button>";
var shutdownButton = "<button operation='shutdown' class='btn btn-danger' job-name='" + data[i].jobName + "'>关闭</button>";
var removeButton = "<button operation='remove' class='btn btn-danger' job-name='" + data[i].jobName + "'>删除</button>";
operationTd = triggerButton + "&nbsp;";
if ("PAUSED" === status) {
operationTd = resumeButton + "&nbsp;";
operationTd = triggerButton + resumeButton + "&nbsp;";
} else if ("DISABLED" !== status && "CRASHED" !== status && "SHUTDOWN" !== status) {
operationTd = pauseButton + "&nbsp;";
operationTd = triggerButton + pauseButton + "&nbsp;";
}
if ("SHUTDOWN" !== status) {
operationTd = operationTd + shutdownButton + "&nbsp;";
Expand All @@ -47,6 +51,18 @@ function renderJobs() {
});
}

function bindTriggerButtons() {
$(document).on("click", "button[operation='trigger'][data-toggle!='modal']", function(event) {
$.post("job/trigger", {jobName : $(event.currentTarget).attr("job-name"), ip : $("#server-ip").text()}, function (data) {
renderJobs();
showSuccessDialog();
});
});
$(document).on("click", "button[operation='trigger'][data-toggle='modal']", function(event) {
$("#chosen-job-name").text($(event.currentTarget).attr("job-name"));
});
}

function bindPauseButtons() {
$(document).on("click", "button[operation='pause'][data-toggle!='modal']", function(event) {
$.post("job/pause", {jobName : $(event.currentTarget).attr("job-name"), ip : $("#server-ip").text()}, function (data) {
Expand All @@ -68,6 +84,15 @@ function bindResumeButtons() {
});
}

function bindTriggerAllButton() {
$(document).on("click", "#trigger-all-jobs-btn", function(event) {
$.post("job/triggerAll/ip", {ip : $("#server-ip").text()}, function (data) {
renderJobs();
showSuccessDialog();
});
});
}

function bindPauseAllButton() {
$(document).on("click", "#pause-all-jobs-btn", function(event) {
$.post("job/pauseAll/ip", {ip : $("#server-ip").text()}, function (data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@
<tbody>
</tbody>
</table>
<button id="trigger-all-jobs-btn" class="btn btn-success">全部触发</button>
<button id="pause-all-jobs-btn" class="btn btn-warning">全部暂停</button>
<button id="resume-all-jobs-btn" class="btn btn-success">全部恢复</button>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
<tbody>
</tbody>
</table>
<button id="trigger-all-jobs-btn" class="btn btn-success">全部触发</button>
<button id="pause-all-jobs-btn" class="btn btn-warning">全部暂停</button>
<button id="resume-all-jobs-btn" class="btn btn-success">全部恢复</button>
<span id="chosen-job-name" class="hide"></span>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public JobOperationListenerManager(final CoordinatorRegistryCenter coordinatorRe
@Override
public void start() {
addConnectionStateListener(new ConnectionLostListener());
addDataListener(new JobTriggerStatusJobListener());
addDataListener(new JobPausedStatusJobListener());
addDataListener(new JobShutdownStatusJobListener());
}
Expand All @@ -89,6 +90,24 @@ public void stateChanged(final CuratorFramework client, final ConnectionState ne
}
}

class JobTriggerStatusJobListener extends AbstractJobListener {

@Override
protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) {
if (Type.NODE_ADDED != event.getType() || !serverNode.isLocalJobTriggerPath(path)) {
return;
}
serverService.clearJobTriggerStatus();
JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
if (null == jobScheduleController) {
return;
}
if (serverService.isLocalhostServerReady()) {
jobScheduleController.triggerJob();
}
}
}

class JobPausedStatusJobListener extends AbstractJobListener {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ public class ServerNode {

static final String STATUS = ROOT + "/%s/" + STATUS_APPENDIX;

static final String TRIGGER_APPENDIX = "trigger";

static final String TRIGGER = ROOT + "/%s/" + TRIGGER_APPENDIX;

static final String DISABLED_APPENDIX = "disabled";

static final String DISABLED = ROOT + "/%s/" + DISABLED_APPENDIX;
Expand Down Expand Up @@ -68,6 +72,10 @@ static String getStatusNode(final String ip) {
return String.format(STATUS, ip);
}

static String getTriggerNode(final String ip) {
return String.format(TRIGGER, ip);
}

static String getDisabledNode(final String ip) {
return String.format(DISABLED, ip);
}
Expand All @@ -88,6 +96,16 @@ static String getShutdownNode(final String ip) {
return String.format(SHUTDOWN, ip);
}

/**
* 判断给定路径是否为作业服务器立刻触发路径.
*
* @param path 待判断的路径
* @return 是否为作业服务器立刻触发路径
*/
public boolean isLocalJobTriggerPath(final String path) {
return path.startsWith(jobNodePath.getFullPath(String.format(ServerNode.TRIGGER, localHostService.getIp())));
}

/**
* 判断给定路径是否为作业服务器暂停路径.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ private void persistDisabled() {
}
}

/**
* 清除立刻执行作业的标记.
*/
public void clearJobTriggerStatus() {
jobNodeStorage.removeJobNodeIfExisted(ServerNode.getTriggerNode(localHostService.getIp()));
}

/**
* 清除暂停作业的标记.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
@RequiredArgsConstructor
public final class JobNodePath {

/**
* 作业立刻触发节点名称.
*/
public static final String TRIGGER_NODE = "trigger";

/**
* 作业暂停节点名称.
*/
Expand Down Expand Up @@ -81,16 +86,6 @@ public String getConfigNodePath(final String nodeName) {
return String.format("/%s/%s/%s", jobName, CONFIG_NODE, nodeName);
}

/**
* 获取选举节点路径.
*
* @param nodeName 子节点名称
* @return 选举节点路径
*/
public String getLeaderNodePath(final String nodeName) {
return String.format("/%s/%s/%s", jobName, LEADER_NODE, nodeName);
}

/**
* 获取作业节点IP地址根路径.
*
Expand Down
Loading

0 comments on commit 0c3ac6c

Please sign in to comment.