Skip to content

Commit

Permalink
fix: follower should try clean pending closure to avoid memory leak (o…
Browse files Browse the repository at this point in the history
…penmessaging#309)

* fix: follower should try clean pending closure to avoid memory leak

Change-Id: I24b4cc67d66d7fba3ae4c46480da1af1c769479d

* fix: check timeout pending closureMap in follower role

Change-Id: I7e392579087171574e2a88d6d1cae21f0773510a

* fix: check timeout should check all since it may removed old index

Change-Id: Ia5bd6255188294bcf73130bee1669b7a646ab0ad

* optimize: check timeout more efficient

Change-Id: I103c0aad5bcc313cce02e091f7cd11bf4dd0eb32

* optimize the timeout future check

Change-Id: Id339e9e0d3e697c5bfcd3330c43ea751ed024911

---------

Co-authored-by: liwen.2022 <liwen.2022@bytedance.com>
  • Loading branch information
absolute8511 and liwen.2022 authored Aug 11, 2023
1 parent cfd4191 commit 0156c72
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,19 @@ public boolean completeResponseFuture(final ApplyEntry task) {
*/
public void checkResponseFuturesTimeout(final long beginIndex) {
final long term = this.memberState.currTerm();
long maxIndex = this.memberState.getCommittedIndex() + dLedgerConfig.getMaxPendingRequestsNum() + 1;
if (maxIndex > this.memberState.getLedgerEndIndex()) {
maxIndex = this.memberState.getLedgerEndIndex() + 1;
}
ConcurrentMap<Long, Closure> closureMap = this.pendingClosure.get(term);
if (closureMap != null) {
for (long i = beginIndex; i < Integer.MAX_VALUE; i++) {
if (closureMap != null && closureMap.size() > 0) {
for (long i = beginIndex; i < maxIndex; i++) {
Closure closure = closureMap.get(i);
if (closure == null) {
break;
// index may be removed for complete, we should continue scan
} else if (closure.isTimeOut()) {
closure.done(Status.error(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT));
closureMap.remove(i);
} else {
break;
}
Expand Down Expand Up @@ -254,6 +259,7 @@ private class QuorumAckChecker extends ShutdownAbleThread {

private long lastPrintWatermarkTimeMs = System.currentTimeMillis();
private long lastCheckLeakTimeMs = System.currentTimeMillis();
private long lastCheckTimeoutTimeMs = System.currentTimeMillis();

public QuorumAckChecker(Logger logger) {
super("QuorumAckChecker-" + memberState.getSelfId(), logger);
Expand All @@ -267,10 +273,7 @@ public void doWork() {
memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeforeBeginIndex(), dLedgerStore.getLedgerEndIndex(), memberState.getCommittedIndex(), JSON.toJSONString(peerWaterMarksByTerm), memberState.getAppliedIndex());
lastPrintWatermarkTimeMs = System.currentTimeMillis();
}
if (!memberState.isLeader()) {
waitForRunning(1);
return;
}

long currTerm = memberState.currTerm();
checkTermForPendingMap(currTerm, "QuorumAckChecker");
checkTermForWaterMark(currTerm, "QuorumAckChecker");
Expand Down Expand Up @@ -303,9 +306,15 @@ public void doWork() {
checkResponseFuturesElapsed(DLedgerEntryPusher.this.memberState.getAppliedIndex());
lastCheckLeakTimeMs = System.currentTimeMillis();
}

// clear the timeout pending closure which index > appliedIndex
checkResponseFuturesTimeout(DLedgerEntryPusher.this.memberState.getAppliedIndex() + 1);
if (DLedgerUtils.elapsed(lastCheckTimeoutTimeMs) > 1000) {
// clear the timeout pending closure should check all since it can timeout for different index
checkResponseFuturesTimeout(DLedgerEntryPusher.this.memberState.getAppliedIndex() + 1);
lastCheckTimeoutTimeMs = System.currentTimeMillis();
}
if (!memberState.isLeader()) {
waitForRunning(1);
return;
}

// update peer watermarks of self
updatePeerWaterMark(currTerm, memberState.getSelfId(), dLedgerStore.getLedgerEndIndex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.awaitility.core.AssertionCondition;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
Expand Down Expand Up @@ -88,15 +90,15 @@ public void testPushNetworkOffline() throws Exception {
futures.add(future);
}
Assertions.assertEquals(9, dLedgerServer0.getDLedgerStore().getLedgerEndIndex());
Thread.sleep(dLedgerServer0.getDLedgerConfig().getMaxWaitAckTimeMs() + 100);
Thread.sleep(dLedgerServer0.getDLedgerConfig().getMaxWaitAckTimeMs() + 1000);
for (int i = 0; i < futures.size(); i++) {
CompletableFuture<AppendEntryResponse> future = futures.get(i);
Assertions.assertTrue(future.isDone());
Assertions.assertEquals(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT.getCode(), future.get().getCode());
}

boolean hasWait = false;
for (int i = 0; i < dLedgerServer0.getDLedgerConfig().getMaxPendingRequestsNum(); i++) {
for (int i = 0; i < dLedgerServer0.getDLedgerConfig().getMaxPendingRequestsNum()+2; i++) {
AppendEntryRequest appendEntryRequest = new AppendEntryRequest();
appendEntryRequest.setGroup(group);
appendEntryRequest.setRemoteId(dLedgerServer0.getMemberState().getSelfId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,15 @@ public void testBatchPushNetworkOffline() throws Exception {
futures.add(future);
}
Assertions.assertEquals(9, dLedgerServer0.getDLedgerStore().getLedgerEndIndex());
Thread.sleep(dLedgerServer0.getDLedgerConfig().getMaxWaitAckTimeMs() + 100);
Thread.sleep(dLedgerServer0.getDLedgerConfig().getMaxWaitAckTimeMs() + 1000);
for (int i = 0; i < futures.size(); i++) {
CompletableFuture<AppendEntryResponse> future = futures.get(i);
Assertions.assertTrue(future.isDone());
Assertions.assertEquals(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT.getCode(), future.get().getCode());
}

boolean hasWait = false;
for (int i = 0; i < dLedgerServer0.getDLedgerConfig().getMaxPendingRequestsNum(); i++) {
for (int i = 0; i < dLedgerServer0.getDLedgerConfig().getMaxPendingRequestsNum() + 2; i++) {
AppendEntryRequest appendEntryRequest = new AppendEntryRequest();
appendEntryRequest.setGroup(group);
appendEntryRequest.setRemoteId(dLedgerServer0.getMemberState().getSelfId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ protected synchronized DLedgerServer launchServer(String group, String peers, St
config.setEnableLeaderElector(false);
config.setEnableDiskForceClean(false);
config.setDiskSpaceRatioToForceClean(0.90f);
config.setMaxPendingRequestsNum(1000);
DLedgerServer dLedgerServer = new DLedgerServer(config);
MemberState memberState = dLedgerServer.getMemberState();
memberState.setCurrTermForTest(0);
Expand Down

0 comments on commit 0156c72

Please sign in to comment.