Skip to content

Commit

Permalink
add ping executor (#1068)
Browse files Browse the repository at this point in the history
  • Loading branch information
qiujiayu authored Jan 25, 2024
1 parent 95a45cd commit 30f62d0
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,22 @@ public static RpcServer createRaftRpcServer(final Endpoint endpoint) {
*/
public static RpcServer createRaftRpcServer(final Endpoint endpoint, final Executor raftExecutor,
final Executor cliExecutor) {
return createRaftRpcServer(endpoint, raftExecutor, cliExecutor, null);
}

/**
* Creates a raft RPC server with executors to handle requests.
*
* @param endpoint server address to bind
* @param raftExecutor executor to handle RAFT requests.
* @param cliExecutor executor to handle CLI service requests.
* @param pingExecutor executor to handle ping requests.
* @return a rpc server instance
*/
public static RpcServer createRaftRpcServer(final Endpoint endpoint, final Executor raftExecutor,
final Executor cliExecutor, final Executor pingExecutor) {
final RpcServer rpcServer = RpcFactoryHelper.rpcFactory().createRpcServer(endpoint);
addRaftRequestProcessors(rpcServer, raftExecutor, cliExecutor);
addRaftRequestProcessors(rpcServer, raftExecutor, cliExecutor, pingExecutor);
return rpcServer;
}

Expand All @@ -94,6 +108,19 @@ public static void addRaftRequestProcessors(final RpcServer rpcServer) {
*/
public static void addRaftRequestProcessors(final RpcServer rpcServer, final Executor raftExecutor,
final Executor cliExecutor) {
addRaftRequestProcessors(rpcServer, raftExecutor, cliExecutor, null);
}

/**
* Adds RAFT and CLI service request processors.
*
* @param rpcServer rpc server instance
* @param raftExecutor executor to handle RAFT requests.
* @param cliExecutor executor to handle CLI service requests.
* @param pingExecutor executor to handle ping requests.
*/
public static void addRaftRequestProcessors(final RpcServer rpcServer, final Executor raftExecutor,
final Executor cliExecutor, final Executor pingExecutor) {
// raft core processors
final AppendEntriesRequestProcessor appendEntriesRequestProcessor = new AppendEntriesRequestProcessor(
raftExecutor);
Expand All @@ -102,7 +129,7 @@ public static void addRaftRequestProcessors(final RpcServer rpcServer, final Exe
rpcServer.registerProcessor(new GetFileRequestProcessor(raftExecutor));
rpcServer.registerProcessor(new InstallSnapshotRequestProcessor(raftExecutor));
rpcServer.registerProcessor(new RequestVoteRequestProcessor(raftExecutor));
rpcServer.registerProcessor(new PingRequestProcessor());
rpcServer.registerProcessor(new PingRequestProcessor(pingExecutor));
rpcServer.registerProcessor(new TimeoutNowRequestProcessor(raftExecutor));
rpcServer.registerProcessor(new ReadIndexRequestProcessor(raftExecutor));
// raft cli service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.alipay.sofa.jraft.rpc.RpcRequests.PingRequest;
import com.alipay.sofa.jraft.util.RpcFactoryHelper;

import java.util.concurrent.Executor;

/**
* Ping request processor.
*
Expand All @@ -30,6 +32,12 @@
*/
public class PingRequestProcessor implements RpcProcessor<PingRequest> {

private final Executor executor;

public PingRequestProcessor(Executor executor) {
this.executor = executor;
}

@Override
public void handleRequest(final RpcContext rpcCtx, final PingRequest request) {
rpcCtx.sendResponse( //
Expand All @@ -42,4 +50,9 @@ public void handleRequest(final RpcContext rpcCtx, final PingRequest request) {
public String interest() {
return PingRequest.class.getName();
}

@Override
public Executor executor() {
return this.executor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void simulation() throws InterruptedException {
ProtobufMsgFactory.load();

final RpcServer server = RpcFactoryHelper.rpcFactory().createRpcServer(new Endpoint("127.0.0.1", 19991));
server.registerProcessor(new PingRequestProcessor());
server.registerProcessor(new PingRequestProcessor(null));
server.init(null);

final Endpoint target = new Endpoint("my.test.host1.com", 19991);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class PingRequestProcessorTest {

@Test
public void testHandlePing() throws Exception {
PingRequestProcessor processor = new PingRequestProcessor();
PingRequestProcessor processor = new PingRequestProcessor(null);
MockAsyncContext ctx = new MockAsyncContext();
processor.handleRequest(ctx, TestUtils.createPingRequest());
ErrorResponse response = (ErrorResponse) ctx.getResponseObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void simulation() throws InterruptedException {
ProtobufMsgFactory.load();

final RpcServer server = RpcFactoryHelper.rpcFactory().createRpcServer(new Endpoint("127.0.0.1", 19991));
server.registerProcessor(new PingRequestProcessor());
server.registerProcessor(new PingRequestProcessor(null));
server.init(null);

final Endpoint target = new Endpoint("my.test.host1.com", 19991);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class PingRequestProcessorTest {

@Test
public void testHandlePing() throws Exception {
PingRequestProcessor processor = new PingRequestProcessor();
PingRequestProcessor processor = new PingRequestProcessor(null);
MockAsyncContext ctx = new MockAsyncContext();
processor.handleRequest(ctx, TestUtils.createPingRequest());
ErrorResponse response = (ErrorResponse) ctx.getResponseObject();
Expand Down

0 comments on commit 30f62d0

Please sign in to comment.