From c1effaaa19ec8914e18ed3d41b573c21af1c344e Mon Sep 17 00:00:00 2001 From: xwm1992 Date: Mon, 13 Dec 2021 20:28:33 +0800 Subject: [PATCH 1/4] [Bug #646] Missing the rocketmq message properties during protocol conversion --- .../apache/eventmesh/common/Constants.java | 2 + .../impl/RocketMQBinaryMessageReader.java | 8 ++-- .../cloudevent/impl/RocketMQHeaders.java | 2 +- .../impl/RocketMQMessageWriter.java | 11 +++-- .../rocketmq/consumer/PushConsumerImpl.java | 46 +++++++------------ .../rocketmq/producer/ProducerImpl.java | 38 +++++++++++---- .../rocketmq/utils/CloudEventUtils.java | 20 ++++++-- .../runtime/constants/EventMeshConstants.java | 10 ++-- .../eventmesh/runtime/demo/CClientDemo.java | 2 +- 9 files changed, 82 insertions(+), 57 deletions(-) diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java index e5c3e5de0e..af222fd8c6 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java @@ -81,4 +81,6 @@ public class Constants { public static final String PROPERTY_MESSAGE_STORE_TIMESTAMP = "storetimestamp"; + public static final String MESSAGE_PROP_SEPARATOR = "99"; + } diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java index a02d07428a..df42f6bd2e 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java @@ -46,13 +46,15 @@ protected boolean isContentTypeHeader(String key) { @Override protected boolean isCloudEventsHeader(String key) { - return key.length() > 3 && key.substring(0, RocketMQHeaders.CE_PREFIX.length()) - .startsWith(RocketMQHeaders.CE_PREFIX); +// return key.length() > 3 && key.substring(0, RocketMQHeaders.CE_PREFIX.length()) +// .startsWith(RocketMQHeaders.CE_PREFIX); + return true; } @Override protected String toCloudEventsKey(String key) { - return key.substring(RocketMQHeaders.CE_PREFIX.length()).toLowerCase(); +// return key.substring(RocketMQHeaders.CE_PREFIX.length()).toLowerCase(); + return key.toLowerCase(); } @Override diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQHeaders.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQHeaders.java index 99f5edb832..64600ae0a7 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQHeaders.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQHeaders.java @@ -27,7 +27,7 @@ public class RocketMQHeaders { public static final String CE_PREFIX = "CE_"; protected static final Map ATTRIBUTES_TO_HEADERS = - MessageUtils.generateAttributesToHeadersMapping(v -> CE_PREFIX + v); + MessageUtils.generateAttributesToHeadersMapping(v -> v); public static final String CONTENT_TYPE = ATTRIBUTES_TO_HEADERS.get(CloudEventV1.DATACONTENTTYPE); diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQMessageWriter.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQMessageWriter.java index d253069d92..d7a472a806 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQMessageWriter.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQMessageWriter.java @@ -68,11 +68,12 @@ public RocketMQMessageWriter(String topic, String keys, String tags) { public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException { - String propName = RocketMQHeaders.ATTRIBUTES_TO_HEADERS.get(name); - if (propName == null) { - propName = RocketMQHeaders.CE_PREFIX + name; - } - message.putUserProperty(propName, value); +// String propName = RocketMQHeaders.ATTRIBUTES_TO_HEADERS.get(name); +// if (propName == null) { +// propName = RocketMQHeaders.CE_PREFIX + name; +// } +// message.putUserProperty(propName, value); + message.putUserProperty(name, value); return this; } diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java index 93c61a3df8..e7e5ae8e7e 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java @@ -183,24 +183,18 @@ public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg, String.valueOf(msg.getStoreTimestamp())); //for rr request/reply - String cluster = msg.getProperty(MessageConst.PROPERTY_CLUSTER); - String replyClient = msg.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT); - String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID); - CloudEvent cloudEvent = RocketMQMessageFactory.createReader(CloudEventUtils.msgConvert(msg)).toEvent(); - CloudEventBuilder cloudEventBuilder; - if (StringUtils.isNotEmpty(cluster)) { - cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension("cluster", cluster); - cloudEvent = cloudEventBuilder.build(); - } - if (StringUtils.isNotEmpty(replyClient)) { - cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension("replytoclient", replyClient); - cloudEvent = cloudEventBuilder.build(); + CloudEventBuilder cloudEventBuilder = null; + for (String sysPropKey : MessageConst.STRING_HASH_SET) { + if (StringUtils.isNotEmpty(msg.getProperty(sysPropKey))) { + String prop = msg.getProperty(sysPropKey); + sysPropKey = sysPropKey.toLowerCase().replaceAll("_", Constants.MESSAGE_PROP_SEPARATOR); + cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension(sysPropKey, prop); + } } - if (StringUtils.isNotEmpty(correlationId)) { - cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension("correlationid", correlationId); + if (cloudEventBuilder != null) { cloudEvent = cloudEventBuilder.build(); } @@ -259,25 +253,19 @@ public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg, msg.putUserProperty(EventMeshConstants.STORE_TIMESTAMP, String.valueOf(msg.getStoreTimestamp())); - //for rr request/reply - String cluster = msg.getProperty(MessageConst.PROPERTY_CLUSTER); - String replyClient = msg.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT); - String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID); - CloudEvent cloudEvent = RocketMQMessageFactory.createReader(CloudEventUtils.msgConvert(msg)).toEvent(); - CloudEventBuilder cloudEventBuilder; - if (StringUtils.isNotEmpty(cluster)) { - cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension("cluster", cluster); - cloudEvent = cloudEventBuilder.build(); - } - if (StringUtils.isNotEmpty(replyClient)) { - cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension("replytoclient", replyClient); - cloudEvent = cloudEventBuilder.build(); + CloudEventBuilder cloudEventBuilder = null; + + for (String sysPropKey : MessageConst.STRING_HASH_SET) { + if (StringUtils.isNotEmpty(msg.getProperty(sysPropKey))) { + String prop = msg.getProperty(sysPropKey); + sysPropKey = sysPropKey.toLowerCase().replaceAll("_", Constants.MESSAGE_PROP_SEPARATOR); + cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension(sysPropKey, prop); + } } - if (StringUtils.isNotEmpty(correlationId)) { - cloudEventBuilder = CloudEventBuilder.from(cloudEvent).withExtension("correlationid", correlationId); + if (cloudEventBuilder != null) { cloudEvent = cloudEventBuilder.build(); } diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java index 619fe63578..94e010bc02 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java @@ -22,6 +22,7 @@ import org.apache.eventmesh.api.SendResult; import org.apache.eventmesh.api.exception.ConnectorRuntimeException; import org.apache.eventmesh.api.exception.OnExceptionContext; +import org.apache.eventmesh.common.Constants; import org.apache.eventmesh.connector.rocketmq.cloudevent.RocketMQMessageFactory; import org.apache.eventmesh.connector.rocketmq.utils.OMSUtil; import org.apache.eventmesh.connector.rocketmq.utils.CloudEventUtils; @@ -75,6 +76,7 @@ public SendResult send(CloudEvent cloudEvent) { this.checkProducerServiceState(rocketmqProducer.getDefaultMQProducerImpl()); org.apache.rocketmq.common.message.Message msg = RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent); + msg = supplySysProp(msg, cloudEvent); String messageId = null; try { org.apache.rocketmq.client.producer.SendResult sendResultRmq = @@ -95,6 +97,7 @@ public void sendOneway(CloudEvent cloudEvent) { this.checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl()); org.apache.rocketmq.common.message.Message msg = RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent); + msg = supplySysProp(msg, cloudEvent); try { this.rocketmqProducer.sendOneway(msg); } catch (Exception e) { @@ -109,7 +112,7 @@ public void sendAsync(CloudEvent cloudEvent, SendCallback sendCallback) { this.checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl()); org.apache.rocketmq.common.message.Message msg = RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent); - + msg = supplySysProp(msg, cloudEvent); try { this.rocketmqProducer.send(msg, this.sendCallbackConvert(msg, sendCallback)); } catch (Exception e) { @@ -125,6 +128,9 @@ public void request(CloudEvent cloudEvent, RequestReplyCallback rrCallback, long this.checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl()); org.apache.rocketmq.common.message.Message msg = RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent); + + msg = supplySysProp(msg, cloudEvent); + rocketmqProducer.request(msg, rrCallbackConvert(msg, rrCallback), timeout); } @@ -133,15 +139,7 @@ public boolean reply(final CloudEvent cloudEvent, final SendCallback sendCallbac org.apache.rocketmq.common.message.Message msg = RocketMQMessageFactory.createWriter(cloudEvent.getSubject()).writeBinary(cloudEvent); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_TYPE, MixAll.REPLY_MESSAGE_FLAG); - if (StringUtils.isNotEmpty(cloudEvent.getExtension("cluster").toString())) { - MessageAccessor.putProperty(msg, MessageConst.PROPERTY_CLUSTER, cloudEvent.getExtension("cluster").toString()); - } - if (StringUtils.isNotEmpty(cloudEvent.getExtension("replytoclient").toString())) { - MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, cloudEvent.getExtension("replytoclient").toString()); - } - if (StringUtils.isNotEmpty(cloudEvent.getExtension("correlationid").toString())) { - MessageAccessor.putProperty(msg, MessageConst.PROPERTY_CORRELATION_ID, cloudEvent.getExtension("correlationid").toString()); - } + msg = supplySysProp(msg, cloudEvent); try { this.rocketmqProducer.send(msg, this.sendCallbackConvert(msg, sendCallback)); @@ -154,10 +152,30 @@ public boolean reply(final CloudEvent cloudEvent, final SendCallback sendCallbac } + private Message supplySysProp(Message msg, CloudEvent cloudEvent) { + for (String sysPropKey : MessageConst.STRING_HASH_SET) { + String ceKey = sysPropKey.toLowerCase().replaceAll("_", Constants.MESSAGE_PROP_SEPARATOR); + if (cloudEvent.getExtension(ceKey) != null && StringUtils.isNotEmpty(cloudEvent.getExtension(ceKey).toString())) { + MessageAccessor.putProperty(msg, sysPropKey, cloudEvent.getExtension(ceKey).toString()); + msg.getProperties().remove(ceKey); + } + } + return msg; + } + private RequestCallback rrCallbackConvert(final Message message, final RequestReplyCallback rrCallback) { return new RequestCallback() { @Override public void onSuccess(org.apache.rocketmq.common.message.Message message) { + // clean the message property to lowercase + for (String sysPropKey : MessageConst.STRING_HASH_SET) { + if (StringUtils.isNotEmpty(message.getProperty(sysPropKey))) { + String prop = message.getProperty(sysPropKey); + String tmpPropKey = sysPropKey.toLowerCase().replaceAll("_", Constants.MESSAGE_PROP_SEPARATOR); + MessageAccessor.putProperty(message, tmpPropKey, prop); + message.getProperties().remove(sysPropKey); + } + } CloudEvent event = RocketMQMessageFactory.createReader(message).toEvent(); rrCallback.onSuccess(event); } diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/CloudEventUtils.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/CloudEventUtils.java index a093265d97..80ec6f3f40 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/CloudEventUtils.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/CloudEventUtils.java @@ -17,13 +17,13 @@ package org.apache.eventmesh.connector.rocketmq.utils; - import org.apache.eventmesh.api.SendResult; import org.apache.eventmesh.common.Constants; -import org.apache.eventmesh.connector.rocketmq.cloudevent.impl.RocketMQHeaders; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; import java.util.Map; @@ -90,13 +90,23 @@ public static Message msgConvert(MessageExt rmqMsg) { MessageAccessor.putProperty(message, buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET), String.valueOf(rmqMsg.getQueueOffset())); + for (String sysPropKey : MessageConst.STRING_HASH_SET) { + if (StringUtils.isNotEmpty(message.getProperty(sysPropKey))) { + String prop = message.getProperty(sysPropKey); + String tmpPropKey = sysPropKey.toLowerCase().replaceAll("_", Constants.MESSAGE_PROP_SEPARATOR); + MessageAccessor.putProperty(message, tmpPropKey, prop); + message.getProperties().remove(sysPropKey); + } + } + return message; } private static String buildCloudEventPropertyKey(String propName) { - return RocketMQHeaders.CE_PREFIX + propName; +// return RocketMQHeaders.CE_PREFIX + propName; + return propName; } public static org.apache.rocketmq.common.message.MessageExt msgConvertExt(Message message) { @@ -121,8 +131,8 @@ public static org.apache.rocketmq.common.message.MessageExt msgConvertExt(Messag rmqMessageExt.setTopic(message.getTopic()); int queueId = - (int) Integer.valueOf(message.getProperty(buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_QUEUE_ID))); - long queueOffset = (long) Long.valueOf( + Integer.parseInt(message.getProperty(buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_QUEUE_ID))); + long queueOffset = Long.parseLong( message.getProperty(buildCloudEventPropertyKey(Constants.PROPERTY_MESSAGE_QUEUE_OFFSET))); //use in manual ack rmqMessageExt.setQueueId(queueId); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/EventMeshConstants.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/EventMeshConstants.java index 2f8bd634a0..bd702989f3 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/EventMeshConstants.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/constants/EventMeshConstants.java @@ -17,6 +17,8 @@ package org.apache.eventmesh.runtime.constants; +import org.apache.eventmesh.common.Constants; + public class EventMeshConstants { public static final String EVENT_STORE_PROPERTIES = "eventstore"; @@ -111,8 +113,10 @@ public class EventMeshConstants { public static final String PROPERTY_RR_REQUEST_ID = "RR_REQUEST_UNIQ_ID"; - public static final String LEAVE_TIME = "leavetime"; //leaveBrokerTime - public static final String ARRIVE_TIME = "arrivetime"; - public static final String STORE_TIME = "storetime"; + public static final String LEAVE_TIME = "leave" + Constants.MESSAGE_PROP_SEPARATOR + "time"; //leaveBrokerTime + public static final String ARRIVE_TIME = "arrive" + Constants.MESSAGE_PROP_SEPARATOR + "time"; + public static final String STORE_TIME = "store" + Constants.MESSAGE_PROP_SEPARATOR + "time"; + + } diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java index 4f33595ee0..cad2c5018b 100644 --- a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/demo/CClientDemo.java @@ -63,7 +63,7 @@ public static void main(String[] args) throws Exception { client.registerSubBusiHandler(new ReceiveMsgHook() { @Override public void handle(Package msg, ChannelHandlerContext ctx) { - if (msg.getHeader().getCommand() == Command.ASYNC_MESSAGE_TO_CLIENT || msg.getHeader().getCommand() == Command.BROADCAST_MESSAGE_TO_CLIENT) { + if (msg.getHeader().getCmd() == Command.ASYNC_MESSAGE_TO_CLIENT || msg.getHeader().getCmd() == Command.BROADCAST_MESSAGE_TO_CLIENT) { logger.error("receive message-------------------------------------" + msg.toString()); } } From 010208e87d6181c9311e295d3ceb8ccb07b0d3c7 Mon Sep 17 00:00:00 2001 From: xwm1992 Date: Mon, 13 Dec 2021 20:46:53 +0800 Subject: [PATCH 2/4] fix checkstyle and gradle module dependency --- .../cloudevent/impl/RocketMQBinaryMessageReader.java | 6 +++--- .../cloudevent/impl/RocketMQMessageWriter.java | 10 +++++----- .../connector/rocketmq/utils/CloudEventUtils.java | 2 +- eventmesh-runtime/build.gradle | 2 -- 4 files changed, 9 insertions(+), 11 deletions(-) diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java index df42f6bd2e..8b120380b5 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQBinaryMessageReader.java @@ -46,14 +46,14 @@ protected boolean isContentTypeHeader(String key) { @Override protected boolean isCloudEventsHeader(String key) { -// return key.length() > 3 && key.substring(0, RocketMQHeaders.CE_PREFIX.length()) -// .startsWith(RocketMQHeaders.CE_PREFIX); + // return key.length() > 3 && key.substring(0, RocketMQHeaders.CE_PREFIX.length()) + //.startsWith(RocketMQHeaders.CE_PREFIX); return true; } @Override protected String toCloudEventsKey(String key) { -// return key.substring(RocketMQHeaders.CE_PREFIX.length()).toLowerCase(); + //return key.substring(RocketMQHeaders.CE_PREFIX.length()).toLowerCase(); return key.toLowerCase(); } diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQMessageWriter.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQMessageWriter.java index d7a472a806..60f0609a36 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQMessageWriter.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/cloudevent/impl/RocketMQMessageWriter.java @@ -68,11 +68,11 @@ public RocketMQMessageWriter(String topic, String keys, String tags) { public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException { -// String propName = RocketMQHeaders.ATTRIBUTES_TO_HEADERS.get(name); -// if (propName == null) { -// propName = RocketMQHeaders.CE_PREFIX + name; -// } -// message.putUserProperty(propName, value); + //String propName = RocketMQHeaders.ATTRIBUTES_TO_HEADERS.get(name); + //if (propName == null) { + //propName = RocketMQHeaders.CE_PREFIX + name; + //} + //message.putUserProperty(propName, value); message.putUserProperty(name, value); return this; } diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/CloudEventUtils.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/CloudEventUtils.java index 80ec6f3f40..2073468895 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/CloudEventUtils.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/CloudEventUtils.java @@ -105,7 +105,7 @@ public static Message msgConvert(MessageExt rmqMsg) { private static String buildCloudEventPropertyKey(String propName) { -// return RocketMQHeaders.CE_PREFIX + propName; + //return RocketMQHeaders.CE_PREFIX + propName; return propName; } diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle index e4e19031a7..fdca886ccb 100644 --- a/eventmesh-runtime/build.gradle +++ b/eventmesh-runtime/build.gradle @@ -34,7 +34,6 @@ dependencies { implementation project(":eventmesh-security-plugin:eventmesh-security-api") implementation project(":eventmesh-security-plugin:eventmesh-security-acl") implementation project(":eventmesh-registry-plugin:eventmesh-registry-api") - implementation project(":eventmesh-registry-plugin:eventmesh-registry-namesrv") implementation project(":eventmesh-admin:eventmesh-admin-rocketmq") implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-api") @@ -51,7 +50,6 @@ dependencies { testImplementation project(":eventmesh-security-plugin:eventmesh-security-api") testImplementation project(":eventmesh-security-plugin:eventmesh-security-acl") testImplementation project(":eventmesh-registry-plugin:eventmesh-registry-api") - testImplementation project(":eventmesh-registry-plugin:eventmesh-registry-namesrv") testImplementation project(":eventmesh-admin:eventmesh-admin-rocketmq") testImplementation project(":eventmesh-protocol-plugin:eventmesh-protocol-api") From 49be49d8f35c4dd600056f0e86eebe5e59930e81 Mon Sep 17 00:00:00 2001 From: xwm1992 Date: Mon, 13 Dec 2021 21:00:34 +0800 Subject: [PATCH 3/4] fix conflicts --- .../eventmesh/connector/rocketmq/producer/ProducerImpl.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java index 66848d547d..89c6b98b4f 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/producer/ProducerImpl.java @@ -39,9 +39,6 @@ import java.util.Properties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import io.cloudevents.CloudEvent; import lombok.extern.slf4j.Slf4j; From f540bc62b80b4264cc500d95529ef3f64765c335 Mon Sep 17 00:00:00 2001 From: xwm1992 Date: Tue, 14 Dec 2021 17:23:01 +0800 Subject: [PATCH 4/4] [ISSUE #405] remove some unused code --- .../org/apache/eventmesh/api/RRCallback.java | 28 ------------ .../api/consumer/MeshMQPushConsumer.java | 40 ----------------- .../api/producer/MeshMQProducer.java | 45 ------------------- .../eventmesh/api/producer/Producer.java | 2 - 4 files changed, 115 deletions(-) delete mode 100644 eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/RRCallback.java delete mode 100644 eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java delete mode 100644 eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/RRCallback.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/RRCallback.java deleted file mode 100644 index eba4acdbaa..0000000000 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/RRCallback.java +++ /dev/null @@ -1,28 +0,0 @@ -///* -// * Licensed to the Apache Software Foundation (ASF) under one or more -// * contributor license agreements. See the NOTICE file distributed with -// * this work for additional information regarding copyright ownership. -// * The ASF licenses this file to You under the Apache License, Version 2.0 -// * (the "License"); you may not use this file except in compliance with -// * the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -//package org.apache.eventmesh.api; -// -//import io.openmessaging.api.Message; -// -//public interface RRCallback { -// -// public void onSuccess(Message msg); -// -// public void onException(Throwable e); -// -//} diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java deleted file mode 100644 index 8747cd9945..0000000000 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java +++ /dev/null @@ -1,40 +0,0 @@ -///* -// * Licensed to the Apache Software Foundation (ASF) under one or more -// * contributor license agreements. See the NOTICE file distributed with -// * this work for additional information regarding copyright ownership. -// * The ASF licenses this file to You under the Apache License, Version 2.0 -// * (the "License"); you may not use this file except in compliance with -// * the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -//package org.apache.eventmesh.api.consumer; -// -//import java.util.List; -//import java.util.Properties; -// -//import org.apache.eventmesh.api.AbstractContext; -//import org.apache.eventmesh.spi.EventMeshExtensionType; -//import org.apache.eventmesh.spi.EventMeshSPI; -// -//@EventMeshSPI(isSingleton = false, eventMeshExtensionType = EventMeshExtensionType.CONNECTOR) -//public interface MeshMQPushConsumer extends Consumer { -// -// void init(Properties keyValue) throws Exception; -// -// void updateOffset(List msgs, AbstractContext context); -// -//// void registerMessageListener(MessageListenerConcurrently messageListenerConcurrently); -// -// void subscribe(String topic, final AsyncMessageListener listener) throws Exception; -// -// @Override -// void unsubscribe(String topic); -//} diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java deleted file mode 100644 index e44bd50a8e..0000000000 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java +++ /dev/null @@ -1,45 +0,0 @@ -///* -// * Licensed to the Apache Software Foundation (ASF) under one or more -// * contributor license agreements. See the NOTICE file distributed with -// * this work for additional information regarding copyright ownership. -// * The ASF licenses this file to You under the Apache License, Version 2.0 -// * (the "License"); you may not use this file except in compliance with -// * the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -//package org.apache.eventmesh.api.producer; -// -//import java.util.Properties; -// -//import io.openmessaging.api.Message; -//import io.openmessaging.api.Producer; -//import io.openmessaging.api.SendCallback; -// -//import org.apache.eventmesh.api.RRCallback; -//import org.apache.eventmesh.spi.EventMeshExtensionType; -//import org.apache.eventmesh.spi.EventMeshSPI; -// -//@EventMeshSPI(isSingleton = false, eventMeshExtensionType = EventMeshExtensionType.CONNECTOR) -//public interface MeshMQProducer extends Producer { -// -// void init(Properties properties) throws Exception; -// -// void send(Message message, SendCallback sendCallback) throws Exception; -// -// void request(Message message, RRCallback rrCallback, long timeout) throws Exception; -// -// boolean reply(final Message message, final SendCallback sendCallback) throws Exception; -// -// void checkTopicExist(String topic) throws Exception; -// -// void setExtFields(); -// -//} diff --git a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/Producer.java b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/Producer.java index 5f14582eac..c526215435 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/Producer.java +++ b/eventmesh-connector-plugin/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/Producer.java @@ -44,8 +44,6 @@ public interface Producer extends LifeCycle { void sendAsync(final CloudEvent cloudEvent, final SendCallback sendCallback); -// void request(CloudEvent cloudEvent, RRCallback rrCallback, long timeout) throws Exception; - void request(CloudEvent cloudEvent, RequestReplyCallback rrCallback, long timeout) throws Exception; boolean reply(final CloudEvent cloudEvent, final SendCallback sendCallback) throws Exception;