Skip to content

Commit

Permalink
Add streaming algorithms and SLA to check for outliers
Browse files Browse the repository at this point in the history
In the common.streaming_algorithms module, we add the base for the streaming
algorithms that take values from the input stream and compute certain
quantities needed for the benchmark results processing. These algorithms
should use constant memory and be able to scale.

Two streaming algorithms for computing mean and standard deviation are implemented.

There is also a new "max_outliers" SLA that checks for the maximum number of
outliers based on the mean and standard deviation of the durations, computed
using these new streaming algorithms. By default, the outliers are detected
only when the total number of iterations reaches 10 (can be configured).

Example:
  3.1 4.2 3.6 4.5 2.8 3.3 4.1 3.8 4.3 2.9 10.2 11.2 3.4

  has 2 outliers (10.2 and 11.2), so:
    {"outliers": {"max": 2}} -> PASS
    {"outliers": {"max": 1}} -> FAIL

Bonus:
  * Add gate testing of different SLAs with the Dummy scenario
  * Add samples for all SLAs
  * Fix detailed message for max_avg_duration

Change-Id: I7c4f77c418c7b61f71b43216110fa4c7aaccc2f5
  • Loading branch information
mikhaildubov committed Apr 29, 2015
1 parent e7f5d79 commit 49c5056
Show file tree
Hide file tree
Showing 7 changed files with 298 additions and 6 deletions.
6 changes: 6 additions & 0 deletions rally-jobs/rally.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,12 @@
sla:
failure_rate:
max: 0
max_seconds_per_iteration: 1.0
max_avg_duration: 0.5
outliers:
max: 1
min_iterations: 10
sigmas: 10

-
args:
Expand Down
61 changes: 59 additions & 2 deletions rally/benchmark/sla/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import six

from rally.common.i18n import _
from rally.common import streaming_algorithms
from rally.common import utils
from rally import consts
from rally import exceptions
Expand Down Expand Up @@ -228,5 +229,61 @@ def add_iteration(self, iteration):
return self.success

def details(self):
return (_("Maximum average duration of one iteration %.2fs <= %.2fs - "
"%s") % (self.avg, self.criterion_value, self.status()))
return (_("Average duration of one iteration %.2fs <= %.2fs - %s") %
(self.avg, self.criterion_value, self.status()))


class Outliers(SLA):
"""Limit the number of outliers (iterations that take too much time).
The outliers are detected automatically using the computation of the mean
and standard deviation (std) of the data.
"""
OPTION_NAME = "outliers"
CONFIG_SCHEMA = {
"type": "object",
"$schema": consts.JSON_SCHEMA,
"properties": {
"max": {"type": "integer", "minimum": 0},
"min_iterations": {"type": "integer", "minimum": 3},
"sigmas": {"type": "number", "minimum": 0.0,
"exclusiveMinimum": True}
}
}

def __init__(self, criterion_value):
super(Outliers, self).__init__(criterion_value)
self.max_outliers = self.criterion_value.get("max", 0)
# NOTE(msdubov): Having 3 as default is reasonable (need enough data).
self.min_iterations = self.criterion_value.get("min_iterations", 3)
self.sigmas = self.criterion_value.get("sigmas", 3.0)
self.iterations = 0
self.outliers = 0
self.threshold = None
self.mean_comp = streaming_algorithms.MeanStreamingComputation()
self.std_comp = streaming_algorithms.StdDevStreamingComputation()

def add_iteration(self, iteration):
if not iteration.get("error"):
duration = iteration["duration"]
self.iterations += 1

# NOTE(msdubov): First check if the current iteration is an outlier
if ((self.iterations >= self.min_iterations and self.threshold and
duration > self.threshold)):
self.outliers += 1

# NOTE(msdubov): Then update the threshold value
self.mean_comp.add(duration)
self.std_comp.add(duration)
if self.iterations >= 2:
mean = self.mean_comp.result()
std = self.std_comp.result()
self.threshold = mean + self.sigmas * std

self.success = self.outliers <= self.max_outliers
return self.success

