Skip to content

Commit

Permalink
split job scheduler to scheduler and executor
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Jun 24, 2016
1 parent b3774f2 commit 47d4df9
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,20 @@
package com.dangdang.ddframe.job.api;

import com.dangdang.ddframe.job.api.config.JobConfiguration;
import com.dangdang.ddframe.job.api.listener.AbstractDistributeOnceElasticJobListener;
import com.dangdang.ddframe.job.api.listener.ElasticJobListener;
import com.dangdang.ddframe.job.exception.JobException;
import com.dangdang.ddframe.job.internal.guarantee.GuaranteeService;
import com.dangdang.ddframe.job.internal.executor.JobExecutor;
import com.dangdang.ddframe.job.internal.job.LiteJob;
import com.dangdang.ddframe.job.internal.schedule.JobFacade;
import com.dangdang.ddframe.job.internal.schedule.JobRegistry;
import com.dangdang.ddframe.job.internal.schedule.JobScheduleController;
import com.dangdang.ddframe.job.internal.schedule.SchedulerFacade;
import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter;
import com.google.common.base.Joiner;
import lombok.extern.slf4j.Slf4j;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.impl.StdSchedulerFactory;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;

/**
Expand All @@ -46,76 +40,41 @@
* @author zhangliang
* @author caohao
*/
@Slf4j
public class JobScheduler {

private static final String SCHEDULER_INSTANCE_NAME_SUFFIX = "Scheduler";

private static final String CRON_TRIGGER_IDENTITY_SUFFIX = "Trigger";

private final String jobName;

private final CoordinatorRegistryCenter regCenter;

private final ElasticJob elasticJob;

private final SchedulerFacade schedulerFacade;
private final JobExecutor jobExecutor;

public JobScheduler(final CoordinatorRegistryCenter regCenter, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
jobName = jobConfig.getJobName();
this.regCenter = regCenter;
List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
setGuaranteeServiceForElasticJobListeners(regCenter, jobConfig, elasticJobListenerList);
elasticJob = createElasticJob(jobConfig, elasticJobListenerList);
schedulerFacade = new SchedulerFacade(regCenter, jobConfig, elasticJobListenerList);
}

private void setGuaranteeServiceForElasticJobListeners(final CoordinatorRegistryCenter regCenter, final JobConfiguration jobConfig, final List<ElasticJobListener> elasticJobListeners) {
GuaranteeService guaranteeService = new GuaranteeService(regCenter, jobConfig);
for (ElasticJobListener each : elasticJobListeners) {
if (each instanceof AbstractDistributeOnceElasticJobListener) {
((AbstractDistributeOnceElasticJobListener) each).setGuaranteeService(guaranteeService);
}
}
}

private ElasticJob createElasticJob(final JobConfiguration jobConfig, final List<ElasticJobListener> elasticJobListenerList) {
ElasticJob result;
try {
result = (ElasticJob) jobConfig.getJobClass().newInstance();
} catch (final InstantiationException | IllegalAccessException ex) {
throw new JobException(ex);
}
result.setJobFacade(new JobFacade(regCenter, jobConfig, elasticJobListenerList));
return result;
jobExecutor = new JobExecutor(regCenter, jobConfig, elasticJobListeners);
}

/**
* 初始化作业.
*/
public void init() {
log.debug("Elastic job: job controller init, job name is: {}.", jobName);
schedulerFacade.clearPreviousServerStatus();
regCenter.addCacheData("/" + jobName);
schedulerFacade.registerStartUpInfo();
JobDetail jobDetail = JobBuilder.newJob(LiteJob.class).withIdentity(jobName).build();
jobDetail.getJobDataMap().put("elasticJob", elasticJob);
jobExecutor.init();
JobDetail jobDetail = JobBuilder.newJob(LiteJob.class).withIdentity(jobExecutor.getJobName()).build();
jobDetail.getJobDataMap().put("elasticJob", jobExecutor.getElasticJob());
JobScheduleController jobScheduleController;
try {
jobScheduleController = new JobScheduleController(
initializeScheduler(jobDetail.getKey().toString()), jobDetail, schedulerFacade, Joiner.on("_").join(jobName, CRON_TRIGGER_IDENTITY_SUFFIX));
jobScheduleController.scheduleJob(schedulerFacade.getCron());
initializeScheduler(jobDetail.getKey().toString()), jobDetail, jobExecutor.getSchedulerFacade(), Joiner.on("_").join(jobExecutor.getJobName(), CRON_TRIGGER_IDENTITY_SUFFIX));
jobScheduleController.scheduleJob(jobExecutor.getSchedulerFacade().getCron());
} catch (final SchedulerException ex) {
throw new JobException(ex);
}
JobRegistry.getInstance().addJobScheduleController(jobName, jobScheduleController);
JobRegistry.getInstance().addJobScheduleController(jobExecutor.getJobName(), jobScheduleController);
}

