Skip to content

Commit

Permalink
sdk version to protocol version
Browse files Browse the repository at this point in the history
  • Loading branch information
Bughue committed Nov 30, 2024
1 parent d7838ff commit bbf5c1c
Show file tree
Hide file tree
Showing 46 changed files with 404 additions and 289 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,19 @@ public interface ProtocolConstants {
byte VERSION_0 = 0;

/**
* Protocol version
* Protocol version 1
*/
byte VERSION_1 = 1;

/**
* Protocol version 2
*/
byte VERSION_2 = 2;

/**
* Protocol version
*/
byte VERSION = VERSION_1;
byte VERSION = VERSION_2;

/**
* Max frame length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ public interface ProtocolDecoder {

RpcMessage decodeFrame(ByteBuf in);

byte protocolVersion();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@
**/
public interface ProtocolEncoder {
void encode(RpcMessage rpcMessage, ByteBuf out);

byte protocolVersion();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import org.apache.seata.core.protocol.ProtocolConstants;
import org.apache.seata.core.protocol.RpcMessage;
import org.apache.seata.core.protocol.Version;
import org.apache.seata.core.rpc.netty.ProtocolDecoder;
import org.apache.seata.core.serializer.Serializer;
import org.apache.seata.core.serializer.SerializerServiceLoader;
Expand Down Expand Up @@ -131,7 +130,7 @@ public RpcMessage decodeFrame(ByteBuf in) {
bs2[1] = (byte) (0x00FF & typeCode);
System.arraycopy(bs, 0, bs2, 2, length);
byte codecType = isSeataCodec ? SerializerType.SEATA.getCode() : SerializerType.HESSIAN.getCode();
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(codecType), Version.VERSION_0_7_0);
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(codecType), protocolVersion());
rpcMessage.setBody(serializer.deserialize(bs2));
} catch (Exception e) {
LOGGER.error("decode error", e);
Expand All @@ -152,4 +151,9 @@ protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception
throw new DecodeException(exx);
}
}

@Override
public byte protocolVersion(){
return ProtocolConstants.VERSION_0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import io.netty.handler.codec.MessageToByteEncoder;
import org.apache.seata.core.protocol.HeartbeatMessage;
import org.apache.seata.core.protocol.MessageTypeAware;
import org.apache.seata.core.protocol.ProtocolConstants;
import org.apache.seata.core.protocol.RpcMessage;
import org.apache.seata.core.protocol.Version;
import org.apache.seata.core.rpc.netty.ProtocolEncoder;
import org.apache.seata.core.serializer.Serializer;
import org.apache.seata.core.serializer.SerializerServiceLoader;
Expand Down Expand Up @@ -82,7 +82,7 @@ public void encode(RpcMessage message, ByteBuf out) {
}

byte[] bodyBytes = null;
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(codec), Version.VERSION_0_7_0);
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(codec), protocolVersion());
bodyBytes = serializer.serialize(msg.getBody());

if (msg.isSeataCodec()) {
Expand Down Expand Up @@ -118,4 +118,9 @@ protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws
LOGGER.error("Encode request error!", e);
}
}

