该框架为rpc原理学习者提供一个思路,一个非常简单的轻量级rpc框架, 项目中没有使用非常复杂的设计,主打一个简单易用,快速上手,读懂源码实现,目前 注册中心默认实现了redis哨兵,zookeeper和本hangu系列的hangu-register注册中心 (测试代码默认使用的是hangu-register注册中心)。 如果你是一个对rpc原理好奇的人,你可以阅读本框架源码,快速了解rpc的核心实现。
hangu 是函谷的拼音。 为什么取这个名字呢?因为有一次和朋友聊天,聊到了《将夜》,说大师兄和余帘骑着一头牛进了一个叫函谷的地方, 于是道德经就出现了。所以我希望每个进入函谷的人,都能写出自己的道德经。
从架构图中可以看到,心跳由消费者主动发起,默认每隔2s向服务提供者发送心跳包,心跳的实现很简单,在消费者这边 使用 Netty 提供的 IdleStateHandler 事件处理器,在每隔2s发起读超时事件时向提供者发送心跳,超过3次未收到 提供者的响应即认为需要重连,消费者端的 IdleStateHandler 配置代码如下:
/**
* 代码位置{@link com.hangu.rpc.consumer.client.NettyClient#open}
*/
// 省略前部分代码
.addLast(new ByteFrameDecoder())
.addLast(new RequestMessageCodec()) // 请求与响应编解码器
.addLast(new HeartBeatEncoder()) // 心跳编码器
.addLast("logging", loggingHandler)
// 每隔 2s 发送一次心跳,超过三次没有收到响应,也就是三倍的心跳时间,重连
.addLast(new IdleStateHandler(2, 0, 0, TimeUnit.SECONDS))
.addLast(new HeartBeatPongHandler(NettyClient.this)) // 心跳编码器
.addLast(new ResponseMessageHandler(executor));
// 省略后部分代码
可以看到有三个与心跳相关的处理器,分别为 HeartBeatEncoder,IdleStateHandler,HeartBeatPongHandler, 其中 IdleStateHandler 配置了读取超时为2s,超过2s没有收到读事件,那么就会发出读超时事件, 发出读超时事件之后, 由 HeartBeatPongHandler 处理该事件,处理的逻辑如下:
@Slf4j
public class HeartBeatPongHandler extends ChannelInboundHandlerAdapter {
private NettyClient nettyClient;
private int retryBeat = 0;
public HeartBeatPongHandler(NettyClient nettyClient) {
this.nettyClient = nettyClient;
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
// 尝试重连
this.reconnect(ctx);
super.channelUnregistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 这里主要是为了解决网络抖动,误判机器下线,等网络正常时,注册中心再次通知
// 那么需要重新标记为true
this.nettyClient.getClientConnect().setRelease(false);
this.nettyClient.getClientConnect().resetConnCount();
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 收到消息(无论是心跳消息还是任何其他rpc消息),重置重试发送心跳次数
this.retryBeat = 0;
// 这里主要是为了解决网络抖动,误判机器下线,等网络正常时,注册中心再次通知
// 那么需要重新标记为true
this.nettyClient.getClientConnect().setRelease(false);
this.nettyClient.getClientConnect().resetConnCount();
super.channelRead(ctx, msg);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
IdleState idleState = idleStateEvent.state();
// 读超时,发送心跳
if (IdleState.READER_IDLE == idleState) {
if (retryBeat > 3) {
// 关闭重连,通过监听 channelUnregistered 发起重连
ctx.channel().close();
} else {
PingPong pingPong = new PingPong();
pingPong.setId(CommonUtils.snowFlakeNextId());
pingPong.setSerializationType(SerializationTypeEnum.HESSIAN.getType());
// 发送心跳(从当前 context 往前)
ctx.writeAndFlush(pingPong).addListener(future -> {
if (!future.isSuccess()) {
log.error("发送心跳失败!", future.cause());
}
});
++retryBeat;
}
} else {
super.userEventTriggered(ctx, evt);
}
} else {
super.userEventTriggered(ctx, evt);
}
}
private void reconnect(ChannelHandlerContext ctx) {
ClientConnect clientConnect = this.nettyClient.getClientConnect();
int retryConnectCount = clientConnect.incrConnCount();
// N次之后还是不能连接上,放弃连接
if (clientConnect.isRelease()) {
log.info("该链接{}已经被标记为释放,不再重连", clientConnect.getHostInfo());
return;
}
// 如果连接还活着,不需要重连
if (clientConnect.isActive()) {
return;
}
int delay = 2 * (retryConnectCount - 1);
// 最大延迟20秒再执行
if (delay > 20) {
delay = 20;
}
ctx.channel().eventLoop().schedule(() -> {
ConnectManager.doCacheReconnect(this.nettyClient);
}, delay, TimeUnit.SECONDS);
}
}
userEventTriggered 方法接收到读超时事件后,会判断当前连接是否已经有连续三次未接收到来自提供者的数据了,如果超过了,那么尝试重连, 如果没有超过三次,主动向提供者发出心跳,发出的心跳由 HeartBeatEncoder 编码器进行编码向提供者发送消息,在接收到提供者的响应之后,将重试次数归零。
这里我们使用 IdleStateHandler 的读超时实现了每隔2s向服务提供者发送心跳,超过三倍的心跳时间未接收到响应就认为该连接已断开,需要 重连。
服务提供者端不会主动向消费者发送心跳,它只会被动接收心跳,但超过8s未接收到任何来自消费者的读写数据时,主动关闭连接
/**
* 代码位置{@link com.hangu.rpc.provider.server.NettyServer.start}
*/
// 读写时间超过8s,表示该链接已失效
.addLast(new IdleStateHandler(0, 0, 8, TimeUnit.SECONDS))
hangu-demo里有两个子模块,分别是提供者和消费者,启动这两个模块,调用UserController的测试代码即可
- 配置
如果你使用的时springboot,那么很好,直接引入以下依赖:
<dependency>
<groupId>org.hangu.rpc</groupId>
<artifactId>hangu-rpc-spring-boot-starter</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
配置hangu-rpc(注册中心为redis哨兵模式的):
server:
port: 8080
hangu:
rpc:
provider:
port: 8089 # 提供者将要暴露的端口
registry:
protocol: redis # 选择zk作为注册中心
redis:
nodes: ip:port,ip2:port2 # redis哨兵模式哨兵集群地址
master: xxx # master的名字
password: xxx # redis的密码
配置hangu-rpc(注册中心为zk):
server:
port: 8080
hangu:
rpc:
provider:
port: 8089 # 提供者将要暴露的端口
registry:
protocol: zookeeper # 选择zk作为注册中心
zookeeper:
hosts: ip:port,ip2:port2 # zk集群地址
user-name: yyy
password: yyy
如果你有自己的注册中心,可以选择实现 com.hangu.rpc.common.registry.RegistryService 接口,然后将 hangu.rpc.registry.protocol=你自己的注册中心名字(其实这里本人使用的时springboot的@ConditionalOnProperty去动态加载, 也就是说你只要 保证spring容器中只存在一个实现了RegistryService接口的注册服务即可)
- 提供者
假设你有以下服务
@HanguService
public class UserServiceImpl implements UserService {
@Override
public UserInfo getUserInfo() {
UserInfo userInfo = new UserInfo();
userInfo.setName("小风");
userInfo.setAge(27);
Address address = new Address();
address.setProvince("江西省");
address.setCity("赣州市");
address.setArea("于都县");
userInfo.setAddress(address);
return userInfo;
}
@Override
public String getUserInfo(String name) {
return name;
}
}
接口为 UserService,实现类为 UserServiceImpl,如果你要暴露该接口,那么只需要在这个实现类上标注 @HanguService 注解即可, 这个注解目前有三个属性,分别为groupName,interfaceName,version,组通常是一系列相关业务的分组,通常会设置为服务名,因为我们的服务通常 会以业务进行垂直划分,interfaceName表示接口名,默认不填的情况下,会自动赋值为接口全路径类名,version表示版本,有时候我们给多个第三方提供服务的 时候可能业务上有差别,对于老的业务代码我们肯定不会去做改动,此时我们可以选择通过版本进行区分。
- 消费者
上面提供者提供了 UserService 服务,如果我们要引入这个服务怎么办?
@HanguReference
public interface UserService {
@HanguMethod(timeout = 20, callback = SimpleRpcResponseCallback.class)
UserInfo getUserInfo(RpcResponseCallback callback);
String getUserInfo(String name);
}
可以看到这个接口上标注了 @HanguReference 注解,这个注解与服务提供者的 @HanguService 注解一样拥有 groupName,interfaceName,version 三个属性,只要这三个属性与提供者的一一对应就能正常提供服务,这意味只要指定了 interfaceName这个名字与服务提供者一致,即使 UserService 这个接口类型的名字你瞎写,放在任何包下都可以正常服务,但是不建议 这么做,最好对接口进行抽离成单独的模块,打成jar包去引入
在类上指定了 @HanguReference 之后,我可以在方法上标注 @HanguMethod 注解,这个注解目前有 timeout与callback两个属性, timeout用来指定该方法调用超时的时间,callback用来指定回调类实现,需要实现 RpcResponseCallback 接口,并且提供无参数构造器, 如果你不想在 @HanguMethod 注解上指定回调,那么可以在方法参数上指定实现了 RpcResponseCallback 接口的回调,这样方法由同步调用 变成了异步调用。
如果你是用的普通spring项目,想要引入hangu-rpc,那么你可以通过以下方式启动
@EnableHanguRpc
public class HanguRpcBootstrapConfig {
}
注意:要确保加入了以下依赖(该依赖用于处理ConfigurationProperties注解)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
其他配置与消费者提供者的操作与springboot的一样
- 配置 如果你没有使用spring框架,那么你只需要在pom.xml中添加以下依赖
<dependency>
<groupId>org.hangu.rpc</groupId>
<artifactId>hangu-starter</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
- 提供者
依然假设你有一个叫 UserServiceImpl的接口实现
public class Provider {
public static void main(String[] args) {
// 主要用于设置启动的端口
HanguProperties hanguProperties = new HanguProperties();
// 启动服务
HanguRpcManager.openServer(hanguProperties);
ServerInfo serverInfo = new ServerInfo();
serverInfo.setGroupName("用户服务系统");
serverInfo.setInterfaceName("userService");
serverInfo.setVersion("1.0");
Class<UserSerice> interfaceClass = UserService.class;
UserService service = new UserServiceImpl();
ZookeeperConfigProperties properties = new ZookeeperConfigProperties();
properties.setHosts("ip:port,ip:port");
RegistryService registryService = new ZookeeperRegistryService(properties);
ServiceBean<T> serviceBean =
new ServiceBean<>(serverInfo, interfaceClass, service, registryService);
ServiceExporter.export(serviceBean);
}
}
- 消费者
public class Consumer {
public static void main(String[] args) {
ServerInfo serverInfo = new ServerInfo();
serverInfo.setGroupName("用户服务系统");
serverInfo.setInterfaceName("userService");
serverInfo.setVersion("1.0");
Class<UserSerice> interfaceClass = UserService.class;
ZookeeperConfigProperties properties = new ZookeeperConfigProperties();
properties.setHosts("ip:port,ip:port");
RegistryService registryService = new ZookeeperRegistryService(properties);
// 主要用于设置消费者线程数量
HanguProperties hanguProperties = new HanguProperties();
ReferenceBean<T> referenceBean = new ReferenceBean<>(serverInfo, interfaceClass, registryService,
hanguProperties);
UserService service = ServiceReference.reference(referenceBean, CommonUtils.getClassLoader(this.getClass()));
// 调用服务
UserInfo userInfo = service.getUserInfo();
}
}