diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Header.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Header.java index 1df51427c8..390ebf350a 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Header.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/tcp/Header.java @@ -17,21 +17,32 @@ package org.apache.eventmesh.common.protocol.tcp; +import java.util.HashMap; +import java.util.Map; + public class Header { private Command cmd; private int code; - private String msg; + private String dsec; private String seq; + private Map properties; public Header() { } - public Header(Command cmd, int code, String msg, String seq) { + public Header(Command cmd, int code, String dsec, String seq) { this.cmd = cmd; this.code = code; - this.msg = msg; + this.dsec = dsec; + this.seq = seq; + } + + public Header(int code, String dsec, String seq, Map properties) { + this.code = code; + this.dsec = dsec; this.seq = seq; + this.properties = properties; } public Command getCommand() { @@ -50,12 +61,12 @@ public void setCode(int code) { this.code = code; } - public String getMsg() { - return msg; + public String getDsec() { + return dsec; } - public void setMsg(String msg) { - this.msg = msg; + public void setDsec(String dsec) { + this.dsec = dsec; } public String getSeq() { @@ -66,13 +77,38 @@ public void setSeq(String seq) { this.seq = seq; } + public Map getProperties() { + return properties; + } + + public void setProperties(Map properties) { + this.properties = properties; + } + + public void putProperty(final String name, final Object value) { + if (null == this.properties) { + this.properties = new HashMap<>(); + } + + this.properties.put(name, value); + } + + public Object getProperty(final String name) { + if (null == this.properties) { + this.properties = new HashMap<>(); + } + + return this.properties.get(name); + } + @Override public String toString() { return "Header{" + - "cmd=" + cmd + - ", code=" + code + - ", msg='" + msg + '\'' + - ", seq='" + seq + '\'' + - '}'; + "cmd=" + cmd + + ", code=" + code + + ", dsec='" + dsec + '\'' + + ", seq='" + seq + '\'' + + ", properties=" + properties + + '}'; } } diff --git a/eventmesh-examples/build.gradle b/eventmesh-examples/build.gradle index f62d019d36..771cbdd442 100644 --- a/eventmesh-examples/build.gradle +++ b/eventmesh-examples/build.gradle @@ -26,7 +26,7 @@ dependencies { implementation project(":eventmesh-connector-plugin:eventmesh-connector-api") implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'io.netty:netty-all' - + implementation "io.cloudevents:cloudevents-core" testImplementation project(":eventmesh-sdk-java") testImplementation project(":eventmesh-common") testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api") diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribe.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribe.java index 769fd22268..f9b910857b 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribe.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribe.java @@ -34,7 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class AsyncSubscribe implements ReceiveMsgHook { +public class AsyncSubscribe implements ReceiveMsgHook { public static Logger logger = LoggerFactory.getLogger(AsyncSubscribe.class); @@ -68,7 +68,12 @@ public static void main(String[] agrs) throws Exception { @Override public void handle(Package msg, ChannelHandlerContext ctx) { - EventMeshMessage eventMeshMessage = (EventMeshMessage) msg.getBody(); + EventMeshMessage eventMeshMessage = convert(msg); logger.info("receive async msg====================={}", eventMeshMessage); } + + @Override + public EventMeshMessage convert(Package pkg) { + return (EventMeshMessage) pkg.getBody(); + } } diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribeBroadcast.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribeBroadcast.java index 99735487bc..5381c8ff5f 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribeBroadcast.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/AsyncSubscribeBroadcast.java @@ -34,7 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class AsyncSubscribeBroadcast implements ReceiveMsgHook { +public class AsyncSubscribeBroadcast implements ReceiveMsgHook { public static Logger logger = LoggerFactory.getLogger(AsyncSubscribeBroadcast.class); @@ -68,7 +68,12 @@ public static void main(String[] agrs) throws Exception { @Override public void handle(Package msg, ChannelHandlerContext ctx) { - EventMeshMessage eventMeshMessage = (EventMeshMessage) msg.getBody(); + EventMeshMessage eventMeshMessage = convert(msg); logger.info("receive broadcast msg==============={}", eventMeshMessage); } + + @Override + public EventMeshMessage convert(Package pkg) { + return (EventMeshMessage) pkg.getBody(); + } } diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/SyncResponse.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/SyncResponse.java index 033dfb43ef..0eb4f77d30 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/SyncResponse.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/SyncResponse.java @@ -23,6 +23,7 @@ import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient; import org.apache.eventmesh.common.protocol.SubscriptionType; +import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; import org.apache.eventmesh.common.protocol.tcp.Package; import org.apache.eventmesh.common.protocol.SubscriptionMode; import org.apache.eventmesh.common.protocol.tcp.UserAgent; @@ -30,7 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SyncResponse implements ReceiveMsgHook { +public class SyncResponse implements ReceiveMsgHook { public static Logger logger = LoggerFactory.getLogger(SyncResponse.class); @@ -66,4 +67,9 @@ public void handle(Package msg, ChannelHandlerContext ctx) { Package pkg = EventMeshTestUtils.rrResponse(msg); ctx.writeAndFlush(pkg); } + + @Override + public EventMeshMessage convert(Package pkg) { + return null; + } } diff --git a/eventmesh-sdk-java/build.gradle b/eventmesh-sdk-java/build.gradle index e57a607ed2..0db0df4b2b 100644 --- a/eventmesh-sdk-java/build.gradle +++ b/eventmesh-sdk-java/build.gradle @@ -26,6 +26,8 @@ dependencies { implementation "io.netty:netty-all" implementation "org.apache.httpcomponents:httpclient" + implementation "io.cloudevents:cloudevents-core" + testImplementation project(":eventmesh-common") testImplementation project(":eventmesh-common") diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshClient.java index c98aa7a8b3..96c4aa5808 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/EventMeshClient.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.client.tcp; +import io.cloudevents.CloudEvent; import org.apache.eventmesh.client.tcp.common.AsyncRRCallback; import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; import org.apache.eventmesh.common.protocol.SubscriptionType; @@ -31,6 +32,10 @@ public interface EventMeshClient { Package publish(Package msg, long timeout) throws Exception; + Package publish(CloudEvent cloudEvent, long timeout) throws Exception; + + void broadcast(CloudEvent cloudEvent, long timeout) throws Exception; + void broadcast(Package msg, long timeout) throws Exception; void init() throws Exception; diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/SimplePubClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/SimplePubClient.java index 239cfa423d..95a4a38897 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/SimplePubClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/SimplePubClient.java @@ -18,6 +18,7 @@ package org.apache.eventmesh.client.tcp; +import io.cloudevents.CloudEvent; import org.apache.eventmesh.client.tcp.common.AsyncRRCallback; import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; import org.apache.eventmesh.common.protocol.tcp.Package; @@ -39,6 +40,10 @@ public interface SimplePubClient { Package publish(Package msg, long timeout) throws Exception; + Package publish(CloudEvent cloudEvent, long timeout) throws Exception; + + void broadcast(CloudEvent cloudEvent, long timeout) throws Exception; + void broadcast(Package msg, long timeout) throws Exception; void registerBusiHandler(ReceiveMsgHook handler) throws Exception; diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/EventMeshCommon.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/EventMeshCommon.java index a3f89e8a82..8238f52ceb 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/EventMeshCommon.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/EventMeshCommon.java @@ -120,4 +120,6 @@ public class EventMeshCommon { public static String PREFIX_SESSION_TPS_STAT_EVENTSEND = "event_send_tps_"; public static String PREFIX_SESSION_TPS_STAT_EVENTREV = "event_rev_tps_"; + + public static String CLOUD_EVENTS_PROTOCOL_NAME = "cloudevents"; } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java index 5b9d5143d5..3176588541 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/MessageUtils.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.concurrent.ThreadLocalRandom; +import io.cloudevents.CloudEvent; import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.tcp.Subscription; import org.apache.eventmesh.common.protocol.SubscriptionItem; @@ -76,6 +77,18 @@ public static Package asyncMessageAck(Package in) { return msg; } + public static Package asyncCloudEvent(CloudEvent cloudEvent) { + Package msg = new Package(); + msg.setHeader(new Header(Command.ASYNC_MESSAGE_TO_SERVER, 0, + null, generateRandomString(seqLength))); + msg.getHeader().putProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL, + EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME); + msg.getHeader().putProperty(PropertyConst.PROPERTY_CLOUD_EVENT_VERSION, + cloudEvent.getSpecVersion().toString()); + msg.setBody(cloudEvent); + return msg; + } + public static Package broadcastMessageAck(Package in) { Package msg = new Package(); msg.setHeader(new Header(Command.BROADCAST_MESSAGE_TO_CLIENT_ACK, 0, null, in.getHeader().getSeq())); diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/PropertyConst.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/PropertyConst.java new file mode 100644 index 0000000000..f2e89f0698 --- /dev/null +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/PropertyConst.java @@ -0,0 +1,11 @@ +package org.apache.eventmesh.client.tcp.common; + +/** + * properties key name + */ +public class PropertyConst { + + public static String PROPERTY_MESSAGE_PROTOCOL = "message_protocol"; + + public static String PROPERTY_CLOUD_EVENT_VERSION = "cloud_event_version"; +} diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/ReceiveMsgHook.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/ReceiveMsgHook.java index bc60a6a8a2..858e80bdd0 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/ReceiveMsgHook.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/ReceiveMsgHook.java @@ -20,6 +20,13 @@ import io.netty.channel.ChannelHandlerContext; import org.apache.eventmesh.common.protocol.tcp.Package; -public interface ReceiveMsgHook { +/** + * ReceiveMsgHook. + * + * @param receive message type. + */ +public interface ReceiveMsgHook { void handle(Package msg, ChannelHandlerContext ctx); + + T convert(Package pkg); } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/DefaultEventMeshClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/DefaultEventMeshClient.java index c75922f688..ef2db7f167 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/DefaultEventMeshClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/DefaultEventMeshClient.java @@ -18,6 +18,7 @@ package org.apache.eventmesh.client.tcp.impl; +import io.cloudevents.CloudEvent; import org.apache.eventmesh.client.tcp.EventMeshClient; import org.apache.eventmesh.client.tcp.SimplePubClient; import org.apache.eventmesh.client.tcp.SimpleSubClient; @@ -73,10 +74,20 @@ public Package publish(Package msg, long timeout) throws Exception { return this.pubClient.publish(msg, timeout); } + @Override + public Package publish(CloudEvent cloudEvent, long timeout) throws Exception { + return this.pubClient.publish(cloudEvent, timeout); + } + public void broadcast(Package msg, long timeout) throws Exception { this.pubClient.broadcast(msg, timeout); } + @Override + public void broadcast(CloudEvent cloudEvent, long timeout) throws Exception { + this.pubClient.broadcast(cloudEvent, timeout); + } + public void init() throws Exception { this.subClient.init(); this.pubClient.init(); diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimplePubClientImpl.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimplePubClientImpl.java index 0e1f6b57d7..36db2c6a33 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimplePubClientImpl.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimplePubClientImpl.java @@ -21,6 +21,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import io.cloudevents.CloudEvent; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; @@ -29,6 +30,7 @@ import org.apache.eventmesh.client.tcp.common.AsyncRRCallback; import org.apache.eventmesh.client.tcp.common.EventMeshCommon; import org.apache.eventmesh.client.tcp.common.MessageUtils; +import org.apache.eventmesh.client.tcp.common.PropertyConst; import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook; import org.apache.eventmesh.client.tcp.common.RequestContext; import org.apache.eventmesh.client.tcp.common.TcpClient; @@ -145,6 +147,25 @@ public Package publish(Package msg, long timeout) throws Exception { return io(msg, timeout); } + + @Override + public Package publish(CloudEvent cloudEvent, long timeout) throws Exception { + Package msg = MessageUtils.asyncCloudEvent(cloudEvent); + logger.info("SimplePubClientImpl cloud event|{}|publish|send|type={}|protocol={}|msg={}", + clientNo, msg.getHeader().getCommand(), + msg.getHeader().getProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL), msg); + return io(MessageUtils.asyncCloudEvent(cloudEvent), timeout); + } + + @Override + public void broadcast(CloudEvent cloudEvent, long timeout) throws Exception { + Package msg = MessageUtils.asyncCloudEvent(cloudEvent); + logger.info("SimplePubClientImpl cloud event|{}|publish|send|type={}|protocol={}|msg={}", + clientNo, msg.getHeader().getCommand(), + msg.getHeader().getProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL), msg); + super.send(msg); + } + /** * Send broadcast message * diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribe.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribe.java index abd3ef8a87..9929d974a3 100644 --- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribe.java +++ b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribe.java @@ -32,7 +32,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class AsyncSubscribe implements ReceiveMsgHook { +public class AsyncSubscribe implements ReceiveMsgHook { public static Logger logger = LoggerFactory.getLogger(AsyncSubscribe.class); @@ -63,7 +63,12 @@ public static void main(String[] agrs) throws Exception { @Override public void handle(Package msg, ChannelHandlerContext ctx) { - EventMeshMessage eventMeshMessage = (EventMeshMessage) msg.getBody(); + EventMeshMessage eventMeshMessage = convert(msg); logger.info("receive async msg====================={}", eventMeshMessage); } + + @Override + public EventMeshMessage convert(Package pkg) { + return (EventMeshMessage) pkg.getBody(); + } } diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java index 52706c176a..e950122be2 100644 --- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java +++ b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/AsyncSubscribeBroadcast.java @@ -32,7 +32,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class AsyncSubscribeBroadcast implements ReceiveMsgHook { +public class AsyncSubscribeBroadcast implements ReceiveMsgHook { public static Logger logger = LoggerFactory.getLogger(AsyncSubscribeBroadcast.class); @@ -63,7 +63,12 @@ public static void main(String[] agrs) throws Exception { @Override public void handle(Package msg, ChannelHandlerContext ctx) { - EventMeshMessage eventMeshMessage = (EventMeshMessage) msg.getBody(); + EventMeshMessage eventMeshMessage = convert(msg); logger.info("receive broadcast msg==============={}", eventMeshMessage); } + + @Override + public EventMeshMessage convert(Package pkg) { + return (EventMeshMessage) pkg.getBody(); + } } diff --git a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/SyncResponse.java b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/SyncResponse.java index 96b415ca59..68ea58b086 100644 --- a/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/SyncResponse.java +++ b/eventmesh-sdk-java/src/test/java/org/apache/eventmesh/client/tcp/demo/SyncResponse.java @@ -25,12 +25,13 @@ import org.apache.eventmesh.client.tcp.impl.DefaultEventMeshClient; import org.apache.eventmesh.common.protocol.SubscriptionType; import org.apache.eventmesh.common.protocol.SubscriptionMode; +import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage; import org.apache.eventmesh.common.protocol.tcp.UserAgent; import org.apache.eventmesh.common.protocol.tcp.Package; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SyncResponse implements ReceiveMsgHook { +public class SyncResponse implements ReceiveMsgHook { public static Logger logger = LoggerFactory.getLogger(SyncResponse.class); @@ -66,4 +67,9 @@ public void handle(Package msg, ChannelHandlerContext ctx) { Package pkg = EventMeshTestUtils.rrResponse(msg); ctx.writeAndFlush(pkg); } + + @Override + public EventMeshMessage convert(Package pkg) { + return null; + } }