@Override
public byte protocolVersion(){
return ProtocolConstants.VERSION_0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,12 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.core.compressor.Compressor;
import org.apache.seata.core.compressor.CompressorFactory;
import org.apache.seata.core.exception.DecodeException;
import org.apache.seata.core.protocol.HeartbeatMessage;
import org.apache.seata.core.protocol.ProtocolConstants;
import org.apache.seata.core.protocol.RpcMessage;
import org.apache.seata.core.protocol.Version;
import org.apache.seata.core.rpc.RpcContext;
import org.apache.seata.core.rpc.netty.ChannelManager;
import org.apache.seata.core.rpc.netty.ProtocolDecoder;
import org.apache.seata.core.serializer.Serializer;
import org.apache.seata.core.serializer.SerializerServiceLoader;
Expand Down Expand Up @@ -90,10 +86,6 @@ int lengthFieldLength, FullLength is int(4B). so values is 4

@Override
public RpcMessage decodeFrame(ByteBuf frame) {
return decodeFrame(null, frame);
}

public RpcMessage decodeFrame(ChannelHandlerContext ctx, ByteBuf frame) {
byte b0 = frame.readByte();
byte b1 = frame.readByte();
if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0
Expand Down Expand Up @@ -137,12 +129,7 @@ public RpcMessage decodeFrame(ChannelHandlerContext ctx, ByteBuf frame) {
bs = compressor.decompress(bs);
SerializerType protocolType = SerializerType.getByCode(rpcMessage.getCodec());
if (this.supportDeSerializerTypes.contains(protocolType)) {
String sdkVersion = "";
if(ctx != null && ctx.channel() != null){
sdkVersion = Version.getChannelVersion(ctx.channel());
sdkVersion = StringUtils.isBlank(sdkVersion) ? "" : sdkVersion;
}
Serializer serializer = SerializerServiceLoader.load(protocolType, sdkVersion);
Serializer serializer = SerializerServiceLoader.load(protocolType, ProtocolConstants.VERSION_1);
rpcMessage.setBody(serializer.deserialize(bs));
} else {
throw new IllegalArgumentException("SerializerType not match");
Expand All @@ -161,7 +148,7 @@ protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception
if (decoded instanceof ByteBuf) {
ByteBuf frame = (ByteBuf)decoded;
try {
return decodeFrame(ctx, frame);
return decodeFrame(frame);
} finally {
frame.release();
}
Expand All @@ -173,4 +160,8 @@ protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception
return decoded;
}

@Override
public byte protocolVersion(){
return ProtocolConstants.VERSION_1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.core.protocol.Version;
import org.apache.seata.core.rpc.RpcContext;
import org.apache.seata.core.rpc.netty.ChannelManager;
import org.apache.seata.core.rpc.netty.ProtocolEncoder;
import org.apache.seata.core.serializer.Serializer;
import org.apache.seata.core.compressor.Compressor;
Expand Down Expand Up @@ -68,12 +64,7 @@ public class ProtocolEncoderV1 extends MessageToByteEncoder implements ProtocolE
private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolEncoderV1.class);


@Override
public void encode(RpcMessage message, ByteBuf out) {
doEncode(null, message, out);
}

public void doEncode(ChannelHandlerContext ctx, RpcMessage message, ByteBuf out) {
try {
ProtocolRpcMessageV1 rpcMessage = new ProtocolRpcMessageV1();
rpcMessage.rpcMsg2ProtocolMsg(message);
Expand All @@ -83,7 +74,7 @@ public void doEncode(ChannelHandlerContext ctx, RpcMessage message, ByteBuf out)

byte messageType = rpcMessage.getMessageType();
out.writeBytes(ProtocolConstants.MAGIC_CODE_BYTES);
out.writeByte(ProtocolConstants.VERSION_1);
out.writeByte(protocolVersion());
// full Length(4B) and head length(2B) will fix in the end.
out.writerIndex(out.writerIndex() + 6);
out.writeByte(messageType);
Expand All @@ -103,12 +94,7 @@ public void doEncode(ChannelHandlerContext ctx, RpcMessage message, ByteBuf out)
if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
&& messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
// heartbeat has no body
String sdkVersion = "";
if(ctx != null && ctx.channel() != null){
sdkVersion = Version.getChannelVersion(ctx.channel());
sdkVersion = StringUtils.isBlank(sdkVersion) ? "" : sdkVersion;
}
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()), sdkVersion);
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()), protocolVersion());
bodyBytes = serializer.serialize(rpcMessage.getBody());
Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor());
bodyBytes = compressor.compress(bodyBytes);
Expand Down Expand Up @@ -139,7 +125,7 @@ public void doEncode(ChannelHandlerContext ctx, RpcMessage message, ByteBuf out)
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
try {
if (msg instanceof RpcMessage) {
this.doEncode(ctx, (RpcMessage)msg, out);
this.encode((RpcMessage)msg, out);
} else {
throw new UnsupportedOperationException("Not support this class:" + msg.getClass());
}
Expand All @@ -148,4 +134,8 @@ protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws
}
}

