Skip to content

Commit

Permalink
[ISSUE apache#3652] refactored the code, extracted a new method and u…
Browse files Browse the repository at this point in the history
…sed switch

* apache#3652 refactored the code, extracted a new method and used switch to make code more readable

* removed redundant declarations

* using constant expression for switch comparison

* fix had done a static import of SpecVersion, removed it but missed reverting related changes

* removed unused import
  • Loading branch information
dipankr authored Apr 19, 2023
1 parent 159bd6f commit 3ad1a0c
Showing 1 changed file with 40 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageBatchV2RequestBody;
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageRequestBody;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
import org.apache.eventmesh.common.protocol.http.common.ProtocolVersion;
import org.apache.eventmesh.common.protocol.http.header.Header;
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageBatchV2RequestHeader;
import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;
Expand All @@ -42,84 +41,50 @@ public static CloudEvent buildEvent(Header header, Body body) throws ProtocolHan
SendMessageBatchV2RequestHeader sendMessageBatchV2RequestHeader = (SendMessageBatchV2RequestHeader) header;
SendMessageBatchV2RequestBody sendMessageBatchV2RequestBody = (SendMessageBatchV2RequestBody) body;

String protocolType = sendMessageBatchV2RequestHeader.getProtocolType();
String protocolDesc = sendMessageBatchV2RequestHeader.getProtocolDesc();
String protocolVersion = sendMessageBatchV2RequestHeader.getProtocolVersion();

String code = sendMessageBatchV2RequestHeader.getCode();
String env = sendMessageBatchV2RequestHeader.getEnv();
String idc = sendMessageBatchV2RequestHeader.getIdc();
String ip = sendMessageBatchV2RequestHeader.getIp();
String pid = sendMessageBatchV2RequestHeader.getPid();
String sys = sendMessageBatchV2RequestHeader.getSys();
String username = sendMessageBatchV2RequestHeader.getUsername();
String passwd = sendMessageBatchV2RequestHeader.getPasswd();
ProtocolVersion version = sendMessageBatchV2RequestHeader.getVersion();
String language = sendMessageBatchV2RequestHeader.getLanguage();

String content = sendMessageBatchV2RequestBody.getMsg();

CloudEvent event = null;
CloudEventBuilder cloudEventBuilder;
if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) {
cloudEventBuilder = CloudEventBuilder.v1();

cloudEventBuilder = cloudEventBuilder.withId(sendMessageBatchV2RequestBody.getBizSeqNo())
.withSubject(sendMessageBatchV2RequestBody.getTopic())
.withType("eventmeshmessage")
.withSource(URI.create("/"))
.withData(content.getBytes(StandardCharsets.UTF_8))
.withExtension(ProtocolKey.REQUEST_CODE, code)
.withExtension(ProtocolKey.ClientInstanceKey.ENV, env)
.withExtension(ProtocolKey.ClientInstanceKey.IDC, idc)
.withExtension(ProtocolKey.ClientInstanceKey.IP, ip)
.withExtension(ProtocolKey.ClientInstanceKey.PID, pid)
.withExtension(ProtocolKey.ClientInstanceKey.SYS, sys)
.withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username)
.withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd)
.withExtension(ProtocolKey.VERSION, version.getVersion())
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.withExtension(SendMessageBatchV2RequestBody.BIZSEQNO, sendMessageBatchV2RequestBody.getBizSeqNo())
.withExtension(SendMessageBatchV2RequestBody.PRODUCERGROUP, sendMessageBatchV2RequestBody.getProducerGroup())
.withExtension(SendMessageBatchV2RequestBody.TTL, sendMessageBatchV2RequestBody.getTtl());
if (StringUtils.isNotEmpty(sendMessageBatchV2RequestBody.getTag())) {
cloudEventBuilder = cloudEventBuilder.withExtension(SendMessageRequestBody.TAG, sendMessageBatchV2RequestBody.getTag());
}
event = cloudEventBuilder.build();
} else if (StringUtils.equals(SpecVersion.V03.toString(), protocolVersion)) {
cloudEventBuilder = CloudEventBuilder.v03();
cloudEventBuilder = cloudEventBuilder.withId(sendMessageBatchV2RequestBody.getBizSeqNo())
.withSubject(sendMessageBatchV2RequestBody.getTopic())
.withType("eventmeshmessage")
.withSource(URI.create("/"))
.withData(content.getBytes(StandardCharsets.UTF_8))
.withExtension(ProtocolKey.REQUEST_CODE, code)
.withExtension(ProtocolKey.ClientInstanceKey.ENV, env)
.withExtension(ProtocolKey.ClientInstanceKey.IDC, idc)
.withExtension(ProtocolKey.ClientInstanceKey.IP, ip)
.withExtension(ProtocolKey.ClientInstanceKey.PID, pid)
.withExtension(ProtocolKey.ClientInstanceKey.SYS, sys)
.withExtension(ProtocolKey.ClientInstanceKey.USERNAME, username)
.withExtension(ProtocolKey.ClientInstanceKey.PASSWD, passwd)
.withExtension(ProtocolKey.VERSION, version.getVersion())
.withExtension(ProtocolKey.LANGUAGE, language)
.withExtension(ProtocolKey.PROTOCOL_TYPE, protocolType)
.withExtension(ProtocolKey.PROTOCOL_DESC, protocolDesc)
.withExtension(ProtocolKey.PROTOCOL_VERSION, protocolVersion)
.withExtension(SendMessageBatchV2RequestBody.BIZSEQNO, sendMessageBatchV2RequestBody.getBizSeqNo())
.withExtension(SendMessageBatchV2RequestBody.PRODUCERGROUP, sendMessageBatchV2RequestBody.getProducerGroup())
.withExtension(SendMessageBatchV2RequestBody.TTL, sendMessageBatchV2RequestBody.getTtl());
if (StringUtils.isNotEmpty(sendMessageBatchV2RequestBody.getTag())) {
cloudEventBuilder = cloudEventBuilder.withExtension(SendMessageRequestBody.TAG, sendMessageBatchV2RequestBody.getTag());
}
event = cloudEventBuilder.build();
switch (SpecVersion.parse(sendMessageBatchV2RequestHeader.getProtocolVersion())) {
case V1:
cloudEventBuilder = CloudEventBuilder.v1();
break;
case V03:
cloudEventBuilder = CloudEventBuilder.v03();
break;
default:
return null; // unsupported protocol version
}
return event;

return getBuildCloudEvent(sendMessageBatchV2RequestHeader, sendMessageBatchV2RequestBody, cloudEventBuilder);
} catch (Exception e) {
throw new ProtocolHandleException(e.getMessage(), e.getCause());
}
}

