Skip to content

Commit

Permalink
STORM-2793 Track network data metrics
Browse files Browse the repository at this point in the history
* This closes apache#2399
  • Loading branch information
Joshua Martell authored and HeartSaVioR committed Nov 6, 2017
1 parent 89b48d2 commit 82e00aa
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 18 deletions.
1 change: 1 addition & 0 deletions conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ topology.disable.loadaware.messaging: false
topology.state.checkpoint.interval.ms: 1000
topology.localityaware.higher.bound.percent: 0.8
topology.localityaware.lower.bound.percent: 0.2
topology.serialized.message.size.metrics: false

# Configs for Resource Aware Scheduler
# topology priority describing the importance of the topology in decreasing importance starting from 0 (i.e. 0 is the highest priority and the priority importance decreases as the priority number increases).
Expand Down
7 changes: 7 additions & 0 deletions storm-client/src/jvm/org/apache/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,13 @@ public class Config extends HashMap<String, Object> {
@isListEntryCustom(entryValidatorClasses={MetricRegistryValidator.class})
public static final String TOPOLOGY_METRICS_CONSUMER_REGISTER = "topology.metrics.consumer.register";

/**
* Enable tracking of network message byte counts per source-destination task. This is off by default as it
* creates tasks^2 metric values, but is useful for debugging as it exposes data skew when tuple sizes are uneven.
*/
@isBoolean
public static final String TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS = "topology.serialized.message.size.metrics";

/**
* A map of metric name to class name implementing IMetric that will be created once per worker JVM
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,44 +17,96 @@
*/
package org.apache.storm.messaging;

import org.apache.storm.Config;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.serialization.KryoTupleDeserializer;
import org.apache.storm.task.GeneralTopologyContext;
import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.serialization.KryoTupleDeserializer;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.ObjectReader;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;


/**
* A class that is called when a TaskMessage arrives.
*/
public class DeserializingConnectionCallback implements IConnectionCallback {
private final WorkerState.ILocalTransferCallback _cb;
private final Map _conf;
private final GeneralTopologyContext _context;
public class DeserializingConnectionCallback implements IConnectionCallback, IMetric {
private final WorkerState.ILocalTransferCallback cb;
private final Map conf;
private final GeneralTopologyContext context;

private final ThreadLocal<KryoTupleDeserializer> _des =
new ThreadLocal<KryoTupleDeserializer>() {
@Override
protected KryoTupleDeserializer initialValue() {
return new KryoTupleDeserializer(_conf, _context);
}
};

public DeserializingConnectionCallback(final Map<String, Object> conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
_conf = conf;
_context = context;
_cb = callback;
new ThreadLocal<KryoTupleDeserializer>() {
@Override
protected KryoTupleDeserializer initialValue() {
return new KryoTupleDeserializer(conf, context);
}
};

// Track serialized size of messages.
private final boolean sizeMetricsEnabled;
private final ConcurrentHashMap<String, AtomicLong> byteCounts = new ConcurrentHashMap<>();


public DeserializingConnectionCallback(final Map conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
this.conf = conf;
this.context = context;
cb = callback;
sizeMetricsEnabled = ObjectReader.getBoolean(conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS), false);

}

@Override
public void recv(List<TaskMessage> batch) {
KryoTupleDeserializer des = _des.get();
ArrayList<AddressedTuple> ret = new ArrayList<>(batch.size());
for (TaskMessage message: batch) {
ret.add(new AddressedTuple(message.task(), des.deserialize(message.message())));
Tuple tuple = des.deserialize(message.message());
AddressedTuple addrTuple = new AddressedTuple(message.task(), tuple);
updateMetrics(tuple.getSourceTask(), message);
ret.add(addrTuple);
}
cb.transfer(ret);
}

/**
* Returns serialized byte count traffic metrics.
* @return Map of metric counts, or null if disabled
*/
@Override
public Object getValueAndReset() {
if (!sizeMetricsEnabled) {
return null;
}
HashMap<String, Long> outMap = new HashMap<>();
for (Map.Entry<String, AtomicLong> ent : byteCounts.entrySet()) {
AtomicLong count = ent.getValue();
if (count.get() > 0) {
outMap.put(ent.getKey(), count.getAndSet(0L));
}
}
return outMap;
}

/**
* Update serialized byte counts for each message.
* @param sourceTaskId source task
* @param message serialized message
*/
protected void updateMetrics(int sourceTaskId, TaskMessage message) {
if (sizeMetricsEnabled) {
int dest = message.task();
int len = message.message().length;
String key = Integer.toString(sourceTaskId) + "-" + Integer.toString(dest);
byteCounts.computeIfAbsent(key, k -> new AtomicLong(0L)).addAndGet(len);
}
_cb.transfer(ret);
}

}
10 changes: 10 additions & 0 deletions storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.storm.messaging.ConnectionWithStatus;
import org.apache.storm.messaging.IConnectionCallback;
import org.apache.storm.messaging.TaskMessage;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.metric.api.IStatefulObject;
import org.apache.storm.serialization.KryoValuesSerializer;
import org.apache.storm.utils.ObjectReader;
Expand Down Expand Up @@ -249,6 +250,15 @@ public Object getState() {
}
}
ret.put("enqueued", enqueued);

// Report messageSizes metric, if enabled (non-null).
if (_cb instanceof IMetric) {
Object metrics = ((IMetric) _cb).getValueAndReset();
if (metrics instanceof Map) {
ret.put("messageBytes", metrics);
}
}

return ret;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.messaging;

import org.apache.storm.Config;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.task.GeneralTopologyContext;
import org.junit.Before;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class DeserializingConnectionCallbackTest {
private static final byte [] messageBytes = new byte[3];
private static TaskMessage message;

@Before
public void setUp() throws Exception {
// Setup a test message
message = mock(TaskMessage.class);
when(message.task()).thenReturn(456); // destination taskId
when(message.message()).thenReturn(messageBytes);
}


@Test
public void testUpdateMetricsConfigOff() {
Map config = new HashMap();
config.put(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS, Boolean.FALSE);
DeserializingConnectionCallback withoutMetrics =
new DeserializingConnectionCallback(config, mock(GeneralTopologyContext.class), mock(
WorkerState.ILocalTransferCallback.class));

// Metrics are off, verify null
assertNull(withoutMetrics.getValueAndReset());

// Add our messages and verify no metrics are recorded
withoutMetrics.updateMetrics(123, message);
assertNull(withoutMetrics.getValueAndReset());
}

@Test
public void testUpdateMetricsConfigOn() {
Map config = new HashMap();
config.put(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS, Boolean.TRUE);
DeserializingConnectionCallback withMetrics =
new DeserializingConnectionCallback(config, mock(GeneralTopologyContext.class), mock(
WorkerState.ILocalTransferCallback.class));

// Starting empty
Object metrics = withMetrics.getValueAndReset();
assertTrue(metrics instanceof Map);
assertTrue(((Map) metrics).isEmpty());

// Add messages
withMetrics.updateMetrics(123, message);
withMetrics.updateMetrics(123, message);

// Verify recorded messages size metrics
metrics = withMetrics.getValueAndReset();
assertTrue(metrics instanceof Map);
assertEquals(6L, ((Map) metrics).get("123-456"));
}
}

0 comments on commit 82e00aa

Please sign in to comment.