From f75043f300bbb75b3c77c6d7efb5aab4853882f9 Mon Sep 17 00:00:00 2001 From: "minghua.xie" Date: Mon, 18 Nov 2024 18:45:47 +0800 Subject: [PATCH] unit test --- .../rpc/netty/NettyClientChannelManager.java | 4 +- .../core/rpc/netty/RmNettyRemotingClient.java | 2 +- .../core/rpc/netty/TmNettyRemotingClient.java | 2 +- .../rpc/netty/ChannelManagerTestHelper.java | 2 + .../core/rpc/netty/MsgVersionHelperTest.java | 103 ++++++++++++++++++ 5 files changed, 110 insertions(+), 3 deletions(-) create mode 100644 test/src/test/java/org/apache/seata/core/rpc/netty/MsgVersionHelperTest.java diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientChannelManager.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientChannelManager.java index 7be0de2e729..ca0d67942b7 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientChannelManager.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientChannelManager.java @@ -36,6 +36,7 @@ import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.common.util.NetUtil; import org.apache.seata.common.util.StringUtils; +import org.apache.seata.core.protocol.Version; import org.apache.seata.discovery.registry.FileRegistryServiceImpl; import org.apache.seata.discovery.registry.RegistryFactory; import org.apache.seata.discovery.registry.RegistryService; @@ -268,12 +269,13 @@ void invalidateObject(final String serverAddress, final Channel channel) throws nettyClientKeyPool.invalidateObject(poolKeyMap.get(serverAddress), channel); } - void registerChannel(final String serverAddress, final Channel channel) { + void registerChannel(final String serverAddress, final Channel channel, String version) { Channel channelToServer = channels.get(serverAddress); if (channelToServer != null && channelToServer.isActive()) { return; } channels.put(serverAddress, channel); + Version.putChannelVersion(channel, version); } private Channel doConnect(String serverAddress) { diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java index 0b9fd0e12bc..872cfa2b2bb 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java @@ -178,7 +178,7 @@ public void onRegisterMsgSuccess(String serverAddress, Channel channel, Object r if (LOGGER.isInfoEnabled()) { LOGGER.info("register RM success. client version:{}, server version:{},channel:{}", registerRMRequest.getVersion(), registerRMResponse.getVersion(), channel); } - getClientChannelManager().registerChannel(serverAddress, channel); + getClientChannelManager().registerChannel(serverAddress, channel, registerRMRequest.getVersion()); String dbKey = getMergedResourceKeys(); if (registerRMRequest.getResourceIds() != null) { if (!registerRMRequest.getResourceIds().equals(dbKey)) { diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java index 28993b61f77..4873f8c3470 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java @@ -218,7 +218,7 @@ public void onRegisterMsgSuccess(String serverAddress, Channel channel, Object r if (LOGGER.isInfoEnabled()) { LOGGER.info("register TM success. client version:{}, server version:{},channel:{}", registerTMRequest.getVersion(), registerTMResponse.getVersion(), channel); } - getClientChannelManager().registerChannel(serverAddress, channel); + getClientChannelManager().registerChannel(serverAddress, channel, registerTMRequest.getVersion()); } @Override diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/ChannelManagerTestHelper.java b/test/src/test/java/org/apache/seata/core/rpc/netty/ChannelManagerTestHelper.java index 0ed3179eccc..2989f4e8337 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/ChannelManagerTestHelper.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/ChannelManagerTestHelper.java @@ -37,4 +37,6 @@ public static Channel getChannel(TmNettyRemotingClient client) { private static NettyClientChannelManager getChannelManager(AbstractNettyRemotingClient remotingClient) { return remotingClient.getClientChannelManager(); } + + } diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/MsgVersionHelperTest.java b/test/src/test/java/org/apache/seata/core/rpc/netty/MsgVersionHelperTest.java new file mode 100644 index 00000000000..cbfe9a2f262 --- /dev/null +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/MsgVersionHelperTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty; + +import io.netty.channel.Channel; +import org.apache.seata.common.ConfigurationKeys; +import org.apache.seata.common.ConfigurationTestHelper; +import org.apache.seata.common.XID; +import org.apache.seata.common.util.NetUtil; +import org.apache.seata.common.util.UUIDGenerator; +import org.apache.seata.core.protocol.ProtocolConstants; +import org.apache.seata.core.protocol.RpcMessage; +import org.apache.seata.core.protocol.Version; +import org.apache.seata.core.protocol.transaction.UndoLogDeleteRequest; +import org.apache.seata.core.rpc.MsgVersionHelper; +import org.apache.seata.server.coordinator.DefaultCoordinator; +import org.apache.seata.server.session.SessionHolder; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * MsgVersionHelper Test + **/ +public class MsgVersionHelperTest { + private static final Logger LOGGER = LoggerFactory.getLogger(MsgVersionHelperTest.class); + + @BeforeAll + public static void init(){ + ConfigurationTestHelper.putConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL, "8091"); + } + @AfterAll + public static void after() { + ConfigurationTestHelper.removeConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL); + } + + public static ThreadPoolExecutor initMessageExecutor() { + return new ThreadPoolExecutor(5, 5, 500, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(20000), new ThreadPoolExecutor.CallerRunsPolicy()); + } + @Test + public void testSendMsgWithResponse() throws Exception { + ThreadPoolExecutor workingThreads = initMessageExecutor(); + NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads); + new Thread(() -> { + SessionHolder.init(null); + nettyRemotingServer.setHandler(DefaultCoordinator.getInstance(nettyRemotingServer)); + // set registry + XID.setIpAddress(NetUtil.getLocalIp()); + XID.setPort(8091); + // init snowflake for transactionId, branchId + UUIDGenerator.init(1L); + nettyRemotingServer.init(); + }).start(); + Thread.sleep(3000); + + String applicationId = "app 1"; + String transactionServiceGroup = "default_tx_group"; + TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup); + tmNettyRemotingClient.init(); + + String serverAddress = "0.0.0.0:8091"; + Channel channel = TmNettyRemotingClient.getInstance().getClientChannelManager().acquireChannel(serverAddress); + + Assertions.assertFalse(MsgVersionHelper.versionNotSupport(channel, buildUndoLogDeleteMsg(ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY))); + Version.putChannelVersion(channel,"0.7.0"); + Assertions.assertTrue(MsgVersionHelper.versionNotSupport(channel, buildUndoLogDeleteMsg(ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY))); + + nettyRemotingServer.destroy(); + tmNettyRemotingClient.destroy(); + } + + private RpcMessage buildUndoLogDeleteMsg(byte messageType) { + RpcMessage rpcMessage = new RpcMessage(); + rpcMessage.setId(100); + rpcMessage.setMessageType(messageType); + rpcMessage.setCodec(ProtocolConstants.CONFIGURED_CODEC); + rpcMessage.setCompressor(ProtocolConstants.CONFIGURED_COMPRESSOR); + rpcMessage.setBody(new UndoLogDeleteRequest()); + return rpcMessage; + } +}