private Scheduler initializeScheduler(final String jobName) throws SchedulerException {
StdSchedulerFactory factory = new StdSchedulerFactory();
factory.initialize(getBaseQuartzProperties(jobName));
Scheduler result = factory.getScheduler();
result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
result.getListenerManager().addTriggerListener(jobExecutor.getSchedulerFacade().newJobTriggerListener());
return result;
}

Expand All @@ -124,7 +83,7 @@ private Properties getBaseQuartzProperties(final String jobName) {
result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());
result.put("org.quartz.threadPool.threadCount", "1");
result.put("org.quartz.scheduler.instanceName", Joiner.on("_").join(jobName, SCHEDULER_INSTANCE_NAME_SUFFIX));
if (!schedulerFacade.isMisfire()) {
if (!jobExecutor.getSchedulerFacade().isMisfire()) {
result.put("org.quartz.jobStore.misfireThreshold", "1");
}
prepareEnvironments(result);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package com.dangdang.ddframe.job.internal.executor;

import com.dangdang.ddframe.job.api.ElasticJob;
import com.dangdang.ddframe.job.api.config.JobConfiguration;
import com.dangdang.ddframe.job.api.listener.AbstractDistributeOnceElasticJobListener;
import com.dangdang.ddframe.job.api.listener.ElasticJobListener;
import com.dangdang.ddframe.job.exception.JobException;
import com.dangdang.ddframe.job.internal.guarantee.GuaranteeService;
import com.dangdang.ddframe.job.internal.schedule.JobFacade;
import com.dangdang.ddframe.job.internal.schedule.SchedulerFacade;
import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.List;

/**
* 作业启动器.
*
* @author zhangliang
*/
@Slf4j
@Getter
public class JobExecutor {

private final String jobName;

private final CoordinatorRegistryCenter regCenter;

private final ElasticJob elasticJob;

private final SchedulerFacade schedulerFacade;

public JobExecutor(final CoordinatorRegistryCenter regCenter, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
jobName = jobConfig.getJobName();
this.regCenter = regCenter;
List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
setGuaranteeServiceForElasticJobListeners(regCenter, jobConfig, elasticJobListenerList);
elasticJob = createElasticJob(jobConfig, elasticJobListenerList);
schedulerFacade = new SchedulerFacade(regCenter, jobConfig, elasticJobListenerList);
}

private void setGuaranteeServiceForElasticJobListeners(final CoordinatorRegistryCenter regCenter, final JobConfiguration jobConfig, final List<ElasticJobListener> elasticJobListeners) {
GuaranteeService guaranteeService = new GuaranteeService(regCenter, jobConfig);
for (ElasticJobListener each : elasticJobListeners) {
if (each instanceof AbstractDistributeOnceElasticJobListener) {
((AbstractDistributeOnceElasticJobListener) each).setGuaranteeService(guaranteeService);
}
}
}

private ElasticJob createElasticJob(final JobConfiguration jobConfig, final List<ElasticJobListener> elasticJobListenerList) {
ElasticJob result;
try {
result = (ElasticJob) jobConfig.getJobClass().newInstance();
} catch (final InstantiationException | IllegalAccessException ex) {
throw new JobException(ex);
}
result.setJobFacade(new JobFacade(regCenter, jobConfig, elasticJobListenerList));
return result;
}

/**
* 初始化作业.
*/
public void init() {
log.debug("Elastic job: job controller init, job name is: {}.", jobName);
schedulerFacade.clearPreviousServerStatus();
regCenter.addCacheData("/" + jobName);
schedulerFacade.registerStartUpInfo();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ public void registerStartUpInfo() {
configService.persistJobConfiguration();
serverService.persistServerOnline();
serverService.clearJobPausedStatus();
if (JobType.DATA_FLOW.equals(configService.getJobType())) {
statisticsService.startProcessCountJob();
if (JobType.DATA_FLOW == configService.getJobType()) {
statisticsService.startProcessCountJob();
}
shardingService.setReshardingFlag();
monitorService.listen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

@RunWith(Suite.class)
@SuiteClasses({
JobExecutionMultipleShardingContextTest.class,
JobExecutionMultipleShardingContextTest.class,
JobSchedulerTest.class,
DistributeOnceElasticJobListenerTest.class,
JobConfigurationFactoryTest.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@

import com.dangdang.ddframe.job.api.config.JobConfiguration;
import com.dangdang.ddframe.job.api.config.JobConfigurationFactory;
import com.dangdang.ddframe.job.api.listener.AbstractDistributeOnceElasticJobListener;
import com.dangdang.ddframe.job.api.listener.fixture.ElasticJobListenerCaller;
import com.dangdang.ddframe.job.api.listener.fixture.TestDistributeOnceElasticJobListener;
import com.dangdang.ddframe.job.api.listener.fixture.TestElasticJobListener;
import com.dangdang.ddframe.job.fixture.TestJob;
import com.dangdang.ddframe.job.internal.executor.JobExecutor;
import com.dangdang.ddframe.job.internal.schedule.JobRegistry;
import com.dangdang.ddframe.job.internal.schedule.JobScheduleController;
import com.dangdang.ddframe.job.internal.schedule.JobTriggerListener;
Expand All @@ -39,8 +37,6 @@

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.verify;
Expand All @@ -51,6 +47,9 @@ public final class JobSchedulerTest {
@Mock
private CoordinatorRegistryCenter regCenter;

@Mock
private JobExecutor jobExecutor;

@Mock
private SchedulerFacade schedulerFacade;

Expand All @@ -64,17 +63,8 @@ public final class JobSchedulerTest {
@Before
public void initMocks() throws NoSuchFieldException {
MockitoAnnotations.initMocks(this);
ReflectionUtils.setFieldValue(jobScheduler, "regCenter", regCenter);
ReflectionUtils.setFieldValue(jobScheduler, "schedulerFacade", schedulerFacade);
}

@Test
public void testNew() throws NoSuchFieldException {
TestDistributeOnceElasticJobListener testDistributeOnceElasticJobListener = new TestDistributeOnceElasticJobListener(caller);
assertNull(ReflectionUtils.getFieldValue(testDistributeOnceElasticJobListener, ReflectionUtils.getFieldWithName(AbstractDistributeOnceElasticJobListener.class, "guaranteeService", false)));
JobScheduler actualJobScheduler = new JobScheduler(null, jobConfig, new TestElasticJobListener(caller), testDistributeOnceElasticJobListener);
assertNotNull(ReflectionUtils.getFieldValue(testDistributeOnceElasticJobListener, ReflectionUtils.getFieldWithName(AbstractDistributeOnceElasticJobListener.class, "guaranteeService", false)));
assertThat(ReflectionUtils.getFieldValue(actualJobScheduler, ReflectionUtils.getFieldWithName(JobScheduler.class, "elasticJob", false)), instanceOf(jobConfig.getJobClass()));
ReflectionUtils.setFieldValue(jobScheduler, "jobExecutor", jobExecutor);
when(jobExecutor.getSchedulerFacade()).thenReturn(schedulerFacade);
}

@Test
Expand All @@ -92,19 +82,18 @@ public void assertInitIfIsNotMisfire() throws NoSuchFieldException, SchedulerExc
}

private void mockInit(final boolean isMisfire) {
when(jobExecutor.getJobName()).thenReturn("testJob");
when(schedulerFacade.newJobTriggerListener()).thenReturn(new JobTriggerListener(null, null));
when(schedulerFacade.getCron()).thenReturn("* * 0/10 * * ? 2050");
when(schedulerFacade.isMisfire()).thenReturn(isMisfire);
}

private void assertInit() throws NoSuchFieldException, SchedulerException {
verify(schedulerFacade).clearPreviousServerStatus();
verify(jobExecutor).init();
Scheduler scheduler = ReflectionUtils.getFieldValue(JobRegistry.getInstance().getJobScheduleController("testJob"), JobScheduleController.class.getDeclaredField("scheduler"));
assertThat(scheduler.getListenerManager().getTriggerListeners().size(), is(1));
assertThat(scheduler.getListenerManager().getTriggerListeners().get(0), instanceOf(JobTriggerListener.class));
assertTrue(scheduler.isStarted());
verify(regCenter).addCacheData("/testJob");
verify(schedulerFacade).registerStartUpInfo();
verify(schedulerFacade).newJobTriggerListener();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.dangdang.ddframe.job.internal.execution.ExecutionListenerManagerTest;
import com.dangdang.ddframe.job.internal.execution.ExecutionNodeTest;
import com.dangdang.ddframe.job.internal.execution.ExecutionServiceTest;
import com.dangdang.ddframe.job.internal.executor.JobExecutorTest;
import com.dangdang.ddframe.job.internal.failover.FailoverListenerManagerTest;
import com.dangdang.ddframe.job.internal.failover.FailoverNodeTest;
import com.dangdang.ddframe.job.internal.failover.FailoverServiceTest;
Expand Down Expand Up @@ -106,7 +107,8 @@
GuaranteeNodeTest.class,
GuaranteeServiceTest.class,
SchedulerFacadeTest.class,
JobFacadeTest.class
JobFacadeTest.class,
JobExecutorTest.class
})
public final class AllInternalTests {
}
Loading

0 comments on commit 47d4df9

Please sign in to comment.