Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #366 ] remove custom-format topic concept #388

Merged
merged 8 commits into from
Jun 17, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
remove custom-format topic concept
  • Loading branch information
iNanos committed Jun 16, 2021
commit 78c6137a0d156ea62ae261cfd38b720b360cf8f0
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class IPUtil {
public static String getLocalAddress() {
// if the progress works under docker environment
// return the host ip about this docker located from environment value
String dockerHostIp = System.getenv("webank_docker_host_ip");
String dockerHostIp = System.getenv("docker_host_ip");
if (dockerHostIp != null && !"".equals(dockerHostIp))
return dockerHostIp;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.protocol;

public enum SubcriptionType {
/**
* SYNC
*/
SYNC("SYNC"),
/**
* ASYNC
*/
ASYNC("ASYNC");

private String type;

SubcriptionType(String type) {
this.type = type;
}

public String getType() {
return type;
}

public void setType(String type) {
this.type = type;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,23 @@ public class SubscriptionItem {

private SubscriptionMode mode;

public SubscriptionItem(String topic, SubscriptionMode mode) {
private SubcriptionType type;

public SubscriptionItem() {
}

public SubscriptionItem(String topic, SubscriptionMode mode, SubcriptionType type) {
this.topic = topic;
this.mode = mode;
this.type = type;
}

public SubcriptionType getType() {
return type;
}

public void setType(SubcriptionType type) {
this.type = type;
}

public String getTopic() {
Expand All @@ -49,6 +63,7 @@ public String toString() {
return "SubscriptionItem{" +
"topic=" + topic +
", mode=" + mode +
", type=" + type +
'}';
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ public class EventMeshConstants {

public static final String BROADCAST_PREFIX = "broadcast-";

public static final String IS_SYNC_MESSAGE = "sync";

public final static String CONSUMER_GROUP_NAME_PREFIX = "ConsumerGroup-";

public final static String PRODUCER_GROUP_NAME_PREFIX = "ProducerGroup-";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -38,9 +38,9 @@ public class ConsumerGroupTopicConf {
private String topic;

/**
* @see org.apache.eventmesh.common.protocol.SubscriptionMode
* @see org.apache.eventmesh.common.protocol.SubscriptionItem
*/
private SubscriptionMode subscriptionMode;
private SubscriptionItem subscriptionItem;

/**
* PUSH URL
Expand All @@ -59,21 +59,21 @@ public boolean equals(Object o) {
ConsumerGroupTopicConf that = (ConsumerGroupTopicConf) o;
return consumerGroup.equals(that.consumerGroup) &&
Objects.equals(topic, that.topic) &&
Objects.equals(subscriptionMode, that.subscriptionMode) &&
Objects.equals(subscriptionItem, that.subscriptionItem) &&
Objects.equals(idcUrls, that.idcUrls);
}

@Override
public int hashCode() {
return Objects.hash(consumerGroup, topic, subscriptionMode, idcUrls);
return Objects.hash(consumerGroup, topic, subscriptionItem, idcUrls);
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("consumeTopicConfig={consumerGroup=").append(consumerGroup)
.append(",topic=").append(topic)
.append(",subscriptionMode=").append(subscriptionMode)
.append(",subscriptionMode=").append(subscriptionItem)
.append(",idcUrls=").append(idcUrls).append("}");
return sb.toString();
}
Expand All @@ -94,12 +94,12 @@ public void setTopic(String topic) {
this.topic = topic;
}

public SubscriptionMode getSubscriptionMode() {
return subscriptionMode;
public SubscriptionItem getSubscriptionItem() {
return subscriptionItem;
}

public void setSubscriptionMode(SubscriptionMode subscriptionMode) {
this.subscriptionMode = subscriptionMode;
public void setSubscriptionItem(SubscriptionItem subscriptionItem) {
this.subscriptionItem = subscriptionItem;
}

public Map<String, List<String>> getIdcUrls() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public synchronized void start() throws Exception {

private synchronized void setupEventMeshConsumer(ConsumerGroupConf consumerGroupConfig) throws Exception {
for (Map.Entry<String, ConsumerGroupTopicConf> conf : consumerGroupConfig.getConsumerGroupTopicConf().entrySet()) {
eventMeshConsumer.subscribe(conf.getKey(), conf.getValue().getSubscriptionMode());
eventMeshConsumer.subscribe(conf.getKey(), conf.getValue().getSubscriptionItem());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void run() {
ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf();
latestTopicConf.setConsumerGroup(consumerGroup);
latestTopicConf.setTopic(topic);
latestTopicConf.setSubscriptionMode(map.get(topicKey).getSubscriptionMode());
latestTopicConf.setSubscriptionItem(map.get(topicKey).getSubscriptionItem());
latestTopicConf.setUrls(clientUrls);

latestTopicConf.setIdcUrls(idcUrls);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.api.MeshAsyncConsumeContext;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
Expand Down Expand Up @@ -109,9 +110,9 @@ public synchronized void start() throws Exception {
started4Broadcast.compareAndSet(false, true);
}

public void subscribe(String topic, SubscriptionMode subscriptionMode) throws Exception {
public void subscribe(String topic, SubscriptionItem subscriptionItem) throws Exception {
AsyncMessageListener listener = null;
if (!SubscriptionMode.BROADCASTING.equals(subscriptionMode)) {
if (!SubscriptionMode.BROADCASTING.equals(subscriptionItem.getMode())) {
listener = new AsyncMessageListener() {
@Override
public void consume(Message message, AsyncConsumeContext context) {
Expand Down Expand Up @@ -140,8 +141,7 @@ public void consume(Message message, AsyncConsumeContext context) {
}
}
HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
topic, message, ((MeshAsyncConsumeContext)context).getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
topic, message, SubscriptionMode.CLUSTERING, persistentMqConsumer.getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
topic, message, subscriptionItem, ((MeshAsyncConsumeContext)context).getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);

if (httpMessageHandler.handle(handleMsgContext)) {
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
Expand Down Expand Up @@ -190,8 +190,7 @@ public void consume(Message message, AsyncConsumeContext context) {
}
}
HandleMsgContext handleMsgContext = new HandleMsgContext(EventMeshUtil.buildPushMsgSeqNo(), consumerGroupConf.getConsumerGroup(), EventMeshConsumer.this,
topic, message, ((MeshAsyncConsumeContext)context).getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
topic, message, SubscriptionMode.BROADCASTING, broadcastMqConsumer.getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);
topic, message, subscriptionItem, ((MeshAsyncConsumeContext)context).getContext(), consumerGroupConf, eventMeshHTTPServer, bizSeqNo, uniqueId, currentTopicConfig);

if (httpMessageHandler.handle(handleMsgContext)) {
// context.attributes().put(NonStandardKeys.MESSAGE_CONSUME_STATUS, EventMeshConsumeConcurrentlyStatus.CONSUME_FINISH.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
Expand All @@ -51,7 +52,7 @@ public class HandleMsgContext {

private String topic;

private SubscriptionMode subscriptionMode;
private SubscriptionItem subscriptionItem;

private Message msg;

Expand All @@ -70,15 +71,15 @@ public class HandleMsgContext {
private Map<String, String> props;

public HandleMsgContext(String msgRandomNo, String consumerGroup, EventMeshConsumer eventMeshConsumer,
String topic, Message msg, SubscriptionMode subscriptionMode,
String topic, Message msg, SubscriptionItem subscriptionItem,
AbstractContext context, ConsumerGroupConf consumerGroupConfig,
EventMeshHTTPServer eventMeshHTTPServer, String bizSeqNo, String uniqueId, ConsumerGroupTopicConf consumeTopicConfig) {
this.msgRandomNo = msgRandomNo;
this.consumerGroup = consumerGroup;
this.eventMeshConsumer = eventMeshConsumer;
this.topic = topic;
this.msg = msg;
this.subscriptionMode = subscriptionMode;
this.subscriptionItem = subscriptionItem;
this.context = context;
this.consumerGroupConfig = consumerGroupConfig;
this.eventMeshHTTPServer = eventMeshHTTPServer;
Expand Down Expand Up @@ -156,12 +157,12 @@ public void setMsg(Message msg) {
this.msg = msg;
}

public SubscriptionMode getSubscriptionMode() {
return subscriptionMode;
public SubscriptionItem getSubscriptionItem() {
return subscriptionItem;
}

public void setSubscriptionMode(SubscriptionMode subscriptionMode) {
this.subscriptionMode = subscriptionMode;
public void setSubscriptionItem(SubscriptionItem subscriptionItem) {
this.subscriptionItem = subscriptionItem;
}

public long getCreateTime() {
Expand Down Expand Up @@ -200,7 +201,7 @@ public void finish() {
// msg.getProperty(DeFiBusConstant.PROPERTY_MESSAGE_BROKER),
// msg.getQueueId(), msg.getQueueOffset());
}
eventMeshConsumer.updateOffset(topic, subscriptionMode, Arrays.asList(msg), context);
eventMeshConsumer.updateOffset(topic, subscriptionItem.getMode(), Arrays.asList(msg), context);
}
}

Expand All @@ -226,7 +227,7 @@ public String toString() {
sb.append("handleMsgContext={")
.append("consumerGroup=").append(consumerGroup)
.append(",topic=").append(topic)
.append(",subscriptionMode=").append(subscriptionMode)
.append(",subscriptionItem=").append(subscriptionItem)
.append(",consumeTopicConfig=").append(consumeTopicConfig)
.append(",bizSeqNo=").append(bizSeqNo)
.append(",uniqueId=").append(uniqueId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
ConsumerGroupTopicConf consumeTopicConfig = new ConsumerGroupTopicConf();
consumeTopicConfig.setConsumerGroup(consumerGroup);
consumeTopicConfig.setTopic(subTopic.getTopic());
consumeTopicConfig.setSubscriptionMode(subTopic.getMode());
consumeTopicConfig.setSubscriptionItem(subTopic);
consumeTopicConfig.setUrls(new HashSet<>(Arrays.asList(url)));

consumeTopicConfig.setIdcUrls(idcUrls);
Expand All @@ -150,7 +150,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
ConsumerGroupTopicConf currentTopicConf = map.get(key);
latestTopicConf.setConsumerGroup(consumerGroup);
latestTopicConf.setTopic(subTopic.getTopic());
latestTopicConf.setSubscriptionMode(subTopic.getMode());
latestTopicConf.setSubscriptionItem(subTopic);
latestTopicConf.setUrls(new HashSet<>(Arrays.asList(url)));
latestTopicConf.getUrls().addAll(currentTopicConf.getUrls());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void onResponse(HttpCommand httpCommand) {
ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf();
latestTopicConf.setConsumerGroup(consumerGroup);
latestTopicConf.setTopic(unSubTopic);
latestTopicConf.setSubscriptionMode(map.get(topicKey).getSubscriptionMode());
latestTopicConf.setSubscriptionItem(map.get(topicKey).getSubscriptionItem());
latestTopicConf.setUrls(clientUrls);

latestTopicConf.setIdcUrls(idcUrls);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.commons.text.RandomStringGenerator;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.IPUtil;
import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.http.body.message.PushMessageRequestBody;
import org.apache.eventmesh.common.protocol.http.common.ClientRetCode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
Expand Down Expand Up @@ -88,7 +89,7 @@ public void tryHTTPRequest() {

String requestCode = "";

if (Boolean.valueOf(handleMsgContext.getMsg().getSystemProperties(EventMeshConstants.IS_SYNC_MESSAGE))) {
if (SubcriptionType.SYNC.equals(handleMsgContext.getSubscriptionItem().getType())) {
requestCode = String.valueOf(RequestCode.HTTP_PUSH_CLIENT_SYNC.getRequestCode());
} else {
requestCode = String.valueOf(RequestCode.HTTP_PUSH_CLIENT_ASYNC.getRequestCode());
Expand Down
Loading