Skip to content

Commit

Permalink
Merge pull request #1 from Tumblr/staging
Browse files Browse the repository at this point in the history
Anomaly Detection Algorithm nlp-BI-2594
  • Loading branch information
Niger Little-Poole committed Dec 3, 2015
2 parents b77bba0 + 9503ee5 commit fea2457
Showing 3 changed files with 331 additions and 0 deletions.
11 changes: 11 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from setuptools import setup

setup(name='thermometr',
version='0.1',
description='The Anomaly Detection Package',
url='https://github.ewr01.tumblr.net/nigerlittlepoole/thermometr',
author='Niger Little-Poole',
author_email='nigerlittlepoole@tumblr.com',
license='MIT',
packages=['thermometr'],
zip_safe=False)
1 change: 1 addition & 0 deletions thermometr/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from thermometr import Thermometr
319 changes: 319 additions & 0 deletions thermometr/thermometr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
import statsmodels.api as sm
import math
import random
import pandas as pd
from scipy.stats import t
import scipy.stats as st
import numpy as np
import collections

from numbers import Number


class Thermometr():
"""
Thermometr objects generate anomoly measurements from input series
"""


def __init__(self, series, dates =None):
"""
Note:
list comprehension standardizes series to np.arrays
Args:
series (Optional[int] or DataFrame or Dict): 1D or 2D group of numerical values or Dataframe
dates [str]: List of dates corresponding to series. Multiple formats are supported
"""
self.series = [np.array(x) for x in Thermometr.standardize_data(series)]
self.dates = dates

@staticmethod
def standardize_data(series):
"""
Static method that regularlizes series to 2D list of numpy arrays
Note:
Type errors that aren't in the first position of the iterable won't be caught
Args:
series (Optional[int] or DataFrame or Dict): 1D or 2D group of numerical values or Dataframe
Returns:
List of numpy arrays containing series, raises ValueError if problem
"""

lists = []

if len(series) == 0:
raise ValueError('Empty series not allowed')
# python strings will pass iterable so need to explicitely check
if isinstance(series,str):
raise ValueError('Series could not be parsed, must be iterable')
# DataFrame isn't a standard iterable, .values returns np.array()
if isinstance(series, pd.DataFrame):
for i in df.columns:
lists.append(df[i].values)
return lists
# can't select from dict with [0] index so need seperate case
elif isinstance(series, dict):
for sub in series.values():
lists.extend(Thermometr.standardize_data(sub))
return lists
# standard use case
elif isinstance(series, collections.Iterable):
if isinstance(series[0], Number):
lists.append(series)
return lists
# if it is a list of lists, then it recurses into this function again
elif isinstance(series[0], collections.Iterable):
for sub in series:
lists.extend(Thermometr.standardize_data(sub))
return lists

raise ValueError('Series could not be parsed, must be iterable')

@staticmethod
def seasonal_esd(inputs, a =.025, frequency = 3, start= None, end =None):
"""
Static method that generates anomolies from the given series
using S-H-ESD algorithm developed by twitter
Note:
Based on frequency, the decompisition trend values can become NULL near the bounds of the series
this is a stats_model limitation and has been dealt with using an imputation
Args:
inputs (np.array[int]): time series of numerical values
a (float): a confidence level(alpha) for the algorithm to use to determine outliers
frequency (int): the frequency of season of the data (instances to complete cycle)
start (int): the first index in the series to check for anomalies
end (int): the last index in the series to check for anomalies
Returns:
List of tuple pairs (anomoly,index) indicating the anomolies for input series
"""

outliers = []
raw = np.copy(inputs) # copy so that you keep inputs immutable
data = sm.tsa.seasonal_decompose(raw, freq=frequency) # STL decomposition algorithm from stats_model

trend = data.trend
for i in range(len(trend)):
substitute = trend[i-frequency] if i > frequency else trend[i + frequency]
trend[i] = substitute if np.isnan(trend[i]) else trend[i]
vals = data.observed - data.trend - data.seasonal # distance from STL curve for each point in series
mean = np.nanmean(vals) # mean of the residuals

# need to impute NULL residuals to mean
pairs = []
for i in range(len(vals)):
v = mean if np.isnan(vals[i]) else vals[i]
pairs.append((v,i))
return Thermometr.grubbs_test(inputs, pairs, start,end,a)

@staticmethod
def grubbs_test( inputs,errs, start=None,end=None, a = .025):
"""
Static method that finds anomalies in a sample of normally distributed random variables
Note:
An assumption is made that the input data is normal. Grubb's test uses the chi squared
distribution and n degrees of freedom to account for small sample sizes
Args:
inputs (np.array[int]): time series of numerical values
start (int): the first index in the series to check for anomalies
end (int): the last index in the series to check for anomalies
a (float): a confidence level(alpha) for the algorithm to use to determine outliers
"""
outliers = []
vals = errs
start = 0 if start is None else start
end = len(vals) -1 if end is None else end
check = True

# run grubbs test until the furthest remaining point fails
while check == True:
g = 0
val = 0
n = len(vals)
index = 0
ind = 0
series = [ x[0] for x in vals]
series = np.array(series)
u = np.nanmean(series)
s = np.nanstd(series)
# find residual with largest z value, or distance from mean
for j in range(len(vals)):
if j >= start and j <= end:
v = vals[j][0]
k = vals[j][1]
val = v if abs( (u - v )/s)> g else val
index = int(j) if abs( (u - v) /s)> g else index
ind = k if abs( (u - v) /s)> g else ind
g = abs( (u - v) /s) if abs( (u - v) /s)> g else g

# generate critical value for grubb's test
critical = ( (n -1) /math.sqrt(n))*math.sqrt(math.pow(t.ppf(a/(2*n), n -2),2)/ (n -2 + math.pow(t.ppf(a/(2*n),n-2),2) ) )
if g > critical:
outliers.append((inputs[ind],ind, 1 - critical/g))
else:
check = False
# remove value for next iteration of the test by imputing to new mean
vals[index] = (u*n - vals[index])/ (n-1)
return outliers

@staticmethod
def arima_test(values, clean,start= None, end=1, strict =True ):
"""
Static method that is used for finding anomalies with ARIMA and not STL
Note:
ARIMA doesn't work on small series so in strict mode validation defaults to false
and in non strict defaults to true
Args:
values (np.array[int]): time series values
start (int): the first index in the series to check for anomalies
end (int): the last index in the series to check for anomalies
strict (binary): determines the default return value for when a comparison cannot be completed
Returns:
List of tuple pairs (anomoly,index) indicating the anomolies for input series
"""
try:
n = len(values)
# ARIMA doesn't work on series less than 13
if n < 13:
return not strict
# Build the ARIMA model and generate projections
diffs = np.diff(clean)
clean = np.array([0])
clean = np.concatenate((clean,diffs))

model = sm.tsa.AR(clean).fit()
base = 12 # ARIMA requires starting the projections at an index less than the maximum
fits = model.predict(base, n)
errs = []
# generates all the differences between arima projections and observed values
for i in range(base,n):
errs.append((fits[i-base] - values[i],i))

return Thermometr.grubbs_test(values, errs)
except Exception as e:
print e
return []


@staticmethod
def derivative(y):
"""
Returns deriviative of series y
Args:
y (np.array[float]): series of values
Returns
np.array[float] of changes between values
"""
dx = 0.001
dy = np.diff(y)/dx
return np.multiply(dy,.001)

@staticmethod
def seasonality(y):
"""
Generates the frequency of a timeseries
Notes:
computes frequency by checking average number of times
for the derivative to change sign 3 times
Args:
y (np.array[float]): series of values
Returns
int representing frequency
"""
vals = Thermometr.derivative(y)
sign = 1 if vals[0] >=0 else -1
changes = 0
count = 0
counts = []
for val in vals:
count+=1
if val * sign < 0:
sign = -1 if sign >=0 else 1
changes +=1
if changes ==3:
changes =0
counts.append(count)
count = 0
try:
return int(np.nanmean(np.array(counts[1:]) ) )
except Exception as e:
print e
return 3

@staticmethod
def detect_series(series,start,end, strict = False , a=0.025):
"""
Finds anomalies in a series using ESD and ARIMA validation
Args:
series (np.array[float]): the time series values
start (int): the first index in the series to check for anomalies
end (int): the last index in the series to check for anomalies
Returns [tuples] containing anomalies, their index, their value, and their ESD score
"""
vals = np.copy(series)
potential = Thermometr.seasonal_esd(vals,a=a,frequency= Thermometr.seasonality(vals), start=start, end=end)
potential = sorted(potential, key=lambda x: x[1])
indices = [x[1] for x in potential]

clean = np.copy(vals)
for i in range(len(vals)):
if i in indices and i >0:
clean[i] = clean[i-1]

# check against ARIMA results, if strict mode only take intersection of anomaly sets
others = Thermometr.arima_test(vals,clean)
if strict:
confirmed = []
indices = [x[1] for x in others]
for x in potential:
if x[1] < 12 or x[1] in indices:
confirmed.append(x)
potential = confirmed
else:
potential.extend(others)
potential = {v[1]:v for v in potential}.values()
anomalies = sorted(potential, key=lambda x: x[1])
return anomalies

def detect(self, start = None, end = None, strict = False, a =0.025):
"""
Finds anomalies in each series of Thermometr
Args:
start (int): the first index in the series to check for anomalies
end (int): the last index in the series to check for anomalies
Returns:
list or list of lists contianing anomalies for Thermometr
"""
results = []

for sub_series in self.series:
n = len(sub_series)
s= eval(start) if type(start) == str else start
e = eval(end) if type(end) == str else end
results.append(Thermometr.detect_series(sub_series,s,e,strict,a))
if self.dates is not None:
temp = []
for sub_series in results:
temp.append ([ {"value":x[0], "index":self.dates[x[1]], "ESD":x[2]} for x in sub_series ])
results = temp
else:
temp = []
for sub_series in results:
temp.append ([ {"value":x[0], "index":x[1], "ESD":x[2]} for x in sub_series ])
results = temp
if len(results) ==1:
results = results[0]
return results

def detect_latest(self,strict = False, a=0.025):
"""
Finds last index anomalies in each series of Thermometr
Notes:
same as detect() except sets start = n-1 and end = n given n = len(series)
Returns:
list or list of lists contianing anomalies for Thermometr
"""
return self.detect("n-1","n", strict,a)

0 comments on commit fea2457

Please sign in to comment.