Skip to content

Commit

Permalink
bugfix: 集群环境下有重复seq时,消息重发任务无法取消
Browse files Browse the repository at this point in the history
bugfix: 生成系统毫秒的缓存代码有问题
  • Loading branch information
Lihuanghe committed Sep 10, 2015
1 parent ff36271 commit 0ad31b8
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 20 deletions.
23 changes: 13 additions & 10 deletions src/main/java/com/zx/sms/common/util/CachedMillisecondClock.java
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@
package com.zx.sms.common.util;

/**
*由于System.currentTimeMillis()性能问题,缓存当前时间,每1s更新一次
* 由于System.currentTimeMillis()性能问题,缓存当前时间,每1s更新一次
*/
public enum CachedMillisecondClock {
INS;
private volatile long now = 0;// 当前时间

private CachedMillisecondClock() {
this.now = System.currentTimeMillis();
start();
}

private void start() {
new Thread(new Runnable() {
new Thread(null,new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {

while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
now = System.currentTimeMillis();
}
now = System.currentTimeMillis();
}
}).start();
},"CachedMillisecondClockUpdater").start();
}

public long now() {
return now;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.lang.math.RandomUtils;

/**
* @author huzorro(huzorro@gmail.com)
*
Expand Down Expand Up @@ -65,5 +67,5 @@ public static long getNextAtomicValue(AtomicLong atomicObj,long limited){
}

private final static long Limited = 0x7fffffffffff0000L;
private final static AtomicLong sequenceId = new AtomicLong();
private final static AtomicLong sequenceId = new AtomicLong(Math.abs(RandomUtils.nextInt()));
}
41 changes: 32 additions & 9 deletions src/main/java/com/zx/sms/session/cmpp/SessionStateManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -355,23 +355,46 @@ private void preSendMsg(ChannelHandlerContext ctx) {
/**
* 发送msg,首先做消息持久化
*/
private void safewrite(ChannelHandlerContext ctx, final Message message, ChannelPromise promise) {
private void safewrite(final ChannelHandlerContext ctx, final Message message, final ChannelPromise promise) {
if (ctx.channel().isActive()) {
final Long seq = message.getHeader().getSequenceId();

message.incrementAndGetRequests();
msgWriteCount++;

// 记录已发送的请求,在发送msg前生记录到map里。防止生成retryTask前就收到resp的情况发生
Entry tmpentry = new Entry(message);
msgRetryMap.put(seq, tmpentry);
// 持久化到队列
storeMap.put(seq, message);

Entry old = msgRetryMap.putIfAbsent(seq, tmpentry);

if(old !=null) {
// bugfix: 集群环境下可能产生相同的seq. 如果已经存在一个相同的seq.
//此消息延迟250ms再发
logger.error("has repeat Sequense {}",seq);
EventLoopGroupFactory.INS.getMsgResend().schedule(new Runnable(){
@Override
public void run() {
try {
write(ctx,message,promise);
} catch (Exception e) {
logger.error("has repeat Sequense ,and write Msg err {}",message);
}
}

}, 250, TimeUnit.MILLISECONDS);

}else{
msgWriteCount++;
// 持久化到队列
storeMap.put(seq, message);

ctx.write(message, promise);
ctx.write(message, promise);

// 注册重试任务
scheduleRetryMsg(ctx, message, promise);
ctx.flush();
}


// 注册重试任务
scheduleRetryMsg(ctx, message, promise);
ctx.flush();
} else {
// 如果连接已关闭,通知上层应用
if (promise != null && (!promise.isDone()))
Expand Down

0 comments on commit 0ad31b8

Please sign in to comment.