def details(self):
return (_("Maximum number of outliers %i <= %i - %s") %
(self.outliers, self.max_outliers, self.status()))
82 changes: 82 additions & 0 deletions rally/common/streaming_algorithms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Copyright 2015: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import abc
import math

import six

from rally.common.i18n import _
from rally import exceptions


@six.add_metaclass(abc.ABCMeta)
class StreamingAlgorithm(object):
"""Base class for streaming computations that scale."""

@abc.abstractmethod
def add(self, value):
"""Process a single value from the input stream."""

@abc.abstractmethod
def result(self):
"""Return the result based on the values processed so far."""


class MeanStreamingComputation(StreamingAlgorithm):
"""Computes mean for a stream of numbers."""

def __init__(self):
self.total = 0.0
self.count = 0

def add(self, value):
self.count += 1
self.total += value

def result(self):
if self.count == 0:
message = _("Unable to calculate the mean: "
"no values processed so far.")
raise exceptions.RallyException(message)
return self.total / self.count


class StdDevStreamingComputation(StreamingAlgorithm):
"""Computes the standard deviation for a stream of numbers."""

def __init__(self):
self.count = 0
# NOTE(msdubov): To compute std, we need the auxiliary variables below.
self.dev_sum = 0.0
self.mean_computation = MeanStreamingComputation()
self.mean = 0.0

def add(self, value):
# NOTE(msdubov): This streaming method for std computation appears
# in "The Art of Computer Programming" by D. Knuth,
# Vol 2, p. 232, 3rd edition.
self.count += 1
mean_prev = self.mean
self.mean_computation.add(value)
self.mean = self.mean_computation.result()
self.dev_sum = self.dev_sum + (value - mean_prev) * (value - self.mean)

