Created
September 20, 2017 08:03
-
-
Save faniska/d6adcc918d0d338a184b4eb1bf357f29 to your computer and use it in GitHub Desktop.
Time Planner: Calculating end date or task duration with given working days, hours, holidays eth
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import datetime | |
from dateutil import parser as dt_parser | |
class TimePlanner: | |
holidays = None | |
weekend = None | |
working_hours = None | |
min_part = 5 # Кол-во минут, для разбиения | |
def __init__(self, working_hours, weekend, holidays): | |
self.working_hours = working_hours | |
self.weekend = weekend | |
self.holidays = holidays | |
@staticmethod | |
def hm_to_seconds(time_str): | |
time_arr = time_str.split(':') | |
if len(time_str) > 1: | |
return int(time_arr[0]) * 3600 + int(time_arr[1]) * 60 | |
@staticmethod | |
def parse(date_time): | |
if not isinstance(date_time, datetime.datetime): | |
date_time = dt_parser.parse(date_time) | |
return date_time.replace(second=0) | |
@staticmethod | |
def sec_to_hours(seconds): | |
return float(seconds) / float(3600) | |
def get_working_hours_number(self): | |
total = 0 | |
for part in self.working_hours: | |
total += self.sec_to_hours(self.hm_to_seconds(part[1])) - self.sec_to_hours(self.hm_to_seconds(part[0])) | |
return total | |
def get_break_times(self): | |
break_times = [] | |
break_s = break_e = None | |
for part in self.working_hours: | |
if break_s is None and break_e is None: | |
break_s = part[1] | |
elif break_s and break_e is None: | |
break_e = part[0] | |
if break_s and break_e: | |
break_times.append([break_s, break_e]) | |
break_s = part[1] | |
break_e = None | |
return break_times | |
def is_weekend(self, date): | |
return int(date.strftime('%w')) in self.weekend | |
def is_holiday(self, date): | |
return date.strftime("*-%m-%d") in self.holidays or date.strftime("%Y-%m-%d") in self.holidays | |
def is_working_time(self, current_time): | |
current_time_hm_sec = self.hm_to_seconds(current_time.strftime("%H:%M")) | |
for part in self.working_hours: | |
if self.hm_to_seconds(part[0]) <= current_time_hm_sec < self.hm_to_seconds(part[1]): | |
return True | |
return False | |
def is_break_time(self, current_time, return_minutes=False): | |
current_time_hm_sec = self.hm_to_seconds(current_time.strftime("%H:%M")) | |
for part in self.get_break_times(): | |
if self.hm_to_seconds(part[0]) <= current_time_hm_sec < self.hm_to_seconds(part[1]): | |
if return_minutes: | |
seconds = self.hm_to_seconds(part[1]) - self.hm_to_seconds(part[0]) | |
return int(seconds / 60) | |
return True | |
return False | |
@staticmethod | |
def get_rest_minutes(hours): | |
return int((hours - int(hours)) * 60) | |
def increment_by_days(self, date_time, days, check_off_time=True): | |
days = int(days) | |
if days > 0: | |
while days: | |
date_time += datetime.timedelta(days=1) | |
if check_off_time and (self.is_weekend(date_time) or self.is_holiday(date_time)): | |
continue | |
days -= 1 | |
return date_time | |
def increment_by_hours(self, date_time, hours, check_off_time=True): | |
hours = int(hours) | |
if hours > 0: | |
while hours: | |
date_time += datetime.timedelta(hours=1) | |
if check_off_time and self.is_break_time(date_time): | |
continue | |
hours -= 1 | |
return date_time | |
def increment_by_minutes(self, date_time, minutes, check_off_time=True): | |
minutes = int(minutes) | |
if minutes > self.min_part: | |
# Если минут больше min_part, то разбиваем минуты на куски, чтобы уменьшить кол-во итераций | |
min_parts = minutes // self.min_part | |
minutes = minutes % self.min_part | |
while min_parts: | |
date_time += datetime.timedelta(minutes=self.min_part) | |
if check_off_time and self.is_break_time(date_time): | |
# Если попали на перерыв, прибавляем время по min_part минут, пока не выйдем из перерыва | |
continue | |
min_parts -= 1 | |
if minutes > 0: | |
# Если есть остаток минут, прибавляем их без учета перерывов | |
date_time += datetime.timedelta(minutes=minutes) | |
return date_time | |
def calc_date_planned_end(self, planned_start, duration): | |
planned_start = self.parse(planned_start) | |
day_start_hm = self.working_hours[0][0].split(':') | |
first_day_working_hours = self.calc_working_hours_for_first_day(planned_start) | |
working_hours_number = self.get_working_hours_number() | |
if first_day_working_hours < duration: | |
duration -= first_day_working_hours | |
planned_start = self.increment_by_days(planned_start, 1) | |
planned_start = planned_start.replace(hour=int(day_start_hm[0]), minute=int(day_start_hm[1])) | |
days = int(duration / working_hours_number) | |
hours = duration - days * working_hours_number | |
minutes = self.get_rest_minutes(hours) | |
start_hm_sec = self.hm_to_seconds(planned_start.strftime("%H:%M")) | |
day_end_sec = self.hm_to_seconds(self.working_hours[-1][1]) | |
difference = (start_hm_sec + hours * 3600) - day_end_sec | |
# Если с учетом рабочих часов и длительности, задача переносится на другой день, | |
# то прибавляем +1 к days и сбрасываем часы на начало рабочего дня | |
if difference > 0: | |
days += 1 | |
hours = self.sec_to_hours(difference) | |
minutes = self.get_rest_minutes(hours) | |
planned_start = planned_start.replace(hour=int(day_start_hm[0]), minute=int(day_start_hm[1])) | |
if hours == 0: | |
days -= 1 | |
hours = 8 | |
planned_end = self.increment_by_days(planned_start, days) | |
planned_end = self.increment_by_hours(planned_end, hours) | |
planned_end = self.increment_by_minutes(planned_end, minutes) | |
return planned_end | |
def calc_duration(self, planned_start, planned_end): | |
planned_start = self.parse(planned_start) | |
planned_end = self.parse(planned_end) | |
if planned_start.strftime("%Y-%m-%d") == planned_end.strftime("%Y-%m-%d"): | |
# Если выполнение задачи запланировано на один день | |
duration = self.calc_working_hours_for_day(planned_start, planned_end) | |
else: | |
first_day_working_hours = self.calc_working_hours_for_first_day(planned_start) | |
last_day_working_hours = self.calc_working_hours_for_last_day(planned_end) | |
working_days = self.calc_working_days(planned_start, planned_end) | |
duration = first_day_working_hours + last_day_working_hours + working_days * self.get_working_hours_number() | |
return round(duration, 1) | |
def calc_working_hours_for_first_day(self, planned_start): | |
planned_start = self.parse(planned_start) | |
end_hm = self.working_hours[-1][1].split(':') | |
day_end = planned_start.replace(hour=int(end_hm[0]), minute=int(end_hm[1])) | |
working_minutes = self.calc_working_minutes(planned_start, day_end) | |
return float(working_minutes) / 60 | |
def calc_working_hours_for_last_day(self, planned_end): | |
planned_end = self.parse(planned_end) | |
start_hm = self.working_hours[0][0].split(':') | |
day_start = planned_end.replace(hour=int(start_hm[0]), minute=int(start_hm[1])) | |
working_minutes = self.calc_working_minutes(day_start, planned_end) | |
return float(working_minutes) / 60 | |
def calc_working_hours_for_day(self, planned_start, planned_end): | |
planned_start = self.parse(planned_start) | |
planned_end = self.parse(planned_end) | |
if planned_start.strftime("%Y-%m-%d") != planned_end.strftime("%Y-%m-%d"): | |
# Если planned_start и planned_end назначены на разные дни, то эта функция не применима | |
return False | |
working_minutes = self.calc_working_minutes(planned_start, planned_end) | |
return float(working_minutes) / 60 | |
def calc_working_minutes(self, start, end): | |
time_diff = end - start | |
minutes = time_diff.seconds / 60 | |
working_minutes = 0 | |
if minutes > self.min_part: | |
# Если минут больше min_part, то разбиваем минуты на куски, чтобы уменьшить кол-во итераций | |
min_parts = minutes // self.min_part | |
minutes = minutes % self.min_part | |
while min_parts and start < end: | |
start += datetime.timedelta(minutes=self.min_part) | |
if self.is_working_time(start): | |
# Если время рабочее, увеличиваем кол-во рабочих минут | |
working_minutes += self.min_part | |
min_parts -= 1 | |
if start == end and not self.is_working_time(end): | |
working_minutes += self.min_part | |
working_minutes += minutes | |
return working_minutes | |
def calc_working_days(self, planned_start, planned_end): | |
working_days = 0 | |
planned_start = self.parse(planned_start) | |
planned_end = self.parse(planned_end) | |
time_diff = planned_end - planned_start | |
if time_diff.days > 1: | |
days = round(time_diff.days + float(time_diff.seconds) / (3600 * 24)) - 1 | |
date_time = planned_start | |
while days: | |
days -= 1 | |
date_time += datetime.timedelta(days=1) | |
if self.is_weekend(date_time) or self.is_holiday(date_time): | |
continue | |
working_days += 1 | |
return working_days |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment