Created
August 2, 2016 17:36
-
-
Save lostinplace/4dd4ed527239edd6bd32cebcc13c2b7b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import math | |
import re | |
import dateutil.parser | |
class BitVectorTools: | |
bitvector_resolution_minutes = 15 | |
bitvector_slots = int((24*60) / bitvector_resolution_minutes) | |
bv_tz_name = 'UTC' | |
bv_format = '{0:0b}' | |
@staticmethod | |
def datetimestring_range_to_bv_dict(start_datetime_string, end_datetime_string): | |
start_datetime = dateutil.parser.parse(start_datetime_string) | |
end_datetime = dateutil.parser.parse(end_datetime_string) | |
return BitVectorTools.datetime_range_to_bv_dict(start_datetime, end_datetime) | |
@staticmethod | |
def datetime_range_to_bv_dict(start_datetime, end_datetime): | |
start_date = start_datetime.date() | |
end_date = end_datetime.date() | |
timeline = end_date - start_date | |
day_delta = datetime.timedelta(days=1) | |
bv_dict = {} | |
for i in range(timeline.days + 1): | |
tmp_start_position = 0 | |
tmp_end_position = BitVectorTools.bitvector_slots | |
if i == 0: | |
tmp_start_position = BitVectorTools.time_to_bv_position(start_datetime.time()) | |
if i == timeline.days: tmp_end_position = BitVectorTools.time_to_bv_position(end_datetime.time()) | |
tmp_date = start_date + day_delta * i | |
tmp_bv = BitVectorTools.generate_bv_from_positions(tmp_start_position, tmp_end_position) | |
bv_dict[tmp_date.isoformat()] = tmp_bv | |
return bv_dict | |
@staticmethod | |
def detect_collision_in_bv_dicts(dict_a, dict_b): | |
a_keys = set(dict_a) | |
b_keys = set(dict_b) | |
relevant_keys = a_keys.intersection(b_keys) | |
collisions = {} | |
for k in relevant_keys: | |
a = dict_a[k] | |
b = dict_b[k] | |
collision = BitVectorTools.generate_bv_collision(a, b) | |
if collision: | |
collisions[k] = collision | |
return collisions | |
@staticmethod | |
def combine_bv_dicts(dict_a, dict_b): | |
a_keys = set(dict_a) | |
b_keys = set(dict_b) | |
all_keys = a_keys.union(b_keys) | |
result = {} | |
for k in all_keys: | |
a = dict_a.get(k) or 0 | |
b = dict_b.get(k) or 0 | |
combination = a | b | |
result[k] = combination | |
return result | |
@staticmethod | |
def time_to_bv_position(a_time): | |
minutes_from_start = a_time.hour * 60 + a_time.minute | |
slots_from_start = math.floor(minutes_from_start / BitVectorTools.bitvector_resolution_minutes) | |
return slots_from_start | |
@staticmethod | |
def datetime_to_bv_position(a_datetime): | |
if a_datetime.tzname() != BitVectorTools.bv_tz_name: | |
raise ValueError('datetime must be in UTC') | |
daystring = a_datetime.date().isoformat() | |
return daystring, BitVectorTools.time_to_bv_position(a_datetime.time()) | |
@staticmethod | |
def generate_bv_from_timerange_strings(start_time_string, end_time_string): | |
start_time = dateutil.parser.parse(start_time_string) | |
end_time = dateutil.parser.parse(end_time_string) | |
return BitVectorTools.generate_bv_from_timerange(start_time, end_time) | |
@staticmethod | |
def generate_bv_from_timerange(start_time, end_time): | |
start_position = BitVectorTools.time_to_bv_position(start_time) | |
end_position = BitVectorTools.time_to_bv_position(end_time) if end_time else BitVectorTools.bitvector_slots | |
return BitVectorTools.generate_bv_from_positions(start_position, end_position) | |
@staticmethod | |
def generate_bv_from_positions(start_position, stop_position): | |
bv_length = BitVectorTools.bitvector_slots - start_position | |
number_of_ones = stop_position - start_position | |
ones = pow(2, number_of_ones) - 1 | |
number_of_zeroes = bv_length-number_of_ones | |
out = ones << number_of_zeroes | |
return out | |
@staticmethod | |
def generate_bv_collision(usage_request_bv, usage_bv): | |
""" | |
:param usage_request_bv: bv that describes the usage being requested | |
:param usage_bv: bv that describes the existing usage of the asset | |
:return: true if there is a collision, false if clear | |
>>> BitVectorTools.generate_bv_collision(24,12) | |
4 | |
>>> BitVectorTools.generate_bv_collision(24,3) | |
0 | |
""" | |
collision = usage_request_bv & usage_bv | |
return collision | |
@staticmethod | |
def update_usage_bv(bv_a, bv_b): | |
""" | |
A simple wrapper around bitwise OR | |
:param bv_a: operand a | |
:param bv_b: operand b | |
:return: the union of both operands | |
""" | |
return bv_a | bv_b | |
@staticmethod | |
def datetimeranges_to_bv_dict(timerange_collection): | |
""" | |
accepts a collection of datetimes and converts them to a bitvector dictionary | |
:param timerange_collection: a collection of tuples where the first element is the start time string | |
and the second element is the end time string | |
:return: a bv_dictionary | |
""" | |
ranges = map(lambda x: list(map(dateutil.parser.parse, x)), timerange_collection) | |
dicts = map(lambda x: BitVectorTools.datetime_range_to_bv_dict(*x), ranges) | |
out_dict = {} | |
for a_dict in dicts: | |
out_dict = BitVectorTools.combine_bv_dicts(out_dict, a_dict) | |
return out_dict | |
@staticmethod | |
def bv_dict_to_datetimeranges(a_bv_dict): | |
""" | |
accepts a bitvector dictionary and converts it to a collection of datetimes | |
:param a_bv_dict: a bv_dictionary | |
:return: a collection of tuples where the first element is the start time string | |
and the second element is the end time string | |
""" | |
out_list = [] | |
for k in a_bv_dict: | |
bv = a_bv_dict[k] | |
day = dateutil.parser.parse(k) | |
timeranges = BitVectorTools._complex_bv_to_timeranges(bv) | |
for timerange in timeranges: | |
start_datetime = datetime.datetime.combine(day, timerange[0]) | |
end_day = day | |
end_time = timerange[1] | |
if not end_time: | |
end_day = day + datetime.timedelta(days=1) | |
end_time = datetime.time(0, 0) | |
end_datetime = datetime.datetime.combine(end_day, end_time) | |
result = (start_datetime, end_datetime) | |
out_list.append(result) | |
return out_list | |
@staticmethod | |
def compute_bv_order(a_bv): | |
num_log = math.log(a_bv, 2) | |
order = math.floor(num_log + 1) | |
return order | |
@staticmethod | |
def count_set_bits(a_bv): | |
""" | |
Counts the number of set bits in an integer, also evaluates the largest number of contiguous bits | |
:param a_bv: any integer | |
:return: tuple of number of set bits and the size of the largest cluster | |
""" | |
tmp = a_bv | |
tmp_diff = 0 | |
set_bits_count = 0 | |
most_contiguous_set_bits = 0 | |
current_contiguous_set_bits = 0 | |
while tmp: | |
next_i = tmp & (tmp - 1) | |
last_diff = tmp_diff | |
tmp_diff = tmp - next_i | |
if not tmp_diff >> 1 == last_diff: | |
most_contiguous_set_bits = current_contiguous_set_bits | |
current_contiguous_set_bits = 0 | |
current_contiguous_set_bits += 1 | |
tmp = next_i | |
set_bits_count += 1 | |
if current_contiguous_set_bits > most_contiguous_set_bits: | |
most_contiguous_set_bits = current_contiguous_set_bits | |
return set_bits_count, most_contiguous_set_bits | |
@staticmethod | |
def _simple_bv_to_timerange(simple_bv): | |
""" | |
consumes a simple BV which is a number whose binary representation is n ones and m zeroes where n>0 and m>=0 | |
:param simple_bv: bitvector to translate | |
:return: starttime, endtime | |
""" | |
bv_string = simple_bv | |
if isinstance(simple_bv, int): | |
bv_string = BitVectorTools.bv_format.format(simple_bv) | |
bv_chunk_length = bv_string.count('1') | |
start_position = BitVectorTools.bitvector_slots - len(bv_string) | |
start_time = BitVectorTools._bv_position_to_time(start_position) | |
end_position = start_position + bv_chunk_length | |
end_time = BitVectorTools._bv_position_to_time(end_position) | |
return start_time, end_time | |
@staticmethod | |
def _bv_position_to_time(a_position): | |
""" | |
converts a bitvector position to a time | |
:param a_position: a position in a bitvector | |
:return: the time object represented by that position or None if it goes to end of day | |
""" | |
minutes_from_day_start = a_position * BitVectorTools.bitvector_resolution_minutes | |
time_hours = int(minutes_from_day_start / 60) | |
if time_hours >= 24: | |
return None | |
time_minutes = minutes_from_day_start % 60 | |
result = datetime.time(time_hours, time_minutes, 0) | |
return result | |
@staticmethod | |
def _complex_bv_to_timeranges(complex_bv): | |
""" | |
accepts a bitvector that describes a day with gaps in the usage, | |
and returns the timeranges of all distinct usages | |
:param complex_bv: a bv that describes a day's usage pattern | |
:return: list of timeranges | |
""" | |
bv_string = complex_bv | |
if isinstance(complex_bv, int): | |
bv_string = BitVectorTools.bv_format.format(complex_bv) | |
tmp_string = bv_string | |
matches = re.findall('(1*)', tmp_string) | |
timeranges = [] | |
for i in matches: | |
if not bool(i): | |
continue | |
bv_length = len(tmp_string) | |
match_length = len(i) | |
match_index = tmp_string.index(i) | |
bv_string_length = bv_length - match_index | |
match_bv_string = i.ljust(bv_string_length, '0') | |
tmp_timerange = BitVectorTools._simple_bv_to_timerange(match_bv_string) | |
timeranges.append(tmp_timerange) | |
tmp_string = tmp_string.replace(i, '0' * match_length, 1) | |
return timeranges | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment