Skip to content

Instantly share code, notes, and snippets.

@marcelrf
Created May 28, 2018 13:44
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save marcelrf/2d5c33a27c5bb170260b7d71aac6c5bb to your computer and use it in GitHub Desktop.
Save marcelrf/2d5c33a27c5bb170260b7d71aac6c5bb to your computer and use it in GitHub Desktop.
shepherd.py
#!/usr/bin/python3
# -*- coding: utf-8 -*-
from collections import defaultdict
import argparse
import itertools
import random
import sys
BOOTSTRAP_SAMPLES = 100
FACTOR_DECAY = 2
LENGTH_FACTOR_WEIGHT = 0.2
MAX_DIVERGENCE = 100
MIN_DATA_SIZE = 10
PERCENTS = [0.25, 0.5, 0.75]
def get_arguments():
parser = argparse.ArgumentParser(description='Find anomalies in numerical series.')
parser.add_argument('-i', '--input-path', type=str,
help='path where to read from (default: stdin)')
parser.add_argument('-d', '--input-delimiter', type=str, default='\t',
help='input delimiter character (default: tab)')
parser.add_argument('-c', '--input-columnar', action='store_true',
help='consider each input column as a series')
parser.add_argument('-o', '--output-path', type=str,
help='path where to write to (default: stdout)')
parser.add_argument('-D', '--output-delimiter', type=str, default='\t',
help='output delimiter character (default: tab)')
parser.add_argument('-H', '--output-human', action='store_true',
help='additionally output divergence in human-readable form')
parser.add_argument('-t', '--threshold', type=float, default=3.0,
help='only output anomalies greater than this (default: 3.0)')
return parser.parse_args()
def read_input(args):
input_stream = open(args.input_path) if args.input_path else sys.stdin
input_matrix = [
line.split(args.input_delimiter)
for line in input_stream
]
if args.input_path:
input_stream.close()
if args.input_columnar:
input_matrix = list(map(list, zip(*input_matrix)))
input_matrix = [
[name.strip()] + [
float(x.strip())
for x in data if x.strip() not in [None, '']]
for (name, *data) in input_matrix
]
return input_matrix
def write_output(results, arguments):
results.sort(key=lambda x: -abs(x[1]))
output_stream = open(args.output_path, 'w') if args.output_path else sys.stdout
for (name, divergence) in results:
if abs(divergence) > args.threshold:
human_divergence = ''
if args.output_human:
if abs(divergence) < 3: human_divergence = 'Within normality'
elif divergence > 0 and divergence < 9: human_divergence = 'Slightly above'
elif divergence < 0 and divergence > -9: human_divergence = 'Slightly below'
elif divergence > 0 and divergence < 27: human_divergence = 'Considerably above'
elif divergence < 0 and divergence > -27: human_divergence = 'Considerably below'
elif divergence > 0: human_divergence = 'Extremely above'
elif divergence < 0: human_divergence = 'Extremely below'
human_divergence = args.output_delimiter + human_divergence
print(
name + args.output_delimiter + str(round(divergence, 2)) + human_divergence,
file=output_stream
)
def generate_transformations_derive(data):
derived_data = derive(data)
return {
**generate_transformations_periodicize(data, ''),
**{'derived': derived_data},
**generate_transformations_periodicize(derived_data, 'derived+')
}
def generate_transformations_periodicize(data, prefix):
return {
**generate_transformations_aggregate(data, prefix),
**generate_transformations_cherrypick(data, prefix)
}
def generate_transformations_aggregate(data, prefix):
transformations = {
**generate_transformations_regress(data, prefix)
}
group_size = 2
while int(len(data) / group_size) >= MIN_DATA_SIZE:
aggregated_data = aggregate(data, group_size)
transformation_name = prefix + 'aggregated(%s)' % group_size
transformations[transformation_name] = aggregated_data
transformations.update(
generate_transformations_regress(aggregated_data, transformation_name + '+')
)
group_size += 1
return transformations
def generate_transformations_cherrypick(data, prefix):
transformations = {
**generate_transformations_regress(data, prefix)
}
pick_every = 2
while int((len(data) + pick_every - 1) / pick_every) >= MIN_DATA_SIZE:
cherrypicked_data = cherrypick(data, pick_every)
transformation_name = prefix + 'cherrypicked(%s)' % pick_every
transformations[transformation_name] = cherrypicked_data
transformations.update(
generate_transformations_regress(cherrypicked_data, transformation_name + '+')
)
pick_every += 1
return transformations
def generate_transformations_regress(data, prefix):
transformations = {}
transformations[prefix + 'regressed'] = regress(data)
return transformations
def derive(data):
derived_data = []
for i in range(len(data) - 1):
derived_data.append(data[i + 1] - data[i])
return derived_data
def aggregate(data, group_size):
reminder = len(data) % group_size
trimmed_data = data[reminder:]
aggregated_data = []
aggregated_value = 0
for i in range(len(trimmed_data)):
aggregated_value += trimmed_data[i]
if (i + 1) % group_size == 0:
aggregated_data.append(aggregated_value / group_size)
aggregated_value = 0
return aggregated_data
def cherrypick(data, pick_every):
compress_mask = itertools.cycle([1] + list(itertools.repeat(0, pick_every - 1)))
return list(reversed(list(itertools.compress(reversed(data), compress_mask))))
def regress(data):
reg_data = data[:-1]
part1 = reg_data[0:int(len(reg_data) / 2)]
part2 = reg_data[int(len(reg_data) / 2):len(reg_data)]
x1 = len(part1) / 2
y1 = sum(part1) / len(part1)
x2 = len(part1) + len(part2) / 2
y2 = sum(part2) / len(part2)
m = (y2 - y1) / (x2 - x1)
b = y1 - m * x1
return [data[i] - (i * m + b) for i in range(len(data))]
def get_bootstrap_percentiles(data, percents):
sample_percentiles = []
for i in range(BOOTSTRAP_SAMPLES):
sample = [
random.choice(data)
for j in range(len(data))
]
sample_percentiles.append(get_percentiles(sample, percents))
percentiles = [
sum(percentile_collection) / float(len(percentile_collection))
for percentile_collection in zip(*sample_percentiles)
]
return percentiles
def get_percentiles(data, percents):
freqs = defaultdict(lambda: 0)
for value in data:
freqs[value] += 1
relative_freqs = [
[value, freqs[value] / float(len(data))]
for value in sorted(freqs.keys())
]
accum_freq = 0
percentiles = [0 for i in range(len(percents))]
for value, rel_freq in relative_freqs:
new_accum_freq = accum_freq + rel_freq
for i in range(len(percents)):
percent = percents[i]
if accum_freq < percent and new_accum_freq >= percent:
percentiles[i] = value
accum_freq = new_accum_freq
return percentiles
def get_accuracy_factor(data, percentiles):
data_gap = max(data) - min(data)
if data_gap == 0: return 1
percentile_gap = percentiles[2] - percentiles[0]
if percentile_gap >= data_gap: return 0
return (data_gap - percentile_gap) / data_gap
def get_length_factor(data):
return -1 / (LENGTH_FACTOR_WEIGHT * len(data) + 1) + 1
def get_divergence(percentiles, last_value):
low, median, high = percentiles
if last_value == median:
divergence = 0
elif last_value > median:
base = high - median
if base == 0:
divergence = MAX_DIVERGENCE
else:
divergence = min((last_value - median) / base, MAX_DIVERGENCE)
else:
base = low - median
if base == 0:
divergence = -MAX_DIVERGENCE
else:
divergence = -min((last_value - median) / base, MAX_DIVERGENCE)
return divergence
def get_results(name, data, transformed_data):
percentiles = get_bootstrap_percentiles(transformed_data[:-1], PERCENTS)
accuracy_factor = get_accuracy_factor(data[:-1], percentiles)
length_factor = get_length_factor(transformed_data[:-1])
final_factor = (accuracy_factor * length_factor) ** FACTOR_DECAY
divergence = get_divergence(percentiles, transformed_data[-1])
return [final_factor, divergence]
def analyze(data):
if len(data) < MIN_DATA_SIZE:
return 0
transformations = generate_transformations_derive(data)
transformations['identity'] = data
results = [
get_results(name, data, transformed_data)
for name, transformed_data in transformations.items()
]
divergence_acc, divergence_fac, factor_acc = 0, 0, 0
for factor, divergence in results:
divergence_acc += divergence
divergence_fac += divergence * factor
factor_acc += factor
if factor_acc == 0:
return divergence_acc / len(results)
return divergence_fac / factor_acc
if __name__ == '__main__':
args = get_arguments()
sequences = read_input(args)
results = [(name, analyze(data)) for (name, *data) in sequences]
write_output(results, args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment