Last active
May 3, 2017 07:46
-
-
Save Derfirm/ba14448d1be82bc5363ed5c097d14233 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import division | |
import random | |
import itertools | |
from collections import OrderedDict, defaultdict | |
def flip_coin(): | |
return random.randint(0, 1) | |
class RandomWithCoefficient(object): | |
def __init__(self, coefficient, base_chance, min_streak_to_start): | |
self.coefficient = coefficient | |
self.win_streak = 0 | |
self.loose_streak = 0 | |
self.chance = base_chance | |
self._default_chance = base_chance | |
self._last_value = None | |
self._min_streak_to_start = min_streak_to_start | |
def make_roll(self): | |
value = random.random() | |
if value < self.chance: | |
result = True | |
self.win_streak += 1 | |
self.loose_streak = 0 | |
else: | |
result = False | |
self.loose_streak += 1 | |
self.win_streak = 0 | |
self.recalculate(result) | |
return result | |
def recalculate(self, result): | |
if self._last_value is not None and self._last_value != result: | |
self.reset_chance() | |
elif self.win_streak > self._min_streak_to_start: | |
self.chance = self._default_chance * (self.coefficient ** (self.win_streak-1)) | |
elif self.loose_streak > self._min_streak_to_start: | |
self.chance = 1- self._default_chance * (self.coefficient ** (self.loose_streak-1)) | |
self._last_value = result | |
self.chance = self.clamp(self.chance, 0, 1) | |
def reset_chance(self): | |
self.chance = self._default_chance | |
@staticmethod | |
def clamp(n, minn, maxn): | |
return max(min(maxn, n), minn) | |
class CleverRandom(object): | |
def __init__(self, base_chance): | |
self.base_chance=base_chance | |
self.delta = None | |
def flip_clever_coin(self): | |
value = random.random() | |
delta = self.base_chance - value | |
self.base_chance, chance = self.base_chance - delta, self.base_chance | |
assert 0 <= chance <=1, 'invalid chance %f' % chance | |
if value < chance: | |
return True | |
return False | |
class CleverRandomWithCorrections(CleverRandom): | |
def __init__(self, base_chance): | |
super(CleverRandomWithCorrections, self).__init__(base_chance) | |
self._last_value = None | |
self.default_chance = base_chance | |
def flip_clever_coin(self): | |
result = super(CleverRandomWithCorrections, self).flip_clever_coin() | |
if self._last_value is not None and self._last_value == result: | |
self.base_chance = self.default_chance | |
self._last_value = result | |
return result | |
class AnomalyCalculator(object): | |
def __init__(self): | |
self.install() | |
self.setup_storage() | |
def setup_storage(self): | |
self.cached_win_count = 0 | |
self.cached_loose_count = 0 | |
self.cached_win_streak = 0 | |
self.cached_loose_streak = 0 | |
self.cached_dual_switch_count = 0 | |
self._cached_anomaly_map = defaultdict(dict) | |
def update_storage(self): | |
self.cached_win_count += self.win_count | |
self.cached_loose_count += self.loose_count | |
self.cached_win_streak += self.win_streak | |
self.cached_loose_streak += self.loose_streak | |
self.cached_dual_switch_count += self.dual_switch_count | |
for attr_name, value in self._anomaly_map.items(): | |
for freq, count in value.items(): | |
self._cached_anomaly_map[attr_name][freq] = self._cached_anomaly_map[attr_name].get(freq, 0) + count | |
def install(self): | |
self.loose_count = 0 | |
self.win_count = 0 | |
self.win_streak = 0 | |
self.loose_streak = 0 | |
self.dual_switch_count = 0 | |
self._anomaly_map = defaultdict(dict) | |
self._last_value = None | |
self.watchable_attrs = ('win_streak', 'loose_streak', 'dual_switch_count') | |
def _calc_switch(self, value): | |
if self._last_value is None: | |
return | |
if value != self._last_value: | |
self.dual_switch_count += 1 | |
else: | |
self._fill_report(attrs=['dual_switch_count']) | |
self.dual_switch_count = 0 | |
def _calc_loose(self): | |
self.loose_count += 1 | |
self.loose_streak += 1 | |
self._fill_report(attrs=['win_streak']) | |
self.win_streak = 0 | |
def _calc_win(self): | |
self.win_count += 1 | |
self.win_streak += 1 | |
self._fill_report(attrs=['loose_streak']) | |
self.loose_streak = 0 | |
def count(self, value, debug): | |
value = bool(value) | |
if value is True: | |
self._calc_win() | |
else: | |
self._calc_loose() | |
self._calc_switch(value) | |
if debug: | |
self.debug(value) | |
self._last_value = value | |
def fill_final_report(self): | |
if self.win_streak > 0: | |
self._fill_report(attrs=['win_streak']) | |
elif self.loose_streak > 0: | |
self._fill_report(attrs=['loose_streak']) | |
elif self.dual_switch_count > 0: | |
self._fill_report(attrs=['dual_switch_count']) | |
def _fill_report(self, attrs=None): | |
attrs_ = attrs or self.watchable_attrs | |
for attr_name in attrs_: | |
attr = getattr(self, attr_name) | |
if attr > 0: | |
self._anomaly_map[attr_name][attr] = self._anomaly_map[attr_name].get(attr, 0) + 1 | |
def debug(self, value): | |
print '-' * 30 | |
print ('current value is', value) | |
print ('loose_count', self.loose_count) | |
print ('win_count', self.win_count) | |
print ('win_streak', self.win_streak) | |
print ('loose_streak', self.loose_streak) | |
print ('dual_switch_count', self.dual_switch_count) | |
print ('_last_value', self._last_value) | |
print ('anomaly map', self._anomaly_map) | |
print '-' * 30 | |
def create_report(self, callback, run, iterations): | |
print ('Report for {} function for {} run and {} iterations'.format(callback.__name__, run, iterations)) | |
print ('*' * 33) | |
print ('win percent {0:.2f}% ({1} from {2})'.format(self.win_count/iterations*100, self.win_count, iterations)) | |
print ('loose percent {0:.0f}% ({1} from {2})'.format(self.loose_count/iterations*100, self.loose_count, iterations)) | |
print('_' * 33) | |
cards_items = OrderedDict(sorted(self._anomaly_map.items(), key= lambda key: key[1].keys())) | |
for attr_name in self.watchable_attrs: | |
if attr_name not in self._anomaly_map: | |
continue | |
print(attr_name) | |
for freq, count in cards_items[attr_name].items(): | |
print(' - series {0} was repeated {1} times'.format(freq, count)) | |
print('-' * 33) | |
def create_new_report(self, callback, runs, iterations): | |
print('Global Report for {}'.format(callback.__name__)) | |
print ('#' * 33) | |
print ('win percent {0:.2f}% ({1} from {2})'.format((self.cached_win_count/(iterations*runs))*100, self.cached_win_count, iterations*runs)) | |
print ('loose percent {0:.0f}% ({1} from {2})'.format((self.cached_loose_count/(iterations*runs))*100, self.cached_loose_count, iterations*runs)) | |
print('_' * 33) | |
for attr_name in self.watchable_attrs: | |
if attr_name not in self._cached_anomaly_map: | |
continue | |
print(attr_name) | |
for freq, count in self._cached_anomaly_map[attr_name].items(): | |
print(' - series {0} was repeated {1} times'.format(freq, count)) | |
print('@' * 33) | |
def run(self, runs, iterations, callbacks, debug=False): | |
for callback in callbacks: | |
for run in xrange(runs): | |
for i in xrange(iterations): | |
value = callback() | |
self.count(value, debug=debug) | |
self.fill_final_report() | |
self.update_storage() | |
# self.create_report(callback, run+1, iterations) | |
self.install() | |
self.create_new_report(callback, runs, iterations) | |
self.setup_storage() | |
def main(): | |
rand = CleverRandom(0.50) | |
rand2 = CleverRandomWithCorrections(0.50) | |
rand3 = RandomWithCoefficient(0.75, 0.5, 1) | |
random_func_seq = (flip_coin, rand.flip_clever_coin, rand2.flip_clever_coin, rand3.make_roll) | |
AnomalyCalculator().run(10000, 10, random_func_seq) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment