Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #391] Optimize interface design in eventmesh-connector-api #392

Merged
merged 13 commits into from
Jun 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.api;

import io.openmessaging.api.AsyncConsumeContext;

public abstract class MeshAsyncConsumeContext extends AsyncConsumeContext {
private AbstractContext context;
public enum EventMeshAction {
CommitMessage,

public AbstractContext getContext() {
return context;
}
ReconsumeLater,

public void setContext(AbstractContext context) {
this.context = context;
}
ManualAck
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eventmesh.api;

import io.openmessaging.api.Action;
import io.openmessaging.api.AsyncConsumeContext;

public abstract class EventMeshAsyncConsumeContext extends AsyncConsumeContext {

private AbstractContext abstractContext;

public AbstractContext getAbstractContext() {
return abstractContext;
}

public void setAbstractContext(AbstractContext abstractContext) {
this.abstractContext = abstractContext;
}

public abstract void commit(EventMeshAction action);

@Override
public void commit(Action action) {
throw new UnsupportedOperationException("not support yet");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ public interface MeshMQPushConsumer extends Consumer {

void init(Properties keyValue) throws Exception;

@Override
void start();

// void updateOffset(List<MessageExt> msgs, ConsumeConcurrentlyContext context);

void updateOffset(List<Message> msgs, AbstractContext context);

// void registerMessageListener(MessageListenerConcurrently messageListenerConcurrently);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ public interface MeshMQProducer extends Producer {

void init(Properties properties) throws Exception;

@Override
void start();

void send(Message message, SendCallback sendCallback) throws Exception;

void request(Message message, SendCallback sendCallback, RRCallback rrCallback, long timeout) throws Exception;
Expand All @@ -40,12 +37,8 @@ public interface MeshMQProducer extends Producer {

boolean reply(final Message message, final SendCallback sendCallback) throws Exception;

MeshMQProducer getMeshMQProducer();

String buildMQClientId();
void checkTopicExist(String topic) throws Exception;

void setExtFields();

void getDefaultTopicRouteInfoFromNameServer(String topic, long timeout) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

import io.openmessaging.api.Action;
import io.openmessaging.api.AsyncConsumeContext;
import io.openmessaging.api.AsyncGenericMessageListener;
import io.openmessaging.api.AsyncMessageListener;
import io.openmessaging.api.Consumer;
Expand All @@ -32,9 +29,8 @@
import io.openmessaging.api.MessageListener;
import io.openmessaging.api.MessageSelector;
import io.openmessaging.api.exception.OMSRuntimeException;

import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.api.MeshAsyncConsumeContext;
import org.apache.eventmesh.api.EventMeshAction;
import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.connector.rocketmq.common.EventMeshConstants;
import org.apache.eventmesh.connector.rocketmq.config.ClientConfig;
Expand Down Expand Up @@ -116,13 +112,25 @@ public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg, EventMes

final Properties contextProperties = new Properties();
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
MeshAsyncConsumeContext omsContext = new MeshAsyncConsumeContext() {
EventMeshAsyncConsumeContext omsContext = new EventMeshAsyncConsumeContext() {
@Override
public void commit(Action action) {
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
public void commit(EventMeshAction action) {
switch (action){
case CommitMessage:
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
break;
case ReconsumeLater:
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
break;
case ManualAck:
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
break;
default:
break;
}
}
};
omsContext.setContext(context);
omsContext.setAbstractContext(context);
listener.consume(omsMsg, omsContext);

return EventMeshConsumeConcurrentlyStatus.valueOf(contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
Expand Down Expand Up @@ -156,14 +164,25 @@ public EventMeshConsumeConcurrentlyStatus handleMessage(MessageExt msg, EventMes

contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());

MeshAsyncConsumeContext omsContext = new MeshAsyncConsumeContext() {
EventMeshAsyncConsumeContext omsContext = new EventMeshAsyncConsumeContext() {
@Override
public void commit(Action action) {
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
public void commit(EventMeshAction action) {
switch (action) {
case CommitMessage:
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
break;
case ReconsumeLater:
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.RECONSUME_LATER.name());
break;
case ManualAck:
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
break;
default:
break;
}
}
};
omsContext.setContext(context);
omsContext.setAbstractContext(context);
listener.consume(omsMsg, omsContext);

return EventMeshConsumeConcurrentlyStatus.valueOf(contextProperties.getProperty(NonStandardKeys.MESSAGE_CONSUME_STATUS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ public void start() {
producer.start();
}


@Override
public synchronized void shutdown() {
producer.shutdown();
Expand Down Expand Up @@ -115,27 +114,15 @@ public boolean reply(final Message message, final SendCallback sendCallback) thr
}

@Override
public MeshMQProducer getMeshMQProducer() {
return this;
}

@Override
public String buildMQClientId() {
return producer.getRocketmqProducer().buildMQClientId();
public void checkTopicExist(String topic) throws Exception {
this.producer.getRocketmqProducer().getDefaultMQProducerImpl().getmQClientFactory().getMQClientAPIImpl().getDefaultTopicRouteInfoFromNameServer(topic, EventMeshConstants.DEFAULT_TIMEOUT_IN_MILLISECONDS);
}

@Override
public void setExtFields() {
producer.setExtFields();
}

@Override
public void getDefaultTopicRouteInfoFromNameServer(String topic, long timeout) throws Exception {
producer.getRocketmqProducer().getDefaultMQProducerImpl()
.getmQClientFactory().getMQClientAPIImpl().getDefaultTopicRouteInfoFromNameServer(topic,
timeout);
}

@Override
public SendResult send(Message message) {
return producer.send(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import io.openmessaging.api.OMS;
import io.openmessaging.api.OMSBuiltinKeys;

import org.apache.eventmesh.api.EventMeshAction;
import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
import org.apache.eventmesh.connector.rocketmq.consumer.PushConsumerImpl;
import org.apache.eventmesh.connector.rocketmq.domain.NonStandardKeys;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
Expand Down Expand Up @@ -95,7 +97,7 @@ public void testConsumeMessage() {
public void consume(Message message, AsyncConsumeContext context) {
assertThat(message.getSystemProperties("MESSAGE_ID")).isEqualTo("NewMsgId");
assertThat(message.getBody()).isEqualTo(testBody);
context.commit(Action.CommitMessage);
((EventMeshAsyncConsumeContext)context).commit(EventMeshAction.CommitMessage);
}
});
((MessageListenerConcurrently) rocketmqPushConsumer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,6 @@ public void unsubscribe(String topic) throws Exception {
meshMQPushConsumer.unsubscribe(topic);
}

// public boolean isPause() {
// return meshMQPushConsumer.isPause();
// }
//
// public void pause() {
// meshMQPushConsumer.pause();
// }

public synchronized void init(Properties keyValue) throws Exception {
meshMQPushConsumer = getMeshMQPushConsumer();
if (meshMQPushConsumer == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

import io.openmessaging.api.Action;
import io.openmessaging.api.AsyncConsumeContext;
import io.openmessaging.api.AsyncMessageListener;
import io.openmessaging.api.Message;
Expand All @@ -31,7 +30,8 @@

import org.apache.commons.collections4.MapUtils;
import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.api.MeshAsyncConsumeContext;
import org.apache.eventmesh.api.EventMeshAction;
import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
Expand Down Expand Up @@ -125,25 +125,26 @@ public void consume(Message message, AsyncConsumeContext context) {
}

ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(consumerGroupConf.getConsumerGroupTopicConf(), topic, null);
EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext)context;

if (currentTopicConfig == null) {
logger.error("no topicConfig found, consumerGroup:{} topic:{}", consumerGroupConf.getConsumerGroup(), topic);
try {
sendMessageBack(message, uniqueId, bizSeqNo);
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
// context.ack();
context.commit(Action.CommitMessage);
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
return;
} catch (Exception ex) {
}
}
HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
topic, message, subscriptionItem, ((MeshAsyncConsumeContext)context).getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
topic, message, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);

if (httpMessageHandler.handle(handleMsgContext)) {
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
// context.ack();
context.commit(Action.CommitMessage);
eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
} else {
try {
sendMessageBack(message, uniqueId, bizSeqNo);
Expand All @@ -152,7 +153,7 @@ public void consume(Message message, AsyncConsumeContext context) {
}
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
// context.ack();
context.commit(Action.CommitMessage);
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
}
}
};
Expand All @@ -174,25 +175,26 @@ public void consume(Message message, AsyncConsumeContext context) {
}

ConsumerGroupTopicConf currentTopicConfig = MapUtils.getObject(consumerGroupConf.getConsumerGroupTopicConf(), topic, null);
EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext)context;

if (currentTopicConfig == null) {
logger.error("no topicConfig found, consumerGroup:{} topic:{}", consumerGroupConf.getConsumerGroup(), topic);
try {
sendMessageBack(message, uniqueId, bizSeqNo);
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
// context.ack();
context.commit(Action.CommitMessage);
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
return;
} catch (Exception ex) {
}
}
HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
topic, message, subscriptionItem, ((MeshAsyncConsumeContext)context).getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
topic, message, subscriptionItem, eventMeshAsyncConsumeContext.getAbstractContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);

if (httpMessageHandler.handle(handleMsgContext)) {
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
// context.ack();
context.commit(Action.CommitMessage);
eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
} else {
try {
sendMessageBack(message, uniqueId, bizSeqNo);
Expand All @@ -201,7 +203,7 @@ public void consume(Message message, AsyncConsumeContext context) {
}
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
// context.ack();
context.commit(Action.CommitMessage);
eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
omsMsg.putUserProperties("msgType", "persistent");
omsMsg.putUserProperties(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
omsMsg.putUserProperties(Constants.RMB_UNIQ_ID, sendMessageRequestBody.getUniqueId());
omsMsg.putUserProperties("REPLY_TO", eventMeshProducer.getMqProducerWrapper().getMeshMQProducer().buildMQClientId());
// omsMsg.putUserProperties("REPLY_TO", eventMeshProducer.getMqProducerWrapper().getMeshMQProducer().buildMQClientId());

if (messageLogger.isDebugEnabled()) {
messageLogger.debug("msg2MQMsg suc, bizSeqNo={}, topic={}", sendMessageRequestBody.getBizSeqNo(),
Expand Down
Loading