Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delay send2 weight server #27

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
considering inSendingMsgNum when sending delay msgs grouped by real-t…
…ime broker to real-time broker
  • Loading branch information
xufeng.deng committed Jan 8, 2019
commit 9d0e531d18ebbe642a9dc4c5878da4253f48ba96
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
import com.google.common.collect.Maps;
import qunar.tc.qmq.broker.BrokerClusterInfo;
import qunar.tc.qmq.broker.BrokerGroupInfo;
import qunar.tc.qmq.broker.BrokerLoadBalance;
import qunar.tc.qmq.broker.BrokerService;
import qunar.tc.qmq.broker.impl.PollBrokerLoadBalance;
import qunar.tc.qmq.common.ClientType;
import qunar.tc.qmq.common.Disposable;
import qunar.tc.qmq.configuration.DynamicConfig;
import qunar.tc.qmq.delay.DelayLogFacade;
import qunar.tc.qmq.delay.ScheduleIndex;
import qunar.tc.qmq.delay.sender.loadbalance.InSendingNumWeightLoadBalancer;
import qunar.tc.qmq.delay.sender.loadbalance.LoadBalancer;

import java.util.List;
import java.util.Map;
Expand All @@ -42,15 +42,15 @@ class SenderExecutor implements Disposable {
private static final int DEFAULT_SEND_THREAD = 1;

private final ConcurrentMap<String, SenderGroup> groupSenders = new ConcurrentHashMap<>();
private final BrokerLoadBalance brokerLoadBalance;
private final Sender sender;
private final DelayLogFacade store;
private final int sendThreads;
private final LoadBalancer balancer;

SenderExecutor(final Sender sender, DelayLogFacade store, DynamicConfig sendConfig) {
this.sender = sender;
this.store = store;
this.brokerLoadBalance = PollBrokerLoadBalance.getInstance();
this.balancer = new InSendingNumWeightLoadBalancer();
this.sendThreads = sendConfig.getInt("delay.send.threads", DEFAULT_SEND_THREAD);
}

Expand All @@ -62,7 +62,7 @@ void execute(final List<ScheduleIndex> indexList, final SenderGroup.ResultHandle
}

private void doExecute(final SenderGroup group, final List<ScheduleIndex> list, final SenderGroup.ResultHandler handler) {
group.send(list, sender, handler);
group.send(list, sender, handler,balancer);
}

private Map<SenderGroup, List<ScheduleIndex>> groupByBroker(final List<ScheduleIndex> indexList, final BrokerService brokerService) {
Expand Down Expand Up @@ -100,7 +100,7 @@ private SenderGroup getGroup(BrokerGroupInfo groupInfo, int sendThreads) {

private BrokerGroupInfo loadGroup(String subject, BrokerService brokerService) {
BrokerClusterInfo cluster = brokerService.getClusterBySubject(ClientType.PRODUCER, subject);
return brokerLoadBalance.loadBalance(cluster, null);
return balancer.select(cluster);
}

private Map<String, List<ScheduleIndex>> groupBySubject(List<ScheduleIndex> list) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import qunar.tc.qmq.delay.DelayLogFacade;
import qunar.tc.qmq.delay.ScheduleIndex;
import qunar.tc.qmq.delay.monitor.QMon;
import qunar.tc.qmq.delay.sender.loadbalance.BrokerGroupStats;
import qunar.tc.qmq.delay.sender.loadbalance.LoadBalancer;
import qunar.tc.qmq.delay.store.model.ScheduleSetRecord;
import qunar.tc.qmq.metrics.Metrics;
import qunar.tc.qmq.netty.exception.ClientSendException;
Expand Down Expand Up @@ -62,33 +64,37 @@ public class SenderGroup implements Disposable {
this.executorService = new ThreadPoolExecutor(1, sendThreads, 1L, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(), new ThreadFactoryBuilder()
.setNameFormat("delay-sender-" + groupInfo.getGroupName() + "-%d").build());

Metrics.gauge("sendGroupQueueSize", new String[]{"brokerGroup"}, new String[]{groupInfo.getGroupName()}, () -> (double) executorService.getQueue().size());
}

public void send(final List<ScheduleIndex> records, final Sender sender, final ResultHandler handler) {
executorService.execute(() -> doSend(records, sender, handler));
public void send(final List<ScheduleIndex> records, final Sender sender, final ResultHandler handler, final LoadBalancer balancer) {
final BrokerGroupStats stats = balancer.getBrokerGroupStats(groupInfo.get());
stats.incrementToSendCount(records.size());
executorService.execute(() -> doSend(records, sender, handler, balancer));
}

private void doSend(final List<ScheduleIndex> batch, final Sender sender, final ResultHandler handler) {
private void doSend(final List<ScheduleIndex> batch, final Sender sender, final ResultHandler handler, final LoadBalancer balancer) {
BrokerGroupInfo groupInfo = this.groupInfo.get();
String groupName = groupInfo.getGroupName();
List<List<ScheduleIndex>> partitions = Lists.partition(batch, MAX_SEND_BATCH_SIZE);

for (List<ScheduleIndex> partition : partitions) {
send(sender, handler, groupInfo, groupName, partition);
send(sender, handler, groupInfo, partition, balancer);
}
}

private void send(Sender sender, ResultHandler handler, BrokerGroupInfo groupInfo, String groupName, List<ScheduleIndex> list) {
private void send(final Sender sender, final ResultHandler handler, final BrokerGroupInfo groupInfo, final List<ScheduleIndex> list, final LoadBalancer balancer) {
try {
long start = System.currentTimeMillis();
List<ScheduleSetRecord> records = store.recoverLogRecord(list);
QMon.loadMsgTime(System.currentTimeMillis() - start);

Datagram response = sendMessages(records, sender);
release(records);
monitor(list, groupName);
monitor(list, groupInfo.getGroupName());
if (response == null) {
handler.fail(list);
groupInfo.markFailed();
fail(list, groupInfo.getGroupName(), handler);
} else {
final int responseCode = response.getHeader().getCode();
final Map<String, SendResult> resultMap = getSendResult(response);
Expand All @@ -98,9 +104,7 @@ private void send(Sender sender, ResultHandler handler, BrokerGroupInfo groupInf
groupInfo.markFailed();
}

monitorSendFail(list, groupInfo.getGroupName());

handler.fail(list);
fail(list, groupInfo.getGroupName(), handler);
return;
}

Expand All @@ -121,11 +125,19 @@ private void send(Sender sender, ResultHandler handler, BrokerGroupInfo groupInf
handler.success(records, failedMessageIds);
}
} catch (Throwable e) {
LOGGER.error("sender group send batch failed,broker:{},batch size:{}", groupName, list.size(), e);
handler.fail(list);
LOGGER.error("sender group send batch failed,broker:{},batch size:{}", groupInfo.getGroupName(), list.size(), e);
fail(list, groupInfo.getGroupName(), handler);
} finally {
BrokerGroupStats stats = balancer.getBrokerGroupStats(groupInfo);
stats.decrementToSendCount(list.size());
}
}

private void fail(final List<ScheduleIndex> list, final String groupName, final ResultHandler handler) {
monitorSendFail(list, groupName);
handler.fail(list);
}

private void release(List<ScheduleSetRecord> records) {
for (ScheduleSetRecord record : records) {
record.release();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package qunar.tc.qmq.delay.sender.loadbalance;

import qunar.tc.qmq.broker.BrokerGroupInfo;

import java.util.concurrent.atomic.AtomicLong;

/**
* @author xufeng.deng dennisdxf@gmail.com
* @since 2019-01-08 15:08
*/
public class BrokerGroupStats {
private final BrokerGroupInfo brokerGroupInfo;

// send time

// send failed

// send success

// to send count
private final AtomicLong toSend;

public BrokerGroupStats(final BrokerGroupInfo brokerGroupInfo) {
this.brokerGroupInfo = brokerGroupInfo;
this.toSend = new AtomicLong(0);
}

public void incrementToSendCount(long toSendNum) {
toSend.addAndGet(toSendNum);
}

public void decrementToSendCount(long sendNum) {
long count = toSend.get();
toSend.set(count - sendNum);
}

long getToSendCount() {
return toSend.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package qunar.tc.qmq.delay.sender.loadbalance;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.broker.BrokerClusterInfo;
import qunar.tc.qmq.broker.BrokerGroupInfo;

import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* @author xufeng.deng dennisdxf@gmail.com
* @since 2019-01-08 16:03
*/
public class InSendingNumWeightLoadBalancer extends RandomLoadBalancer {
private static final Logger LOG = LoggerFactory.getLogger(InSendingNumWeightLoadBalancer.class);

private final LoadBalanceStats stats;

private volatile List<Long> accumulatedWeights = Collections.synchronizedList(new ArrayList<>());

private volatile List<BrokerGroupInfo> brokerGroups = Collections.synchronizedList(new ArrayList<>());

private final AtomicBoolean brokerWeightAssignmentInProgress = new AtomicBoolean(false);

private final Timer brokerWeightTimer;

private final Random random = new Random();

public InSendingNumWeightLoadBalancer() {
stats = new LoadBalanceStats();
brokerWeightTimer = new Timer("brokerWeightTimer", true);
scheduleBrokerWeight();
}

private void scheduleBrokerWeight() {
brokerWeightTimer.schedule(new DynamicBrokerGroupWeightTask(), 0, 1000);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
LOG.info("Stopping brokerWeightTimer.");
brokerWeightTimer.cancel();
}));
}

@Override
public BrokerGroupInfo select(BrokerClusterInfo clusterInfo) {
List<BrokerGroupInfo> arrivalGroups = clusterInfo.getGroups();
if (arrivalGroups == null || arrivalGroups.isEmpty()) return null;

List<BrokerGroupInfo> stayGroupInfos = getBrokerGroups();
int groupsSize = arrivalGroups.size();
refreshBrokerGroups(arrivalGroups, stayGroupInfos);

BrokerGroupInfo brokerGroupInfo = null;
List<Long> currentWeights = getAccumulatedWeights();
int cyclicCount = 0;
while (brokerGroupInfo == null && cyclicCount++ < groupsSize * 3) {
int brokerIndex = 0;
long maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1);
if (maxTotalWeight < 1000) {
brokerGroupInfo = super.select(clusterInfo);
} else {
long randomWeight = random.nextLong() * maxTotalWeight;
int n = 0;
for (Long l : currentWeights) {
if (l >= randomWeight) {
brokerIndex = n;
break;
} else {
++n;
}
}

brokerGroupInfo = stayGroupInfos.get(brokerIndex);

if (brokerGroupInfo == null) {
Thread.yield();
continue;
}

if (brokerGroupInfo.isAvailable()) {
return brokerGroupInfo;
}

brokerGroupInfo = null;
}
}

return brokerGroupInfo;
}

private void refreshBrokerGroups(List<BrokerGroupInfo> arrivalGroups, List<BrokerGroupInfo> stayBrokerGroups) {
Set<BrokerGroupInfo> oldSet = Sets.newHashSet(stayBrokerGroups);
Set<BrokerGroupInfo> newSet = Sets.newHashSet(arrivalGroups);
Set<BrokerGroupInfo> removals = Sets.difference(oldSet, newSet);
Set<BrokerGroupInfo> adds = Sets.difference(newSet, oldSet);
if (!removals.isEmpty() || !adds.isEmpty()) {
List<BrokerGroupInfo> attached = ImmutableList.copyOf(stayBrokerGroups);
attached.removeAll(removals);
attached.addAll(adds);
setBrokerGroups(attached);
}
}

class DynamicBrokerGroupWeightTask extends TimerTask {

@Override
public void run() {
BrokerWeight brokerWeight = new BrokerWeight();
try {
brokerWeight.maintainWeights();
} catch (Exception e) {
LOG.error("Error running DynamicBrokerGroupWeightTask.", e);
}
}
}

class BrokerWeight {
void maintainWeights() {
if (!brokerWeightAssignmentInProgress.compareAndSet(false, true)) {
return;
}

try {
LOG.info("weight adjusting job started.");
doMaintain();
} catch (Exception e) {
LOG.error("Error calculating broker weights.");
} finally {
brokerWeightAssignmentInProgress.set(false);
}
}

private void doMaintain() {
long total = 0;
List<BrokerGroupInfo> groups = getBrokerGroups();
for (BrokerGroupInfo brokerGroup : groups) {
final BrokerGroupStats brokerGroupStats = stats.getBrokerGroupStats(brokerGroup);
total += brokerGroupStats.getToSendCount();
}

long weightSoFar = 0;
List<Long> finalWeights = Lists.newArrayListWithCapacity(groups.size());
for (BrokerGroupInfo brokerGroup : groups) {
final BrokerGroupStats brokerGroupStats = stats.getBrokerGroupStats(brokerGroup);
long weight = total - brokerGroupStats.getToSendCount();
weightSoFar += weight;
finalWeights.add(weightSoFar);
}
setAccumulatedWeights(finalWeights);
}
}

@Override
public BrokerGroupStats getBrokerGroupStats(BrokerGroupInfo brokerGroupInfo) {
return stats.getBrokerGroupStats(brokerGroupInfo);
}

private void setAccumulatedWeights(final List<Long> weights) {
this.accumulatedWeights = weights;
}

private void setBrokerGroups(final List<BrokerGroupInfo> brokerGroups) {
this.brokerGroups = brokerGroups;
}

private List<BrokerGroupInfo> getBrokerGroups() {
return Collections.unmodifiableList(brokerGroups);
}

private List<Long> getAccumulatedWeights() {
return Collections.unmodifiableList(accumulatedWeights);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package qunar.tc.qmq.delay.sender.loadbalance;

import com.google.common.cache.*;
import qunar.tc.qmq.broker.BrokerGroupInfo;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
* @author xufeng.deng dennisdxf@gmail.com
* @since 2019-01-08 15:12
*/
class LoadBalanceStats {
private final LoadingCache<BrokerGroupInfo, BrokerGroupStats> brokerGroupStatsCache = CacheBuilder.newBuilder()
.expireAfterAccess(10, TimeUnit.MINUTES).build(new CacheLoader<BrokerGroupInfo, BrokerGroupStats>() {
@Override
public BrokerGroupStats load(BrokerGroupInfo key) throws Exception {
return createBrokerGroupStats(key);
}
});

private BrokerGroupStats createBrokerGroupStats(BrokerGroupInfo brokerGroupInfo) {
return new BrokerGroupStats(brokerGroupInfo);
}

BrokerGroupStats getBrokerGroupStats(final BrokerGroupInfo brokerGroup) {
try {
return brokerGroupStatsCache.get(brokerGroup);
} catch (ExecutionException e) {
BrokerGroupStats stats = createBrokerGroupStats(brokerGroup);
brokerGroupStatsCache.asMap().putIfAbsent(brokerGroup, stats);
return brokerGroupStatsCache.asMap().get(brokerGroup);
}
}

}
Loading