Created
August 19, 2022 08:47
-
-
Save aneessh18/4279c2ed28dbc384e9706107fbb3d369 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import datetime | |
from collections import defaultdict | |
supported_type_of_operations = ['CREATE', 'UPDATE', 'DELETE', 'READ'] | |
OPERATION_HEADER = 'Operation' | |
START_TIME_HEADER = 'Start' | |
END_TIME_HEADER = 'End' | |
def validate_time(time): | |
timeformat = "%H:%M" | |
try: | |
datetime.datetime.strptime(time, timeformat) | |
except ValueError: | |
raise Exception("Time format is incorrect") | |
def validate_row(row): | |
row_headers = map(lambda key: key.upper(), row.keys()) | |
if (OPERATION_HEADER.upper() not in row_headers) or ( | |
START_TIME_HEADER.upper() | |
not in row_headers) or (END_TIME_HEADER.upper() | |
not in row_headers): | |
raise Exception('CSV data file is not according to the input') | |
if row.get(OPERATION_HEADER).upper() not in supported_type_of_operations: | |
raise Exception(f'{row.get(OPERATION_HEADER)} Operation not supported') | |
start_time = row.get(START_TIME_HEADER) | |
end_time = row.get(END_TIME_HEADER) | |
validate_time(start_time) | |
validate_time(end_time) | |
def read_and_structure_input(filename): | |
log_data = [] | |
with open(filename, newline='') as csvfile: | |
reader = csv.DictReader(csvfile) | |
for row in reader: | |
validate_row(row) | |
log_data.append([ | |
row.get(OPERATION_HEADER), | |
row.get(START_TIME_HEADER), | |
row.get(END_TIME_HEADER) | |
]) | |
return log_data | |
def convert_time_into_mins(time): | |
hours, minutes = list(map(int, time.split(":"))) | |
return hours * 60 + minutes | |
def group_data_according_to_operation(log_data): | |
compute_diff_between_start_and_end_times = lambda start_time, end_time: convert_time_into_mins( | |
end_time) - convert_time_into_mins(start_time) | |
collapse_start_and_end_time = lambda data: [ | |
data[0], | |
compute_diff_between_start_and_end_times(data[1], data[2]) | |
] | |
collapsed_log_data = list(map(collapse_start_and_end_time, log_data)) | |
log_data_grouped_by_operation = defaultdict(list) | |
for data in collapsed_log_data: | |
log_data_grouped_by_operation[data[0]].append(data[1]) | |
return log_data_grouped_by_operation | |
def compute_average_time_of_operation(log_data): | |
log_data_grouped_by_operation = map( | |
lambda data: [data[0], sum(data[1]) // len(data[1])], | |
group_data_according_to_operation(log_data).items()) | |
print('\n*******Average Times of Operations*******') | |
print("{:<10} {:<8}".format('Operation Type', 'Average Time Taken')) | |
for k, v in log_data_grouped_by_operation: | |
print("{:<10} {:<8}".format(k, v)) | |
def find_operation_with_maximum_time(log_data): | |
log_data_grouped_by_operation = group_data_according_to_operation(log_data) | |
log_data_grouped_by_operation = list( | |
map(lambda item: [item[0], sum(item[1])], | |
log_data_grouped_by_operation.items())) | |
max_time = max(log_data_grouped_by_operation, key=lambda data: data[1]) | |
print('\n********Operation with Maximum Time******') | |
print("{:<10} {:<8}".format('Operation Type', 'Max Time Taken')) | |
print("{:<10} {:<8}".format(max_time[0], max_time[1])) | |
def do_intervals_overlap(interval1, interval2): | |
start_time_1, end_time_1 = list(map(convert_time_into_mins, interval1)) | |
start_time_2, end_time_2 = list(map(convert_time_into_mins, interval2)) | |
return True if start_time_1 <= start_time_2 <= end_time_1 else False | |
def max_time(time1, time2): | |
return time1 if convert_time_into_mins(time1) > convert_time_into_mins( | |
time2) else time2 | |
def merge_intervals(intervals): | |
stack = [] | |
#sorting by the start times | |
intervals.sort(key=lambda interval: int(interval[0].split(":")[0]) * 60 + | |
int(interval[0].split(":")[1])) | |
stack.append(intervals[0]) | |
for i in range(1, len(intervals)): | |
top = stack[-1] | |
if do_intervals_overlap(top, intervals[i]): | |
#merge them | |
stack.pop() | |
new_top = [top[0], max_time(top[1], intervals[i][1])] | |
stack.append(new_top) | |
else: | |
stack.append(intervals[i]) | |
return stack | |
def merge_data(log_data): | |
log_data_grouped_by_operation = defaultdict(list) | |
for data in log_data: | |
log_data_grouped_by_operation[data[0]].append([data[1], data[2]]) | |
print('\n********Operation with Merged Intervals******') | |
for operation, intervals in log_data_grouped_by_operation.items(): | |
print(f'\nOperation Type: {operation}') | |
print('Merged Intervals') | |
print(merge_intervals(intervals)) | |
if __name__ == "__main__": | |
log_data = read_and_structure_input('log_data.csv') | |
compute_average_time_of_operation(log_data) | |
find_operation_with_maximum_time(log_data) | |
merge_data(log_data) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment