Skip to content

Commit

Permalink
[ISSUE apache#5020] Optimize unit tests and code (apache#5023)
Browse files Browse the repository at this point in the history
  • Loading branch information
cnzakii authored Jul 9, 2024
1 parent c615f97 commit 3436e56
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 163 deletions.
4 changes: 3 additions & 1 deletion eventmesh-connectors/eventmesh-connector-http/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ dependencies {
implementation 'io.vertx:vertx-web-client:4.5.8'
implementation 'dev.failsafe:failsafe:3.3.2'


testImplementation 'org.apache.httpcomponents.client5:httpclient5:5.3.1'
testImplementation 'org.apache.httpcomponents.client5:httpclient5-fluent:5.3.1'
testImplementation 'org.mock-server:mockserver-netty:5.15.0'
testImplementation 'com.squareup.okhttp3:okhttp:4.12.0'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,14 @@ public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConne

// get timestamp and offset
Long timestamp = httpConnectRecord.getData().getTimestamp();
Map<String, ?> offset = ((HttpRecordOffset) httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap();
Map<String, ?> offset = null;
try {
// May throw NullPointerException.
offset = ((HttpRecordOffset) httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap();
} catch (NullPointerException e) {
// ignore null pointer exception
}
final Map<String, ?> finalOffset = offset;

// send the request
return this.webClient.post(url.getPath())
Expand All @@ -143,26 +150,28 @@ public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConne
.ssl(Objects.equals(url.getScheme(), "https"))
.sendJson(httpConnectRecord)
.onSuccess(res -> {
log.info("Request sent successfully. Record: timestamp={}, offset={}", timestamp, offset);
log.info("Request sent successfully. Record: timestamp={}, offset={}", timestamp, finalOffset);
// log the response
if (HttpUtils.is2xxSuccessful(res.statusCode())) {
if (log.isDebugEnabled()) {
log.debug("Received successful response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}",
res.statusCode(), timestamp, offset, res.bodyAsString());
res.statusCode(), timestamp, finalOffset, res.bodyAsString());
} else {
log.info("Received successful response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp, offset);
log.info("Received successful response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp,
finalOffset);
}
} else {
if (log.isDebugEnabled()) {
log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}",
res.statusCode(), timestamp, offset, res.bodyAsString());
res.statusCode(), timestamp, finalOffset, res.bodyAsString());
} else {
log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp, offset);
log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp,
finalOffset);
}
}

})
.onFailure(err -> log.error("Request failed to send. Record: timestamp={}, offset={}", timestamp, offset, err));
.onFailure(err -> log.error("Request failed to send. Record: timestamp={}, offset={}", timestamp, finalOffset, err));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ public class WebhookHttpSinkHandler extends CommonHttpSinkHandler {
// store the received data, when webhook is enabled
private final SynchronizedCircularFifoQueue<HttpExportRecord> receivedDataQueue;

private volatile boolean exportStarted = false;

private volatile boolean exportDestroyed = false;

public boolean isExportStarted() {
return exportStarted;
}

public boolean isExportDestroyed() {
return exportDestroyed;
}

public WebhookHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
super(sinkConnectorConfig);
this.sinkConnectorConfig = sinkConnectorConfig;
Expand Down Expand Up @@ -179,10 +191,15 @@ public void start() {
// start the webclient
super.start();
// start the export server
Throwable t = this.exportServer.listen().cause();
if (t != null) {
throw new EventMeshException("Failed to start Vertx server. ", t);
}
this.exportServer.listen(res -> {
if (res.succeeded()) {
this.exportStarted = true;
log.info("WebhookHttpExportServer started on port: {}", this.webhookConfig.getPort());
} else {
log.error("WebhookHttpExportServer failed to start on port: {}", this.webhookConfig.getPort());
throw new EventMeshException("Failed to start Vertx server. ", res.cause());
}
});
}

/**
Expand Down Expand Up @@ -250,10 +267,15 @@ public void stop() {
super.stop();
// stop the export server
if (this.exportServer != null) {
Throwable t = this.exportServer.close().cause();
if (t != null) {
throw new EventMeshException("Failed to stop Vertx server. ", t);
}
this.exportServer.close(res -> {
if (res.succeeded()) {
this.exportDestroyed = true;
log.info("WebhookHttpExportServer stopped on port: {}", this.webhookConfig.getPort());
} else {
log.error("WebhookHttpExportServer failed to stop on port: {}", this.webhookConfig.getPort());
throw new EventMeshException("Failed to stop Vertx server. ", res.cause());
}
});
} else {
log.warn("Callback server is null, ignore.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,18 @@ public class HttpSourceConnector implements Source {

private HttpServer server;

private volatile boolean started = false;

private volatile boolean destroyed = false;

public boolean isStarted() {
return started;
}

public boolean isDestroyed() {
return destroyed;
}


@Override
public Class<? extends Config> configClass() {
Expand Down Expand Up @@ -105,10 +117,15 @@ private void doInit() {

@Override
public void start() {
Throwable t = this.server.listen().cause();
if (t != null) {
throw new EventMeshException("failed to start Vertx server", t);
}
this.server.listen(res -> {
if (res.succeeded()) {
this.started = true;
log.info("HttpSourceConnector started on port: {}", this.sourceConfig.getConnectorConfig().getPort());
} else {
log.error("HttpSourceConnector failed to start on port: {}", this.sourceConfig.getConnectorConfig().getPort());
throw new EventMeshException("failed to start Vertx server", res.cause());
}
});
}

@Override
Expand All @@ -123,9 +140,19 @@ public String name() {

@Override
public void stop() {
Throwable t = this.server.close().cause();
if (t != null) {
throw new EventMeshException("failed to stop Vertx server", t);
if (this.server != null) {
this.server.close(res -> {
if (res.succeeded()) {
this.destroyed = true;
log.info("HttpSourceConnector stopped on port: {}", this.sourceConfig.getConnectorConfig().getPort());
} else {
log.error("HttpSourceConnector failed to stop on port: {}", this.sourceConfig.getConnectorConfig().getPort());
throw new EventMeshException("failed to stop Vertx server", res.cause());
}
}
);
} else {
log.warn("HttpSourceConnector server is null, ignore.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.eventmesh.connector.http.source.connector;


import static org.mockserver.model.HttpRequest.request;

import org.apache.eventmesh.connector.http.sink.HttpSinkConnector;
Expand All @@ -25,59 +26,64 @@
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.util.ConfigUtil;

import org.apache.hc.client5.http.fluent.Request;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.net.URIBuilder;

import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockserver.integration.ClientAndServer;
import org.mockserver.model.HttpRequest;
import org.mockserver.model.HttpResponse;
import org.mockserver.model.MediaType;


import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;

import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;

public class HttpSinkConnectorTest {

private HttpSinkConnector sinkConnector;

private HttpSinkConfig sinkConfig;

private URI severUri;
private URL url;

private ClientAndServer mockServer;

private static final AtomicInteger counter = new AtomicInteger(0);

@BeforeEach
void before() throws Exception {
// init sinkConnector
this.sinkConnector = new HttpSinkConnector();
this.sinkConfig = (HttpSinkConfig) ConfigUtil.parse(sinkConnector.configClass());
this.sinkConnector.init(this.sinkConfig);
this.sinkConnector.start();
sinkConnector = new HttpSinkConnector();
sinkConfig = (HttpSinkConfig) ConfigUtil.parse(sinkConnector.configClass());
sinkConnector.init(this.sinkConfig);
sinkConnector.start();

this.severUri = URI.create(sinkConfig.connectorConfig.getUrls()[0]);
url = new URL(sinkConfig.connectorConfig.getUrls()[0]);
// start mockServer
mockServer = ClientAndServer.startClientAndServer(severUri.getPort());
mockServer = ClientAndServer.startClientAndServer(url.getPort());
mockServer.reset()
.when(
request()
.withMethod("POST")
.withPath(severUri.getPath())
.withPath(url.getPath())
)
.respond(
httpRequest -> {
// Increase the number of requests received
counter.incrementAndGet();
JSONObject requestBody = JSON.parseObject(httpRequest.getBodyAsString());
return HttpResponse.response()
.withContentType(MediaType.APPLICATION_JSON)
Expand All @@ -90,6 +96,7 @@ void before() throws Exception {
); // .withDelay(TimeUnit.SECONDS, 10);
}
);

}

@AfterEach
Expand All @@ -101,62 +108,57 @@ void after() throws Exception {
@Test
void testPut() throws Exception {
// Create a list of ConnectRecord
final int times = 10;
final int size = 10;
List<ConnectRecord> connectRecords = new ArrayList<>();
for (int i = 0; i < times; i++) {
for (int i = 0; i < size; i++) {
ConnectRecord record = createConnectRecord();
connectRecords.add(record);
}
// Put ConnectRecord
sinkConnector.put(connectRecords);

// sleep 5s
Thread.sleep(5000);

// verify request
HttpRequest[] recordedRequests = mockServer.retrieveRecordedRequests(null);
// assert recordedRequests.length == times;
// wait for receiving request
final int times = 5000; // 5 seconds
long start = System.currentTimeMillis();
while (counter.get() < size) {
if (System.currentTimeMillis() - start > times) {
// timeout
Assertions.fail("The number of requests received=" + counter.get() + " is less than the number of ConnectRecord=" + size);
} else {
Thread.sleep(100);
}
}

// verify response
HttpWebhookConfig webhookConfig = sinkConfig.connectorConfig.getWebhookConfig();
String url = new HttpUrl.Builder()
.scheme("http")
.host(severUri.getHost())
.port(webhookConfig.getPort())
.addPathSegments(webhookConfig.getExportPath())
.addQueryParameter("pageNum", "1")
.addQueryParameter("pageSize", "10")
.addQueryParameter("type", "poll")
.build().toString();

// build request
Request request = new Request.Builder()
.url(url)
.addHeader("Content-Type", "application/json")

URI exportUrl = new URIBuilder()
.setScheme("http")
.setHost(url.getHost())
.setPort(webhookConfig.getPort())
.setPath(webhookConfig.getExportPath())
.addParameter("pageNum", "1")
.addParameter("pageSize", "10")
.addParameter("type", "poll")
.build();

OkHttpClient client = new OkHttpClient();
try (Response response = client.newCall(request).execute()) {
// check response code
if (!response.isSuccessful()) {
throw new RuntimeException("Unexpected response code: " + response);
}
// check response body
ResponseBody responseBody = response.body();
if (responseBody != null) {
JSONObject jsonObject = JSON.parseObject(responseBody.string());
Request.get(exportUrl)
.execute()
.handleResponse(response -> {
// check response code
Assertions.assertEquals(HttpStatus.SC_OK, response.getCode());
// check response body
JSONObject jsonObject = JSON.parseObject(response.getEntity().getContent());
JSONArray pageItems = jsonObject.getJSONArray("pageItems");

assert pageItems != null && pageItems.size() == times;

for (int i = 0; i < times; i++) {
Assertions.assertNotNull(pageItems);
Assertions.assertEquals(size, pageItems.size());
for (int i = 0; i < size; i++) {
JSONObject pageItem = pageItems.getJSONObject(i);
assert pageItem != null;
// assert pageItem.getJSONObject("data") != null;
// assert pageItem.getJSONObject("metadata") != null;
Assertions.assertNotNull(pageItem);
}
}
}
return null;
});
}

private ConnectRecord createConnectRecord() {
Expand Down
Loading

0 comments on commit 3436e56

Please sign in to comment.