def result(self):
if self.count < 2:
message = _("Unable to calculate the standard deviation: "
"need at least two values to be processed.")
raise exceptions.RallyException(message)
return math.sqrt(self.dev_sum / (self.count - 1))
10 changes: 7 additions & 3 deletions samples/tasks/sla/create-and-delete-user.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@
"concurrency": 10
},
"sla": {
"max_seconds_per_iteration": 4,
"failure_rate": {
"max": 1
"max_seconds_per_iteration": 4.0,
"failure_rate": {"max": 1},
"max_avg_duration": 3.0,
"outliers": {
"max": 1,
"min_iterations": 10,
"sigmas": 10
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion samples/tasks/sla/create-and-delete-user.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
times: 100
concurrency: 10
sla:
max_seconds_per_iteration: 4
max_seconds_per_iteration: 4.0
failure_rate:
max: 1
max_avg_duration: 3.0
outliers:
max: 1
min_iterations: 10
sigmas: 10
70 changes: 70 additions & 0 deletions tests/unit/benchmark/sla/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,73 @@ def test_add_iteration(self):
self.assertTrue(sla.add_iteration({"duration": 5.0})) # avg = 3.667
self.assertFalse(sla.add_iteration({"duration": 7.0})) # avg = 4.5
self.assertTrue(sla.add_iteration({"duration": 1.0})) # avg = 3.8


class OutliersTestCase(test.TestCase):

def test_config_schema(self):
base.Outliers.validate({"outliers": {"max": 0, "min_iterations": 5,
"sigmas": 2.5}})
self.assertRaises(jsonschema.ValidationError,
base.Outliers.validate,
{"outliers": {"max": -1}})
self.assertRaises(jsonschema.ValidationError,
base.Outliers.validate,
{"outliers": {"max": 0, "min_iterations": 2}})
self.assertRaises(jsonschema.ValidationError,
base.Outliers.validate,
{"outliers": {"max": 0, "sigmas": 0}})

def test_result(self):
sla1 = base.Outliers({"max": 1})
sla2 = base.Outliers({"max": 2})
iteration_durations = [3.1, 4.2, 3.6, 4.5, 2.8, 3.3, 4.1, 3.8, 4.3,
2.9, 10.2, 11.2, 3.4] # outliers: 10.2, 11.2
for sla in [sla1, sla2]:
for d in iteration_durations:
sla.add_iteration({"duration": d})
self.assertFalse(sla1.result()["success"]) # 2 outliers > 1
self.assertTrue(sla2.result()["success"]) # 2 outliers <= 2
self.assertEqual("Failed", sla1.status())
self.assertEqual("Passed", sla2.status())

def test_result_large_sigmas(self):
sla = base.Outliers({"max": 1, "sigmas": 5})
iteration_durations = [3.1, 4.2, 3.6, 4.5, 2.8, 3.3, 4.1, 3.8, 4.3,
2.9, 10.2, 11.2, 3.4]
for d in iteration_durations:
sla.add_iteration({"duration": d})
# NOTE(msdubov): No outliers registered since sigmas = 5 (not 2)
self.assertTrue(sla.result()["success"])
self.assertEqual("Passed", sla.status())

def test_result_no_iterations(self):
sla = base.Outliers({"max": 0})
self.assertTrue(sla.result()["success"])

def test_result_few_iterations_large_min_iterations(self):
sla = base.Outliers({"max": 0, "min_iterations": 10})
iteration_durations = [3.1, 4.2, 4.7, 3.6, 15.14, 2.8]
for d in iteration_durations:
sla.add_iteration({"duration": d})
# NOTE(msdubov): SLA doesn't fail because it hasn't iterations < 10
self.assertTrue(sla.result()["success"])

def test_result_few_iterations_small_min_iterations(self):
sla = base.Outliers({"max": 0, "min_iterations": 5})
iteration_durations = [3.1, 4.2, 4.7, 3.6, 15.14, 2.8]
for d in iteration_durations:
sla.add_iteration({"duration": d})
# NOTE(msdubov): Now this SLA can fail with >= 5 iterations
self.assertFalse(sla.result()["success"])

def test_add_iteration(self):
sla = base.Outliers({"max": 1})
# NOTE(msdubov): One outlier in the first 11 iterations
first_iterations = [3.1, 4.2, 3.6, 4.5, 2.8, 3.3, 4.1, 3.8, 4.3,
2.9, 10.2]
for d in first_iterations:
self.assertTrue(sla.add_iteration({"duration": d}))
# NOTE(msdubov): 12th iteration makes the SLA always failed
self.assertFalse(sla.add_iteration({"duration": 11.2}))
self.assertFalse(sla.add_iteration({"duration": 3.4}))
68 changes: 68 additions & 0 deletions tests/unit/common/test_streaming_algorithms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Copyright 2015: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import math

from rally.common import streaming_algorithms as algo
from rally import exceptions
from tests.unit import test


class MeanStreamingComputationTestCase(test.TestCase):

def test_empty_stream(self):
mean_computation = algo.MeanStreamingComputation()
self.assertRaises(exceptions.RallyException, mean_computation.result)

def test_one_value(self):
mean_computation = algo.MeanStreamingComputation()
mean_computation.add(10.0)
self.assertEqual(10.0, mean_computation.result())

def test_stream(self):
stream = range(10)
mean_computation = algo.MeanStreamingComputation()
for value in stream:
mean_computation.add(value)
excepted_mean = float(sum(stream)) / len(stream)
self.assertEqual(excepted_mean, mean_computation.result())


class StdDevStreamingComputationTestCase(test.TestCase):

def test_empty_stream(self):
std_computation = algo.StdDevStreamingComputation()
self.assertRaises(exceptions.RallyException, std_computation.result)

def test_one_value(self):
std_computation = algo.StdDevStreamingComputation()
std_computation.add(10.0)
self.assertRaises(exceptions.RallyException, std_computation.result)

def test_two_values(self):
std_computation = algo.StdDevStreamingComputation()
std_computation.add(10.0)
std_computation.add(10.0)
self.assertEqual(0.0, std_computation.result())

def test_stream(self):
stream = range(10)
std_computation = algo.StdDevStreamingComputation()
for value in stream:
std_computation.add(value)
mean = float(sum(stream)) / len(stream)
excepted_std = math.sqrt(sum((x - mean) ** 2 for x in stream) /
(len(stream) - 1))
self.assertEqual(excepted_std, std_computation.result())

0 comments on commit 49c5056

Please sign in to comment.