Skip to content

Commit

Permalink
[ISSUE #289] Support Register-Model server based on DLedger (#288)
Browse files Browse the repository at this point in the history
* feat(example): add Register Model DLedger

1. add Register Model DLedger

Closes #286

* feat(jepsen): add jepsen test framework

1. add jepsen test framework

Closes #286

* feat(jepsen): support linear verify and make test more automated

1. support linear verify
2. make test more automated

Closes #286

* feat(jepsen): pass network partition test

1. pass network partition test

Closes #286

* feat(jepsen): add apache license header

1. add apache license header

Closes #286

* fix(jepsen): fix apply empty log

1. fix apply empty log

Closes #286

* feat(example): support benchmark for register model

1. support benchmark for register model

Closes #289

* refactor(example): combine modules: command and example

1. combine modules: command and example

Closes #289

* style(global): rollback changes `.gitignore`

1. rollback changes `.gitignore`

Closes #289

* rerun
  • Loading branch information
TheR1sing3un authored May 29, 2023
1 parent 234f707 commit 91acd65
Show file tree
Hide file tree
Showing 39 changed files with 1,153 additions and 213 deletions.
6 changes: 3 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
.project
.settings/
target/
localbin/
localbin/
devenv
*.log*
*.iml
.idea/
*.versionsBackup
!NOTICE-BIN
!LICENSE-BIN
.DS_Store
nohup.out
.DS_Store
nohup.out

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ public BatchAppendFuture() {

}

public BatchAppendFuture(long[] positions) {
this.positions = positions;
}

public BatchAppendFuture(long timeOutMs) {
super(timeOutMs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ public DLedgerServer(DLedgerConfig dLedgerConfig, NettyServerConfig nettyServerC
}

public DLedgerServer(DLedgerConfig dLedgerConfig, NettyServerConfig nettyServerConfig,
NettyClientConfig nettyClientConfig) {
NettyClientConfig nettyClientConfig) {
this(dLedgerConfig, nettyServerConfig, nettyClientConfig, null);
}

public DLedgerServer(DLedgerConfig dLedgerConfig, NettyServerConfig nettyServerConfig,
NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
dLedgerConfig.init();
this.dLedgerConfig = dLedgerConfig;
this.memberState = new MemberState(dLedgerConfig);
Expand Down Expand Up @@ -266,52 +266,19 @@ public CompletableFuture<AppendEntryResponse> handleAppend(AppendEntryRequest re
appendEntryResponse.setTerm(currTerm);
appendEntryResponse.setLeaderId(memberState.getSelfId());
return AppendFuture.newCompletedFuture(-1, appendEntryResponse);
} else {
AppendFuture<AppendEntryResponse> future = new AppendFuture<>();
DLedgerEntry resEntry = null;
if (request instanceof BatchAppendEntryRequest) {
BatchAppendEntryRequest batchRequest = (BatchAppendEntryRequest) request;
if (batchRequest.getBatchMsgs() != null && batchRequest.getBatchMsgs().size() != 0) {
// record positions to return;
long[] positions = new long[batchRequest.getBatchMsgs().size()];
// split bodys to append
int index = 0;
for (byte[] bytes : batchRequest.getBatchMsgs()) {
DLedgerEntry dLedgerEntry = new DLedgerEntry();
dLedgerEntry.setBody(bytes);
resEntry = dLedgerStore.appendAsLeader(dLedgerEntry);
positions[index++] = resEntry.getPos();
}
// only wait last entry ack is ok
future = new BatchAppendFuture<>();
((BatchAppendFuture<?>) future).setPositions(positions);
} else {
throw new DLedgerException(DLedgerResponseCode.REQUEST_WITH_EMPTY_BODYS, "BatchAppendEntryRequest" +
" with empty bodys");
}
} else {
DLedgerEntry dLedgerEntry = new DLedgerEntry();
dLedgerEntry.setBody(request.getBody());
resEntry = dLedgerStore.appendAsLeader(dLedgerEntry);
}
AppendFuture<AppendEntryResponse> future;
if (request instanceof BatchAppendEntryRequest) {
BatchAppendEntryRequest batchRequest = (BatchAppendEntryRequest) request;
if (batchRequest.getBatchMsgs() == null || batchRequest.getBatchMsgs().isEmpty()) {
throw new DLedgerException(DLedgerResponseCode.REQUEST_WITH_EMPTY_BODYS, "BatchAppendEntryRequest" +
" with empty bodys");
}
DLedgerEntry finalResEntry = resEntry;
AppendFuture<AppendEntryResponse> finalFuture = future;
Closure closure = new Closure() {
@Override
void done(Status status) {
AppendEntryResponse response = new AppendEntryResponse();
response.setGroup(DLedgerServer.this.memberState.getGroup());
response.setTerm(DLedgerServer.this.memberState.currTerm());
response.setIndex(finalResEntry.getIndex());
response.setLeaderId(DLedgerServer.this.memberState.getLeaderId());
response.setPos(finalResEntry.getPos());
response.setCode(status.code.getCode());
finalFuture.complete(response);
}
};
dLedgerEntryPusher.appendClosure(closure, resEntry.getTerm(), resEntry.getIndex());
return finalFuture;
future = appendAsLeader(batchRequest.getBatchMsgs());
} else {
future = appendAsLeader(request.getBody());
}
return future;
} catch (DLedgerException e) {
LOGGER.error("[{}][HandleAppend] failed", memberState.getSelfId(), e);
AppendEntryResponse response = new AppendEntryResponse();
Expand All @@ -322,6 +289,52 @@ void done(Status status) {
}
}

public AppendFuture<AppendEntryResponse> appendAsLeader(byte[] body) throws DLedgerException {
return this.appendAsLeader(Arrays.asList(body));
}

public AppendFuture<AppendEntryResponse> appendAsLeader(List<byte[]> bodies) throws DLedgerException {
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);
if (bodies.size() == 0) {
return AppendFuture.newCompletedFuture(-1, null);
}
AppendFuture<AppendEntryResponse> future;
DLedgerEntry entry = new DLedgerEntry();
if (bodies.size() > 1) {
long[] positions = new long[bodies.size()];
for (int i = 0; i < bodies.size(); i++) {
DLedgerEntry dLedgerEntry = new DLedgerEntry();
dLedgerEntry.setBody(bodies.get(i));
entry = dLedgerStore.appendAsLeader(dLedgerEntry);
positions[i] = entry.getPos();
}
// only wait last entry ack is ok
future = new BatchAppendFuture<>(positions);
} else {
DLedgerEntry dLedgerEntry = new DLedgerEntry();
dLedgerEntry.setBody(bodies.get(0));
entry = dLedgerStore.appendAsLeader(dLedgerEntry);
future = new AppendFuture<>();
}
final DLedgerEntry finalResEntry = entry;
final AppendFuture<AppendEntryResponse> finalFuture = future;
Closure closure = new Closure() {
@Override
void done(Status status) {
AppendEntryResponse response = new AppendEntryResponse();
response.setGroup(DLedgerServer.this.memberState.getGroup());
response.setTerm(DLedgerServer.this.memberState.currTerm());
response.setIndex(finalResEntry.getIndex());
response.setLeaderId(DLedgerServer.this.memberState.getLeaderId());
response.setPos(finalResEntry.getPos());
response.setCode(status.code.getCode());
finalFuture.complete(response);
}
};
dLedgerEntryPusher.appendClosure(closure, finalResEntry.getTerm(), finalResEntry.getIndex());
return finalFuture;
}

@Override
public void handleRead(ReadMode mode, ReadClosure closure) {
try {
Expand Down Expand Up @@ -354,7 +367,9 @@ private void dealUnsafeRead(ReadClosure closure) throws DLedgerException {
private void dealRaftLogRead(ReadClosure closure) throws DLedgerException {
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);
// append an empty raft log, call closure when this raft log is applied
DLedgerEntry dLedgerEntry = dLedgerStore.appendAsLeader(new DLedgerEntry());
DLedgerEntry emptyEntry = new DLedgerEntry();
emptyEntry.setBody(new byte[0]);
DLedgerEntry dLedgerEntry = dLedgerStore.appendAsLeader(emptyEntry);
dLedgerEntryPusher.appendClosure(closure, dLedgerEntry.getTerm(), dLedgerEntry.getIndex());
}

Expand Down Expand Up @@ -434,7 +449,7 @@ public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request)

@Override
public CompletableFuture<LeadershipTransferResponse> handleLeadershipTransfer(
LeadershipTransferRequest request) throws Exception {
LeadershipTransferRequest request) throws Exception {
LOGGER.info("handleLeadershipTransfer: {}", request);
try {
PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());
Expand All @@ -448,7 +463,7 @@ public CompletableFuture<LeadershipTransferResponse> handleLeadershipTransfer(
// check fall transferee not fall behind much.
long transfereeFallBehind = dLedgerStore.getLedgerEndIndex() - dLedgerEntryPusher.getPeerWaterMark(request.getTerm(), request.getTransfereeId());
PreConditions.check(transfereeFallBehind < dLedgerConfig.getMaxLeadershipTransferWaitIndex(),
DLedgerResponseCode.FALL_BEHIND_TOO_MUCH, "transferee fall behind too much, diff=%s", transfereeFallBehind);
DLedgerResponseCode.FALL_BEHIND_TOO_MUCH, "transferee fall behind too much, diff=%s", transfereeFallBehind);
return dLedgerLeaderElector.handleLeadershipTransfer(request);
} else if (memberState.getSelfId().equals(request.getTransfereeId())) {
// It's the transferee received the take leadership command.
Expand All @@ -462,8 +477,8 @@ public CompletableFuture<LeadershipTransferResponse> handleLeadershipTransfer(

if (costTime > dLedgerConfig.getLeadershipTransferWaitTimeout()) {
throw new DLedgerException(DLedgerResponseCode.TAKE_LEADERSHIP_FAILED,
"transferee fall behind, wait timeout. timeout = {}, diff = {}",
dLedgerConfig.getLeadershipTransferWaitTimeout(), fallBehind);
"transferee fall behind, wait timeout. timeout = {}, diff = {}",
dLedgerConfig.getLeadershipTransferWaitTimeout(), fallBehind);
}

LOGGER.warn("transferee fall behind, diff = {}", fallBehind);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,19 @@

package io.openmessaging.storage.dledger.protocol.userdefine;

import io.openmessaging.storage.dledger.DLedgerServer;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.concurrent.CompletableFuture;

public abstract class UserDefineProcessor<T extends UserDefineRequest, V extends UserDefineResponse> {

protected final DLedgerServer dLedgerServer;

public UserDefineProcessor(DLedgerServer dLedgerServer) {
this.dLedgerServer = dLedgerServer;
}

public abstract CompletableFuture<V> handleRequest(T t);

public abstract Integer getRequestTypeCode();
Expand All @@ -37,5 +44,4 @@ public Type getResponseType() {
return parameterizedType.getActualTypeArguments()[1];
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
* limitations under the License.
*/

package io.openmessaging.storage.dledger.util;
package io.openmessaging.storage.dledger.utils;

public class BytesUtil {

public static byte[] intToBytes( int value ) {
public static byte[] intToBytes(int value) {
byte[] src = new byte[4];
src[3] = (byte) ((value >> 24) & 0xFF);
src[2] = (byte) ((value >> 16) & 0xFF);
Expand All @@ -30,9 +30,9 @@ public static byte[] intToBytes( int value ) {
public static int bytesToInt(byte[] src, int offset) {
int value;
value = (int) ((src[offset] & 0xFF)
| ((src[offset+1] & 0xFF)<<8)
| ((src[offset+2] & 0xFF)<<16)
| ((src[offset+3] & 0xFF)<<24));
| ((src[offset + 1] & 0xFF) << 8)
| ((src[offset + 2] & 0xFF) << 16)
| ((src[offset + 3] & 0xFF) << 24));
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@
import io.openmessaging.storage.dledger.client.DLedgerClient;
import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
import io.openmessaging.storage.dledger.protocol.RequestOrResponse;
import io.openmessaging.storage.dledger.statemachine.register.RegisterReadProcessor;
import io.openmessaging.storage.dledger.statemachine.register.RegisterReadRequest;
import io.openmessaging.storage.dledger.statemachine.register.RegisterReadResponse;
import io.openmessaging.storage.dledger.statemachine.register.RegisterStateMachine;
import io.openmessaging.storage.dledger.util.BytesUtil;
import io.openmessaging.storage.dledger.utils.BytesUtil;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ public class RegisterReadProcessor extends UserDefineProcessor<RegisterReadReque

private Integer requestTypeCode;

private final DLedgerServer server;

public RegisterReadProcessor(DLedgerServer server) {
this.server = server;
super(server);
RegisterReadRequest registerReadRequest = new RegisterReadRequest(0);
this.requestTypeCode = registerReadRequest.getRequestTypeCode();
}
Expand All @@ -46,7 +44,7 @@ public CompletableFuture<RegisterReadResponse> handleRequest(RegisterReadRequest
@Override
public void done(Status status) {
if (status.isOk()) {
RegisterStateMachine registerStateMachine = (RegisterStateMachine) RegisterReadProcessor.this.server.getStateMachine();
RegisterStateMachine registerStateMachine = (RegisterStateMachine) dLedgerServer.getStateMachine();
Integer value = registerStateMachine.getValue(key);
response.setValue(value);
future.complete(response);
Expand All @@ -56,7 +54,7 @@ public void done(Status status) {
}
}
};
RegisterReadProcessor.this.server.handleRead(ReadMode.RAFT_LOG_READ, closure);
dLedgerServer.handleRead(ReadMode.RAFT_LOG_READ, closure);
return future;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import io.openmessaging.storage.dledger.statemachine.CommittedEntryIterator;
import io.openmessaging.storage.dledger.statemachine.RegisterSnapshotFile;
import io.openmessaging.storage.dledger.statemachine.StateMachine;
import io.openmessaging.storage.dledger.util.BytesUtil;
import io.openmessaging.storage.dledger.utils.BytesUtil;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -37,7 +37,7 @@ public class RegisterStateMachine implements StateMachine {
public void onApply(CommittedEntryIterator iter) {
while (iter.hasNext()) {
final DLedgerEntry entry = iter.next();
if (entry != null && entry.getBody() != null) {
if (entry != null && entry.getBody() != null && entry.getBody().length == 8) {
byte[] bytes = entry.getBody();
int key = BytesUtil.bytesToInt(bytes, 0);
int value = BytesUtil.bytesToInt(bytes, 4);
Expand Down
Loading

0 comments on commit 91acd65

Please sign in to comment.