Skip to content

Commit

Permalink
bugfix: fix hsf ConsumerModel convert error (apache#6626)
Browse files Browse the repository at this point in the history
  • Loading branch information
slievrly authored Jun 25, 2024
1 parent eb19e98 commit ac2e7a2
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 27 deletions.
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Add changes here for all PR submitted to the 2.x branch.
### bugfix:
- [[#6592](https://github.com/apache/incubator-seata/pull/6592)] fix @Async annotation not working in ClusterWatcherManager
- [[#6624](https://github.com/apache/incubator-seata/pull/6624)] fix Alibaba Dubbo convert error
- [[#6626](https://github.com/apache/incubator-seata/pull/6626)] fix hsf ConsumerModel convert error


### optimize:
Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
### bugfix:
- [[#6592](https://github.com/apache/incubator-seata/pull/6592)] fix @Async注解ClusterWatcherManager中不生效的问题
- [[#6624](https://github.com/apache/incubator-seata/pull/6624)] 修复 Alibaba Dubbo 转换错误
- [[#6626](https://github.com/apache/incubator-seata/pull/6626)] 修复 hsf ConsumerModel 转换错误


### optimize:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.integration.hsf;

import com.taobao.hsf.context.RPCContext;
import com.taobao.hsf.invocation.Invocation;
import com.taobao.hsf.invocation.InvocationHandler;
import com.taobao.hsf.invocation.RPCResult;
import com.taobao.hsf.invocation.filter.ClientFilter;
import com.taobao.hsf.util.concurrent.ListenableFuture;
import org.apache.seata.core.context.RootContext;
import org.apache.seata.core.model.BranchType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The type Hsf transaction consumer filter.
*/
public class HsfTransactionConsumerFilter implements ClientFilter {

private static final Logger LOGGER = LoggerFactory.getLogger(HsfTransactionConsumerFilter.class);

@Override
public ListenableFuture<RPCResult> invoke(InvocationHandler nextHandler, Invocation invocation) throws Throwable {
String xid = RootContext.getXID();
BranchType branchType = RootContext.getBranchType();

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("xid in RootContext[{}], branchType in RootContext[{}]", xid, branchType);
}
if (xid != null) {
RPCContext.getClientContext().putAttachment(RootContext.KEY_XID, xid);
RPCContext.getClientContext().putAttachment(RootContext.KEY_BRANCH_TYPE, branchType.name());
}
try {
return nextHandler.invoke(invocation);
} finally {
RPCContext.getClientContext().removeAttachment(RootContext.KEY_XID);
RPCContext.getClientContext().removeAttachment(RootContext.KEY_BRANCH_TYPE);
}
}

@Override
public void onResponse(Invocation invocation, RPCResult rpcResult) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.taobao.hsf.invocation.Invocation;
import com.taobao.hsf.invocation.InvocationHandler;
import com.taobao.hsf.invocation.RPCResult;
import com.taobao.hsf.invocation.filter.ClientFilter;
import com.taobao.hsf.invocation.filter.ServerFilter;
import com.taobao.hsf.util.concurrent.ListenableFuture;
import org.apache.seata.common.util.StringUtils;
Expand All @@ -30,42 +29,33 @@
import org.slf4j.LoggerFactory;

/**
* The type Transaction propagation filter.
*
* The type Hsf transaction provider filter.
*/
public class HsfTransactionFilter implements ClientFilter, ServerFilter {
public class HsfTransactionProviderFilter implements ServerFilter {

private static final Logger LOGGER = LoggerFactory.getLogger(HsfTransactionFilter.class);
private static final Logger LOGGER = LoggerFactory.getLogger(HsfTransactionProviderFilter.class);

@Override
public ListenableFuture<RPCResult> invoke(InvocationHandler invocationHandler, Invocation invocation)
throws Throwable {
String xid = RootContext.getXID();
BranchType branchType = RootContext.getBranchType();
public ListenableFuture<RPCResult> invoke(InvocationHandler nextHandler, Invocation invocation) throws Throwable {

Object rpcXid = RPCContext.getServerContext().getAttachment(RootContext.KEY_XID);
Object rpcBranchType = RPCContext.getServerContext().getAttachment(RootContext.KEY_BRANCH_TYPE);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("xid in RootContext[{}] xid in RpcContext[{}]", xid, rpcXid);
LOGGER.debug("xid in RpcContext[{}], branchType in RpcContext[{}]", rpcXid, rpcBranchType);
}
boolean bind = false;
if (xid != null) {
RPCContext.getClientContext().putAttachment(RootContext.KEY_XID, xid);
RPCContext.getClientContext().putAttachment(RootContext.KEY_BRANCH_TYPE, branchType.name());
} else {
if (rpcXid != null) {
RootContext.bind(rpcXid.toString());
if (StringUtils.equals(BranchType.TCC.name(), rpcBranchType.toString())) {
RootContext.bindBranchType(BranchType.TCC);
}
bind = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind xid [{}] branchType [{}] to RootContext", rpcXid, rpcBranchType);
}
if (rpcXid != null) {
RootContext.bind(rpcXid.toString());
if (StringUtils.equals(BranchType.TCC.name(), rpcBranchType.toString())) {
RootContext.bindBranchType(BranchType.TCC);
}
bind = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind xid [{}] branchType [{}] to RootContext", rpcXid, rpcBranchType);
}
}
try {
return invocationHandler.invoke(invocation);
return nextHandler.invoke(invocation);
} finally {
if (bind) {
BranchType previousBranchType = RootContext.getBranchType();
Expand All @@ -89,8 +79,6 @@ public ListenableFuture<RPCResult> invoke(InvocationHandler invocationHandler, I
}
}
}
RPCContext.getClientContext().removeAttachment(RootContext.KEY_XID);
RPCContext.getClientContext().removeAttachment(RootContext.KEY_BRANCH_TYPE);
RPCContext.getServerContext().removeAttachment(RootContext.KEY_XID);
RPCContext.getServerContext().removeAttachment(RootContext.KEY_BRANCH_TYPE);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
org.apache.seata.integration.hsf.HsfTransactionFilter
org.apache.seata.integration.hsf.HsfTransactionConsumerFilter
org.apache.seata.integration.hsf.HsfTransactionProviderFilter

0 comments on commit ac2e7a2

Please sign in to comment.