Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #289] Support Register-Model server based on DLedger #288

Merged
merged 10 commits into from
May 29, 2023
6 changes: 3 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
.project
.settings/
target/
localbin/
localbin/
devenv
*.log*
*.iml
.idea/
*.versionsBackup
!NOTICE-BIN
!LICENSE-BIN
.DS_Store
nohup.out
.DS_Store
nohup.out

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ public BatchAppendFuture() {

}

public BatchAppendFuture(long[] positions) {
this.positions = positions;
}

public BatchAppendFuture(long timeOutMs) {
super(timeOutMs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ public DLedgerServer(DLedgerConfig dLedgerConfig, NettyServerConfig nettyServerC
}

public DLedgerServer(DLedgerConfig dLedgerConfig, NettyServerConfig nettyServerConfig,
NettyClientConfig nettyClientConfig) {
NettyClientConfig nettyClientConfig) {
this(dLedgerConfig, nettyServerConfig, nettyClientConfig, null);
}

public DLedgerServer(DLedgerConfig dLedgerConfig, NettyServerConfig nettyServerConfig,
NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
dLedgerConfig.init();
this.dLedgerConfig = dLedgerConfig;
this.memberState = new MemberState(dLedgerConfig);
Expand Down Expand Up @@ -266,52 +266,19 @@ public CompletableFuture<AppendEntryResponse> handleAppend(AppendEntryRequest re
appendEntryResponse.setTerm(currTerm);
appendEntryResponse.setLeaderId(memberState.getSelfId());
return AppendFuture.newCompletedFuture(-1, appendEntryResponse);
} else {
AppendFuture<AppendEntryResponse> future = new AppendFuture<>();
DLedgerEntry resEntry = null;
if (request instanceof BatchAppendEntryRequest) {
BatchAppendEntryRequest batchRequest = (BatchAppendEntryRequest) request;
if (batchRequest.getBatchMsgs() != null && batchRequest.getBatchMsgs().size() != 0) {
// record positions to return;
long[] positions = new long[batchRequest.getBatchMsgs().size()];
// split bodys to append
int index = 0;
for (byte[] bytes : batchRequest.getBatchMsgs()) {
DLedgerEntry dLedgerEntry = new DLedgerEntry();
dLedgerEntry.setBody(bytes);
resEntry = dLedgerStore.appendAsLeader(dLedgerEntry);
positions[index++] = resEntry.getPos();
}
// only wait last entry ack is ok
future = new BatchAppendFuture<>();
((BatchAppendFuture<?>) future).setPositions(positions);
} else {
throw new DLedgerException(DLedgerResponseCode.REQUEST_WITH_EMPTY_BODYS, "BatchAppendEntryRequest" +
" with empty bodys");
}
} else {
DLedgerEntry dLedgerEntry = new DLedgerEntry();
dLedgerEntry.setBody(request.getBody());
resEntry = dLedgerStore.appendAsLeader(dLedgerEntry);
}
AppendFuture<AppendEntryResponse> future;
if (request instanceof BatchAppendEntryRequest) {
BatchAppendEntryRequest batchRequest = (BatchAppendEntryRequest) request;
if (batchRequest.getBatchMsgs() == null || batchRequest.getBatchMsgs().isEmpty()) {
throw new DLedgerException(DLedgerResponseCode.REQUEST_WITH_EMPTY_BODYS, "BatchAppendEntryRequest" +
" with empty bodys");
}
DLedgerEntry finalResEntry = resEntry;
AppendFuture<AppendEntryResponse> finalFuture = future;
Closure closure = new Closure() {
@Override
void done(Status status) {
AppendEntryResponse response = new AppendEntryResponse();
response.setGroup(DLedgerServer.this.memberState.getGroup());
response.setTerm(DLedgerServer.this.memberState.currTerm());
response.setIndex(finalResEntry.getIndex());
response.setLeaderId(DLedgerServer.this.memberState.getLeaderId());
response.setPos(finalResEntry.getPos());
response.setCode(status.code.getCode());
finalFuture.complete(response);
}
};
dLedgerEntryPusher.appendClosure(closure, resEntry.getTerm(), resEntry.getIndex());
return finalFuture;
future = appendAsLeader(batchRequest.getBatchMsgs());
} else {
future = appendAsLeader(request.getBody());
}
return future;
} catch (DLedgerException e) {
LOGGER.error("[{}][HandleAppend] failed", memberState.getSelfId(), e);
AppendEntryResponse response = new AppendEntryResponse();
Expand All @@ -322,6 +289,52 @@ void done(Status status) {
}
}

public AppendFuture<AppendEntryResponse> appendAsLeader(byte[] body) throws DLedgerException {
return this.appendAsLeader(Arrays.asList(body));
}

public AppendFuture<AppendEntryResponse> appendAsLeader(List<byte[]> bodies) throws DLedgerException {
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);
if (bodies.size() == 0) {
return AppendFuture.newCompletedFuture(-1, null);
}
AppendFuture<AppendEntryResponse> future;
DLedgerEntry entry = new DLedgerEntry();
if (bodies.size() > 1) {
long[] positions = new long[bodies.size()];
for (int i = 0; i < bodies.size(); i++) {
DLedgerEntry dLedgerEntry = new DLedgerEntry();
dLedgerEntry.setBody(bodies.get(i));
entry = dLedgerStore.appendAsLeader(dLedgerEntry);
positions[i] = entry.getPos();
}
// only wait last entry ack is ok
future = new BatchAppendFuture<>(positions);
} else {
DLedgerEntry dLedgerEntry = new DLedgerEntry();
dLedgerEntry.setBody(bodies.get(0));
entry = dLedgerStore.appendAsLeader(dLedgerEntry);
future = new AppendFuture<>();
}
final DLedgerEntry finalResEntry = entry;
final AppendFuture<AppendEntryResponse> finalFuture = future;
Closure closure = new Closure() {
@Override
void done(Status status) {
AppendEntryResponse response = new AppendEntryResponse();
response.setGroup(DLedgerServer.this.memberState.getGroup());
response.setTerm(DLedgerServer.this.memberState.currTerm());
response.setIndex(finalResEntry.getIndex());
response.setLeaderId(DLedgerServer.this.memberState.getLeaderId());
response.setPos(finalResEntry.getPos());
response.setCode(status.code.getCode());
finalFuture.complete(response);
}
};
dLedgerEntryPusher.appendClosure(closure, finalResEntry.getTerm(), finalResEntry.getIndex());
return finalFuture;
}

@Override
public void handleRead(ReadMode mode, ReadClosure closure) {
try {
Expand Down Expand Up @@ -354,7 +367,9 @@ private void dealUnsafeRead(ReadClosure closure) throws DLedgerException {
private void dealRaftLogRead(ReadClosure closure) throws DLedgerException {
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);
// append an empty raft log, call closure when this raft log is applied
DLedgerEntry dLedgerEntry = dLedgerStore.appendAsLeader(new DLedgerEntry());
DLedgerEntry emptyEntry = new DLedgerEntry();
emptyEntry.setBody(new byte[0]);
DLedgerEntry dLedgerEntry = dLedgerStore.appendAsLeader(emptyEntry);
dLedgerEntryPusher.appendClosure(closure, dLedgerEntry.getTerm(), dLedgerEntry.getIndex());
}

Expand Down Expand Up @@ -434,7 +449,7 @@ public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request)

@Override
public CompletableFuture<LeadershipTransferResponse> handleLeadershipTransfer(
LeadershipTransferRequest request) throws Exception {
LeadershipTransferRequest request) throws Exception {
LOGGER.info("handleLeadershipTransfer: {}", request);
try {
PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());
Expand All @@ -448,7 +463,7 @@ public CompletableFuture<LeadershipTransferResponse> handleLeadershipTransfer(
// check fall transferee not fall behind much.
long transfereeFallBehind = dLedgerStore.getLedgerEndIndex() - dLedgerEntryPusher.getPeerWaterMark(request.getTerm(), request.getTransfereeId());
PreConditions.check(transfereeFallBehind < dLedgerConfig.getMaxLeadershipTransferWaitIndex(),
DLedgerResponseCode.FALL_BEHIND_TOO_MUCH, "transferee fall behind too much, diff=%s", transfereeFallBehind);
DLedgerResponseCode.FALL_BEHIND_TOO_MUCH, "transferee fall behind too much, diff=%s", transfereeFallBehind);
return dLedgerLeaderElector.handleLeadershipTransfer(request);
} else if (memberState.getSelfId().equals(request.getTransfereeId())) {
// It's the transferee received the take leadership command.
Expand All @@ -462,8 +477,8 @@ public CompletableFuture<LeadershipTransferResponse> handleLeadershipTransfer(

if (costTime > dLedgerConfig.getLeadershipTransferWaitTimeout()) {
throw new DLedgerException(DLedgerResponseCode.TAKE_LEADERSHIP_FAILED,
"transferee fall behind, wait timeout. timeout = {}, diff = {}",
dLedgerConfig.getLeadershipTransferWaitTimeout(), fallBehind);
"transferee fall behind, wait timeout. timeout = {}, diff = {}",
dLedgerConfig.getLeadershipTransferWaitTimeout(), fallBehind);
}

LOGGER.warn("transferee fall behind, diff = {}", fallBehind);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,19 @@

package io.openmessaging.storage.dledger.protocol.userdefine;

import io.openmessaging.storage.dledger.DLedgerServer;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.concurrent.CompletableFuture;

public abstract class UserDefineProcessor<T extends UserDefineRequest, V extends UserDefineResponse> {

protected final DLedgerServer dLedgerServer;

public UserDefineProcessor(DLedgerServer dLedgerServer) {
this.dLedgerServer = dLedgerServer;
}

public abstract CompletableFuture<V> handleRequest(T t);

public abstract Integer getRequestTypeCode();
Expand All @@ -37,5 +44,4 @@ public Type getResponseType() {
return parameterizedType.getActualTypeArguments()[1];
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
* limitations under the License.
*/

package io.openmessaging.storage.dledger.util;
package io.openmessaging.storage.dledger.utils;

public class BytesUtil {

public static byte[] intToBytes( int value ) {
public static byte[] intToBytes(int value) {
byte[] src = new byte[4];
src[3] = (byte) ((value >> 24) & 0xFF);
src[2] = (byte) ((value >> 16) & 0xFF);
Expand All @@ -30,9 +30,9 @@ public static byte[] intToBytes( int value ) {
public static int bytesToInt(byte[] src, int offset) {
int value;
value = (int) ((src[offset] & 0xFF)
| ((src[offset+1] & 0xFF)<<8)
| ((src[offset+2] & 0xFF)<<16)
| ((src[offset+3] & 0xFF)<<24));
| ((src[offset + 1] & 0xFF) << 8)
| ((src[offset + 2] & 0xFF) << 16)
| ((src[offset + 3] & 0xFF) << 24));
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@
import io.openmessaging.storage.dledger.client.DLedgerClient;
import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
import io.openmessaging.storage.dledger.protocol.RequestOrResponse;
import io.openmessaging.storage.dledger.statemachine.register.RegisterReadProcessor;
import io.openmessaging.storage.dledger.statemachine.register.RegisterReadRequest;
import io.openmessaging.storage.dledger.statemachine.register.RegisterReadResponse;
import io.openmessaging.storage.dledger.statemachine.register.RegisterStateMachine;
import io.openmessaging.storage.dledger.util.BytesUtil;
import io.openmessaging.storage.dledger.utils.BytesUtil;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ public class RegisterReadProcessor extends UserDefineProcessor<RegisterReadReque

private Integer requestTypeCode;

private final DLedgerServer server;

public RegisterReadProcessor(DLedgerServer server) {
this.server = server;
super(server);
RegisterReadRequest registerReadRequest = new RegisterReadRequest(0);
this.requestTypeCode = registerReadRequest.getRequestTypeCode();
}
Expand All @@ -46,7 +44,7 @@ public CompletableFuture<RegisterReadResponse> handleRequest(RegisterReadRequest
@Override
public void done(Status status) {
if (status.isOk()) {
RegisterStateMachine registerStateMachine = (RegisterStateMachine) RegisterReadProcessor.this.server.getStateMachine();
RegisterStateMachine registerStateMachine = (RegisterStateMachine) dLedgerServer.getStateMachine();
Integer value = registerStateMachine.getValue(key);
response.setValue(value);
future.complete(response);
Expand All @@ -56,7 +54,7 @@ public void done(Status status) {
}
}
};
RegisterReadProcessor.this.server.handleRead(ReadMode.RAFT_LOG_READ, closure);
dLedgerServer.handleRead(ReadMode.RAFT_LOG_READ, closure);
return future;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import io.openmessaging.storage.dledger.statemachine.CommittedEntryIterator;
import io.openmessaging.storage.dledger.statemachine.RegisterSnapshotFile;
import io.openmessaging.storage.dledger.statemachine.StateMachine;
import io.openmessaging.storage.dledger.util.BytesUtil;
import io.openmessaging.storage.dledger.utils.BytesUtil;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -37,7 +37,7 @@ public class RegisterStateMachine implements StateMachine {
public void onApply(CommittedEntryIterator iter) {
while (iter.hasNext()) {
final DLedgerEntry entry = iter.next();
if (entry != null && entry.getBody() != null) {
if (entry != null && entry.getBody() != null && entry.getBody().length == 8) {
byte[] bytes = entry.getBody();
int key = BytesUtil.bytesToInt(bytes, 0);
int value = BytesUtil.bytesToInt(bytes, 4);
Expand Down
Loading