@Override
public byte protocolVersion(){
return ProtocolConstants.VERSION_1;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.core.rpc.netty.v2;

import org.apache.seata.core.protocol.ProtocolConstants;
import org.apache.seata.core.rpc.netty.v1.ProtocolDecoderV1;

/**
* Decoder of protocol-v2
**/
public class ProtocolDecoderV2 extends ProtocolDecoderV1 {

@Override
public byte protocolVersion(){
return ProtocolConstants.VERSION_1;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.core.rpc.netty.v2;

import org.apache.seata.core.protocol.ProtocolConstants;
import org.apache.seata.core.rpc.netty.v1.ProtocolEncoderV1;

/**
* Encoder of protocol-v2
**/
public class ProtocolEncoderV2 extends ProtocolEncoderV1 {
@Override
public byte protocolVersion(){
return ProtocolConstants.VERSION_1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@
import org.apache.seata.common.loader.EnhancedServiceLoader;
import org.apache.seata.common.loader.EnhancedServiceNotFoundException;
import org.apache.seata.common.util.ReflectionUtil;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.config.Configuration;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.core.constants.ConfigurationKeys;
import org.apache.seata.core.protocol.Version;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -68,13 +66,13 @@ private SerializerServiceLoader() {
* @return the service of {@link Serializer}
* @throws EnhancedServiceNotFoundException the enhanced service not found exception
*/
public static Serializer load(SerializerType type, String version) throws EnhancedServiceNotFoundException {
public static Serializer load(SerializerType type, byte version) throws EnhancedServiceNotFoundException {
// The following code is only used to kindly prompt users to add missing dependencies.
if (type == SerializerType.PROTOBUF && !CONTAINS_PROTOBUF_DEPENDENCY) {
throw new EnhancedServiceNotFoundException("The class '" + PROTOBUF_SERIALIZER_CLASS_NAME + "' not found. " +
"Please manually reference 'org.apache.seata:seata-serializer-protobuf' dependency.");
}
version = StringUtils.isBlank(version) ? Version.getCurrent() : version;

String key = serializerKey(type, version);
Serializer serializer = SERIALIZER_MAP.get(key);
if (serializer == null) {
Expand Down Expand Up @@ -111,7 +109,7 @@ public static Serializer load(SerializerType type) throws EnhancedServiceNotFoun
return serializer;
}

private static String serializerKey(SerializerType type, String version) {
private static String serializerKey(SerializerType type, byte version) {
if (type == SerializerType.SEATA) {
return type.name() + version;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

import org.apache.seata.core.protocol.ProtocolConstants;
import org.apache.seata.serializer.seata.protocol.BatchResultMessageCodec;
import org.apache.seata.serializer.seata.protocol.MergeResultMessageCodec;
import org.apache.seata.serializer.seata.protocol.MergedWarpMessageCodec;
Expand Down Expand Up @@ -77,6 +78,8 @@
import org.apache.seata.core.protocol.transaction.GlobalStatusRequest;
import org.apache.seata.core.protocol.transaction.GlobalStatusResponse;
import org.apache.seata.core.protocol.transaction.UndoLogDeleteRequest;
import org.apache.seata.serializer.seata.protocol.v2.RegisterRMResponseCodecV2;
import org.apache.seata.serializer.seata.protocol.v2.RegisterTMResponseCodecV2;

/**
* The type Message codec factory.
Expand Down Expand Up @@ -117,13 +120,21 @@ public static MessageSeataCodec getMessageCodec(short typeCode, byte version) {
msgCodec = new RegisterTMRequestCodec();
break;
case MessageType.TYPE_REG_CLT_RESULT:
msgCodec = new RegisterTMResponseCodec();
if (version == ProtocolConstants.VERSION_2) {
msgCodec = new RegisterTMResponseCodecV2();
} else {
msgCodec = new RegisterTMResponseCodec();
}
break;
case MessageType.TYPE_REG_RM:
msgCodec = new RegisterRMRequestCodec();
break;
case MessageType.TYPE_REG_RM_RESULT:
msgCodec = new RegisterRMResponseCodec();
if (version == ProtocolConstants.VERSION_2) {
msgCodec = new RegisterRMResponseCodecV2();
} else {
msgCodec = new RegisterRMResponseCodec();
}
break;
case MessageType.TYPE_BRANCH_COMMIT:
msgCodec = new BranchCommitRequestCodec();
Expand Down
Loading

0 comments on commit bbf5c1c

Please sign in to comment.