Skip to content

Commit

Permalink
Merge pull request #178 from xwm1992/1.2.0
Browse files Browse the repository at this point in the history
[ISSUE #176] Support Spi for extended implementation to accommodate a variety of MQ
  • Loading branch information
lrhkobe authored Dec 14, 2020
2 parents e559d80 + 00aa8e0 commit f4f3210
Show file tree
Hide file tree
Showing 40 changed files with 797 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package connector.rocketmq;
package com.webank.connector.rocketmq;

import connector.rocketmq.consumer.PullConsumerImpl;
import connector.rocketmq.consumer.PushConsumerImpl;
import connector.rocketmq.producer.ProducerImpl;
import connector.rocketmq.utils.OMSUtil;
import com.webank.connector.rocketmq.producer.ProducerImpl;
import com.webank.connector.rocketmq.consumer.PullConsumerImpl;
import com.webank.connector.rocketmq.consumer.PushConsumerImpl;
import com.webank.connector.rocketmq.utils.OMSUtil;
import io.openmessaging.KeyValue;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.ResourceManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package connector.rocketmq.config;
package com.webank.connector.rocketmq.config;

import connector.rocketmq.domain.NonStandardKeys;
import com.webank.connector.rocketmq.domain.NonStandardKeys;
import io.openmessaging.OMSBuiltinKeys;

public class ClientConfig implements OMSBuiltinKeys, NonStandardKeys {
Expand All @@ -39,6 +39,7 @@ public class ClientConfig implements OMSBuiltinKeys, NonStandardKeys {
private String rmqMessageDestination;
private int rmqPullMessageBatchNums = 32;
private int rmqPullMessageCacheCapacity = 1000;
private String messageModel;

public String getDriverImpl() {
return driverImpl;
Expand Down Expand Up @@ -191,4 +192,12 @@ public String getRoutingExpression() {
public void setRoutingExpression(String routingExpression) {
this.routingExpression = routingExpression;
}

public String getMessageModel() {
return messageModel;
}

public void setMessageModel(String messageModel) {
this.messageModel = messageModel;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package connector.rocketmq.consumer;
package com.webank.connector.rocketmq.consumer;

import connector.rocketmq.config.ClientConfig;
import connector.rocketmq.domain.ConsumeRequest;
import com.webank.connector.rocketmq.domain.ConsumeRequest;
import com.webank.connector.rocketmq.config.ClientConfig;
import io.openmessaging.KeyValue;
import io.openmessaging.Message;
import io.openmessaging.ServiceLifecycle;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package connector.rocketmq.consumer;
package com.webank.connector.rocketmq.consumer;

import connector.rocketmq.config.ClientConfig;
import connector.rocketmq.domain.ConsumeRequest;
import connector.rocketmq.utils.BeanUtils;
import connector.rocketmq.utils.OMSUtil;
import com.webank.connector.rocketmq.domain.ConsumeRequest;
import com.webank.connector.rocketmq.utils.BeanUtils;
import com.webank.connector.rocketmq.utils.OMSUtil;
import com.webank.connector.rocketmq.config.ClientConfig;
import io.openmessaging.KeyValue;
import io.openmessaging.Message;
import io.openmessaging.OMSBuiltinKeys;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package connector.rocketmq.consumer;
package com.webank.connector.rocketmq.consumer;

import connector.rocketmq.config.ClientConfig;
import connector.rocketmq.domain.NonStandardKeys;
import connector.rocketmq.utils.BeanUtils;
import connector.rocketmq.utils.OMSUtil;
import com.webank.connector.rocketmq.domain.NonStandardKeys;
import com.webank.connector.rocketmq.utils.BeanUtils;
import com.webank.connector.rocketmq.utils.OMSUtil;
import com.webank.connector.rocketmq.config.ClientConfig;
import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
import io.openmessaging.OMS;
Expand All @@ -34,6 +34,7 @@
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.LanguageCode;

import java.util.List;
Expand Down Expand Up @@ -71,13 +72,14 @@ public PushConsumerImpl(final KeyValue properties) {
this.rocketmqPushConsumer.setConsumeTimeout(clientConfig.getRmqMessageConsumeTimeout());
this.rocketmqPushConsumer.setConsumeThreadMax(clientConfig.getRmqMaxConsumeThreadNums());
this.rocketmqPushConsumer.setConsumeThreadMin(clientConfig.getRmqMinConsumeThreadNums());
this.rocketmqPushConsumer.setMessageModel(MessageModel.valueOf(clientConfig.getMessageModel()));

String consumerId = OMSUtil.buildInstanceName();
this.rocketmqPushConsumer.setInstanceName(consumerId);
properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId);
this.rocketmqPushConsumer.setLanguage(LanguageCode.OMS);

this.rocketmqPushConsumer.registerMessageListener(new MessageListenerImpl());
// this.rocketmqPushConsumer.registerMessageListener(new MessageListenerImpl());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@
* limitations under the License.
*/

package connector.rocketmq.consumer;
package com.webank.connector.rocketmq.consumer;

import com.webank.api.consumer.MeshMQPushConsumer;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
import com.webank.eventmesh.common.config.CommonConfiguration;
import io.openmessaging.*;
import io.openmessaging.consumer.MessageListener;
import io.openmessaging.consumer.PushConsumer;
import io.openmessaging.interceptor.ConsumerInterceptor;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageService;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,18 +40,25 @@ public class RocketMQConsumerImpl implements MeshMQPushConsumer {

public Logger messageLogger = LoggerFactory.getLogger("message");

public final String DEFAULT_ACCESS_DRIVER = "connector.rocketmq.MessagingAccessPointImpl";
public final String DEFAULT_ACCESS_DRIVER = "com.webank.connector.rocketmq.MessagingAccessPointImpl";

private PushConsumerImpl pushConsumer;

@Override
public synchronized void init(boolean isBroadcast, CommonConfiguration commonConfiguration,
String consumerGroup) throws Exception {
String omsNamesrv = "oms:rocketmq://" + commonConfiguration.namesrvAddr + "/namespace";
KeyValue properties = OMS.newKeyValue().put(OMSBuiltinKeys.DRIVER_IMPL, DEFAULT_ACCESS_DRIVER);
properties.put("ACCESS_POINTS", commonConfiguration.namesrvAddr)

properties.put("ACCESS_POINTS", omsNamesrv)
.put("REGION", "namespace")
.put(OMSBuiltinKeys.CONSUMER_ID, consumerGroup);
MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint(commonConfiguration.namesrvAddr, properties);
if (isBroadcast){
properties.put("MESSAGE_MODEL", MessageModel.BROADCASTING.name());
}else {
properties.put("MESSAGE_MODEL", MessageModel.CLUSTERING.name());
}
MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint(omsNamesrv, properties);
pushConsumer = (PushConsumerImpl)messagingAccessPoint.createPushConsumer();
}

Expand All @@ -59,7 +69,8 @@ public void setInstanceName(String instanceName) {

@Override
public void registerMessageListener(MessageListenerConcurrently listener) {
pushConsumer.getRocketmqPushConsumer().setMessageListener(listener);
// pushConsumer.getRocketmqPushConsumer().setMessageListener(listener);
pushConsumer.getRocketmqPushConsumer().registerMessageListener(listener);
}

@Override
Expand All @@ -69,12 +80,18 @@ public synchronized void start() throws Exception {

@Override
public void subscribe(String topic) throws Exception {
if (pushConsumer.getRocketmqPushConsumer().getMessageListener() == null){
logger.error("no messageListener has been registered");
throw new Exception("no messageListener has been registered");
}
pushConsumer.attachQueue(topic, new MessageListener() {
@Override
public void onReceived(Message message, Context context) {
System.out.printf("Received one message: %s%n", message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID));
context.ack();
}
});

}

@Override
Expand Down Expand Up @@ -104,8 +121,8 @@ public synchronized void shutdown() {

@Override
public void updateOffset(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageListenerConcurrently pushConsumerMessageListener = (MessageListenerConcurrently) pushConsumer.getRocketmqPushConsumer().getMessageListener();
pushConsumerMessageListener.consumeMessage(msgs, context);
ConsumeMessageService consumeMessageService = pushConsumer.getRocketmqPushConsumer().getDefaultMQPushConsumerImpl().getConsumeMessageService();
((ConsumeMessageConcurrentlyService) consumeMessageService).updateOffset(msgs, context);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package connector.rocketmq.domain;
package com.webank.connector.rocketmq.domain;

import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package connector.rocketmq.domain;
package com.webank.connector.rocketmq.domain;

import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.common.message.MessageExt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package connector.rocketmq.domain;
package com.webank.connector.rocketmq.domain;

public interface NonStandardKeys {
String CONSUMER_GROUP = "rmq.consumer.group";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package connector.rocketmq.domain;
package com.webank.connector.rocketmq.domain;

public interface RocketMQConstants {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package connector.rocketmq.domain;
package com.webank.connector.rocketmq.domain;

import io.openmessaging.KeyValue;
import io.openmessaging.producer.SendResult;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.webank.connector.rocketmq.patch;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.common.message.MessageQueue;

public class ProxyConsumeConcurrentlyContext extends ConsumeConcurrentlyContext {
private final ProcessQueue processQueue;
private boolean manualAck = true;

public ProxyConsumeConcurrentlyContext(MessageQueue messageQueue, ProcessQueue processQueue) {
super(messageQueue);
this.processQueue = processQueue;
}

public ProcessQueue getProcessQueue() {
return processQueue;
}

public boolean isManualAck() {
return manualAck;
}

public void setManualAck(boolean manualAck) {
this.manualAck = manualAck;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.webank.connector.rocketmq.patch;

public enum ProxyConsumeConcurrentlyStatus {
/**
* Success consumption
*/
CONSUME_SUCCESS,
/**
* Failure consumption,later try to consume
*/
RECONSUME_LATER,
/**
* Success consumption but ack later manually
*/
CONSUME_FINISH;
}
Loading

0 comments on commit f4f3210

Please sign in to comment.