Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #366 ] remove custom-format topic concept #388

Merged
merged 8 commits into from
Jun 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class IPUtil {
public static String getLocalAddress() {
// if the progress works under docker environment
// return the host ip about this docker located from environment value
String dockerHostIp = System.getenv("webank_docker_host_ip");
String dockerHostIp = System.getenv("docker_host_ip");
if (dockerHostIp != null && !"".equals(dockerHostIp))
return dockerHostIp;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,29 @@
* limitations under the License.
*/

package org.apache.eventmesh.runtime.core.consumergroup.event;
package org.apache.eventmesh.common.protocol;

public class ConsumerGroupInstanceChangeEvent {
public enum SubcriptionType {
/**
* SYNC
*/
SYNC("SYNC"),
/**
* ASYNC
*/
ASYNC("ASYNC");

private String type;

SubcriptionType(String type) {
this.type = type;
}

public String getType() {
return type;
}

public void setType(String type) {
this.type = type;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.protocol;

public class SubscriptionItem {

private String topic;

private SubscriptionMode mode;

private SubcriptionType type;

public SubscriptionItem() {
}

public SubscriptionItem(String topic, SubscriptionMode mode, SubcriptionType type) {
this.topic = topic;
this.mode = mode;
this.type = type;
}

public SubcriptionType getType() {
return type;
}

public void setType(SubcriptionType type) {
this.type = type;
}

public String getTopic() {
return topic;
}

public void setTopic(String topic) {
this.topic = topic;
}

public SubscriptionMode getMode() {
return mode;
}

public void setMode(SubscriptionMode mode) {
this.mode = mode;
}

@Override
public String toString() {
return "SubscriptionItem{" +
"topic=" + topic +
", mode=" + mode +
", type=" + type +
'}';
}
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.protocol;

public enum SubscriptionMode {

/**
* broadcast
*/
BROADCASTING("BROADCASTING"),
/**
* clustering
*/
CLUSTERING("CLUSTERING");

private String mode;

SubscriptionMode(String mode) {
this.mode = mode;
}

public String getMode() {
return mode;
}

public void setMode(String mode) {
this.mode = mode;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public class HeartbeatRequestBody extends Body {
public static final String CLIENTTYPE = "clientType";
public static final String HEARTBEATENTITIES = "heartbeatEntities";


private String clientType;

private List<HeartbeatEntity> heartbeatEntities;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.alibaba.fastjson.JSONArray;

import org.apache.commons.collections4.MapUtils;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.http.body.Body;

public class RegRequestBody extends Body {
Expand All @@ -39,13 +40,13 @@ public class RegRequestBody extends Body {

private String endPoint;

private List<String> topics;
private List<SubscriptionItem> topics;

public List<String> getTopics() {
public List<SubscriptionItem> getTopics() {
return topics;
}

public void setTopics(List<String> topics) {
public void setTopics(List<SubscriptionItem> topics) {
this.topics = topics;
}

Expand All @@ -69,7 +70,7 @@ public static RegRequestBody buildBody(Map<String, Object> bodyParam) {
RegRequestBody body = new RegRequestBody();
body.setClientType(MapUtils.getString(bodyParam, CLIENTTYPE));
body.setEndPoint(MapUtils.getString(bodyParam, ENDPOINT));
body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPICS), String.class));
body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPICS), SubscriptionItem.class));
return body;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.alibaba.fastjson.JSONArray;

import org.apache.commons.collections4.MapUtils;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.http.body.Body;

public class SubscribeRequestBody extends Body {
Expand All @@ -33,20 +34,18 @@ public class SubscribeRequestBody extends Body {

public static final String URL = "url";

private List<String> topics;
private List<SubscriptionItem> topics;

private String url;

private String topic;

public List<String> getTopics() {
public List<SubscriptionItem> getTopics() {
return topics;
}

public void setTopics(List<String> topics) {
public void setTopics(List<SubscriptionItem> topics) {
this.topics = topics;
}

private String url;

public String getUrl() {
return url;
}
Expand All @@ -58,7 +57,7 @@ public void setUrl(String url) {
public static SubscribeRequestBody buildBody(Map<String, Object> bodyParam) {
SubscribeRequestBody body = new SubscribeRequestBody();
body.setUrl(MapUtils.getString(bodyParam, URL));
body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPIC), String.class));
body.setTopics(JSONArray.parseArray(MapUtils.getString(bodyParam, TOPIC), SubscriptionItem.class));
return body;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,27 @@

package org.apache.eventmesh.common.protocol.tcp;

import org.apache.eventmesh.common.protocol.SubscriptionItem;

import java.util.LinkedList;
import java.util.List;

public class Subscription {

private List<String> topicList = new LinkedList<>();
private List<SubscriptionItem> topicList = new LinkedList<>();

public Subscription() {
}

public Subscription(List<String> topicList) {
public Subscription(List<SubscriptionItem> topicList) {
this.topicList = topicList;
}

public List<String> getTopicList() {
public List<SubscriptionItem> getTopicList() {
return topicList;
}

public void setTopicList(List<String> topicList) {
public void setTopicList(List<SubscriptionItem> topicList) {
this.topicList = topicList;
}

Expand All @@ -45,6 +47,4 @@ public String toString() {
"topicList=" + topicList +
'}';
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ public class EventMeshHTTPServer extends AbrstractHTTPServer {

private EventMeshHTTPConfiguration eventMeshHttpConfiguration;

public final ConcurrentHashMap<String, ConsumerGroupConf> localConsumerGroupMapping = new ConcurrentHashMap<>();
public final ConcurrentHashMap<String /**group*/, ConsumerGroupConf> localConsumerGroupMapping = new ConcurrentHashMap<>();

public final ConcurrentHashMap<String, List<Client>> localClientInfoMapping = new ConcurrentHashMap<>();
public final ConcurrentHashMap<String /**group@topic*/, List<Client>> localClientInfoMapping = new ConcurrentHashMap<>();

public EventMeshHTTPServer(EventMeshServer eventMeshServer,
EventMeshHTTPConfiguration eventMeshHttpConfiguration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public void init() throws Exception {

String eventstore = System.getProperty(EventMeshConstants.EVENT_STORE_PROPERTIES, System.getenv(EventMeshConstants.EVENT_STORE_ENV));
logger.info("eventstore : {}", eventstore);
// logger.info("load custom {} class for eventMesh", ConsumeMessageConcurrentlyService.class.getCanonicalName());

serviceState = ServiceState.INITED;
logger.info("server state:{}", serviceState);
Expand Down

This file was deleted.

Loading