private static CloudEvent getBuildCloudEvent(SendMessageBatchV2RequestHeader sendMessageBatchV2RequestHeader,
SendMessageBatchV2RequestBody sendMessageBatchV2RequestBody, CloudEventBuilder cloudEventBuilder) {
cloudEventBuilder = cloudEventBuilder.withId(sendMessageBatchV2RequestBody.getBizSeqNo())
.withSubject(sendMessageBatchV2RequestBody.getTopic())
.withType("eventmeshmessage")
.withSource(URI.create("/"))
.withData(sendMessageBatchV2RequestBody.getMsg().getBytes(StandardCharsets.UTF_8))
.withExtension(ProtocolKey.REQUEST_CODE, sendMessageBatchV2RequestHeader.getCode())
.withExtension(ProtocolKey.ClientInstanceKey.ENV, sendMessageBatchV2RequestHeader.getEnv())
.withExtension(ProtocolKey.ClientInstanceKey.IDC, sendMessageBatchV2RequestHeader.getIdc())
.withExtension(ProtocolKey.ClientInstanceKey.IP, sendMessageBatchV2RequestHeader.getIp())
.withExtension(ProtocolKey.ClientInstanceKey.PID, sendMessageBatchV2RequestHeader.getPid())
.withExtension(ProtocolKey.ClientInstanceKey.SYS, sendMessageBatchV2RequestHeader.getSys())
.withExtension(ProtocolKey.ClientInstanceKey.USERNAME, sendMessageBatchV2RequestHeader.getUsername())
.withExtension(ProtocolKey.ClientInstanceKey.PASSWD, sendMessageBatchV2RequestHeader.getPasswd())
.withExtension(ProtocolKey.VERSION, sendMessageBatchV2RequestHeader.getVersion().getVersion())
.withExtension(ProtocolKey.LANGUAGE, sendMessageBatchV2RequestHeader.getLanguage())
.withExtension(ProtocolKey.PROTOCOL_TYPE, sendMessageBatchV2RequestHeader.getProtocolType())
.withExtension(ProtocolKey.PROTOCOL_DESC, sendMessageBatchV2RequestHeader.getProtocolDesc())
.withExtension(ProtocolKey.PROTOCOL_VERSION, sendMessageBatchV2RequestHeader.getProtocolVersion())
.withExtension(SendMessageBatchV2RequestBody.BIZSEQNO, sendMessageBatchV2RequestBody.getBizSeqNo())
.withExtension(SendMessageBatchV2RequestBody.PRODUCERGROUP, sendMessageBatchV2RequestBody.getProducerGroup())
.withExtension(SendMessageBatchV2RequestBody.TTL, sendMessageBatchV2RequestBody.getTtl());
if (StringUtils.isNotEmpty(sendMessageBatchV2RequestBody.getTag())) {
cloudEventBuilder = cloudEventBuilder.withExtension(SendMessageRequestBody.TAG, sendMessageBatchV2RequestBody.getTag());
}
return cloudEventBuilder.build();
}
}

0 comments on commit 3ad1a0c

Please sign in to comment.