Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #340]Add http trace http point #527

Merged
merged 49 commits into from
Dec 16, 2021
Merged
Changes from 1 commit
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
dde6ab5
tracing in AbstractHTTPServer
Roc-00 Sep 15, 2021
2ed29e2
tracing in AbstractHTTPServer
Roc-00 Sep 17, 2021
2293b9c
tracing in AbstractHTTPServer
Roc-00 Sep 20, 2021
2c5cc3d
trace in AbstractHTTPServer
Roc-00 Sep 20, 2021
60e9d69
add licence
Roc-00 Sep 20, 2021
744b16c
add licence
Roc-00 Sep 20, 2021
7479fd9
the span exporter
Roc-00 Sep 20, 2021
53a3fdf
design docs
Roc-00 Sep 20, 2021
40bb928
fix the error on text
Roc-00 Sep 22, 2021
69c62bc
delete the useless dependence
Roc-00 Sep 22, 2021
da9ed30
delete the useless dependence
Roc-00 Sep 22, 2021
3ceb7ee
add license
Roc-00 Sep 22, 2021
eea9212
add license
Roc-00 Sep 22, 2021
3d7b605
remove the unused code
Roc-00 Sep 23, 2021
4a29456
fix the different spanExporter
Roc-00 Sep 24, 2021
66bba23
change the class name
Roc-00 Sep 24, 2021
ac5d236
Merge branch 'develop' into trace3
Roc-00 Sep 24, 2021
d22bb45
fix gradle -build problem
Roc-00 Sep 24, 2021
70499ad
design docs improve
Roc-00 Sep 25, 2021
affc589
Merge branch 'develop' into trace3
Roc-00 Sep 25, 2021
78e2f62
fix the gradle.build error problem
Roc-00 Sep 25, 2021
c1e38d7
fixed
Roc-00 Oct 1, 2021
1e874ca
unsure fix
Roc-00 Oct 1, 2021
5261c1a
fixed
Roc-00 Oct 1, 2021
0cd4fc4
fix the path name
Roc-00 Oct 4, 2021
7de65fc
Merge branch 'develop' into trace3
Roc-00 Oct 22, 2021
51ff1e2
fix the path name
Roc-00 Oct 22, 2021
be96e82
fix check error
Roc-00 Oct 22, 2021
0440060
fix check error
Roc-00 Oct 22, 2021
4da95e7
fix check error
Roc-00 Oct 22, 2021
e3977fa
change the max size of one line from 100 to 120
Roc-00 Oct 22, 2021
d537406
Wrap lines longer than 120 characters
Roc-00 Oct 22, 2021
9578c73
Wrap lines longer than 120 characters
Roc-00 Oct 22, 2021
31d6a61
Wrap lines longer than 120 characters
Roc-00 Oct 22, 2021
9b6e253
format code
Roc-00 Oct 23, 2021
4fca695
format code
Roc-00 Oct 23, 2021
98d7a3e
format code
Roc-00 Oct 23, 2021
037d371
format code
Roc-00 Oct 23, 2021
bb0971d
format code
Roc-00 Oct 23, 2021
6642680
add javadoc
Roc-00 Oct 23, 2021
76b42fd
checkstyle fix
Roc-00 Oct 23, 2021
1ad5545
unversioned files
Roc-00 Nov 22, 2021
71d3bbd
Merge branch 'develop' into trace3
Roc-00 Nov 22, 2021
96b54b8
put context into channel in advance
Roc-00 Nov 22, 2021
6e883b0
Merge branch 'develop' into trace3
Roc-00 Dec 10, 2021
d48de99
Merge branch 'apache:develop' into trace3
Roc-00 Dec 14, 2021
836b13c
fix error
Roc-00 Dec 14, 2021
977b942
fix error
Roc-00 Dec 15, 2021
da634ea
fix error
Roc-00 Dec 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix error
  • Loading branch information
Roc-00 committed Dec 14, 2021
commit 836b13c7510755acd2ac7a051109d4a6ed84b1a1
Original file line number Diff line number Diff line change
Expand Up @@ -30,38 +30,17 @@
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer;
import org.apache.eventmesh.runtime.util.RemotingHelper;

import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import org.apache.eventmesh.common.ThreadPoolFactory;
import org.apache.eventmesh.common.command.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.Body;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
import org.apache.eventmesh.common.protocol.http.common.ProtocolVersion;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.http.header.Header;
import org.apache.eventmesh.runtime.common.Pair;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer;
import org.apache.eventmesh.runtime.trace.AttributeKeys;
import org.apache.eventmesh.runtime.trace.OpenTelemetryTraceFactory;
import org.apache.eventmesh.runtime.trace.SpanKey;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;

import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -109,10 +88,6 @@
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
Expand All @@ -122,7 +97,6 @@
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;


public abstract class AbstractHTTPServer extends AbstractRemotingServer {

public Logger httpServerLogger = LoggerFactory.getLogger(this.getClass());
Expand All @@ -137,14 +111,14 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {

private boolean useTLS;

public OpenTelemetryTraceFactory openTelemetryTraceFactory;

public Tracer tracer;

private Boolean useTrace = true; //Determine whether trace is enabled

public TextMapPropagator textMapPropagator;

public OpenTelemetryTraceFactory openTelemetryTraceFactory;

public Tracer tracer;

public ThreadPoolExecutor asyncContextCompleteHandler =
ThreadPoolFactory.createThreadPoolExecutor(10, 10, "EventMesh-http-asyncContext-");

Expand Down Expand Up @@ -249,6 +223,87 @@ public void registerProcessor(Integer requestCode, HttpRequestProcessor processo
this.processorTable.put(requestCode.toString(), pair);
}

private Map<String, Object> parseHTTPHeader(HttpRequest fullReq) {
Map<String, Object> headerParam = new HashMap<>();
for (String key : fullReq.headers().names()) {
if (StringUtils.equalsIgnoreCase(HttpHeaderNames.CONTENT_TYPE.toString(), key)
|| StringUtils.equalsIgnoreCase(HttpHeaderNames.ACCEPT_ENCODING.toString(), key)
|| StringUtils.equalsIgnoreCase(HttpHeaderNames.CONTENT_LENGTH.toString(), key)) {
continue;
}
headerParam.put(key, fullReq.headers().get(key));
}
return headerParam;
}

/**
* Validate request, return error status.
*
* @param httpRequest
* @return if request is validated return null else return error status
*/
private HttpResponseStatus validateHTTPRequest(HttpRequest httpRequest) {
if (!started.get()) {
return HttpResponseStatus.SERVICE_UNAVAILABLE;
}
if (!httpRequest.decoderResult().isSuccess()) {
return HttpResponseStatus.BAD_REQUEST;
}
if (!HttpMethod.GET.equals(httpRequest.method()) && !HttpMethod.POST.equals(httpRequest.method())) {
return HttpResponseStatus.METHOD_NOT_ALLOWED;
}
final String protocolVersion = httpRequest.headers().get(ProtocolKey.VERSION);
if (!ProtocolVersion.contains(protocolVersion)) {
return HttpResponseStatus.BAD_REQUEST;
}
return null;
}

/**
* Inject ip and protocol version, if the protocol version is empty, set default to {@link ProtocolVersion#V1}.
*
* @param ctx
* @param httpRequest
*/
private void preProcessHTTPRequestHeader(ChannelHandlerContext ctx, HttpRequest httpRequest) {
HttpHeaders requestHeaders = httpRequest.headers();
requestHeaders.set(ProtocolKey.ClientInstanceKey.IP,
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));

String protocolVersion = httpRequest.headers().get(ProtocolKey.VERSION);
if (StringUtils.isBlank(protocolVersion)) {
requestHeaders.set(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion());
}
}

/**
* Parse request body to map
*
* @param httpRequest
* @return
*/
private Map<String, Object> parseHttpRequestBody(HttpRequest httpRequest) throws IOException {
final long bodyDecodeStart = System.currentTimeMillis();
Map<String, Object> httpRequestBody = new HashMap<>();

if (HttpMethod.GET.equals(httpRequest.method())) {
QueryStringDecoder getDecoder = new QueryStringDecoder(httpRequest.uri());
getDecoder.parameters().forEach((key, value) -> httpRequestBody.put(key, value.get(0)));
} else if (HttpMethod.POST.equals(httpRequest.method())) {
HttpPostRequestDecoder decoder =
new HttpPostRequestDecoder(defaultHttpDataFactory, httpRequest);
for (InterfaceHttpData parm : decoder.getBodyHttpDatas()) {
if (parm.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
Attribute data = (Attribute) parm;
httpRequestBody.put(data.getName(), data.getValue());
}
}
decoder.destroy();
}
metrics.summaryMetrics.recordDecodeTimeCost(System.currentTimeMillis() - bodyDecodeStart);
return httpRequestBody;
}

class HTTPHandler extends SimpleChannelInboundHandler<HttpRequest> {

@Override
Expand Down Expand Up @@ -342,11 +397,6 @@ public String get(HttpRequest carrier, String key) {
span.recordException(ex); //record this exception
span.end(); // closing the scope does not end the span, this has to be done manually
}
} finally {
try {
decoder.destroy();
} catch (Exception e) {
}
}
}

Expand Down Expand Up @@ -494,85 +544,4 @@ protected void initChannel(SocketChannel channel) {
}
}

private Map<String, Object> parseHTTPHeader(HttpRequest fullReq) {
Map<String, Object> headerParam = new HashMap<>();
for (String key : fullReq.headers().names()) {
if (StringUtils.equalsIgnoreCase(HttpHeaderNames.CONTENT_TYPE.toString(), key)
|| StringUtils.equalsIgnoreCase(HttpHeaderNames.ACCEPT_ENCODING.toString(), key)
|| StringUtils.equalsIgnoreCase(HttpHeaderNames.CONTENT_LENGTH.toString(), key)) {
continue;
}
headerParam.put(key, fullReq.headers().get(key));
}
return headerParam;
}

/**
* Validate request, return error status.
*
* @param httpRequest
* @return if request is validated return null else return error status
*/
private HttpResponseStatus validateHTTPRequest(HttpRequest httpRequest) {
if (!started.get()) {
return HttpResponseStatus.SERVICE_UNAVAILABLE;
}
if (!httpRequest.decoderResult().isSuccess()) {
return HttpResponseStatus.BAD_REQUEST;
}
if (!HttpMethod.GET.equals(httpRequest.method()) && !HttpMethod.POST.equals(httpRequest.method())) {
return HttpResponseStatus.METHOD_NOT_ALLOWED;
}
final String protocolVersion = httpRequest.headers().get(ProtocolKey.VERSION);
if (!ProtocolVersion.contains(protocolVersion)) {
return HttpResponseStatus.BAD_REQUEST;
}
return null;
}

/**
* Inject ip and protocol version, if the protocol version is empty, set default to {@link ProtocolVersion#V1}.
*
* @param ctx
* @param httpRequest
*/
private void preProcessHTTPRequestHeader(ChannelHandlerContext ctx, HttpRequest httpRequest) {
HttpHeaders requestHeaders = httpRequest.headers();
requestHeaders.set(ProtocolKey.ClientInstanceKey.IP,
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));

String protocolVersion = httpRequest.headers().get(ProtocolKey.VERSION);
if (StringUtils.isBlank(protocolVersion)) {
requestHeaders.set(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion());
}
}

/**
* Parse request body to map
*
* @param httpRequest
* @return
*/
private Map<String, Object> parseHttpRequestBody(HttpRequest httpRequest) throws IOException {
final long bodyDecodeStart = System.currentTimeMillis();
Map<String, Object> httpRequestBody = new HashMap<>();

if (HttpMethod.GET.equals(httpRequest.method())) {
QueryStringDecoder getDecoder = new QueryStringDecoder(httpRequest.uri());
getDecoder.parameters().forEach((key, value) -> httpRequestBody.put(key, value.get(0)));
} else if (HttpMethod.POST.equals(httpRequest.method())) {
HttpPostRequestDecoder decoder =
new HttpPostRequestDecoder(defaultHttpDataFactory, httpRequest);
for (InterfaceHttpData parm : decoder.getBodyHttpDatas()) {
if (parm.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
Attribute data = (Attribute) parm;
httpRequestBody.put(data.getName(), data.getValue());
}
}
decoder.destroy();
}
metrics.summaryMetrics.recordDecodeTimeCost(System.currentTimeMillis() - bodyDecodeStart);
return httpRequestBody;
}

}