Skip to content

Commit

Permalink
springside#221 加入对Jedis的封装,代码改进与注释
Browse files Browse the repository at this point in the history
  • Loading branch information
calvin1978 committed Jul 11, 2013
1 parent d842e55 commit 7cc72a0
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public static void main(String[] args) throws Exception {
if (c == '\n') {
System.out.println("Shutting down");
threadPool.shutdownNow();
boolean shutdownSucess = threadPool.awaitTermination(JobListener.POPUP_TIMEOUT + 1,
boolean shutdownSucess = threadPool.awaitTermination(JobListener.DEFAULT_POPUP_TIMEOUT + 1,
TimeUnit.SECONDS);

if (!shutdownSucess) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ public Object execute(final String hash, final String[] keys, final String[] arg
*/
public Object execute(final String hash, final List<String> keys, final List<String> args)
throws IllegalArgumentException {

final long start = System.currentTimeMillis();

if (!hashScriptMap.containsKey(hash)) {
throw new IllegalArgumentException("Script hash " + hash + " is not loaded in executor。");
}
Expand All @@ -66,12 +69,16 @@ public Object execute(final String hash, final List<String> keys, final List<Str
return jedisTemplate.execute(new JedisAction<Object>() {
@Override
public Object action(Jedis jedis) {
return jedis.evalsha(hash, keys, args);
Object result = jedis.evalsha(hash, keys, args);
logger.debug("Script hash {} execution time is {}ms", hash, System.currentTimeMillis() - start);
return result;
}
});
} catch (JedisDataException e) {
logger.warn("Lua execution error, try to reload the script.", e);
return reloadAndExecute(hash, keys, args);
logger.warn("Script hash {} is not loaded yet, try to reload and run it again", hash, e);
Object result = reloadAndExecute(hash, keys, args);
logger.debug("Script hash {} reload and execution time is {}ms", hash, System.currentTimeMillis() - start);
return result;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,24 @@
import org.slf4j.LoggerFactory;
import org.springframework.core.io.DefaultResourceLoader;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springside.modules.nosql.redis.JedisScriptExecutor;
import org.springside.modules.utils.Threads.WrapExceptionRunnable;

import redis.clients.jedis.JedisPool;

import com.google.common.collect.Lists;

/**
* 定时分发任务。 启动线程定时从sleeping job sorted set 中取出到期的任务放入ready job list.
*
* @author calvin
*/
public class JobDispatcher implements Runnable {
public static final String DEFAULT_DISPATCH_LUA_FILE = "classpath:/redis/dispatch.lua";

private static Logger logger = LoggerFactory.getLogger(JobDispatcher.class);

private ScheduledExecutorService threadPool;
private ScheduledExecutorService scheduledThreadPool;

private JedisScriptExecutor scriptExecutor;

Expand All @@ -37,44 +41,53 @@ public JobDispatcher(String jobName, JedisPool jedisPool) {
}

public JobDispatcher(String jobName, JedisPool jedisPool, String scriptPath) {
keys = Lists.newArrayList(Keys.getSleepingJobKey(jobName), Keys.getReadyJobKey(jobName));
this.scriptExecutor = new JedisScriptExecutor(jedisPool);
keys = Lists.newArrayList(jobName + ".job:sleeping", jobName + ".job:ready");
loadLuaScript(scriptPath);
}

private void loadLuaScript(String scriptPath) {
ResourceLoader resourceLoader = new DefaultResourceLoader();
Resource resource = resourceLoader.getResource(scriptPath);
String script;
try {
String script = FileUtils.readFileToString(resource.getFile());
scriptHash = scriptExecutor.load(script);
Resource resource = new DefaultResourceLoader().getResource(scriptPath);
script = FileUtils.readFileToString(resource.getFile());
} catch (IOException e) {
throw new IllegalStateException(DEFAULT_DISPATCH_LUA_FILE + "not exist", e);
throw new IllegalArgumentException(scriptPath + " is not exist.", e);
}

scriptHash = scriptExecutor.load(script);
}

/**
* 启动分发线程, 自行创建scheduler线程池.
*/
public void start(long periodMilliseconds) {
threadPool = Executors.newScheduledThreadPool(1);
threadPool.scheduleAtFixedRate(new WrapExceptionRunnable(this), 0, periodMilliseconds, TimeUnit.MILLISECONDS);
this.scheduledThreadPool = Executors.newScheduledThreadPool(1);
scheduledThreadPool.scheduleAtFixedRate(new WrapExceptionRunnable(this), 0, periodMilliseconds,
TimeUnit.MILLISECONDS);
}

/**
* 停止分发任务, 默认最多延时10秒等候线程关闭.
*/
public void stop() {
threadPool.shutdownNow();
scheduledThreadPool.shutdownNow();
try {
if (!threadPool.awaitTermination(5, TimeUnit.SECONDS)) {
if (!scheduledThreadPool.awaitTermination(10, TimeUnit.SECONDS)) {
logger.error("Job dispatcher terminate failed!");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

/**
* 以当前时间为参数执行Lua Script分发任务。
*/
@Override
public void run() {
long currTime = System.currentTimeMillis();
List<String> args = Lists.newArrayList(String.valueOf(currTime));
scriptExecutor.execute(scriptHash, keys, args);
long luaExecTime = System.currentTimeMillis() - currTime;
logger.debug("Execution Time={}ms.", luaExecTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,21 @@
import redis.clients.jedis.JedisPool;

/**
* This is the Redis implementation of SchedulerManager.
* 阻塞接收任务的Runnable.
*/
public class JobListener implements Runnable {

public static final int POPUP_TIMEOUT = 5;
public static final int DEFAULT_POPUP_TIMEOUT = 5;

private String readyJobName;
private String readyJobKey;

private JedisTemplate jedisTemplate = null;
private JedisTemplate jedisTemplate;

private final JobHandler jobHandler;

public JobListener(String jobName, JedisPool jedisPool, JobHandler jobHandler) {
jedisTemplate = new JedisTemplate(jedisPool);
readyJobName = jobName + ".job:ready";
readyJobKey = Keys.getReadyJobKey(jobName);
this.jobHandler = jobHandler;
}

Expand All @@ -42,7 +42,7 @@ public void run() {
@Override
public void action(Jedis jedis) {
while (!Thread.currentThread().isInterrupted()) {
List<String> nameValuePair = jedis.brpop(POPUP_TIMEOUT, readyJobName);
List<String> nameValuePair = jedis.brpop(DEFAULT_POPUP_TIMEOUT, readyJobKey);
if ((nameValuePair != null) && !nameValuePair.isEmpty()) {
String job = nameValuePair.get(1);
jobHandler.handleJob(job);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,28 @@ public class JobManager {

private static Logger logger = LoggerFactory.getLogger(JobManager.class);

private String sleepingJobName;
private String sleepingJobKey;

private JedisTemplate jedisTemplate;

public JobManager(String jobName, JedisPool jedisPool) {
jedisTemplate = new JedisTemplate(jedisPool);
sleepingJobName = jobName + ".job:sleeping";
sleepingJobKey = Keys.getSleepingJobKey(jobName);
}

/**
* 安排任务.
*/
public void scheduleJob(final String job, final long delay, final TimeUnit timeUnit) {
final long delayTimeInMillisecond = System.currentTimeMillis() + timeUnit.toMillis(delay);
jedisTemplate.zadd(sleepingJobName, delayTimeInMillisecond, job);
jedisTemplate.zadd(sleepingJobKey, delayTimeInMillisecond, job);
}

/**
* 取消任务,如果任务不存在或已触发返回false, 否则返回true.
* 取消任务, 如果任务不存在或已被触发返回false, 否则返回true.
*/
public boolean cancelJob(final String job) {
boolean removed = jedisTemplate.zrem(sleepingJobName, job);
boolean removed = jedisTemplate.zrem(sleepingJobKey, job);

if (!removed) {
logger.warn("Can't cancel job by value {}", job);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.springside.modules.nosql.redis.scheduler;

public class Keys {

public static String getSleepingJobKey(String jobName) {
return new StringBuilder().append("job:").append(jobName).append(":sleepingjob").toString();
}

public static String getReadyJobKey(String jobName) {
return new StringBuilder().append("job:").append(jobName).append(":sleepingjob").toString();
}
}

0 comments on commit 7cc72a0

Please sign in to comment.