Skip to content

Commit

Permalink
fix streaming process not immediately effect issue.
Browse files Browse the repository at this point in the history
  • Loading branch information
haocao committed Jun 7, 2016
1 parent 7608a7d commit 9b83087
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 48 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public class MyElasticJob extends AbstractSimpleElasticJob {
http://www.dangdang.com/schema/ddframe/job/job.xsd
">
<!--配置作业注册中心 -->
<reg:zookeeper id="regCenter" server-lists=" yourhost:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />
<reg:zookeeper id="regCenter" server-lists="yourhost:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />

<!-- 配置作业-->
<job:simple id="myElasticJob" class="xxx.MyElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />
Expand Down
41 changes: 21 additions & 20 deletions elastic-job-console/src/main/webapp/templates/job_detail.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,28 @@
</div>
</div>
<#if jobType == "DATA_FLOW">
<div class="form-group">
<label for="concurrentDataProcessThreadCount" class="col-sm-2 control-label">处理数据的并发线程数</label>
<div class="col-sm-1">
<input type="number" id="concurrentDataProcessThreadCount" name="concurrentDataProcessThreadCount" class="form-control" data-toggle="tooltip" data-placement="bottom" title="只对高吞吐量处理数据流类型作业起作用" />
</div>

<label for="processCountIntervalSeconds" class="col-sm-2 control-label">统计处理数据量的间隔秒数</label>
<div class="col-sm-2">
<input type="number" id="processCountIntervalSeconds" name="processCountIntervalSeconds" class="form-control" data-toggle="tooltip" data-placement="bottom" title="只对处理数据流类型作业起作用" />
</div>

<label for="fetchDataCount" class="col-sm-2 control-label">每次抓取的数据量</label>
<div class="col-sm-2">
<input type="number" id="fetchDataCount" name="fetchDataCount" class="form-control" data-toggle="tooltip" data-placement="bottom" title="可在不重启作业的情况下灵活配置抓取数据量" />
</div>

<label for="fetchDataCount" class="col-sm-2 control-label">是否流式处理数据</label>
<div class="col-sm-2">
<input type="checkbox" id="streamingProcess" name="streamingProcess" data-toggle="tooltip" data-placement="bottom" title="如果流式处理数据, 则fetchData不返回空结果将持续执行作业; 如果非流式处理数据, 则处理数据完成后作业结束" />
<div class="form-group">
<label for="concurrentDataProcessThreadCount" class="col-sm-2 control-label">处理数据的并发线程数</label>
<div class="col-sm-1">
<input type="number" id="concurrentDataProcessThreadCount" name="concurrentDataProcessThreadCount" class="form-control" data-toggle="tooltip" data-placement="bottom" title="只对高吞吐量处理数据流类型作业起作用" />
</div>

<label for="processCountIntervalSeconds" class="col-sm-2 control-label">统计处理数据量的间隔秒数</label>
<div class="col-sm-2">
<input type="number" id="processCountIntervalSeconds" name="processCountIntervalSeconds" class="form-control" data-toggle="tooltip" data-placement="bottom" title="只对处理数据流类型作业起作用" />
</div>

<label for="fetchDataCount" class="col-sm-2 control-label">每次抓取的数据量</label>
<div class="col-sm-2">
<input type="number" id="fetchDataCount" name="fetchDataCount" class="form-control" data-toggle="tooltip" data-placement="bottom" title="可在不重启作业的情况下灵活配置抓取数据量" />
</div>
</div>
<div class="form-group">
<label for="fetchDataCount" class="col-sm-2 control-label">是否流式处理数据</label>
<div class="col-sm-2">
<input type="checkbox" id="streamingProcess" name="streamingProcess" data-toggle="tooltip" data-placement="bottom" title="如果流式处理数据, 则fetchData不返回空结果将持续执行作业; 如果非流式处理数据, 则处理数据完成后作业结束" />
</div>
</div>
</div>
</#if>
<div class="form-group">
<label for="maxTimeDiffSeconds" class="col-sm-2 control-label">最大容忍的本机与注册中心的时间误差秒数</label>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,12 @@ public boolean isExecuteMisfired(final List<Integer> shardingItems) {
/**
* 判断作业是否符合继续运行的条件.
*
* <p>如果作业停止或需要重分片则作业将不会继续运行.</p>
* <p>如果作业停止或需要重分片或非流式处理则作业将不会继续运行.</p>
*
* @return 作业是否符合继续运行的条件
*/
public boolean isEligibleForJobRunning() {
return !serverService.isJobPausedManually() && !shardingService.isNeedSharding();
return !serverService.isJobPausedManually() && !shardingService.isNeedSharding() && configService.isStreamingProcess();
}

/**判断是否需要重分片.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,55 +91,55 @@ public void setUp() throws NoSuchFieldException {
}

@Test
public void testGetJobName() {
public void assertGetJobName() {
when(configService.getJobName()).thenReturn("testJob");
assertThat(jobFacade.getJobName(), is("testJob"));
}

@Test
public void testCheckMaxTimeDiffSecondsTolerable() {
public void assertCheckMaxTimeDiffSecondsTolerable() {
jobFacade.checkMaxTimeDiffSecondsTolerable();
verify(configService).checkMaxTimeDiffSecondsTolerable();
}

@Test
public void testIsStreamingProcess() {
public void assertIsStreamingProcess() {
when(configService.isStreamingProcess()).thenReturn(false);
assertThat(jobFacade.isStreamingProcess(), is(false));
}

@Test
public void testFailoverIfUnnecessary() {
public void assertFailoverIfUnnecessary() {
when(configService.isFailover()).thenReturn(false);
jobFacade.failoverIfNecessary();
verify(failoverService, times(0)).failoverIfNecessary();
}

@Test
public void testFailoverIfNecessaryButIsPaused() {
public void assertFailoverIfNecessaryButIsPaused() {
when(configService.isFailover()).thenReturn(true);
when(serverService.isJobPausedManually()).thenReturn(true);
jobFacade.failoverIfNecessary();
verify(failoverService, times(0)).failoverIfNecessary();
}

@Test
public void testFailoverIfNecessary() {
public void assertFailoverIfNecessary() {
when(configService.isFailover()).thenReturn(true);
when(serverService.isJobPausedManually()).thenReturn(false);
jobFacade.failoverIfNecessary();
verify(failoverService).failoverIfNecessary();
}

@Test
public void testRegisterJobBegin() {
public void assertRegisterJobBegin() {
JobExecutionMultipleShardingContext shardingContext = new JobExecutionMultipleShardingContext();
jobFacade.registerJobBegin(shardingContext);
verify(executionService).registerJobBegin(shardingContext);
}

@Test
public void testRegisterJobCompletedWhenFailoverDisabled() {
public void assertRegisterJobCompletedWhenFailoverDisabled() {
JobExecutionMultipleShardingContext shardingContext = new JobExecutionMultipleShardingContext();
when(configService.isFailover()).thenReturn(false);
jobFacade.registerJobCompleted(shardingContext);
Expand All @@ -148,7 +148,7 @@ public void testRegisterJobCompletedWhenFailoverDisabled() {
}

@Test
public void testRegisterJobCompletedWhenFailoverEnabled() {
public void assertRegisterJobCompletedWhenFailoverEnabled() {
JobExecutionMultipleShardingContext shardingContext = new JobExecutionMultipleShardingContext();
when(configService.isFailover()).thenReturn(true);
jobFacade.registerJobCompleted(shardingContext);
Expand All @@ -157,7 +157,7 @@ public void testRegisterJobCompletedWhenFailoverEnabled() {
}

@Test
public void testGetShardingContextWhenIsFailoverEnableAndFailover() {
public void assertGetShardingContextWhenIsFailoverEnableAndFailover() {
JobExecutionMultipleShardingContext shardingContext = new JobExecutionMultipleShardingContext();
when(configService.isFailover()).thenReturn(true);
when(failoverService.getLocalHostFailoverItems()).thenReturn(Collections.singletonList(1));
Expand All @@ -167,7 +167,7 @@ public void testGetShardingContextWhenIsFailoverEnableAndFailover() {
}

@Test
public void testGetShardingContextWhenIsFailoverEnableAndNotFailover() {
public void assertGetShardingContextWhenIsFailoverEnableAndNotFailover() {
JobExecutionMultipleShardingContext shardingContext = new JobExecutionMultipleShardingContext();
when(configService.isFailover()).thenReturn(true);
when(failoverService.getLocalHostFailoverItems()).thenReturn(Collections.<Integer>emptyList());
Expand All @@ -179,7 +179,7 @@ public void testGetShardingContextWhenIsFailoverEnableAndNotFailover() {
}

@Test
public void testGetShardingContextWhenIsFailoverDisable() {
public void assertGetShardingContextWhenIsFailoverDisable() {
JobExecutionMultipleShardingContext shardingContext = new JobExecutionMultipleShardingContext();
when(configService.isFailover()).thenReturn(false);
when(shardingService.getLocalHostShardingItems()).thenReturn(Lists.newArrayList(0, 1));
Expand All @@ -189,44 +189,76 @@ public void testGetShardingContextWhenIsFailoverDisable() {
}

@Test
public void testMisfireIfNecessary() {
public void assertMisfireIfNecessary() {
when(executionService.misfireIfNecessary(Arrays.asList(0, 1))).thenReturn(true);
assertThat(jobFacade.misfireIfNecessary(Arrays.asList(0, 1)), is(true));
}

@Test
public void testClearMisfire() {
public void assertClearMisfire() {
jobFacade.clearMisfire(Arrays.asList(0, 1));
verify(executionService).clearMisfire(Arrays.asList(0, 1));
}

@Test
public void testIsNeedSharding() {
public void assertIsNeedSharding() {
when(shardingService.isNeedSharding()).thenReturn(true);
assertThat(jobFacade.isNeedSharding(), is(true));
}

@Test
public void testUpdateOffset() {
public void assertUpdateOffset() {
jobFacade.updateOffset(0, "offset0");
verify(offsetService).updateOffset(0, "offset0");
}

@Test
public void testCleanPreviousExecutionInfo() {
public void assertCleanPreviousExecutionInfo() {
jobFacade.cleanPreviousExecutionInfo();
verify(executionService).cleanPreviousExecutionInfo();
}

@Test
public void testBeforeJobExecuted() {
public void assertBeforeJobExecuted() {
jobFacade.beforeJobExecuted(new JobExecutionMultipleShardingContext());
verify(caller).before();
}

@Test
public void testAfterJobExecuted() {
public void assertAfterJobExecuted() {
jobFacade.afterJobExecuted(new JobExecutionMultipleShardingContext());
verify(caller).after();
}

@Test
public void assertNotEligibleForJobRunningWhenJobPausedManually() {
when(serverService.isJobPausedManually()).thenReturn(true);
assertThat(jobFacade.isEligibleForJobRunning(), is(false));
verify(serverService).isJobPausedManually();
}

@Test
public void assertNotEligibleForJobRunningWhenNeedSharding() {
when(shardingService.isNeedSharding()).thenReturn(true);
assertThat(jobFacade.isEligibleForJobRunning(), is(false));
verify(shardingService).isNeedSharding();
}

@Test
public void assertNotEligibleForJobRunningWhenUnStreamingProcess() {
when(configService.isStreamingProcess()).thenReturn(false);
assertThat(jobFacade.isEligibleForJobRunning(), is(false));
verify(configService).isStreamingProcess();
}

@Test
public void assertEligibleForJobRunningWhenNotJobPausedManuallyAndNotNeedShardingAndStreamingProcess() {
when(serverService.isJobPausedManually()).thenReturn(false);
when(shardingService.isNeedSharding()).thenReturn(false);
when(configService.isStreamingProcess()).thenReturn(true);
assertThat(jobFacade.isEligibleForJobRunning(), is(true));
verify(serverService).isJobPausedManually();
verify(shardingService).isNeedSharding();
verify(configService).isStreamingProcess();
}
}
4 changes: 2 additions & 2 deletions elastic-job-doc/content/index/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ public class MyElasticJob extends AbstractThroughputDataFlowElasticJob<Foo> {
http://www.dangdang.com/schema/ddframe/job/job.xsd
">
<!--配置作业注册中心 -->
<reg:zookeeper id="regCenter" server-lists=" yourhost:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" maxRetries="3" />
<reg:zookeeper id="regCenter" server-lists=" yourhost:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />

<!-- 配置作业-->
<job:simple id="oneOffElasticJob" class="xxx.MyElasticJob" reg-center="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />
<job:simple id="oneOffElasticJob" class="xxx.MyElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />
</beans>
```
2 changes: 1 addition & 1 deletion elastic-job-doc/content/post/directory_structure.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ weight=4

## elastic-job-api

`elastic-job`生命周期操作的API,可独立使用。
`elastic-job`生命周期操作的`API`,可独立使用。

## elastic-job-spring

Expand Down
2 changes: 1 addition & 1 deletion elastic-job-doc/content/post/release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ weight=1

# Release Notes

## 1.1.0-SNAPSHOT
## 1.1.0

### 结构调整

Expand Down
1 change: 1 addition & 0 deletions elastic-job-doc/content/post/theory.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ weight=10
| processCountIntervalSeconds || 统计作业处理数据数量的间隔时间 |
| concurrentDataProcessThreadCount || 同时处理数据的并发线程数 |
| fetchDataCount || 每次抓取的数据量 |
| streaming-process || 是否流式处理数据<br />如果流式处理数据, 则`fetchData`不返回空结果将持续执行作业<br />如果非流式处理数据, 则处理数据完成后作业结束<br />|
| maxTimeDiffSeconds || 允许的本机与注册中心的时间误差秒数 |
| failover || 是否开启失效转移 |
| misfire || 是否开启错过任务重新执行 |
Expand Down
8 changes: 7 additions & 1 deletion elastic-job-doc/content/post/update_notes_1.1.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ weight=2

# elastic-job 1.1.0升级说明

基于扩展性提升,概念明晰和命名规范化的考虑,`elastic-job 1.1.0`版本决定抛弃原有包袱的束缚,重新定义了`JAVA API``Spring`命名空间并且删除了已废弃的API。`elastic-job 1.1.0`作为里程碑版本发布,除了API改动并未做功能上的修改,希望通过标准化配置的方式为未来的新`elastic-job`功能的开发打下良好的基础。
* 基于扩展性提升,概念明晰和命名规范化的考虑,`elastic-job 1.1.0`版本决定抛弃原有包袱的束缚,重新定义了`JAVA API``Spring`命名空间并且删除了已废弃的`API`

* `elastic-job 1.1.0`作为里程碑版本发布,除了`API`改动并未做功能上的修改,希望通过标准化配置的方式为未来的新`elastic-job`功能的开发打下良好的基础。

## 重新定义JAVA API

Expand All @@ -20,8 +22,12 @@ weight=2

* `Spring`命名空间属性由驼峰式修正为`Spring`命名空间标准命名规范(多单词以`-`分隔)。

* 作业的`Spring`命名空间属性`regCenter`变更为`registry-center-ref`

## 废弃过时API

* 删除废弃作业类,包括`AbstractOneOffElasticJob``AbstractPerpetualElasticJob``AbstractSequencePerpetualElasticJob`

* 删除废弃作业调度器类,包括`com.dangdang.ddframe.job.schedule.JobController``com.dangdang.ddframe.job.spring.schedule.SpringJobController`

* 不再支持非`Spring`命名空间通过`xml`方式配置`bean`,如有需要请使用`Spring Java Config`
2 changes: 1 addition & 1 deletion elastic-job-doc/content/post/user_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ job:dataflow命名空间拥有job:simple命名空间的全部属性,以下仅
|process-count-interval-seconds |int ||300 | 统计作业处理数据数量的间隔时间<br />单位:秒<br /> |
|concurrent-data-process-thread-count|int ||1 | 同时处理数据的并发线程数<br />不能小于1<br />仅`ThroughputDataFlow`作业有效 |
|fetch-data-count |int ||1 | 每次抓取的数据量 |
|streaming-process |boolean||false| 是否流式处理数据<br />如果流式处理数据, 则fetchData不返回空结果将持续执行作业<br />如果非流式处理数据, 则处理数据完成后作业结束<br />|
|streaming-process |boolean||false| 是否流式处理数据<br />如果流式处理数据, `fetchData`不返回空结果将持续执行作业<br />如果非流式处理数据, 则处理数据完成后作业结束<br />|

#### job:script命名空间属性详细说明,基本属性参照job:simple命名空间属性详细说明

Expand Down

0 comments on commit 9b83087

Please sign in to comment.