Skip to content

Commit

Permalink
fix apache#87
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed May 11, 2016
1 parent 4a7046e commit 6287b15
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.dangdang.ddframe.job.internal.listener.AbstractJobListener;
import com.dangdang.ddframe.job.internal.listener.AbstractListenerManager;
import com.dangdang.ddframe.job.internal.server.ServerNode;
import com.dangdang.ddframe.job.internal.sharding.ShardingService;
import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -39,16 +38,13 @@ public class ElectionListenerManager extends AbstractListenerManager {

private final LeaderElectionService leaderElectionService;

private final ShardingService shardingService;

private final ElectionNode electionNode;

private final ServerNode serverNode;

public ElectionListenerManager(final CoordinatorRegistryCenter coordinatorRegistryCenter, final JobConfiguration jobConfiguration) {
super(coordinatorRegistryCenter, jobConfiguration);
leaderElectionService = new LeaderElectionService(coordinatorRegistryCenter, jobConfiguration);
shardingService = new ShardingService(coordinatorRegistryCenter, jobConfiguration);
electionNode = new ElectionNode(jobConfiguration.getJobName());
serverNode = new ServerNode(jobConfiguration.getJobName());
}
Expand All @@ -66,7 +62,6 @@ protected void dataChanged(final CuratorFramework client, final TreeCacheEvent e
if ((eventHelper.isLeaderCrashed() || eventHelper.isServerEnabled() || eventHelper.isServerResumed()) && !leaderElectionService.hasLeader()) {
log.debug("Elastic job: leader crashed, elect a new leader now.");
leaderElectionService.leaderElection();
shardingService.setReshardingFlag();
log.debug("Elastic job: leader election completed.");
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class LeaderElectionExecutionCallback implements LeaderExecutionCallback {

@Override
public void execute() {
if (!jobNodeStorage.isJobNodeExisted(ElectionNode.LEADER_HOST) && (isForceElect || serverService.isServerReady())) {
if (!jobNodeStorage.isJobNodeExisted(ElectionNode.LEADER_HOST) && (isForceElect || serverService.isAvailableServer(localHostService.getIp()))) {
jobNodeStorage.fillEphemeralJobNode(ElectionNode.LEADER_HOST, localHostService.getIp());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void failoverIfNecessary() {
}

private boolean needFailover() {
return jobNodeStorage.isJobNodeExisted(FailoverNode.ITEMS_ROOT) && !jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).isEmpty() && serverService.isServerReady();
return jobNodeStorage.isJobNodeExisted(FailoverNode.ITEMS_ROOT) && !jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).isEmpty() && serverService.isLocalhostServerReady();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ public class ServerNode {

static final String STOPPED = ROOT + "/%s/stoped";

static final String SHUTDOWN = ROOT + "/%s/shutdown";
static final String SHUTDOWN_APPENDIX = "shutdown";

static final String SHUTDOWN = ROOT + "/%s/" + SHUTDOWN_APPENDIX;

private final LocalHostService localHostService = new LocalHostService();

Expand Down Expand Up @@ -135,4 +137,14 @@ public boolean isServerStatusPath(final String path) {
public boolean isServerDisabledPath(final String path) {
return path.startsWith(jobNodePath.getFullPath(ServerNode.ROOT)) && path.endsWith(ServerNode.DISABLED_APPENDIX);
}

/**
* 判断给定路径是否为作业服务器关闭路径.
*
* @param path 待判断的路径
* @return 是否为作业服务器关闭路径
*/
public boolean isServerShutdownPath(final String path) {
return path.startsWith(jobNodePath.getFullPath(ServerNode.ROOT)) && path.endsWith(ServerNode.SHUTDOWN_APPENDIX);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,14 @@ public List<String> getAvailableServers() {
return result;
}

private Boolean isAvailableServer(final String ip) {
return jobNodeStorage.isJobNodeExisted(ServerNode.getStatusNode(ip))
/**
* 判断作业服务器是否可用.
*
* @param ip 作业服务器IP地址.
* @return 作业服务器是否可用
*/
public boolean isAvailableServer(final String ip) {
return jobNodeStorage.isJobNodeExisted(ServerNode.getStatusNode(ip)) && !jobNodeStorage.isJobNodeExisted(ServerNode.getStoppedNode(ip))
&& !jobNodeStorage.isJobNodeExisted(ServerNode.getDisabledNode(ip)) && !jobNodeStorage.isJobNodeExisted(ServerNode.getShutdownNode(ip));
}

Expand All @@ -140,18 +146,9 @@ private Boolean isAvailableServer(final String ip) {
*
* @return 当前服务器是否是等待执行的状态
*/
public boolean isServerReady() {
if (jobNodeStorage.isJobNodeExisted(ServerNode.getDisabledNode(localHostService.getIp()))) {
return false;
}
if (jobNodeStorage.isJobNodeExisted(ServerNode.getStoppedNode(localHostService.getIp()))) {
return false;
}
if (jobNodeStorage.isJobNodeExisted(ServerNode.getShutdownNode(localHostService.getIp()))) {
return false;
}
String statusNode = ServerNode.getStatusNode(localHostService.getIp());
return jobNodeStorage.isJobNodeExisted(statusNode) && ServerStatus.READY.name().equals(jobNodeStorage.getJobNodeData(statusNode));
public boolean isLocalhostServerReady() {
String ip = localHostService.getIp();
return isAvailableServer(ip) && ServerStatus.READY.name().equals(jobNodeStorage.getJobNodeData(ServerNode.getStatusNode(ip)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class ListenServersChangedJobListener extends AbstractJobListener {

@Override
protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) {
if (isServersCrashed(event, path) || serverNode.isServerDisabledPath(path)) {
if (isServersCrashed(event, path) || serverNode.isServerDisabledPath(path) || serverNode.isServerShutdownPath(path)) {
shardingService.setReshardingFlag();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package com.dangdang.ddframe.job.internal.election;

import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.dangdang.ddframe.job.api.JobConfiguration;
import com.dangdang.ddframe.job.fixture.TestJob;
import com.dangdang.ddframe.job.internal.election.ElectionListenerManager.LeaderElectionJobListener;
import com.dangdang.ddframe.job.internal.server.ServerNode;
import com.dangdang.ddframe.job.internal.storage.JobNodeStorage;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.junit.Before;
Expand All @@ -31,11 +31,9 @@
import org.mockito.MockitoAnnotations;
import org.unitils.util.ReflectionUtils;

import com.dangdang.ddframe.job.api.JobConfiguration;
import com.dangdang.ddframe.job.fixture.TestJob;
import com.dangdang.ddframe.job.internal.election.ElectionListenerManager.LeaderElectionJobListener;
import com.dangdang.ddframe.job.internal.sharding.ShardingService;
import com.dangdang.ddframe.job.internal.storage.JobNodeStorage;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public final class ElectionListenerManagerTest {

Expand All @@ -48,9 +46,6 @@ public final class ElectionListenerManagerTest {
@Mock
private LeaderElectionService leaderElectionService;

@Mock
private ShardingService shardingService;

private final ElectionListenerManager electionListenerManager = new ElectionListenerManager(null, new JobConfiguration("testJob", TestJob.class, 3, "0/1 * * * * ?"));

@Before
Expand All @@ -59,7 +54,6 @@ public void setUp() throws NoSuchFieldException {
ReflectionUtils.setFieldValue(electionListenerManager, electionListenerManager.getClass().getSuperclass().getDeclaredField("jobNodeStorage"), jobNodeStorage);
ReflectionUtils.setFieldValue(electionListenerManager, "serverNode", serverNode);
ReflectionUtils.setFieldValue(electionListenerManager, "leaderElectionService", leaderElectionService);
ReflectionUtils.setFieldValue(electionListenerManager, "shardingService", shardingService);
}

@Test
Expand Down Expand Up @@ -98,7 +92,6 @@ electionListenerManager.new LeaderElectionJobListener().dataChanged(null, new Tr
TreeCacheEvent.Type.NODE_REMOVED, new ChildData("/testJob/leader/election/host", null, "localhost".getBytes())), "/testJob/leader/election/host");
verify(leaderElectionService).hasLeader();
verify(leaderElectionService).leaderElection();
verify(shardingService).setReshardingFlag();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.mockito.MockitoAnnotations;
import org.unitils.util.ReflectionUtils;

import java.util.Arrays;
import java.util.Collections;

import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -88,27 +87,27 @@ public void assertLeaderElectionExecutionCallbackWithLeader() {
}

@Test
public void assertLeaderElectionExecutionCallbackWithoutLeaderAndServerIsReady() {
public void assertLeaderElectionExecutionCallbackWithoutLeaderAndIsAvailableServer() {
when(jobNodeStorage.isJobNodeExisted("leader/election/host")).thenReturn(false);
when(serverService.isServerReady()).thenReturn(true);
when(serverService.isAvailableServer("mockedIP")).thenReturn(true);
leaderElectionService.new LeaderElectionExecutionCallback(false).execute();
verify(jobNodeStorage).isJobNodeExisted("leader/election/host");
verify(jobNodeStorage).fillEphemeralJobNode("leader/election/host", "mockedIP");
}

@Test
public void assertLeaderElectionExecutionCallbackWithoutLeaderAndServerIsNotReady() {
public void assertLeaderElectionExecutionCallbackWithoutLeaderAndIsNotAvailableServer() {
when(jobNodeStorage.isJobNodeExisted("leader/election/host")).thenReturn(false);
when(serverService.isServerReady()).thenReturn(false);
when(serverService.isAvailableServer("mockedIP")).thenReturn(false);
leaderElectionService.new LeaderElectionExecutionCallback(false).execute();
verify(jobNodeStorage).isJobNodeExisted("leader/election/host");
verify(jobNodeStorage, times(0)).fillEphemeralJobNode("leader/election/host", "mockedIP");
}

@Test
public void assertLeaderForceElectionExecutionCallbackWithoutLeaderAndServerIsNotReady() {
public void assertLeaderForceElectionExecutionCallbackWithoutLeaderAndIsNotAvailableServer() {
when(jobNodeStorage.isJobNodeExisted("leader/election/host")).thenReturn(false);
when(serverService.isServerReady()).thenReturn(false);
when(serverService.isAvailableServer("mockedIP")).thenReturn(false);
leaderElectionService.new LeaderElectionExecutionCallback(true).execute();
verify(jobNodeStorage).isJobNodeExisted("leader/election/host");
verify(jobNodeStorage).fillEphemeralJobNode("leader/election/host", "mockedIP");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,23 +116,23 @@ public void assertFailoverIfUnnecessaryWhenItemsRootNodeIsEmpty() {
public void assertFailoverIfUnnecessaryWhenServerIsNotReady() {
when(jobNodeStorage.isJobNodeExisted("leader/failover/items")).thenReturn(true);
when(jobNodeStorage.getJobNodeChildrenKeys("leader/failover/items")).thenReturn(Arrays.asList("0", "1", "2"));
when(serverService.isServerReady()).thenReturn(false);
when(serverService.isLocalhostServerReady()).thenReturn(false);
failoverService.failoverIfNecessary();
verify(jobNodeStorage).isJobNodeExisted("leader/failover/items");
verify(jobNodeStorage).getJobNodeChildrenKeys("leader/failover/items");
verify(serverService).isServerReady();
verify(serverService).isLocalhostServerReady();
verify(jobNodeStorage, times(0)).executeInLeader(eq("leader/failover/latch"), Matchers.<FailoverLeaderExecutionCallback>any());
}

@Test
public void assertFailoverIfNecessary() {
when(jobNodeStorage.isJobNodeExisted("leader/failover/items")).thenReturn(true);
when(jobNodeStorage.getJobNodeChildrenKeys("leader/failover/items")).thenReturn(Arrays.asList("0", "1", "2"));
when(serverService.isServerReady()).thenReturn(true);
when(serverService.isLocalhostServerReady()).thenReturn(true);
failoverService.failoverIfNecessary();
verify(jobNodeStorage).isJobNodeExisted("leader/failover/items");
verify(jobNodeStorage).getJobNodeChildrenKeys("leader/failover/items");
verify(serverService).isServerReady();
verify(serverService).isLocalhostServerReady();
verify(jobNodeStorage).executeInLeader(eq("leader/failover/latch"), Matchers.<FailoverLeaderExecutionCallback>any());
}

Expand All @@ -148,12 +148,12 @@ public void assertFailoverLeaderExecutionCallbackIfNotNecessary() {
public void assertFailoverLeaderExecutionCallbackIfNecessary() {
when(jobNodeStorage.isJobNodeExisted("leader/failover/items")).thenReturn(true);
when(jobNodeStorage.getJobNodeChildrenKeys("leader/failover/items")).thenReturn(Arrays.asList("0", "1", "2"));
when(serverService.isServerReady()).thenReturn(true);
when(serverService.isLocalhostServerReady()).thenReturn(true);
JobRegistry.getInstance().addJobScheduler("testJob", jobScheduler);
failoverService.new FailoverLeaderExecutionCallback().execute();
verify(jobNodeStorage).isJobNodeExisted("leader/failover/items");
verify(jobNodeStorage, times(2)).getJobNodeChildrenKeys("leader/failover/items");
verify(serverService).isServerReady();
verify(serverService).isLocalhostServerReady();
verify(jobNodeStorage).fillEphemeralJobNode("execution/0/failover", "mockedIP");
verify(jobNodeStorage).removeJobNodeIfExisted("leader/failover/items/0");
verify(jobScheduler).triggerJob();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,11 @@ public void assertIsServerDisabledPath() {
assertFalse(serverNode.isServerDisabledPath("/otherJob/servers/host0/status"));
assertFalse(serverNode.isServerDisabledPath("/testJob/servers/host0/status"));
}

@Test
public void assertIsServerShutdownPath() {
assertTrue(serverNode.isServerShutdownPath("/testJob/servers/host0/shutdown"));
assertFalse(serverNode.isServerShutdownPath("/otherJob/servers/host0/status"));
assertFalse(serverNode.isServerShutdownPath("/testJob/servers/host0/status"));
}
}
Loading

0 comments on commit 6287b15

Please sign in to comment.