Skip to content

Instantly share code, notes, and snippets.

@Derfirm
Last active May 3, 2017 07:46
Show Gist options
  • Save Derfirm/ba14448d1be82bc5363ed5c097d14233 to your computer and use it in GitHub Desktop.
Save Derfirm/ba14448d1be82bc5363ed5c097d14233 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
from __future__ import division
import random
import itertools
from collections import OrderedDict, defaultdict
def flip_coin():
return random.randint(0, 1)
class RandomWithCoefficient(object):
def __init__(self, coefficient, base_chance, min_streak_to_start):
self.coefficient = coefficient
self.win_streak = 0
self.loose_streak = 0
self.chance = base_chance
self._default_chance = base_chance
self._last_value = None
self._min_streak_to_start = min_streak_to_start
def make_roll(self):
value = random.random()
if value < self.chance:
result = True
self.win_streak += 1
self.loose_streak = 0
else:
result = False
self.loose_streak += 1
self.win_streak = 0
self.recalculate(result)
return result
def recalculate(self, result):
if self._last_value is not None and self._last_value != result:
self.reset_chance()
elif self.win_streak > self._min_streak_to_start:
self.chance = self._default_chance * (self.coefficient ** (self.win_streak-1))
elif self.loose_streak > self._min_streak_to_start:
self.chance = 1- self._default_chance * (self.coefficient ** (self.loose_streak-1))
self._last_value = result
self.chance = self.clamp(self.chance, 0, 1)
def reset_chance(self):
self.chance = self._default_chance
@staticmethod
def clamp(n, minn, maxn):
return max(min(maxn, n), minn)
class CleverRandom(object):
def __init__(self, base_chance):
self.base_chance=base_chance
self.delta = None
def flip_clever_coin(self):
value = random.random()
delta = self.base_chance - value
self.base_chance, chance = self.base_chance - delta, self.base_chance
assert 0 <= chance <=1, 'invalid chance %f' % chance
if value < chance:
return True
return False
class CleverRandomWithCorrections(CleverRandom):
def __init__(self, base_chance):
super(CleverRandomWithCorrections, self).__init__(base_chance)
self._last_value = None
self.default_chance = base_chance
def flip_clever_coin(self):
result = super(CleverRandomWithCorrections, self).flip_clever_coin()
if self._last_value is not None and self._last_value == result:
self.base_chance = self.default_chance
self._last_value = result
return result
class AnomalyCalculator(object):
def __init__(self):
self.install()
self.setup_storage()
def setup_storage(self):
self.cached_win_count = 0
self.cached_loose_count = 0
self.cached_win_streak = 0
self.cached_loose_streak = 0
self.cached_dual_switch_count = 0
self._cached_anomaly_map = defaultdict(dict)
def update_storage(self):
self.cached_win_count += self.win_count
self.cached_loose_count += self.loose_count
self.cached_win_streak += self.win_streak
self.cached_loose_streak += self.loose_streak
self.cached_dual_switch_count += self.dual_switch_count
for attr_name, value in self._anomaly_map.items():
for freq, count in value.items():
self._cached_anomaly_map[attr_name][freq] = self._cached_anomaly_map[attr_name].get(freq, 0) + count
def install(self):
self.loose_count = 0
self.win_count = 0
self.win_streak = 0
self.loose_streak = 0
self.dual_switch_count = 0
self._anomaly_map = defaultdict(dict)
self._last_value = None
self.watchable_attrs = ('win_streak', 'loose_streak', 'dual_switch_count')
def _calc_switch(self, value):
if self._last_value is None:
return
if value != self._last_value:
self.dual_switch_count += 1
else:
self._fill_report(attrs=['dual_switch_count'])
self.dual_switch_count = 0
def _calc_loose(self):
self.loose_count += 1
self.loose_streak += 1
self._fill_report(attrs=['win_streak'])
self.win_streak = 0
def _calc_win(self):
self.win_count += 1
self.win_streak += 1
self._fill_report(attrs=['loose_streak'])
self.loose_streak = 0
def count(self, value, debug):
value = bool(value)
if value is True:
self._calc_win()
else:
self._calc_loose()
self._calc_switch(value)
if debug:
self.debug(value)
self._last_value = value
def fill_final_report(self):
if self.win_streak > 0:
self._fill_report(attrs=['win_streak'])
elif self.loose_streak > 0:
self._fill_report(attrs=['loose_streak'])
elif self.dual_switch_count > 0:
self._fill_report(attrs=['dual_switch_count'])
def _fill_report(self, attrs=None):
attrs_ = attrs or self.watchable_attrs
for attr_name in attrs_:
attr = getattr(self, attr_name)
if attr > 0:
self._anomaly_map[attr_name][attr] = self._anomaly_map[attr_name].get(attr, 0) + 1
def debug(self, value):
print '-' * 30
print ('current value is', value)
print ('loose_count', self.loose_count)
print ('win_count', self.win_count)
print ('win_streak', self.win_streak)
print ('loose_streak', self.loose_streak)
print ('dual_switch_count', self.dual_switch_count)
print ('_last_value', self._last_value)
print ('anomaly map', self._anomaly_map)
print '-' * 30
def create_report(self, callback, run, iterations):
print ('Report for {} function for {} run and {} iterations'.format(callback.__name__, run, iterations))
print ('*' * 33)
print ('win percent {0:.2f}% ({1} from {2})'.format(self.win_count/iterations*100, self.win_count, iterations))
print ('loose percent {0:.0f}% ({1} from {2})'.format(self.loose_count/iterations*100, self.loose_count, iterations))
print('_' * 33)
cards_items = OrderedDict(sorted(self._anomaly_map.items(), key= lambda key: key[1].keys()))
for attr_name in self.watchable_attrs:
if attr_name not in self._anomaly_map:
continue
print(attr_name)
for freq, count in cards_items[attr_name].items():
print(' - series {0} was repeated {1} times'.format(freq, count))
print('-' * 33)
def create_new_report(self, callback, runs, iterations):
print('Global Report for {}'.format(callback.__name__))
print ('#' * 33)
print ('win percent {0:.2f}% ({1} from {2})'.format((self.cached_win_count/(iterations*runs))*100, self.cached_win_count, iterations*runs))
print ('loose percent {0:.0f}% ({1} from {2})'.format((self.cached_loose_count/(iterations*runs))*100, self.cached_loose_count, iterations*runs))
print('_' * 33)
for attr_name in self.watchable_attrs:
if attr_name not in self._cached_anomaly_map:
continue
print(attr_name)
for freq, count in self._cached_anomaly_map[attr_name].items():
print(' - series {0} was repeated {1} times'.format(freq, count))
print('@' * 33)
def run(self, runs, iterations, callbacks, debug=False):
for callback in callbacks:
for run in xrange(runs):
for i in xrange(iterations):
value = callback()
self.count(value, debug=debug)
self.fill_final_report()
self.update_storage()
# self.create_report(callback, run+1, iterations)
self.install()
self.create_new_report(callback, runs, iterations)
self.setup_storage()
def main():
rand = CleverRandom(0.50)
rand2 = CleverRandomWithCorrections(0.50)
rand3 = RandomWithCoefficient(0.75, 0.5, 1)
random_func_seq = (flip_coin, rand.flip_clever_coin, rand2.flip_clever_coin, rand3.make_roll)
AnomalyCalculator().run(10000, 10, random_func_seq)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment