Skip to content

Commit

Permalink
[ISSUE apache#3852]Replace some logic with existing APIs. Merge some …
Browse files Browse the repository at this point in the history
…logic. (apache#3854)

* Fix issue3852

* Fix checkstyle error.

* Revoke controversial import static.
  • Loading branch information
pandaapo authored May 9, 2023
1 parent ea4b652 commit 96ee1e9
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.commons.lang3.StringUtils;

import java.nio.charset.StandardCharsets;
import java.util.Objects;

import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
Expand All @@ -40,15 +39,13 @@ public class TcpMessageProtocolResolver {

public static CloudEvent buildEvent(Header header, String cloudEventJson)
throws ProtocolHandleException {
CloudEventBuilder cloudEventBuilder;
final CloudEventBuilder cloudEventBuilder;

String protocolType = header.getProperty(Constants.PROTOCOL_TYPE).toString();
String protocolVersion = header.getProperty(Constants.PROTOCOL_VERSION).toString();
String protocolDesc = header.getProperty(Constants.PROTOCOL_DESC).toString();

if (StringUtils.isBlank(protocolType)
|| StringUtils.isBlank(protocolVersion)
|| StringUtils.isBlank(protocolDesc)) {
if (StringUtils.isAnyBlank(protocolType, protocolVersion, protocolDesc)) {
throw new ProtocolHandleException(
String.format("invalid protocol params protocolType %s|protocolVersion %s|protocolDesc %s",
protocolType, protocolVersion, protocolDesc));
Expand All @@ -63,29 +60,16 @@ public static CloudEvent buildEvent(Header header, String cloudEventJson)
throw new ProtocolHandleException(String.format("Unsupported protocolType: %s", protocolType));
}

if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) {
if (StringUtils.equalsAny(protocolVersion, SpecVersion.V1.toString(), SpecVersion.V03.toString())) {
// todo:resolve different format
EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE);
Preconditions
.checkNotNull(eventFormat, String.format("EventFormat: %s is not supported", JsonFormat.CONTENT_TYPE));
.checkNotNull(eventFormat, "EventFormat: %s is not supported", JsonFormat.CONTENT_TYPE);
CloudEvent event = eventFormat.deserialize(cloudEventJson.getBytes(StandardCharsets.UTF_8));
cloudEventBuilder = CloudEventBuilder.v1(event);
for (String propKey : header.getProperties().keySet()) {
cloudEventBuilder.withExtension(propKey, header.getProperty(propKey).toString());
}

return cloudEventBuilder.build();

} else if (StringUtils.equals(SpecVersion.V03.toString(), protocolVersion)) {
// todo:resolve different format
CloudEvent event = Objects.requireNonNull(EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE))
.deserialize(cloudEventJson.getBytes(StandardCharsets.UTF_8));
cloudEventBuilder = CloudEventBuilder.v03(event);

for (String propKey : header.getProperties().keySet()) {
cloudEventBuilder.withExtension(propKey, header.getProperty(propKey).toString());
}

cloudEventBuilder = CloudEventBuilder.from(event);
header.getProperties().forEach((k, v) -> {
cloudEventBuilder.withExtension(k, v.toString());
});
return cloudEventBuilder.build();
} else {
throw new ProtocolHandleException(String.format("Unsupported protocolVersion: %s", protocolVersion));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.commons.lang3.StringUtils;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
Expand All @@ -45,9 +44,7 @@ public static CloudEvent buildEvent(Header header, EventMeshMessage message) thr
String protocolVersion = header.getProperty(Constants.PROTOCOL_VERSION).toString();
String protocolDesc = header.getProperty(Constants.PROTOCOL_DESC).toString();

if (StringUtils.isBlank(protocolType)
|| StringUtils.isBlank(protocolVersion)
|| StringUtils.isBlank(protocolDesc)) {
if (StringUtils.isAnyBlank(protocolType, protocolVersion, protocolDesc)) {
throw new ProtocolHandleException(String.format("invalid protocol params protocolType %s|protocolVersion %s|protocolDesc %s",
protocolType, protocolVersion, protocolDesc));
}
Expand All @@ -57,20 +54,15 @@ public static CloudEvent buildEvent(Header header, EventMeshMessage message) thr
}

String topic = message.getTopic();

String content = message.getBody();

if (StringUtils.equals(SpecVersion.V1.toString(), protocolVersion)) {
cloudEventBuilder = CloudEventBuilder.v1();

} else if (StringUtils.equals(SpecVersion.V03.toString(), protocolVersion)) {
cloudEventBuilder = CloudEventBuilder.v03();
if (StringUtils.equalsAny(protocolVersion, SpecVersion.V1.toString(), SpecVersion.V03.toString())) {
cloudEventBuilder = CloudEventBuilder.fromSpecVersion(SpecVersion.parse(protocolVersion));
} else {
throw new ProtocolHandleException(String.format("Unsupported protocolVersion: %s", protocolVersion));
}

cloudEventBuilder = cloudEventBuilder
.withId(header.getSeq())
cloudEventBuilder.withId(header.getSeq())
.withSource(URI.create("/"))
.withType("eventmeshmessage")
.withSubject(topic)
Expand All @@ -80,19 +72,19 @@ public static CloudEvent buildEvent(Header header, EventMeshMessage message) thr
cloudEventBuilder.withDataContentType(message.getHeaders().get(Constants.DATA_CONTENT_TYPE));
}

for (String propKey : header.getProperties().keySet()) {
for (Map.Entry<String, Object> prop : header.getProperties().entrySet()) {
try {
cloudEventBuilder.withExtension(propKey, header.getProperty(propKey).toString());
cloudEventBuilder.withExtension(prop.getKey(), prop.getValue().toString());
} catch (Exception e) {
throw new ProtocolHandleException(String.format("Abnormal propKey: %s", propKey), e);
throw new ProtocolHandleException(String.format("Abnormal propKey: %s", prop.getKey()), e);
}
}

for (String propKey : message.getProperties().keySet()) {
for (Map.Entry<String, String> prop : message.getProperties().entrySet()) {
try {
cloudEventBuilder.withExtension(propKey, message.getProperties().get(propKey));
cloudEventBuilder.withExtension(prop.getKey(), prop.getValue());
} catch (Exception e) {
throw new ProtocolHandleException(String.format("Abnormal propKey: %s", propKey), e);
throw new ProtocolHandleException(String.format("Abnormal propKey: %s", prop.getKey()), e);
}
}

Expand All @@ -103,12 +95,12 @@ public static CloudEvent buildEvent(Header header, EventMeshMessage message) thr
public static Package buildEventMeshMessage(CloudEvent cloudEvent) {
EventMeshMessage eventMeshMessage = new EventMeshMessage();
eventMeshMessage.setTopic(cloudEvent.getSubject());
eventMeshMessage.setBody(new String(Objects.requireNonNull(cloudEvent.getData()).toBytes(), StandardCharsets.UTF_8));
eventMeshMessage.setBody(new String(Objects.requireNonNull(cloudEvent.getData()).toBytes(), Constants.DEFAULT_CHARSET));

Map<String, String> prop = new HashMap<>();
for (String extKey : cloudEvent.getExtensionNames()) {
prop.put(extKey, Objects.requireNonNull(cloudEvent.getExtension(extKey)).toString());
}
cloudEvent.getExtensionNames().forEach(k -> {
prop.put(k, Objects.requireNonNull(cloudEvent.getExtension(k)).toString());
});
eventMeshMessage.setProperties(prop);

Package pkg = new Package();
Expand Down

0 comments on commit 96ee1e9

Please sign in to comment.