Skip to content

Instantly share code, notes, and snippets.

@aneessh18
Created August 19, 2022 08:47
Show Gist options
  • Save aneessh18/4279c2ed28dbc384e9706107fbb3d369 to your computer and use it in GitHub Desktop.
Save aneessh18/4279c2ed28dbc384e9706107fbb3d369 to your computer and use it in GitHub Desktop.
import csv
import datetime
from collections import defaultdict
supported_type_of_operations = ['CREATE', 'UPDATE', 'DELETE', 'READ']
OPERATION_HEADER = 'Operation'
START_TIME_HEADER = 'Start'
END_TIME_HEADER = 'End'
def validate_time(time):
timeformat = "%H:%M"
try:
datetime.datetime.strptime(time, timeformat)
except ValueError:
raise Exception("Time format is incorrect")
def validate_row(row):
row_headers = map(lambda key: key.upper(), row.keys())
if (OPERATION_HEADER.upper() not in row_headers) or (
START_TIME_HEADER.upper()
not in row_headers) or (END_TIME_HEADER.upper()
not in row_headers):
raise Exception('CSV data file is not according to the input')
if row.get(OPERATION_HEADER).upper() not in supported_type_of_operations:
raise Exception(f'{row.get(OPERATION_HEADER)} Operation not supported')
start_time = row.get(START_TIME_HEADER)
end_time = row.get(END_TIME_HEADER)
validate_time(start_time)
validate_time(end_time)
def read_and_structure_input(filename):
log_data = []
with open(filename, newline='') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
validate_row(row)
log_data.append([
row.get(OPERATION_HEADER),
row.get(START_TIME_HEADER),
row.get(END_TIME_HEADER)
])
return log_data
def convert_time_into_mins(time):
hours, minutes = list(map(int, time.split(":")))
return hours * 60 + minutes
def group_data_according_to_operation(log_data):
compute_diff_between_start_and_end_times = lambda start_time, end_time: convert_time_into_mins(
end_time) - convert_time_into_mins(start_time)
collapse_start_and_end_time = lambda data: [
data[0],
compute_diff_between_start_and_end_times(data[1], data[2])
]
collapsed_log_data = list(map(collapse_start_and_end_time, log_data))
log_data_grouped_by_operation = defaultdict(list)
for data in collapsed_log_data:
log_data_grouped_by_operation[data[0]].append(data[1])
return log_data_grouped_by_operation
def compute_average_time_of_operation(log_data):
log_data_grouped_by_operation = map(
lambda data: [data[0], sum(data[1]) // len(data[1])],
group_data_according_to_operation(log_data).items())
print('\n*******Average Times of Operations*******')
print("{:<10} {:<8}".format('Operation Type', 'Average Time Taken'))
for k, v in log_data_grouped_by_operation:
print("{:<10} {:<8}".format(k, v))
def find_operation_with_maximum_time(log_data):
log_data_grouped_by_operation = group_data_according_to_operation(log_data)
log_data_grouped_by_operation = list(
map(lambda item: [item[0], sum(item[1])],
log_data_grouped_by_operation.items()))
max_time = max(log_data_grouped_by_operation, key=lambda data: data[1])
print('\n********Operation with Maximum Time******')
print("{:<10} {:<8}".format('Operation Type', 'Max Time Taken'))
print("{:<10} {:<8}".format(max_time[0], max_time[1]))
def do_intervals_overlap(interval1, interval2):
start_time_1, end_time_1 = list(map(convert_time_into_mins, interval1))
start_time_2, end_time_2 = list(map(convert_time_into_mins, interval2))
return True if start_time_1 <= start_time_2 <= end_time_1 else False
def max_time(time1, time2):
return time1 if convert_time_into_mins(time1) > convert_time_into_mins(
time2) else time2
def merge_intervals(intervals):
stack = []
#sorting by the start times
intervals.sort(key=lambda interval: int(interval[0].split(":")[0]) * 60 +
int(interval[0].split(":")[1]))
stack.append(intervals[0])
for i in range(1, len(intervals)):
top = stack[-1]
if do_intervals_overlap(top, intervals[i]):
#merge them
stack.pop()
new_top = [top[0], max_time(top[1], intervals[i][1])]
stack.append(new_top)
else:
stack.append(intervals[i])
return stack
def merge_data(log_data):
log_data_grouped_by_operation = defaultdict(list)
for data in log_data:
log_data_grouped_by_operation[data[0]].append([data[1], data[2]])
print('\n********Operation with Merged Intervals******')
for operation, intervals in log_data_grouped_by_operation.items():
print(f'\nOperation Type: {operation}')
print('Merged Intervals')
print(merge_intervals(intervals))
if __name__ == "__main__":
log_data = read_and_structure_input('log_data.csv')
compute_average_time_of_operation(log_data)
find_operation_with_maximum_time(log_data)
merge_data(log_data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment