Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Jun 23, 2016
1 parent 58bdd1d commit b3774f2
Show file tree
Hide file tree
Showing 28 changed files with 225 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,37 @@

package com.dangdang.ddframe.job.api;

import org.quartz.Job;
import org.quartz.JobExecutionException;
import com.dangdang.ddframe.job.exception.JobException;
import com.dangdang.ddframe.job.internal.schedule.JobFacade;

/**
* 弹性化分布式作业接口.
*
* @author zhangliang
*/
public interface ElasticJob extends Job {
public interface ElasticJob {

/**
* 执行作业.
*/
void execute();

/**
* 处理作业执行时异常.
*
* @param jobExecutionException 作业执行时异常
* @throws JobExecutionException 作业执行时异常
* @param jobException 作业异常
*/
void handleJobExecutionException(JobException jobException);

/**
* 获取提供作业服务的门面类.
*/
JobFacade getJobFacade();

/**
* 设置提供作业服务的门面类.
*
* @param jobFacade 提供作业服务的门面类
*/
void handleJobExecutionException(JobExecutionException jobExecutionException) throws JobExecutionException;
void setJobFacade(JobFacade jobFacade);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.dangdang.ddframe.job.api.listener.ElasticJobListener;
import com.dangdang.ddframe.job.exception.JobException;
import com.dangdang.ddframe.job.internal.guarantee.GuaranteeService;
import com.dangdang.ddframe.job.internal.job.LiteJob;
import com.dangdang.ddframe.job.internal.schedule.JobFacade;
import com.dangdang.ddframe.job.internal.schedule.JobRegistry;
import com.dangdang.ddframe.job.internal.schedule.JobScheduleController;
Expand Down Expand Up @@ -56,20 +57,17 @@ public class JobScheduler {

private final CoordinatorRegistryCenter regCenter;

private final SchedulerFacade schedulerFacade;

private final JobFacade jobFacade;
private final ElasticJob elasticJob;

private final JobDetail jobDetail;
private final SchedulerFacade schedulerFacade;

public JobScheduler(final CoordinatorRegistryCenter regCenter, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
jobName = jobConfig.getJobName();
this.regCenter = regCenter;
List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
setGuaranteeServiceForElasticJobListeners(regCenter, jobConfig, elasticJobListenerList);
elasticJob = createElasticJob(jobConfig, elasticJobListenerList);
schedulerFacade = new SchedulerFacade(regCenter, jobConfig, elasticJobListenerList);
jobFacade = new JobFacade(regCenter, jobConfig, elasticJobListenerList);
jobDetail = JobBuilder.newJob(jobConfig.getJobClass()).withIdentity(jobName).build();
}

private void setGuaranteeServiceForElasticJobListeners(final CoordinatorRegistryCenter regCenter, final JobConfiguration jobConfig, final List<ElasticJobListener> elasticJobListeners) {
Expand All @@ -81,6 +79,17 @@ private void setGuaranteeServiceForElasticJobListeners(final CoordinatorRegistry
}
}

private ElasticJob createElasticJob(final JobConfiguration jobConfig, final List<ElasticJobListener> elasticJobListenerList) {
ElasticJob result;
try {
result = (ElasticJob) jobConfig.getJobClass().newInstance();
} catch (final InstantiationException | IllegalAccessException ex) {
throw new JobException(ex);
}
result.setJobFacade(new JobFacade(regCenter, jobConfig, elasticJobListenerList));
return result;
}

/**
* 初始化作业.
*/
Expand All @@ -89,7 +98,8 @@ public void init() {
schedulerFacade.clearPreviousServerStatus();
regCenter.addCacheData("/" + jobName);
schedulerFacade.registerStartUpInfo();
jobDetail.getJobDataMap().put("jobFacade", jobFacade);
JobDetail jobDetail = JobBuilder.newJob(LiteJob.class).withIdentity(jobName).build();
jobDetail.getJobDataMap().put("elasticJob", elasticJob);
JobScheduleController jobScheduleController;
try {
jobScheduleController = new JobScheduleController(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@

import com.dangdang.ddframe.job.api.ElasticJob;
import com.dangdang.ddframe.job.api.JobExecutionMultipleShardingContext;
import com.dangdang.ddframe.job.exception.JobException;
import com.dangdang.ddframe.job.internal.schedule.JobFacade;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

/**
* 弹性化分布式作业的基类.
Expand All @@ -35,12 +32,11 @@
@Slf4j
public abstract class AbstractElasticJob implements ElasticJob {

@Getter(AccessLevel.PROTECTED)
private JobFacade jobFacade;

@Override
public final void execute(final JobExecutionContext context) throws JobExecutionException {
log.trace("Elastic job: job execute begin, job execution context:{}.", context);
public final void execute() {
log.trace("Elastic job: job execute begin.");
jobFacade.checkMaxTimeDiffSecondsTolerable();
JobExecutionMultipleShardingContext shardingContext = jobFacade.getShardingContext();
if (jobFacade.misfireIfNecessary(shardingContext.getShardingItems())) {
Expand All @@ -53,7 +49,7 @@ public final void execute(final JobExecutionContext context) throws JobExecution
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
handleJobExecutionException(new JobExecutionException(cause));
handleJobExecutionException(new JobException(cause));
}
executeJobInternal(shardingContext);
log.trace("Elastic job: execute normal completed, sharding context:{}.", shardingContext);
Expand All @@ -69,12 +65,12 @@ public final void execute(final JobExecutionContext context) throws JobExecution
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
handleJobExecutionException(new JobExecutionException(cause));
handleJobExecutionException(new JobException(cause));
}
log.trace("Elastic job: execute all completed, job execution context:{}.", context);
log.trace("Elastic job: execute all completed.");
}

private void executeJobInternal(final JobExecutionMultipleShardingContext shardingContext) throws JobExecutionException {
private void executeJobInternal(final JobExecutionMultipleShardingContext shardingContext) {
if (shardingContext.getShardingItems().isEmpty()) {
log.trace("Elastic job: sharding item is empty, job execution context:{}.", shardingContext);
return;
Expand All @@ -85,7 +81,7 @@ private void executeJobInternal(final JobExecutionMultipleShardingContext shardi
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
handleJobExecutionException(new JobExecutionException(cause));
handleJobExecutionException(new JobException(cause));
} finally {
// TODO 考虑增加作业失败的状态,并且考虑如何处理作业失败的整体回路
jobFacade.registerJobCompleted(shardingContext);
Expand All @@ -95,10 +91,16 @@ private void executeJobInternal(final JobExecutionMultipleShardingContext shardi
protected abstract void executeJob(final JobExecutionMultipleShardingContext shardingContext);

@Override
public void handleJobExecutionException(final JobExecutionException jobExecutionException) throws JobExecutionException {
throw jobExecutionException;
public void handleJobExecutionException(final JobException jobException) {
log.error("Elastic job: exception occur in job processing...", jobException.getCause());
}

@Override
public final JobFacade getJobFacade() {
return jobFacade;
}

@Override
public final void setJobFacade(final JobFacade jobFacade) {
this.jobFacade = jobFacade;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package com.dangdang.ddframe.job.internal.job;

import com.dangdang.ddframe.job.api.ElasticJob;
import lombok.Setter;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

/**
* Elastic Job Lite提供的Quartz封装作业.
*
* @author zhangliang
*/
public class LiteJob implements Job {

@Setter
private ElasticJob elasticJob;

@Override
public void execute(final JobExecutionContext context) throws JobExecutionException {
elasticJob.execute();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import com.dangdang.ddframe.job.api.DataFlowElasticJob;
import com.dangdang.ddframe.job.api.JobExecutionMultipleShardingContext;
import com.dangdang.ddframe.job.api.JobExecutionSingleShardingContext;
import com.dangdang.ddframe.job.exception.JobException;
import com.dangdang.ddframe.job.internal.job.AbstractElasticJob;
import com.dangdang.ddframe.job.internal.job.AbstractJobExecutionShardingContext;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.quartz.JobExecutionException;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
Expand Down Expand Up @@ -224,8 +224,8 @@ public ExecutorService getExecutorService() {
}

@Override
public void handleJobExecutionException(final JobExecutionException jobExecutionException) throws JobExecutionException {
log.error("Elastic job: exception occur in job processing...", jobExecutionException.getCause());
public void handleJobExecutionException(final JobException jobException) {
log.error("Elastic job: exception occur in job processing...", jobException.getCause());
}

private void latchAwait(final CountDownLatch latch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* @author caohao
*/
public final class ScriptElasticJob extends AbstractElasticJob {

@Override
protected void executeJob(final JobExecutionMultipleShardingContext shardingContext) {
String scriptCommandLine = getJobFacade().getScriptCommandLine();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@

package com.dangdang.ddframe.job.plugin.job.type.simple;

import lombok.extern.slf4j.Slf4j;

import org.quartz.JobExecutionException;

import com.dangdang.ddframe.job.api.JobExecutionMultipleShardingContext;
import com.dangdang.ddframe.job.internal.job.AbstractElasticJob;
import lombok.extern.slf4j.Slf4j;

/**
* 简单的分布式作业.
Expand All @@ -42,11 +39,6 @@ protected final void executeJob(final JobExecutionMultipleShardingContext shardi
process(shardingContext);
}

@Override
public void handleJobExecutionException(final JobExecutionException jobExecutionException) throws JobExecutionException {
log.error("Elastic job: exception occur in job processing...", jobExecutionException.getCause());
}

/**
* 执行作业.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,27 @@
import com.dangdang.ddframe.job.api.config.JobConfiguration;
import com.dangdang.ddframe.job.api.config.JobConfigurationFactory;
import com.dangdang.ddframe.job.api.listener.AbstractDistributeOnceElasticJobListener;
import com.dangdang.ddframe.job.api.listener.ElasticJobListener;
import com.dangdang.ddframe.job.api.listener.fixture.ElasticJobListenerCaller;
import com.dangdang.ddframe.job.api.listener.fixture.TestDistributeOnceElasticJobListener;
import com.dangdang.ddframe.job.api.listener.fixture.TestElasticJobListener;
import com.dangdang.ddframe.job.fixture.TestJob;
import com.dangdang.ddframe.job.internal.schedule.JobFacade;
import com.dangdang.ddframe.job.internal.schedule.JobRegistry;
import com.dangdang.ddframe.job.internal.schedule.JobScheduleController;
import com.dangdang.ddframe.job.internal.schedule.JobTriggerListener;
import com.dangdang.ddframe.job.internal.schedule.SchedulerFacade;
import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter;
import org.hamcrest.core.Is;
import org.hamcrest.core.IsInstanceOf;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.unitils.util.ReflectionUtils;

import java.util.List;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.verify;
Expand All @@ -60,12 +54,6 @@ public final class JobSchedulerTest {
@Mock
private SchedulerFacade schedulerFacade;

@Mock
private JobFacade jobFacade;

@Mock
private JobDetail jobDetail;

@Mock
private ElasticJobListenerCaller caller;

Expand All @@ -78,18 +66,15 @@ public void initMocks() throws NoSuchFieldException {
MockitoAnnotations.initMocks(this);
ReflectionUtils.setFieldValue(jobScheduler, "regCenter", regCenter);
ReflectionUtils.setFieldValue(jobScheduler, "schedulerFacade", schedulerFacade);
ReflectionUtils.setFieldValue(jobScheduler, "jobFacade", jobFacade);
}

@Test
public void testNew() throws NoSuchFieldException {
JobScheduler actualJobScheduler = new JobScheduler(null, jobConfig, new TestElasticJobListener(caller), new TestDistributeOnceElasticJobListener(caller));
JobFacade actualJobFacade = ReflectionUtils.getFieldValue(actualJobScheduler, ReflectionUtils.getFieldWithName(JobScheduler.class, "jobFacade", false));
List<ElasticJobListener> actualElasticJobListeners = ReflectionUtils.getFieldValue(actualJobFacade, ReflectionUtils.getFieldWithName(JobFacade.class, "elasticJobListeners", false));
assertThat(actualElasticJobListeners.size(), Is.is(2));
assertThat(actualElasticJobListeners.get(0), IsInstanceOf.instanceOf(TestElasticJobListener.class));
assertThat(actualElasticJobListeners.get(1), IsInstanceOf.instanceOf(TestDistributeOnceElasticJobListener.class));
assertNotNull(ReflectionUtils.getFieldValue(actualElasticJobListeners.get(1), AbstractDistributeOnceElasticJobListener.class.getDeclaredField("guaranteeService")));
TestDistributeOnceElasticJobListener testDistributeOnceElasticJobListener = new TestDistributeOnceElasticJobListener(caller);
assertNull(ReflectionUtils.getFieldValue(testDistributeOnceElasticJobListener, ReflectionUtils.getFieldWithName(AbstractDistributeOnceElasticJobListener.class, "guaranteeService", false)));
JobScheduler actualJobScheduler = new JobScheduler(null, jobConfig, new TestElasticJobListener(caller), testDistributeOnceElasticJobListener);
assertNotNull(ReflectionUtils.getFieldValue(testDistributeOnceElasticJobListener, ReflectionUtils.getFieldWithName(AbstractDistributeOnceElasticJobListener.class, "guaranteeService", false)));
assertThat(ReflectionUtils.getFieldValue(actualJobScheduler, ReflectionUtils.getFieldWithName(JobScheduler.class, "elasticJob", false)), instanceOf(jobConfig.getJobClass()));
}

@Test
Expand All @@ -114,13 +99,10 @@ private void mockInit(final boolean isMisfire) {

private void assertInit() throws NoSuchFieldException, SchedulerException {
verify(schedulerFacade).clearPreviousServerStatus();
JobDetail jobDetail = ReflectionUtils.getFieldValue(jobScheduler, jobScheduler.getClass().getDeclaredField("jobDetail"));
assertThat(jobDetail.getKey().getName(), is("testJob"));
Scheduler scheduler = ReflectionUtils.getFieldValue(JobRegistry.getInstance().getJobScheduleController("testJob"), JobScheduleController.class.getDeclaredField("scheduler"));
assertThat(scheduler.getListenerManager().getTriggerListeners().size(), is(1));
assertThat(scheduler.getListenerManager().getTriggerListeners().get(0), instanceOf(JobTriggerListener.class));
assertTrue(scheduler.isStarted());
assertThat((JobFacade) jobDetail.getJobDataMap().get("jobFacade"), is(jobFacade));
verify(regCenter).addCacheData("/testJob");
verify(schedulerFacade).registerStartUpInfo();
verify(schedulerFacade).newJobTriggerListener();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.dangdang.ddframe.job.exception.JobException;
import com.dangdang.ddframe.job.plugin.job.type.dataflow.AbstractIndividualThroughputDataFlowElasticJob;
import lombok.Getter;
import org.quartz.JobExecutionException;

import java.util.Collections;
import java.util.List;
Expand All @@ -46,7 +45,7 @@ public boolean processData(final JobExecutionMultipleShardingContext context, fi
}

@Override
public void handleJobExecutionException(final JobExecutionException jobExecutionException) throws JobExecutionException {
public void handleJobExecutionException(final JobException jobException) {
}

public static void reset() {
Expand Down
Loading

0 comments on commit b3774f2

Please sign in to comment.