Skip to content

Commit

Permalink
fix: fix some problems in snapshot mode
Browse files Browse the repository at this point in the history
1. fix some problems in snapshot mode
  • Loading branch information
TheR1sing3un committed Jan 3, 2023
1 parent c536df0 commit 8c2b026
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ public void doWork() {
.collect(Collectors.toList());
long quorumIndex = sortedWaterMarks.get(sortedWaterMarks.size() / 2);
final Optional<StateMachineCaller> fsmCaller = DLedgerEntryPusher.this.fsmCaller;
if (quorumIndex == this.lastQuorumIndex) return;
if (fsmCaller.isPresent()) {
// If there exist statemachine
DLedgerEntryPusher.this.dLedgerStore.updateCommittedIndex(currTerm, quorumIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public class SnapshotManager {
public static final String SNAPSHOT_TEMP_DIR = "tmp";

private DLedgerServer dLedgerServer;
private long lastSnapshotIndex;
private long lastSnapshotTerm;
private long lastSnapshotIndex = -1;
private long lastSnapshotTerm = -1;
private final SnapshotStore snapshotStore;
private volatile boolean savingSnapshot;
private volatile boolean loadingSnapshot;
Expand Down Expand Up @@ -126,7 +126,7 @@ public void saveSnapshot(DLedgerEntry dLedgerEntry) {
return;
}
// Check if applied index reaching the snapshot threshold
if (dLedgerEntry.getIndex() - this.lastSnapshotIndex <= this.dLedgerServer.getDLedgerConfig().getSnapshotThreshold()) {
if (dLedgerEntry.getIndex() - this.lastSnapshotIndex < this.dLedgerServer.getDLedgerConfig().getSnapshotThreshold()) {
return;
}
// Create snapshot writer
Expand Down Expand Up @@ -164,7 +164,6 @@ private void saveSnapshotAfter(SnapshotWriter writer, SnapshotMeta snapshotMeta,
CompletableFuture.runAsync(() -> {
truncatePrefix(dLedgerEntry);
});
//truncatePrefix(dLedgerEntry);
} else {
logger.error("Unable to save snapshot");
}
Expand Down Expand Up @@ -222,6 +221,7 @@ private void loadSnapshotAfter(SnapshotReader reader, SnapshotMeta snapshotMeta,
this.lastSnapshotIndex = snapshotMeta.getLastIncludedIndex();
this.lastSnapshotTerm = snapshotMeta.getLastIncludedTerm();
this.loadingSnapshot = false;
this.dLedgerServer.getDLedgerStore().updateIndexAfterLoadingSnapshot(this.lastSnapshotIndex, this.lastSnapshotTerm);
logger.info("Snapshot {} loaded successfully", snapshotMeta);
} else {
// Stop the loading process if the snapshot is expired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,6 @@ public void setSnapshotMeta(SnapshotMeta snapshotMeta) {
}

public long getSnapshotIndex() {
return this.snapshotMeta != null ? this.snapshotMeta.getLastIncludedIndex() : 0;
return this.snapshotMeta != null ? this.snapshotMeta.getLastIncludedIndex() : -1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,11 @@ private void doCommitted(final long committedIndex) {
if (this.error != null) {
return;
}
if (this.snapshotManager.isLoadingSnapshot()) {
if (this.snapshotManager.isLoadingSnapshot() || this.snapshotManager.isSavingSnapshot()) {
this.scheduledExecutorService.schedule(() -> {
try {
onCommitted(committedIndex);
logger.info("Still loading snapshot, retry the commit task later");
logger.info("Still loading or saving snapshot, retry the commit task later");
} catch (Throwable e) {
e.printStackTrace();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,28 @@ public long truncate(DLedgerEntry entry, long leaderTerm, String leaderId) {
return appendAsFollower(entry, leaderTerm, leaderId).getIndex();
}

@Override
public void resetOffsetAfterSnapshot(DLedgerEntry entry) {

}

@Override
public void updateIndexAfterLoadingSnapshot(long lastIncludedIndex, long lastIncludedTerm) {
this.ledgerBeforeBeginIndex = lastIncludedIndex;
this.ledgerEndIndex = lastIncludedIndex;
this.ledgerEndTerm = lastIncludedTerm;
}

@Override
public void startup() {

}

@Override
public void shutdown() {

}

@Override
public DLedgerEntry appendAsFollower(DLedgerEntry entry, long leaderTerm, String leaderId) {
PreConditions.check(memberState.isFollower(), DLedgerResponseCode.NOT_FOLLOWER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,12 @@ public long truncate(DLedgerEntry entry, long leaderTerm, String leaderId) {
return -1;
}

public void resetOffsetAfterSnapshot(DLedgerEntry entry) {
public abstract void resetOffsetAfterSnapshot(DLedgerEntry entry);

}

public void startup() {
public abstract void updateIndexAfterLoadingSnapshot(long lastIncludedIndex, long lastIncludedTerm);

}
public abstract void startup();

public void shutdown() {
public abstract void shutdown();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class DLedgerMmapFileStore extends DLedgerStore {
private static final Logger LOGGER = LoggerFactory.getLogger(DLedgerMmapFileStore.class);
public List<AppendHook> appendHooks = new ArrayList<>();

private long ledgerBeforeBeginIndex = -1;
private volatile long ledgerBeforeBeginIndex = -1;
private long ledgerBeginIndex = -1;
private long ledgerEndIndex = -1;
private long committedIndex = -1;
Expand Down Expand Up @@ -303,7 +303,6 @@ public void recover() {
DLedgerEntry entry = get(lastEntryIndex);
PreConditions.check(entry != null, DLedgerResponseCode.DISK_ERROR, "recheck get null entry");
PreConditions.check(entry.getIndex() == lastEntryIndex, DLedgerResponseCode.DISK_ERROR, "recheck index %d != %d", entry.getIndex(), lastEntryIndex);
reviseLedgerBeforeBeginIndex();
}
this.dataFileList.updateWherePosition(processOffset);
this.dataFileList.truncateOffset(processOffset);
Expand Down Expand Up @@ -346,7 +345,6 @@ private void reviseLedgerBeginIndex() {

private void reviseLedgerBeforeBeginIndex() {
// get ledger begin index
System.out.println(this.memberState.getSelfId() + " start to revise before index, now before index = " + this.ledgerBeforeBeginIndex);
MmapFile firstFile = dataFileList.getFirstMappedFile();
SelectMmapBufferResult sbr = firstFile.selectMappedBuffer(0);
try {
Expand All @@ -360,7 +358,6 @@ private void reviseLedgerBeforeBeginIndex() {
}
// begin index
long beginIndex = tmpBuffer.getLong();
System.out.println(this.memberState.getSelfId() + " update before index from " + this.ledgerBeginIndex + " to " + (beginIndex - 1));
this.ledgerBeforeBeginIndex = beginIndex - 1;
indexFileList.resetOffset(beginIndex * INDEX_UNIT_SIZE);
} finally {
Expand Down Expand Up @@ -501,14 +498,21 @@ public void resetOffsetAfterSnapshot(DLedgerEntry entry) {
if (entry.getIndex() <= this.ledgerBeforeBeginIndex) {
return;
}
System.out.println(this.memberState.getSelfId() + " reset offset after snapshot, now before index = " + this.ledgerBeforeBeginIndex + ", snapshot last included index = " + entry.getIndex());
long resetPos = entry.getPos() + entry.getSize();
dataFileList.resetOffset(resetPos);
long resetIndexOffset = entry.getIndex() * INDEX_UNIT_SIZE;
indexFileList.resetOffset(resetIndexOffset);
// reset ledgerBeforeBeginIndex
System.out.println(this.memberState.getSelfId() + " update before index from " + this.ledgerBeginIndex + " to " + entry.getIndex());
this.ledgerBeforeBeginIndex = entry.getIndex();
synchronized (this.memberState) {
long resetPos = entry.getPos() + entry.getSize();
dataFileList.resetOffset(resetPos);
long resetIndexOffset = entry.getIndex() * INDEX_UNIT_SIZE;
indexFileList.resetOffset(resetIndexOffset);
// reset ledgerBeforeBeginIndex
this.ledgerBeforeBeginIndex = entry.getIndex();
}
}

@Override
public void updateIndexAfterLoadingSnapshot(long lastIncludedIndex, long lastIncludedTerm) {
this.ledgerBeforeBeginIndex = lastIncludedIndex;
this.ledgerEndIndex = lastIncludedIndex;
this.ledgerEndTerm = lastIncludedTerm;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,33 +41,33 @@ public void testSaveAndLoadSnapshot() throws InterruptedException {
};
// Launch client
DLedgerClient dLedgerClient = launchClient(group, peers.split(";")[0]);
for (int i = 0; i < 100; i++) {
// append 99 entries, each 10 entries will trigger one snapshotting
for (int i = 0; i < 99; i++) {
AppendEntryResponse appendEntryResponse = dLedgerClient.append(new byte[512]);
assertEquals(DLedgerResponseCode.SUCCESS.getCode(), appendEntryResponse.getCode());
assertEquals(i, appendEntryResponse.getIndex());
}
Thread.sleep(5000);
Thread.sleep(2000);
for (DLedgerServer server : serverList) {
assertEquals(99, server.getDLedgerStore().getLedgerEndIndex());
assertEquals(99, server.getDLedgerStore().getLedgerBeforeBeginIndex());
assertEquals(98, server.getDLedgerStore().getLedgerEndIndex());
assertEquals(89, server.getDLedgerStore().getLedgerBeforeBeginIndex());
// check statemachine
final MockStateMachine fsm = (MockStateMachine) server.getStateMachine();
assertEquals(99, fsm.getAppliedIndex());
assertEquals(100, fsm.getTotalEntries());
assertEquals(99, fsm.getTotalEntries());
}

// now we append an entry will trigger the snapshotting
// this time will delete entries on a scale of 90 to 99
AppendEntryResponse appendEntryResponse = dLedgerClient.append(new byte[512]);
assertEquals(DLedgerResponseCode.SUCCESS.getCode(), appendEntryResponse.getCode());
assertEquals(100, appendEntryResponse.getIndex());

Thread.sleep(5000);
assertEquals(99, appendEntryResponse.getIndex());
Thread.sleep(2000);
for (DLedgerServer server : serverList) {
assertEquals(100, server.getDLedgerStore().getLedgerEndIndex());
assertEquals(99, server.getDLedgerStore().getLedgerEndIndex());
assertEquals(99, server.getDLedgerStore().getLedgerBeforeBeginIndex());
// check statemachine
final MockStateMachine fsm = (MockStateMachine) server.getStateMachine();
assertEquals(100, fsm.getAppliedIndex());
assertEquals(101, fsm.getTotalEntries());
assertEquals(100, fsm.getTotalEntries());
}

Thread.sleep(100);
Expand All @@ -83,15 +83,14 @@ public void testSaveAndLoadSnapshot() throws InterruptedException {
serverList.add(newDLedgerServer0);
serverList.add(newDLedgerServer1);
serverList.add(newDLedgerServer2);
Thread.sleep(5000);
Thread.sleep(2000);
// State machine could only be recovered from snapshot due to the entry has been removed after saving snapshot
for (DLedgerServer server : serverList) {
assertEquals(100, server.getDLedgerStore().getLedgerEndIndex());
assertEquals(99, server.getDLedgerStore().getLedgerEndIndex());
assertEquals(99, server.getDLedgerStore().getLedgerBeforeBeginIndex());
// check statemachine
final MockStateMachine fsm = (MockStateMachine) server.getStateMachine();
assertEquals(100, fsm.getAppliedIndex());
assertEquals(101, fsm.getTotalEntries());
assertEquals(100, fsm.getTotalEntries());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public class MockStateMachine implements StateMachine {
private static Logger logger = LoggerFactory.getLogger(MockStateMachine.class);
private volatile long appliedIndex = -1L;
private final AtomicLong totalEntries = new AtomicLong(0);
private final AtomicLong lastAppliedIndex = new AtomicLong(-1);

@Override
public void onApply(final CommittedEntryIterator iter) {
Expand All @@ -43,6 +42,8 @@ public void onApply(final CommittedEntryIterator iter) {
}
this.totalEntries.addAndGet(1);
this.appliedIndex = next.getIndex();
System.out.println("apply index: " + next.getIndex());
System.out.println("total entries: " + this.totalEntries.get());
}
}
}
Expand All @@ -51,6 +52,7 @@ public void onApply(final CommittedEntryIterator iter) {
public boolean onSnapshotSave(final SnapshotWriter writer) {
long curEntryCnt = this.totalEntries.get();
MockSnapshotFile snapshotFile = new MockSnapshotFile(writer.getSnapshotStorePath() + File.separator + SnapshotManager.SNAPSHOT_DATA_FILE);
System.out.println("save snapshot, total entries: " + curEntryCnt);
return snapshotFile.save(curEntryCnt);
}

Expand Down

0 comments on commit 8c2b026

Please sign in to comment.