From d638ec44b83f0f6b79e74077b5c24b63c57f30bf Mon Sep 17 00:00:00 2001 From: j00441484 Date: Mon, 10 May 2021 17:30:47 -0400 Subject: [PATCH 01/10] [Issue #337] Fix HttpSubscriber startup issue --- .../eventmesh/http/demo/sub/service/SubService.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java index e20fea98be..84432ca003 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java @@ -75,12 +75,15 @@ public void afterPropertiesSet() throws Exception { })); Thread stopThread = new Thread(() -> { + try { + Thread.sleep(5 * 60 * 1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } logger.info("stopThread start...."); System.exit(0); }); - Thread.sleep(5 * 60 * 1000); - -// stopThread.start(); + stopThread.start(); } } From 5ebfb54ece6b2020fb4d1b5fe2be26a3a2a6bbd8 Mon Sep 17 00:00:00 2001 From: jinrongluo Date: Mon, 10 May 2021 17:46:45 -0400 Subject: [PATCH 02/10] [Issue #337] test commit --- .../org/apache/eventmesh/http/demo/sub/service/SubService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java index 84432ca003..c75cef7c51 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java @@ -84,6 +84,6 @@ public void afterPropertiesSet() throws Exception { System.exit(0); }); - stopThread.start(); + // stopThread.start(); } } From a3afff3582a3a0e9baf600276116f34d9e95e191 Mon Sep 17 00:00:00 2001 From: jinrongluo Date: Mon, 10 May 2021 17:49:04 -0400 Subject: [PATCH 03/10] [Issue #337] revert test commit --- .../org/apache/eventmesh/http/demo/sub/service/SubService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java index c75cef7c51..84432ca003 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java @@ -84,6 +84,6 @@ public void afterPropertiesSet() throws Exception { System.exit(0); }); - // stopThread.start(); + stopThread.start(); } } From 50f959d6c7416da01d51da0b33e8c9b47cee0611 Mon Sep 17 00:00:00 2001 From: jinrongluo Date: Tue, 11 May 2021 11:40:24 -0400 Subject: [PATCH 04/10] [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook --- .../http/demo/AsyncPublishInstance.java | 5 +- .../demo/sub/controller/SubController.java | 9 ++- .../http/demo/sub/service/SubService.java | 57 ++++++++++++------- 3 files changed, 50 insertions(+), 21 deletions(-) diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/AsyncPublishInstance.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/AsyncPublishInstance.java index b718bcc2e0..558773fc56 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/AsyncPublishInstance.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/AsyncPublishInstance.java @@ -35,6 +35,9 @@ public class AsyncPublishInstance { public static Logger logger = LoggerFactory.getLogger(AsyncPublishInstance.class); + // This messageSize is also used in SubService.java (Subscriber) + public static int messageSize = 5; + public static void main(String[] args) throws Exception { Properties properties = Utils.readPropertiesFile("application.properties"); final String eventMeshIp = properties.getProperty("eventmesh.ip"); @@ -62,7 +65,7 @@ public static void main(String[] args) throws Exception { liteProducer = new LiteProducer(eventMeshClientConfig); liteProducer.start(); - for (int i = 0; i < 5; i++) { + for (int i = 0; i < messageSize; i++) { LiteMessage liteMessage = new LiteMessage(); liteMessage.setBizSeqNo(RandomStringUtils.randomNumeric(30)) // .setContent("contentStr with special protocal") diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java index 8f8a7a7f9b..a3b9f4ede4 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java @@ -19,8 +19,10 @@ import com.alibaba.fastjson.JSONObject; +import org.apache.eventmesh.http.demo.sub.service.SubService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; @@ -33,12 +35,17 @@ public class SubController { public static Logger logger = LoggerFactory.getLogger(SubController.class); + @Autowired + private SubService subService; + @RequestMapping(value = "/test", method = RequestMethod.POST) public String subTest(@RequestBody String message) { logger.info("=======receive message======= {}", JSONObject.toJSONString(message)); JSONObject result = new JSONObject(); result.put("retCode", 1); - return result.toJSONString(); + String strResult = result.toJSONString(); + subService.consumeMessage(strResult); + return strResult; } } diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java index 84432ca003..8fb4746cef 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java @@ -3,18 +3,22 @@ import java.util.Arrays; import java.util.List; import java.util.Properties; - +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.client.http.conf.LiteClientConfig; import org.apache.eventmesh.client.http.consumer.LiteConsumer; import org.apache.eventmesh.common.EventMeshException; import org.apache.eventmesh.common.IPUtil; import org.apache.eventmesh.common.ThreadUtil; +import org.apache.eventmesh.http.demo.AsyncPublishInstance; import org.apache.eventmesh.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; +import javax.annotation.PreDestroy; @Component public class SubService implements InitializingBean { @@ -38,6 +42,11 @@ public class SubService implements InitializingBean { final String dcn = "FT0"; final String subsys = "1234"; + // CountDownLatch size is the same as messageSize in AsyncPublishInstance.java (Publisher) + private CountDownLatch countDownLatch = new CountDownLatch(AsyncPublishInstance.messageSize); + + private ExecutorService executorService = Executors.newFixedThreadPool(5); + @Override public void afterPropertiesSet() throws Exception { @@ -59,31 +68,41 @@ public void afterPropertiesSet() throws Exception { liteConsumer.heartBeat(topicList, url); liteConsumer.subscribe(topicList, url); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - logger.info("start destory ...."); - try { - liteConsumer.unsubscribe(topicList, url); - } catch (EventMeshException e) { - e.printStackTrace(); - } + // Wait for all messaged to be consumed + executorService.submit(() ->{ try { - liteConsumer.shutdown(); - } catch (Exception e) { - e.printStackTrace(); - } - logger.info("end destory."); - })); - - Thread stopThread = new Thread(() -> { - try { - Thread.sleep(5 * 60 * 1000); + countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } logger.info("stopThread start...."); System.exit(0); }); + } + + @PreDestroy + public void cleanup() { + logger.info("start destory ...."); + try { + liteConsumer.unsubscribe(topicList, url); + } catch (EventMeshException e) { + e.printStackTrace(); + } + try { + liteConsumer.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + } + executorService.shutdown(); + logger.info("end destory."); + } - stopThread.start(); + /** + * Count the message already consumed + */ + public void consumeMessage(String msg) { + logger.info("consume message {}", msg); + countDownLatch.countDown(); + logger.info("remaining number of messages to be consumed {}", countDownLatch.getCount()); } } From d48ead5aac61d602880a6d8715e7bc131c908bf8 Mon Sep 17 00:00:00 2001 From: jinrongluo Date: Tue, 11 May 2021 12:11:20 -0400 Subject: [PATCH 05/10] [Issue #337] Enhance Http Demo Subscriber by using ExecutorService, CountDownLatch and PreDestroy hook --- .../http/demo/sub/service/SubService.java | 57 +++++++++++++------ 1 file changed, 39 insertions(+), 18 deletions(-) diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java index 84432ca003..213ea40078 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java @@ -3,6 +3,9 @@ import java.util.Arrays; import java.util.List; import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.client.http.conf.LiteClientConfig; @@ -10,12 +13,15 @@ import org.apache.eventmesh.common.EventMeshException; import org.apache.eventmesh.common.IPUtil; import org.apache.eventmesh.common.ThreadUtil; +import org.apache.eventmesh.http.demo.AsyncPublishInstance; import org.apache.eventmesh.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; +import javax.annotation.PreDestroy; + @Component public class SubService implements InitializingBean { @@ -38,6 +44,11 @@ public class SubService implements InitializingBean { final String dcn = "FT0"; final String subsys = "1234"; + // CountDownLatch size is the same as messageSize in AsyncPublishInstance.java (Publisher) + private CountDownLatch countDownLatch = new CountDownLatch(AsyncPublishInstance.messageSize); + + private ExecutorService executorService = Executors.newFixedThreadPool(5); + @Override public void afterPropertiesSet() throws Exception { @@ -59,31 +70,41 @@ public void afterPropertiesSet() throws Exception { liteConsumer.heartBeat(topicList, url); liteConsumer.subscribe(topicList, url); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - logger.info("start destory ...."); - try { - liteConsumer.unsubscribe(topicList, url); - } catch (EventMeshException e) { - e.printStackTrace(); - } + // Wait for all messaged to be consumed + executorService.submit(() ->{ try { - liteConsumer.shutdown(); - } catch (Exception e) { - e.printStackTrace(); - } - logger.info("end destory."); - })); - - Thread stopThread = new Thread(() -> { - try { - Thread.sleep(5 * 60 * 1000); + countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } logger.info("stopThread start...."); System.exit(0); }); + } + + @PreDestroy + public void cleanup() { + logger.info("start destory ...."); + try { + liteConsumer.unsubscribe(topicList, url); + } catch (EventMeshException e) { + e.printStackTrace(); + } + try { + liteConsumer.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + } + executorService.shutdown(); + logger.info("end destory."); + } - stopThread.start(); + /** + * Count the message already consumed + */ + public void consumeMessage(String msg) { + logger.info("consume message {}", msg); + countDownLatch.countDown(); + logger.info("remaining number of messages to be consumed {}", countDownLatch.getCount()); } } From c9021fea94148c118ab27608fc8d34404a0c9a3f Mon Sep 17 00:00:00 2001 From: jinrongluo Date: Wed, 12 May 2021 11:25:11 -0400 Subject: [PATCH 06/10] [Issue #337] Address code review comment for Subscriber Demo App --- .../http/demo/sub/controller/SubController.java | 6 +++--- .../eventmesh/http/demo/sub/service/SubService.java | 10 ++-------- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java index a3b9f4ede4..92ca09d700 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/controller/SubController.java @@ -41,11 +41,11 @@ public class SubController { @RequestMapping(value = "/test", method = RequestMethod.POST) public String subTest(@RequestBody String message) { logger.info("=======receive message======= {}", JSONObject.toJSONString(message)); + subService.consumeMessage(message); + JSONObject result = new JSONObject(); result.put("retCode", 1); - String strResult = result.toJSONString(); - subService.consumeMessage(strResult); - return strResult; + return result.toJSONString(); } } diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java index 213ea40078..9a51e4d2fa 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java @@ -4,9 +4,6 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.client.http.conf.LiteClientConfig; import org.apache.eventmesh.client.http.consumer.LiteConsumer; @@ -19,7 +16,6 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; - import javax.annotation.PreDestroy; @Component @@ -47,8 +43,6 @@ public class SubService implements InitializingBean { // CountDownLatch size is the same as messageSize in AsyncPublishInstance.java (Publisher) private CountDownLatch countDownLatch = new CountDownLatch(AsyncPublishInstance.messageSize); - private ExecutorService executorService = Executors.newFixedThreadPool(5); - @Override public void afterPropertiesSet() throws Exception { @@ -71,7 +65,7 @@ public void afterPropertiesSet() throws Exception { liteConsumer.subscribe(topicList, url); // Wait for all messaged to be consumed - executorService.submit(() ->{ + Thread stopThread = new Thread(() -> { try { countDownLatch.await(); } catch (InterruptedException e) { @@ -80,6 +74,7 @@ public void afterPropertiesSet() throws Exception { logger.info("stopThread start...."); System.exit(0); }); + stopThread.start(); } @PreDestroy @@ -95,7 +90,6 @@ public void cleanup() { } catch (Exception e) { e.printStackTrace(); } - executorService.shutdown(); logger.info("end destory."); } From cb2cd82689ea57d62de427af9ffab4bb41cbfd01 Mon Sep 17 00:00:00 2001 From: jinrongluo Date: Wed, 26 May 2021 16:56:00 -0400 Subject: [PATCH 07/10] [Issue #368] Fix Racing condition and memory leak issue in EventMesh SDK LiteConsumer and LiteProducer --- .../client/http/AbstractLiteClient.java | 25 ++++++++++++++ .../client/http/consumer/LiteConsumer.java | 20 ++++++++--- .../client/http/producer/LiteProducer.java | 33 +++++-------------- 3 files changed, 49 insertions(+), 29 deletions(-) diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/AbstractLiteClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/AbstractLiteClient.java index 1710149f83..b538f4bc4a 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/AbstractLiteClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/AbstractLiteClient.java @@ -18,10 +18,17 @@ package org.apache.eventmesh.client.http; import org.apache.eventmesh.client.http.conf.LiteClientConfig; +import org.apache.eventmesh.client.http.ssl.MyX509TrustManager; import org.apache.eventmesh.client.http.util.HttpLoadBalanceUtils; import org.apache.eventmesh.common.loadbalance.LoadBalanceSelector; +import org.apache.http.conn.ssl.DefaultHostnameVerifier; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import java.security.SecureRandom; public abstract class AbstractLiteClient { @@ -46,4 +53,22 @@ public LiteClientConfig getLiteClientConfig() { public void shutdown() throws Exception { logger.info("AbstractLiteClient shutdown"); } + + public CloseableHttpClient setHttpClient() throws Exception { + if (!liteClientConfig.isUseTls()) { + return HttpClients.createDefault(); + } + SSLContext sslContext = null; + try { + String protocol = System.getProperty("ssl.client.protocol", "TLSv1.1"); + TrustManager[] tm = new TrustManager[]{new MyX509TrustManager()}; + sslContext = SSLContext.getInstance(protocol); + sslContext.init(null, tm, new SecureRandom()); + return HttpClients.custom().setSSLContext(sslContext) + .setSSLHostnameVerifier(new DefaultHostnameVerifier()).build(); + } catch (Exception e) { + logger.error("Error in creating HttpClient.", e); + throw e; + } + } } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java index 7c0ef10777..84796dc12d 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java @@ -65,15 +65,13 @@ public class LiteConsumer extends AbstractLiteClient { private ThreadPoolExecutor consumeExecutor; - private static CloseableHttpClient httpClient = HttpClients.createDefault(); - protected LiteClientConfig eventMeshClientConfig; private List subscription = Lists.newArrayList(); private LiteMessageListener messageListener; - protected static final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(4, new EventMeshThreadFactoryImpl("TCPClientScheduler", true)); + protected final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(4, new EventMeshThreadFactoryImpl("TCPClientScheduler", true)); public LiteConsumer(LiteClientConfig liteClientConfig) throws Exception { super(liteClientConfig); @@ -110,7 +108,10 @@ public void start() throws Exception { public void shutdown() throws Exception { logger.info("LiteConsumer shutting down"); super.shutdown(); - httpClient.close(); + if (consumeExecutor != null) { + consumeExecutor.shutdown(); + } + scheduler.shutdown(); started.compareAndSet(true, false); logger.info("LiteConsumer shutdown"); } @@ -126,10 +127,13 @@ public boolean subscribe(List topicList, String url) throws Exception { long startTime = System.currentTimeMillis(); String target = selectEventMesh(); String subRes = ""; + CloseableHttpClient httpClient = setHttpClient(); try { subRes = HttpUtil.post(httpClient, target, subscribeParam); } catch (Exception ex) { throw new EventMeshException(ex); + } finally { + httpClient.close(); } if (logger.isDebugEnabled()) { @@ -211,10 +215,13 @@ public void run() { long startTime = System.currentTimeMillis(); String target = selectEventMesh(); String res = ""; + CloseableHttpClient httpClient = setHttpClient(); try { res = HttpUtil.post(httpClient, target, requestParam); } catch (Exception ex) { throw new EventMeshException(ex); + } finally { + httpClient.close(); } if (logger.isDebugEnabled()) { @@ -234,17 +241,20 @@ public void run() { }, EventMeshCommon.HEATBEAT, EventMeshCommon.HEATBEAT, TimeUnit.MILLISECONDS); } - public boolean unsubscribe(List topicList, String url) throws EventMeshException { + public boolean unsubscribe(List topicList, String url) throws Exception { subscription.removeAll(topicList); RequestParam unSubscribeParam = generateUnSubscribeRequestParam(topicList, url); long startTime = System.currentTimeMillis(); String target = selectEventMesh(); String unSubRes = ""; + CloseableHttpClient httpClient = setHttpClient(); try { unSubRes = HttpUtil.post(httpClient, target, unSubscribeParam); } catch (Exception ex) { throw new EventMeshException(ex); + } finally { + httpClient.close(); } if (logger.isDebugEnabled()) { diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java index 5e81579b74..eb57908b30 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java @@ -61,13 +61,8 @@ public class LiteProducer extends AbstractLiteClient { public Logger logger = LoggerFactory.getLogger(LiteProducer.class); - private static CloseableHttpClient httpClient = HttpClients.createDefault(); - public LiteProducer(LiteClientConfig liteClientConfig) { super(liteClientConfig); - if (liteClientConfig.isUseTls()) { - setHttpClient(); - } } private AtomicBoolean started = new AtomicBoolean(Boolean.FALSE); @@ -92,7 +87,6 @@ public void shutdown() throws Exception { } logger.info("LiteProducer shutting down"); super.shutdown(); - httpClient.close(); started.compareAndSet(true, false); logger.info("LiteProducer shutdown"); } @@ -132,10 +126,13 @@ public boolean publish(LiteMessage message) throws Exception { long startTime = System.currentTimeMillis(); String target = selectEventMesh(); String res = ""; + CloseableHttpClient httpClient = setHttpClient(); try { res = HttpUtil.post(httpClient, target, requestParam); } catch (Exception ex) { throw new EventMeshException(ex); + } finally { + httpClient.close(); } if (logger.isDebugEnabled()) { @@ -191,10 +188,13 @@ public LiteMessage request(LiteMessage message, long timeout) throws Exception { long startTime = System.currentTimeMillis(); String target = selectEventMesh(); String res = ""; + CloseableHttpClient httpClient = setHttpClient(); try { res = HttpUtil.post(httpClient, target, requestParam); } catch (Exception ex) { throw new EventMeshException(ex); + } finally { + httpClient.close(); } if (logger.isDebugEnabled()) { @@ -246,32 +246,17 @@ public void request(LiteMessage message, RRCallback rrCallback, long timeout) th long startTime = System.currentTimeMillis(); String target = selectEventMesh(); + CloseableHttpClient httpClient = setHttpClient(); try { HttpUtil.post(httpClient, null, target, requestParam, new RRCallbackResponseHandlerAdapter(message, rrCallback, timeout)); } catch (Exception ex) { throw new EventMeshException(ex); + } finally { + httpClient.close(); } if (logger.isDebugEnabled()) { logger.debug("publish sync message by async, target:{}, cost:{}, message:{}", target, System.currentTimeMillis() - startTime, message); } } - - public static void setHttpClient() { - SSLContext sslContext = null; - try { - String protocol = System.getProperty("ssl.client.protocol", "TLSv1.1"); - TrustManager[] tm = new TrustManager[]{new MyX509TrustManager()}; - sslContext = SSLContext.getInstance(protocol); - sslContext.init(null, tm, new SecureRandom()); - httpClient = HttpClients.custom().setSSLContext(sslContext) - .setSSLHostnameVerifier(new DefaultHostnameVerifier()).build(); - } catch (NoSuchAlgorithmException e) { - e.printStackTrace(); - } catch (KeyManagementException e) { - e.printStackTrace(); - } catch (Exception e) { - e.printStackTrace(); - } - } } From 1f989322dd9279efddd2cb1ff5f232353e25beee Mon Sep 17 00:00:00 2001 From: jinrongluo Date: Wed, 26 May 2021 17:14:21 -0400 Subject: [PATCH 08/10] [Issue #368] fix build issue --- .../org/apache/eventmesh/http/demo/sub/service/SubService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java index 9a51e4d2fa..e6b92afc99 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/sub/service/SubService.java @@ -82,7 +82,7 @@ public void cleanup() { logger.info("start destory ...."); try { liteConsumer.unsubscribe(topicList, url); - } catch (EventMeshException e) { + } catch (Exception e) { e.printStackTrace(); } try { From 7fd537359293b93474ae1601fa60003bd4c7f2eb Mon Sep 17 00:00:00 2001 From: jinrongluo Date: Thu, 27 May 2021 10:13:21 -0400 Subject: [PATCH 09/10] [Issue #368] use try with resource statement for HttpClient --- .../client/http/consumer/LiteConsumer.java | 24 +++++-------------- .../client/http/producer/LiteProducer.java | 24 +++++-------------- 2 files changed, 12 insertions(+), 36 deletions(-) diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java index 84796dc12d..ede5ef0743 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java @@ -127,13 +127,9 @@ public boolean subscribe(List topicList, String url) throws Exception { long startTime = System.currentTimeMillis(); String target = selectEventMesh(); String subRes = ""; - CloseableHttpClient httpClient = setHttpClient(); - try { + + try (CloseableHttpClient httpClient = setHttpClient()){ subRes = HttpUtil.post(httpClient, target, subscribeParam); - } catch (Exception ex) { - throw new EventMeshException(ex); - } finally { - httpClient.close(); } if (logger.isDebugEnabled()) { @@ -215,13 +211,9 @@ public void run() { long startTime = System.currentTimeMillis(); String target = selectEventMesh(); String res = ""; - CloseableHttpClient httpClient = setHttpClient(); - try { + + try (CloseableHttpClient httpClient = setHttpClient()) { res = HttpUtil.post(httpClient, target, requestParam); - } catch (Exception ex) { - throw new EventMeshException(ex); - } finally { - httpClient.close(); } if (logger.isDebugEnabled()) { @@ -248,13 +240,9 @@ public boolean unsubscribe(List topicList, String url) throws Exception long startTime = System.currentTimeMillis(); String target = selectEventMesh(); String unSubRes = ""; - CloseableHttpClient httpClient = setHttpClient(); - try { + + try (CloseableHttpClient httpClient = setHttpClient()) { unSubRes = HttpUtil.post(httpClient, target, unSubscribeParam); - } catch (Exception ex) { - throw new EventMeshException(ex); - } finally { - httpClient.close(); } if (logger.isDebugEnabled()) { diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java index eb57908b30..d16b8f91ac 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java @@ -126,13 +126,9 @@ public boolean publish(LiteMessage message) throws Exception { long startTime = System.currentTimeMillis(); String target = selectEventMesh(); String res = ""; - CloseableHttpClient httpClient = setHttpClient(); - try { + + try (CloseableHttpClient httpClient = setHttpClient()) { res = HttpUtil.post(httpClient, target, requestParam); - } catch (Exception ex) { - throw new EventMeshException(ex); - } finally { - httpClient.close(); } if (logger.isDebugEnabled()) { @@ -188,13 +184,9 @@ public LiteMessage request(LiteMessage message, long timeout) throws Exception { long startTime = System.currentTimeMillis(); String target = selectEventMesh(); String res = ""; - CloseableHttpClient httpClient = setHttpClient(); - try { + + try (CloseableHttpClient httpClient = setHttpClient()) { res = HttpUtil.post(httpClient, target, requestParam); - } catch (Exception ex) { - throw new EventMeshException(ex); - } finally { - httpClient.close(); } if (logger.isDebugEnabled()) { @@ -246,13 +238,9 @@ public void request(LiteMessage message, RRCallback rrCallback, long timeout) th long startTime = System.currentTimeMillis(); String target = selectEventMesh(); - CloseableHttpClient httpClient = setHttpClient(); - try { + + try (CloseableHttpClient httpClient = setHttpClient()) { HttpUtil.post(httpClient, null, target, requestParam, new RRCallbackResponseHandlerAdapter(message, rrCallback, timeout)); - } catch (Exception ex) { - throw new EventMeshException(ex); - } finally { - httpClient.close(); } if (logger.isDebugEnabled()) { From 378454ea618075b82bd8e6df7f2953fa178527f6 Mon Sep 17 00:00:00 2001 From: jinrongluo Date: Thu, 27 May 2021 16:15:23 -0400 Subject: [PATCH 10/10] [Issue #368] fix TLS1.1 and use TLS1.2 in HttpClient --- .../org/apache/eventmesh/client/http/AbstractLiteClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/AbstractLiteClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/AbstractLiteClient.java index b538f4bc4a..20302af8e6 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/AbstractLiteClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/AbstractLiteClient.java @@ -60,7 +60,7 @@ public CloseableHttpClient setHttpClient() throws Exception { } SSLContext sslContext = null; try { - String protocol = System.getProperty("ssl.client.protocol", "TLSv1.1"); + String protocol = System.getProperty("ssl.client.protocol", "TLSv1.2"); TrustManager[] tm = new TrustManager[]{new MyX509TrustManager()}; sslContext = SSLContext.getInstance(protocol); sslContext.init(null, tm, new SecureRandom());