Skip to content

Commit

Permalink
inner 1563 - add header metadata for grpc
Browse files Browse the repository at this point in the history
  • Loading branch information
PanternBao authored and yanhuqing666 committed Jan 6, 2022
1 parent 4a0f9d5 commit 5631b2a
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 61 deletions.
1 change: 1 addition & 0 deletions dble_checkstyle_suppression.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<suppress checks=".*" files="UshardInterface.java"/>
<suppress checks=".*" files="DbleClusterGrpc.java"/>
<suppress checks=".*" files="EvictionTimer.java"/>
<suppress checks=".*" files="MetaDataClientInterceptor.java"/>
<suppress checks="Indentation" files="MyTime.java"/>
<suppress checks="CyclomaticComplexity" files="MyTime.java"/>
<suppress checks="CyclomaticComplexity" files="TimSort.java"/>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.actiontech.dble.cluster.general.impl;

import com.actiontech.dble.config.model.SystemConfig;
import io.grpc.*;

public class MetaDataClientInterceptor implements ClientInterceptor {

private static String UCORE_RPC_CLIENT_ID_KEY = "universe-client-id";
private static Metadata.Key<String> dbleIdHeadKey = Metadata.Key.of(UCORE_RPC_CLIENT_ID_KEY, Metadata.ASCII_STRING_MARSHALLER);

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {

@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
headers.put(dbleIdHeadKey, "dble-" + SystemConfig.getInstance().getInstanceName());
super.start(responseListener, headers);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@
import com.actiontech.dble.util.PropertiesUtil;
import com.actiontech.dble.util.exception.DetachedException;
import io.grpc.Channel;
import io.grpc.ClientInterceptors;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.apache.commons.lang.StringUtils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
Expand Down Expand Up @@ -59,9 +57,7 @@ public void initConInfo() {
} catch (Exception e) {
LOGGER.error("error:", e);
}
Channel channel = ManagedChannelBuilder.forAddress(getAvailableIpList().get(0),
ClusterConfig.getInstance().getClusterPort()).usePlaintext(true).build();
setStubIfPossible(UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS));
rebuildStub();
}

@Override
Expand All @@ -71,11 +67,9 @@ public void initCluster() {
try {
ipList.addAll(Arrays.asList(ClusterConfig.getInstance().getClusterIP().split(",")));
} catch (Exception e) {
LOGGER.error("error:", e);
LOGGER.error("initCluster error:", e);
}
ManagedChannel channel = ManagedChannelBuilder.forAddress(getAvailableIpList().get(0),
ClusterConfig.getInstance().getClusterPort()).usePlaintext(true).build();
setStubIfPossible(UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS));
rebuildStub();
if (!skipSyncUcores()) {
startUpdateNodes();
}
Expand Down Expand Up @@ -132,20 +126,9 @@ private void log(String message, Exception e) {
}
return output.getSessionId();
} catch (Exception e1) {
for (String ip : getAvailableIpList()) {
ManagedChannel channel = null;
try {
channel = ManagedChannelBuilder.forAddress(ip,
ClusterConfig.getInstance().getClusterPort()).usePlaintext(true).build();
setStubIfPossible(UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS));
output = stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).lockOnSession(input);
return output.getSessionId();
} catch (Exception e2) {
LOGGER.info("connect to ucore error ", e2);
if (channel != null) {
channel.shutdownNow();
}
}
if (rebuildStub()) {
output = stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).lockOnSession(input);
return output.getSessionId();
}
}
throw new IOException(ERROR_MSG + "ips:" + ipList.toString() + ",port:" + ClusterConfig.getInstance().getClusterPort());
Expand All @@ -171,20 +154,9 @@ public void setKV(String path, String value) throws Exception {
try {
stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).putKv(input);
} catch (Exception e1) {
for (String ip : getAvailableIpList()) {
ManagedChannel channel = null;
try {
channel = ManagedChannelBuilder.forAddress(ip,
ClusterConfig.getInstance().getClusterPort()).usePlaintext(true).build();
setStubIfPossible(UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS));
stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).putKv(input);
return;
} catch (Exception e2) {
LOGGER.info("connect to ucore error ", e2);
if (channel != null) {
channel.shutdownNow();
}
}
if (rebuildStub()) {
stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).putKv(input);
return;
}
throw new IOException(ERROR_MSG + "ips:" + ipList.toString() + ",port:" + ClusterConfig.getInstance().getClusterPort());
}
Expand Down Expand Up @@ -558,7 +530,7 @@ private void setIpList(List<String> ipList) {

private List<String> getAvailableIpList() {
if (connectionDetached) {
return new ArrayList<String>();
return Collections.emptyList();
}
return ipList;
}
Expand Down Expand Up @@ -596,26 +568,10 @@ public void markDetach(boolean isConnectionDetached) {
@Override
public void attachCluster() throws Exception {
LOGGER.info("cluster attach begin connect");
boolean connected = false;
for (String ip : ipList) {
ManagedChannel channel = null;
try {
channel = ManagedChannelBuilder.forAddress(ip,
ClusterConfig.getInstance().getClusterPort()).usePlaintext(true).build();
stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
//check connection is ready
ClusterHelper.isExist(ClusterPathUtil.getOnlinePath(SystemConfig.getInstance().getInstanceName()));
connected = true;
break;
} catch (Exception e) {
LOGGER.info("connect to ucore {} error ", ip, e);
if (channel != null) {
channel.shutdownNow();
}
continue;
}
}
if (!connected) {
if (rebuildStub()) {
//check connection is ready
ClusterHelper.isExist(ClusterPathUtil.getOnlinePath(SystemConfig.getInstance().getInstanceName()));
} else {
throw new IllegalStateException("all cluster connect error");
}

Expand All @@ -627,8 +583,30 @@ public void attachCluster() throws Exception {
}
}

public void setStubIfPossible(UcoreGrpc.UcoreBlockingStub stubTemp) {
private boolean rebuildStub() {
boolean isSuccess = false;
for (String ip : getAvailableIpList()) {
Channel channel = null;
try {
channel = ManagedChannelBuilder.forAddress(ip,
ClusterConfig.getInstance().getClusterPort()).usePlaintext(true).build();
channel = ClientInterceptors.intercept(channel, new MetaDataClientInterceptor());
setStubIfPossible(UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS));
isSuccess = true;
break;
} catch (Exception e2) {
LOGGER.info("connect to ucore[{}] error ", ip, e2);
if (channel != null) {
((ManagedChannel) channel).shutdownNow();
}
}
}
return isSuccess;
}

private void setStubIfPossible(UcoreGrpc.UcoreBlockingStub stubTemp) {
if (connectionDetached) {
LOGGER.info("the dble had detach cluster");
return;
}
this.stub = stubTemp;
Expand Down

0 comments on commit 5631b2a

Please sign in to comment.