Skip to content

Commit

Permalink
Add support for framework latency instrumentation
Browse files Browse the repository at this point in the history
As part of the SI instrumentation initiative, this commit adds timing
markers in a breadth of Rest.li-layer and R2-layer codepaths in order
to track framework latency. Client timings are temporarily disabled
for scatter-gather requests, and server timings are disabled for
multiplexed requests.

RB=1745918
G=sf-reviewers
R=ssheng,crzhang,bsoetarm,fcapponi,dmessink
A=crzhang,mnchen,kbalasub
  • Loading branch information
evanw555 committed Dec 10, 2019
1 parent cbd0408 commit 601a5ed
Show file tree
Hide file tree
Showing 34 changed files with 960 additions and 87 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
28.0.13
-------
(RB=1745918)
Add support for framework latency instrumentation

(RB=1874221)
Mark data schema directories as resource roots

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import com.linkedin.r2.message.rest.RestResponse;
import com.linkedin.r2.message.stream.StreamRequest;
import com.linkedin.r2.message.stream.StreamResponse;
import com.linkedin.r2.message.timing.FrameworkTimingKeys;
import com.linkedin.r2.message.timing.TimingContextUtil;
import com.linkedin.r2.transport.common.WireAttributeHelper;
import com.linkedin.r2.transport.common.bridge.client.TransportClient;
import com.linkedin.r2.transport.common.bridge.common.TransportCallback;
Expand Down Expand Up @@ -59,6 +61,8 @@ public void onRestRequest(RestRequest req, final RequestContext requestContext,
Map<String, String> wireAttrs,
final NextFilter<RestRequest, RestResponse> nextFilter)
{
markOnRequestTimings(requestContext);

try
{
_client.restRequest(req, requestContext, wireAttrs, createCallback(requestContext, nextFilter));
Expand All @@ -74,6 +78,8 @@ public void onStreamRequest(StreamRequest req, final RequestContext requestConte
Map<String, String> wireAttrs,
final NextFilter<StreamRequest, StreamResponse> nextFilter)
{
markOnRequestTimings(requestContext);

try
{
_client.streamRequest(req, requestContext, wireAttrs, createCallback(requestContext, nextFilter));
Expand All @@ -89,6 +95,7 @@ private <REQ extends Request, RES extends Response> TransportCallback<RES> creat
final NextFilter<REQ, RES> nextFilter)
{
return res -> {
markOnResponseTimings(requestContext);
final Map<String, String> wireAttrs = res.getWireAttributes();
if (res.hasError())
{
Expand All @@ -100,4 +107,18 @@ private <REQ extends Request, RES extends Response> TransportCallback<RES> creat
}
};
}

private static void markOnRequestTimings(RequestContext requestContext)
{
TimingContextUtil.endTiming(requestContext, FrameworkTimingKeys.CLIENT_REQUEST_R2_FILTER_CHAIN.key());
TimingContextUtil.endTiming(requestContext, FrameworkTimingKeys.CLIENT_REQUEST_R2.key());
TimingContextUtil.endTiming(requestContext, FrameworkTimingKeys.CLIENT_REQUEST.key());
}

private static void markOnResponseTimings(RequestContext requestContext)
{
TimingContextUtil.beginTiming(requestContext, FrameworkTimingKeys.CLIENT_RESPONSE.key());
TimingContextUtil.beginTiming(requestContext, FrameworkTimingKeys.CLIENT_RESPONSE_R2.key());
TimingContextUtil.beginTiming(requestContext, FrameworkTimingKeys.CLIENT_RESPONSE_R2_FILTER_CHAIN.key());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import com.linkedin.r2.message.stream.entitystream.BaseConnector;
import com.linkedin.r2.message.stream.entitystream.EntityStream;
import com.linkedin.r2.message.stream.entitystream.EntityStreams;
import com.linkedin.r2.message.timing.TimingContextUtil;
import com.linkedin.r2.message.timing.FrameworkTimingKeys;
import com.linkedin.r2.transport.common.bridge.common.TransportCallback;
import com.linkedin.r2.transport.common.bridge.server.TransportDispatcher;
import java.util.HashMap;
Expand Down Expand Up @@ -62,6 +64,7 @@ public void onRestRequest(RestRequest req, RequestContext requestContext,
Map<String, String> wireAttrs,
NextFilter<RestRequest, RestResponse> nextFilter)
{
markOnRequestTimings(requestContext);
try
{
_dispatcher.handleRestRequest(req, wireAttrs, requestContext,
Expand All @@ -79,6 +82,7 @@ private <REQ extends Request, RES extends Response> TransportCallback<RES> creat
final NextFilter<REQ, RES> nextFilter)
{
return res -> {
markOnResponseTimings(requestContext);
final Map<String, String> wireAttrs = res.getWireAttributes();
if (res.hasError())
{
Expand All @@ -96,6 +100,7 @@ public void onStreamRequest(StreamRequest req, RequestContext requestContext,
Map<String, String> wireAttrs,
NextFilter<StreamRequest, StreamResponse> nextFilter)
{
markOnRequestTimings(requestContext);
Connector connector = null;
try
{
Expand Down Expand Up @@ -124,6 +129,7 @@ private <REQ extends Request, RES extends Response> TransportCallback<RES> creat
return res -> {
if (responded.compareAndSet(false, true))
{
markOnResponseTimings(requestContext);
final Map<String, String> wireAttrs = res.getWireAttributes();
if (res.hasError())
{
Expand All @@ -137,6 +143,20 @@ private <REQ extends Request, RES extends Response> TransportCallback<RES> creat
};
}

private static void markOnRequestTimings(RequestContext requestContext)
{
TimingContextUtil.endTiming(requestContext, FrameworkTimingKeys.SERVER_REQUEST_R2_FILTER_CHAIN.key());
TimingContextUtil.endTiming(requestContext, FrameworkTimingKeys.SERVER_REQUEST_R2.key());
TimingContextUtil.beginTiming(requestContext, FrameworkTimingKeys.SERVER_REQUEST_RESTLI.key());
}

private static void markOnResponseTimings(RequestContext requestContext)
{
TimingContextUtil.endTiming(requestContext, FrameworkTimingKeys.SERVER_RESPONSE_RESTLI.key());
TimingContextUtil.beginTiming(requestContext, FrameworkTimingKeys.SERVER_RESPONSE_R2.key());
TimingContextUtil.beginTiming(requestContext, FrameworkTimingKeys.SERVER_RESPONSE_R2_FILTER_CHAIN.key());
}

private static class Connector extends BaseConnector
{

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@
import com.linkedin.common.util.None;
import com.linkedin.r2.filter.FilterChain;
import com.linkedin.r2.message.RequestContext;
import com.linkedin.r2.message.Response;
import com.linkedin.r2.message.rest.RestRequest;
import com.linkedin.r2.message.rest.RestResponse;
import com.linkedin.r2.message.stream.StreamRequest;
import com.linkedin.r2.message.stream.StreamResponse;
import com.linkedin.r2.message.timing.FrameworkTimingKeys;
import com.linkedin.r2.message.timing.TimingContextUtil;
import com.linkedin.r2.transport.common.bridge.client.TransportClient;
import com.linkedin.r2.transport.common.bridge.common.TransportCallback;

import com.linkedin.r2.transport.common.bridge.common.TransportResponse;
import java.util.Map;

/**
Expand Down Expand Up @@ -70,7 +74,8 @@ public void restRequest(RestRequest request,
Map<String, String> wireAttrs,
TransportCallback<RestResponse> callback)
{
ResponseFilter.registerCallback(callback, requestContext);
ResponseFilter.registerCallback(createWrappedClientTimingCallback(requestContext, callback), requestContext);
markOnRequestTimings(requestContext);
_filters.onRestRequest(request, requestContext, wireAttrs);
}

Expand All @@ -80,7 +85,8 @@ public void streamRequest(StreamRequest request,
Map<String, String> wireAttrs,
TransportCallback<StreamResponse> callback)
{
ResponseFilter.registerCallback(callback, requestContext);
ResponseFilter.registerCallback(createWrappedClientTimingCallback(requestContext, callback), requestContext);
markOnRequestTimings(requestContext);
_filters.onStreamRequest(request, requestContext, wireAttrs);
}

Expand All @@ -89,4 +95,27 @@ public void shutdown(Callback<None> callback)
{
_client.shutdown(callback);
}

/**
* Creates a thin wrapper around the given callback which simply marks the end of the R2 client response filter chain
* before executing the wrapped callback.
*
* @param requestContext request context
* @param callback callback to wrap
* @param <T> callback value type (rest or stream response)
* @return wrapped callback
*/
private static <T extends Response> TransportCallback<T> createWrappedClientTimingCallback(RequestContext requestContext,
TransportCallback<T> callback)
{
return (TransportResponse<T> response) -> {
TimingContextUtil.endTiming(requestContext, FrameworkTimingKeys.CLIENT_RESPONSE_R2_FILTER_CHAIN.key());
callback.onResponse(response);
};
}

private static void markOnRequestTimings(RequestContext requestContext)
{
TimingContextUtil.beginTiming(requestContext, FrameworkTimingKeys.CLIENT_REQUEST_R2_FILTER_CHAIN.key());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@

import com.linkedin.r2.filter.FilterChain;
import com.linkedin.r2.message.RequestContext;
import com.linkedin.r2.message.Response;
import com.linkedin.r2.message.rest.RestRequest;
import com.linkedin.r2.message.rest.RestResponse;
import com.linkedin.r2.message.stream.StreamRequest;
import com.linkedin.r2.message.stream.StreamResponse;
import com.linkedin.r2.message.timing.FrameworkTimingKeys;
import com.linkedin.r2.message.timing.TimingContextUtil;
import com.linkedin.r2.transport.common.bridge.common.TransportCallback;
import com.linkedin.r2.transport.common.bridge.common.TransportResponse;
import com.linkedin.r2.transport.common.bridge.server.TransportDispatcher;

import java.util.Map;
Expand Down Expand Up @@ -66,7 +70,8 @@ public FilterChainDispatcher(TransportDispatcher dispatcher,
public void handleRestRequest(RestRequest req, Map<String, String> wireAttrs,
RequestContext requestContext, TransportCallback<RestResponse> callback)
{
ResponseFilter.registerCallback(callback, requestContext);
ResponseFilter.registerCallback(createWrappedServerTimingCallback(requestContext, callback), requestContext);
TimingContextUtil.beginTiming(requestContext, FrameworkTimingKeys.SERVER_REQUEST_R2_FILTER_CHAIN.key());
_filters.onRestRequest(req, requestContext, wireAttrs);
}

Expand All @@ -75,7 +80,26 @@ public void handleStreamRequest(StreamRequest req, Map<String, String> wireAttrs
RequestContext requestContext,
TransportCallback<StreamResponse> callback)
{
ResponseFilter.registerCallback(callback, requestContext);
ResponseFilter.registerCallback(createWrappedServerTimingCallback(requestContext, callback), requestContext);
TimingContextUtil.beginTiming(requestContext, FrameworkTimingKeys.SERVER_REQUEST_R2_FILTER_CHAIN.key());
_filters.onStreamRequest(req, requestContext, wireAttrs);
}

/**
* Creates a thin wrapper around the given callback which simply marks the end of the R2 server response filter chain
* before executing the wrapped callback.
*
* @param requestContext request context
* @param callback callback to wrap
* @param <T> callback value type (rest or stream response)
* @return wrapped callback
*/
private static <T extends Response> TransportCallback<T> createWrappedServerTimingCallback(RequestContext requestContext,
TransportCallback<T> callback)
{
return (TransportResponse<T> response) -> {
TimingContextUtil.endTiming(requestContext, FrameworkTimingKeys.SERVER_RESPONSE_R2_FILTER_CHAIN.key());
callback.onResponse(response);
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
Copyright (c) 2019 LinkedIn Corp.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.linkedin.r2.message.timing;

/**
* A collection of predefined {@link TimingKey} objects that represent various Rest.li framework code paths.
*
* @author Evan Williams
*/
public enum FrameworkTimingKeys
{
// High-level metrics
RESOURCE("resource", TimingImportance.HIGH),
SERVER_REQUEST("server/request", TimingImportance.HIGH),
SERVER_RESPONSE("server/response", TimingImportance.HIGH),
CLIENT_REQUEST("client/request", TimingImportance.HIGH),
CLIENT_RESPONSE("client/response", TimingImportance.HIGH),

// Layer-specific metrics
SERVER_REQUEST_R2("server/request/r2", TimingImportance.MEDIUM),
SERVER_REQUEST_RESTLI("server/request/restli", TimingImportance.MEDIUM),
SERVER_RESPONSE_R2("server/response/r2", TimingImportance.MEDIUM),
SERVER_RESPONSE_RESTLI("server/response/restli", TimingImportance.MEDIUM),
CLIENT_REQUEST_R2("client/request/r2", TimingImportance.MEDIUM),
CLIENT_REQUEST_RESTLI("client/request/restli", TimingImportance.MEDIUM),
CLIENT_RESPONSE_R2("client/response/r2", TimingImportance.MEDIUM),
CLIENT_RESPONSE_RESTLI("client/response/restli", TimingImportance.MEDIUM),

// Filter chain metrics
SERVER_REQUEST_R2_FILTER_CHAIN("server/request/r2/filter_chain", TimingImportance.LOW),
SERVER_REQUEST_RESTLI_FILTER_CHAIN("server/request/restli/filter_chain", TimingImportance.LOW),
SERVER_RESPONSE_R2_FILTER_CHAIN("server/response/r2/filter_chain", TimingImportance.LOW),
SERVER_RESPONSE_RESTLI_FILTER_CHAIN("server/response/restli/filter_chain", TimingImportance.LOW),
CLIENT_REQUEST_R2_FILTER_CHAIN("client/request/r2/filter_chain", TimingImportance.LOW),
CLIENT_RESPONSE_R2_FILTER_CHAIN("client/response/r2/filter_chain", TimingImportance.LOW),

// Serialization/Deserialization metrics
SERVER_REQUEST_RESTLI_DESERIALIZATION("server/request/restli/deserialization", TimingImportance.LOW),
SERVER_RESPONSE_RESTLI_SERIALIZATION("server/response/restli/serialization", TimingImportance.LOW),
SERVER_RESPONSE_RESTLI_ERROR_SERIALIZATION("server/response/restli/error_serialization", TimingImportance.LOW),
CLIENT_REQUEST_RESTLI_SERIALIZATION("client/request/restli/serialization", TimingImportance.LOW),
CLIENT_RESPONSE_RESTLI_DESERIALIZATION("client/response/restli/deserialization", TimingImportance.LOW),
CLIENT_RESPONSE_RESTLI_ERROR_DESERIALIZATION("client/response/restli/error_deserialization", TimingImportance.LOW),

// URI operation metrics (numbered suffixes correspond to protocol-specific code paths)
SERVER_REQUEST_RESTLI_URI_PARSE_1("server/request/restli/uri_parse_1", TimingImportance.LOW),
SERVER_REQUEST_RESTLI_URI_PARSE_2("server/request/restli/uri_parse_2", TimingImportance.LOW),
CLIENT_REQUEST_RESTLI_URI_ENCODE("client/request/restli/uri_encode", TimingImportance.LOW),

// Projection operation metrics
SERVER_REQUEST_RESTLI_PROJECTION_DECODE("server/request/restli/projection_decode", TimingImportance.LOW),
SERVER_RESPONSE_RESTLI_PROJECTION_APPLY("server/request/restli/projection_apply", TimingImportance.LOW),

// Misc. metrics
CLIENT_REQUEST_RESTLI_GET_PROTOCOL("client/request/restli/get_protocol", TimingImportance.LOW);

public final static String KEY_PREFIX = "fwk/";

private final TimingKey _timingKey;

FrameworkTimingKeys(String name, TimingImportance timingImportance)
{
_timingKey = TimingKey.registerNewKey(KEY_PREFIX + name, timingImportance);
}

public TimingKey key()
{
return _timingKey;
}
}
Loading

0 comments on commit 601a5ed

Please sign in to comment.