From 8ead1ee8ba2c525e12de03afe1fbd822614e49d4 Mon Sep 17 00:00:00 2001 From: jinrongluo Date: Tue, 8 Feb 2022 21:28:49 -0500 Subject: [PATCH 1/3] [Issue #655] Adding send message constraints for message size and batch size --- .../demo/pub/cloudevents/AsyncPublish.java | 2 +- eventmesh-runtime/conf/eventmesh.properties | 3 +++ .../EventMeshHTTPConfiguration.java | 18 +++++++++++++ .../EventMeshTCPConfiguration.java | 10 +++++++ .../processor/BatchSendMessageProcessor.java | 26 +++++++++++++++++++ .../BatchSendMessageV2Processor.java | 14 ++++++++++ .../http/processor/ReplyMessageProcessor.java | 14 ++++++++++ .../processor/SendAsyncMessageProcessor.java | 14 ++++++++++ .../processor/SendSyncMessageProcessor.java | 14 ++++++++++ .../tcp/client/task/MessageTransferTask.java | 7 +++++ 10 files changed, 121 insertions(+), 1 deletion(-) diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java index df96422456..76218851d0 100644 --- a/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java +++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/tcp/demo/pub/cloudevents/AsyncPublish.java @@ -55,7 +55,7 @@ public static void main(String[] args) throws Exception { EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, CloudEvent.class); client.init(); - for (int i = 0; i < 5; i++) { + for (int i = 0; i < 2; i++) { CloudEvent event = EventMeshTestUtils.generateCloudEventV1Async(); logger.info("begin send async msg[{}]==================={}", i, event); client.publish(event, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties index 4a4b9ddd64..31d643aa79 100644 --- a/eventmesh-runtime/conf/eventmesh.properties +++ b/eventmesh-runtime/conf/eventmesh.properties @@ -39,6 +39,9 @@ eventMesh.server.tcp.msgReqnumPerSecond=15000 eventMesh.server.http.msgReqnumPerSecond=15000 eventMesh.server.session.upstreamBufferSize=20 +eventmesh.server.eventSize=1000 +eventmesh.server.eventBatchSize=10 + # thread number about global scheduler eventMesh.server.global.scheduler=5 eventMesh.server.tcp.taskHandleExecutorPoolSize=8 diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java index d8cb88246f..4ea1ce60d0 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java @@ -70,6 +70,10 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration { public int eventMeshBatchMsgRequestNumPerSecond = 20000; + public int eventMeshEventSize = 1000; + + public int eventMeshEventBatchSize = 10; + public EventMeshHTTPConfiguration(ConfigurationWrapper configurationWrapper) { super(configurationWrapper); } @@ -248,6 +252,16 @@ public void init() { eventMeshHttpMsgReqNumPerSecond = Integer.parseInt(eventMeshHttpMsgReqNumPerSecondStr); } + + String eventSize = configurationWrapper.getProp(ConfKeys.KEY_EVENTMESH_SERVER_EVENTSIZE); + if (StringUtils.isNotEmpty(eventSize) && StringUtils.isNumeric(eventSize)) { + eventMeshEventSize = Integer.parseInt(eventSize); + } + + String eventBatchSize = configurationWrapper.getProp(ConfKeys.KEY_EVENTMESH_SERVER_EVENT_BATCHSIZE); + if (StringUtils.isNotEmpty(eventBatchSize) && StringUtils.isNumeric(eventBatchSize)) { + eventMeshEventBatchSize = Integer.parseInt(eventBatchSize); + } } } @@ -296,5 +310,9 @@ static class ConfKeys { public static String KEY_EVENTMESH_HTTPS_ENABLED = "eventMesh.server.useTls.enabled"; public static String KEY_EVENTMESH_SERVER_MSG_REQ_NUM_PER_SECOND = "eventMesh.server.http.msgReqnumPerSecond"; + + public static String KEY_EVENTMESH_SERVER_EVENTSIZE = "eventmesh.server.eventSize"; + + public static String KEY_EVENTMESH_SERVER_EVENT_BATCHSIZE = "eventmesh.server.eventBatchSize"; } } \ No newline at end of file diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java index 0ab59cb918..d4bd4fa76d 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java @@ -76,6 +76,10 @@ public class EventMeshTCPConfiguration extends CommonConfiguration { public int sleepIntervalInRebalanceRedirectMills = 200; + public int eventMeshEventSize = 1000; + + public int eventMeshEventBatchSize = 10; + private TrafficShapingConfig gtc = new TrafficShapingConfig(0, 10_000, 1_000, 2000); private TrafficShapingConfig ctc = new TrafficShapingConfig(0, 2_000, 1_000, 10_000); @@ -155,6 +159,10 @@ public void init() { sleepIntervalInRebalanceRedirectMills = configurationWrapper.getIntProp( ConfKeys.KEYS_EVENTMESH_SERVER_REBALANCE_REDIRECT_SLEEP_TIME, sleepIntervalInRebalanceRedirectMills); + eventMeshEventSize = configurationWrapper.getIntProp(ConfKeys.KEYS_EVENTMESH_SERVER_EVENTSIZE, eventMeshEventSize); + + eventMeshEventBatchSize = configurationWrapper.getIntProp( + ConfKeys.KEYS_EVENTMESH_SERVER_EVENT_BATCHSIZE, eventMeshEventBatchSize); } public TrafficShapingConfig getGtc() { @@ -191,6 +199,8 @@ static class ConfKeys { public static String KEYS_EVENTMESH_SERVER_PUSH_FAIL_ISOLATE_TIME = "eventMesh.server.tcp.pushFailIsolateTimeInMills"; public static String KEYS_EVENTMESH_SERVER_GRACEFUL_SHUTDOWN_SLEEP_TIME = "eventMesh.server.gracefulShutdown.sleepIntervalInMills"; public static String KEYS_EVENTMESH_SERVER_REBALANCE_REDIRECT_SLEEP_TIME = "eventMesh.server.rebalanceRedirect.sleepIntervalInM"; + public static String KEYS_EVENTMESH_SERVER_EVENTSIZE = "eventmesh.server.eventSize"; + public static String KEYS_EVENTMESH_SERVER_EVENT_BATCHSIZE = "eventmesh.server.eventBatchSize"; } public static class TrafficShapingConfig { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java index 2f2103b450..13c0605f7c 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java @@ -46,6 +46,7 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -111,6 +112,18 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext String producerGroup = ""; int eventSize = eventList.size(); + if (eventSize > eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventBatchSize) { + batchMessageLogger.error("Event batch size exceeds the limit: {}", + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventBatchSize); + + responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse( + sendMessageBatchResponseHeader, + SendMessageBatchResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(), + "Event batch size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventBatchSize)); + asyncContext.onComplete(responseEventMeshCommand); + return; + } + for (CloudEvent event : eventList) { //validate event if (StringUtils.isBlank(event.getId()) @@ -126,6 +139,19 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext return; } + String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8); + if (content.length() > eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize) { + batchMessageLogger.error("Event size exceeds the limit: {}", + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize); + + responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse( + sendMessageBatchResponseHeader, + SendMessageBatchResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(), + "Event size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize)); + asyncContext.onComplete(responseEventMeshCommand); + return; + } + String idc = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.IDC)).toString(); String pid = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.PID)).toString(); String sys = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.SYS)).toString(); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java index 9b9f44e9f3..2b23d786b6 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java @@ -46,6 +46,7 @@ import org.apache.commons.lang3.StringUtils; +import java.nio.charset.StandardCharsets; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -158,6 +159,19 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext return; } + String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8); + if (content.length() > eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize) { + batchMessageLogger.error("Event size exceeds the limit: {}", + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize); + + responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse( + sendMessageBatchV2ResponseHeader, + SendMessageBatchV2ResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(), + "Event size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize)); + asyncContext.onComplete(responseEventMeshCommand); + return; + } + //do acl check if (eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) { String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java index 45ec636405..f35eb0e2bf 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/ReplyMessageProcessor.java @@ -46,6 +46,7 @@ import org.apache.commons.lang3.StringUtils; +import java.nio.charset.StandardCharsets; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -152,6 +153,19 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext return; } + String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8); + if (content.length() > eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize) { + httpLogger.error("Event size exceeds the limit: {}", + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize); + + responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse( + replyMessageResponseHeader, + ReplyMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(), + "Event size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize)); + asyncContext.onComplete(responseEventMeshCommand); + return; + } + EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup); if (!eventMeshProducer.getStarted().get()) { diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java index 826438505e..07fb4df693 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java @@ -45,6 +45,7 @@ import org.apache.commons.lang3.StringUtils; +import java.nio.charset.StandardCharsets; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -195,6 +196,19 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext event = CloudEventBuilder.from(event).withExtension(SendMessageRequestBody.TTL, ttl).build(); } + String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8); + if (content.length() > eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize) { + httpLogger.error("Event size exceeds the limit: {}", + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize); + + responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse( + sendMessageResponseHeader, + SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(), + "Event size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize)); + asyncContext.onComplete(responseEventMeshCommand); + return; + } + try { // body //omsMsg.setBody(sendMessageRequestBody.getContent().getBytes(EventMeshConstants.DEFAULT_CHARSET)); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java index b80fec49a2..529d3ce94e 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java @@ -43,6 +43,7 @@ import org.apache.commons.lang3.StringUtils; +import java.nio.charset.StandardCharsets; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -191,6 +192,19 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext return; } + String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8); + if (content.length() > eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize) { + httpLogger.error("Event size exceeds the limit: {}", + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize); + + responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse( + sendMessageResponseHeader, + SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(), + "Event size exceeds the limit: " + eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEventSize)); + asyncContext.onComplete(responseEventMeshCommand); + return; + } + EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java index ffcd199d6f..7c700cdcae 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java @@ -41,6 +41,7 @@ import org.apache.commons.lang3.StringUtils; +import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -86,6 +87,12 @@ public void run() { throw new Exception("event is null"); } + String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8); + if (content.length() > eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshEventSize) { + throw new Exception("event size exceeds the limit: " + + eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshEventSize); + } + //do acl check in sending msg if (eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerSecurityEnable) { String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); From 0761ae535a90b275a7d637658eba48e444bc039b Mon Sep 17 00:00:00 2001 From: jinrongluo Date: Tue, 8 Feb 2022 21:56:00 -0500 Subject: [PATCH 2/3] [Issue #655] fix style check issue. --- .../core/protocol/tcp/client/task/MessageTransferTask.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java index 7c700cdcae..af4d26a250 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java @@ -89,8 +89,8 @@ public void run() { String content = new String(event.getData().toBytes(), StandardCharsets.UTF_8); if (content.length() > eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshEventSize) { - throw new Exception("event size exceeds the limit: " + - eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshEventSize); + throw new Exception("event size exceeds the limit: " + + eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshEventSize); } //do acl check in sending msg From 868ba83845a21a72371077e56cac55c915a28e40 Mon Sep 17 00:00:00 2001 From: jinrongluo Date: Fri, 11 Feb 2022 12:25:23 -0500 Subject: [PATCH 3/3] [Issue #655] Fix eventsize and batch eventsize property names --- eventmesh-runtime/conf/eventmesh.properties | 10 ++++++---- .../configuration/EventMeshHTTPConfiguration.java | 8 ++++---- .../configuration/EventMeshTCPConfiguration.java | 4 ++-- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties index 031a262641..84cedc913c 100644 --- a/eventmesh-runtime/conf/eventmesh.properties +++ b/eventmesh-runtime/conf/eventmesh.properties @@ -39,8 +39,10 @@ eventMesh.server.tcp.msgReqnumPerSecond=15000 eventMesh.server.http.msgReqnumPerSecond=15000 eventMesh.server.session.upstreamBufferSize=20 -eventmesh.server.eventSize=1000 -eventmesh.server.eventBatchSize=10 +# for single event publish, maximum size allowed per event +eventMesh.server.maxEventSize=1000 +# for batch event publish, maximum number of events allowed in one batch +eventMesh.server.maxEventBatchSize=10 # thread number about global scheduler eventMesh.server.global.scheduler=5 @@ -64,8 +66,8 @@ eventMesh.server.gracefulShutdown.sleepIntervalInMills=1000 eventMesh.server.rebalanceRedirect.sleepIntervalInMills=200 #ip address blacklist -eventmesh.server.blacklist.ipv4=0.0.0.0/8,127.0.0.0/8,169.254.0.0/16,255.255.255.255/32 -eventmesh.server.blacklist.ipv6=::/128,::1/128,ff00::/8 +eventMesh.server.blacklist.ipv4=0.0.0.0/8,127.0.0.0/8,169.254.0.0/16,255.255.255.255/32 +eventMesh.server.blacklist.ipv6=::/128,::1/128,ff00::/8 #connector plugin eventMesh.connector.plugin.type=standalone diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java index d8eb63ee31..c0699c9dd8 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java @@ -353,12 +353,12 @@ static class ConfKeys { public static String KEY_EVENTMESH_SERVER_MSG_REQ_NUM_PER_SECOND = "eventMesh.server.http.msgReqnumPerSecond"; - public static String KEY_EVENTMESH_SERVER_EVENTSIZE = "eventmesh.server.eventSize"; + public static String KEY_EVENTMESH_SERVER_EVENTSIZE = "eventMesh.server.maxEventSize"; - public static String KEY_EVENTMESH_SERVER_EVENT_BATCHSIZE = "eventmesh.server.eventBatchSize"; + public static String KEY_EVENTMESH_SERVER_EVENT_BATCHSIZE = "eventMesh.server.maxEventBatchSize"; - public static String KEY_EVENTMESH_SERVER_IPV4_BLACK_LIST = "eventmesh.server.blacklist.ipv4"; + public static String KEY_EVENTMESH_SERVER_IPV4_BLACK_LIST = "eventMesh.server.blacklist.ipv4"; - public static String KEY_EVENTMESH_SERVER_IPV6_BLACK_LIST = "eventmesh.server.blacklist.ipv6"; + public static String KEY_EVENTMESH_SERVER_IPV6_BLACK_LIST = "eventMesh.server.blacklist.ipv6"; } } \ No newline at end of file diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java index d4bd4fa76d..5544b0bb91 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshTCPConfiguration.java @@ -199,8 +199,8 @@ static class ConfKeys { public static String KEYS_EVENTMESH_SERVER_PUSH_FAIL_ISOLATE_TIME = "eventMesh.server.tcp.pushFailIsolateTimeInMills"; public static String KEYS_EVENTMESH_SERVER_GRACEFUL_SHUTDOWN_SLEEP_TIME = "eventMesh.server.gracefulShutdown.sleepIntervalInMills"; public static String KEYS_EVENTMESH_SERVER_REBALANCE_REDIRECT_SLEEP_TIME = "eventMesh.server.rebalanceRedirect.sleepIntervalInM"; - public static String KEYS_EVENTMESH_SERVER_EVENTSIZE = "eventmesh.server.eventSize"; - public static String KEYS_EVENTMESH_SERVER_EVENT_BATCHSIZE = "eventmesh.server.eventBatchSize"; + public static String KEYS_EVENTMESH_SERVER_EVENTSIZE = "eventMesh.server.maxEventSize"; + public static String KEYS_EVENTMESH_SERVER_EVENT_BATCHSIZE = "eventMesh.server.maxEventBatchSize"; } public static class TrafficShapingConfig {