Skip to content

Instantly share code, notes, and snippets.

@luskan
Last active May 30, 2023 19:46
Show Gist options
  • Save luskan/7d41fa983d6d2c631b14017fcafee17f to your computer and use it in GitHub Desktop.
Save luskan/7d41fa983d6d2c631b14017fcafee17f to your computer and use it in GitHub Desktop.
grabs log times from easyreadmine and generates reports.
'''
er_pass_key.py :: various tools to analyze er with its rest api
- grabs log times from easyreadmine and generates reports.
- lists issues in which user had added any changes in last week
api documentation:
https://easyredmine.docs.apiary.io/
https://www.redmine.org/projects/redmine/wiki/Rest_api
info: requires python 3.7, you also must create er_pass_key_settings.py file with correct
S_AUTH_USER, S_AUTH_PASS, S_API_KEY, S_SERVER_HOST, S_TIMEOUT_SECONDS variables (example below)
'''
import concurrent
import datetime
import functools
import http.client
import os
import sys
import getopt
import time
import xml.etree.ElementTree as ElementTree
from base64 import b64encode
from concurrent.futures import ThreadPoolExecutor
from contextlib import closing
from time import strftime, gmtime
import sqlite3
import numpy as np
usage_text = '''
usage, to print last 7 days history for Tom Kelly: er_pass_key.py -h -u Tom Kell -d 7
'''
# Used by log_time_report
holidays = ['2018-11-01', '2018-11-02', '2018-11-12', '2018-12-31', '2019-01-01']
'''
S_API_KEY = "0123456789001234567890012345678900123456"
S_AUTH_USER = "" # optional
S_AUTH_PASS = "" # optional
S_SERVER_HOST = "er.someserver.com"
S_TIMEOUT_SECONDS = 10
'''
from er_pass_key_settings import S_AUTH_USER, S_AUTH_PASS, S_API_KEY, S_SERVER_HOST, S_TIMEOUT_SECONDS
headers = {
'cache-control': 'no-cache',
}
if S_AUTH_USER != "":
userAndPass = b64encode(bytes(S_AUTH_USER + ":" + S_AUTH_PASS, "utf-8")).decode("ascii")
headers['Authorization'] = 'Basic ' + userAndPass
request_response_cache = {}
issueNameDict = {}
# key-value db based on sqlite3
class KeyValueDB(object):
def __init__(self, name="er_cache.db"):
self.db_conn = sqlite3.connect(name, check_same_thread=False)
def put(self, table, main_id, key, value):
""" id is unique id associated with this key-value pair """
self.__verify(table, key)
self.db_conn.execute('''INSERT INTO {0} (main_id, {1})
VALUES (?, ?)
ON CONFLICT (main_id)
DO UPDATE SET {1} = excluded.{1}
'''
.format(table, key),
(main_id, value))
self.db_conn.commit()
def get(self, table, main_id, key, def_value=""):
self.__verify(table, key)
result = def_value
with closing(self.db_conn.execute(
'''SELECT {0} from {1} where main_id = "{2}"'''.format(key, table, main_id))) as cursor:
if cursor.rowcount != 0:
row_one = cursor.fetchone()
if row_one:
result = row_one[0]
cursor.close()
return result
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.db_conn.close()
def __verify(self, table, column):
self.__add_table_if_not_exists(table)
self.__add_column_if_not_exists(table, column)
def __add_table_if_not_exists(self, table):
self.db_conn.execute('''CREATE TABLE IF NOT EXISTS {0} (
main_id TEXT PRIMARY KEY
)'''.format(table))
def __add_column_if_not_exists(self, table, column):
if self.__column_exists(table, column):
return
self.db_conn.execute('''ALTER TABLE {0} ADD COLUMN {1} TEXT'''
.format(table, column))
def __table_exists(self, table):
with closing(self.db_conn.cursor()) as c:
c.execute(''' SELECT count(name) FROM sqlite_master WHERE type='table' AND name='{0}' '''.format(table))
return c.fetchone()[0] == 1
def __column_exists(self, table, column_name):
with closing(self.db_conn.cursor()) as c:
data = c.execute('''select * from {0}'''.format(table))
return len(list(filter(lambda x: x[0] == column_name, data.description))) > 0
def test_data_cache():
test_db = "test_cache.db"
if os.path.exists(test_db):
os.remove(test_db)
try:
with KeyValueDB(test_db) as dc:
dc.put("students", "1000", "name", "tim")
dc.put("students", "1000", "surname", "cook")
dc.put("grades", "1000", "math", "4")
assert dc.get("students", "1000", "name") == "tim"
assert dc.get("students", "1000", "surname") == "cook"
assert dc.get("grades", "1000", "math") == "4"
dc.put("students", "1000", "age", "18")
assert dc.get("students", "1000", "age") == "18"
assert dc.get("students", "1001", "age", "?") == "?"
with KeyValueDB(test_db) as dc:
assert dc.get("students", "1000", "name") == "tim"
assert dc.get("students", "1000", "surname") == "cook"
assert dc.get("grades", "1000", "math") == "4"
assert dc.get("students", "1000", "age") == "18"
assert dc.get("students", "1001", "age", "?") == "?"
dc.put("students", "1000", "name", "timmi")
assert dc.get("students", "1000", "name") == "timmi"
finally:
os.remove(test_db)
def get_issue_name(id):
'''
Returns name of issue given ID
Args:
id:
ID of issue
Returns:
Name of issue
'''
if id in issueNameDict:
return issueNameDict[id]
if id == -1:
return "out of office/no log times"
if id == -2:
return "(out of case entry)"
conn = http.client.HTTPSConnection(S_SERVER_HOST)
conn.request("GET", "/issues/" + id + ".xml?key=" + S_API_KEY, headers=headers)
res = conn.getresponse()
data = res.read()
root = ElementTree.fromstring(data)
issueNameDict[id] = root.find("subject").text
return issueNameDict[id]
# This will not work as API does not allow to retrieve journals for many issues in one request, you
# must get each issue by separate request and in which you will have journals for it.
'''
def get_issues_journals(issues_ids, for_user_name, create_after_date_time):
print("issue:{0}, {1}, {2}".format(str(",".join(issues_ids)), for_user_name, create_after_date_time))
found_journals = []
while True:
conn = http.client.HTTPSConnection(S_SERVER_HOST)
# request = "/issues.xml?key={0}&issue_id={1}&include=journals".format(S_API_KEY, ','.join(ids))
url = "/issues.xml?key={0}&issue_id={1}&include=journals".format(S_API_KEY, ",".join(issues_ids))
conn.request("GET", url, headers=headers)
res = conn.getresponse()
data = res.read()
try:
root = ElementTree.fromstring(data)
except ElementTree.ParseError as e:
# print(e)
print(str(data) + " .. waiting 5s")
# sys.exit(1)
time.sleep(5)
continue
# TODO: journal iteration is not yet finished. API does not support it and probably never will
issue_id = root.find("id").text
subject = root.find("subject").text
journalsElement = root.find("journals")
for journal in journalsElement.iter("journal"):
user = journal.find("user")
user_name = user.attrib['name']
user_id = user.attrib['id']
if user_name != for_user_name:
continue
created_on = journal.find('created_on').text
created_on_date_time = datetime.datetime.strptime(created_on, "%Y-%m-%dT%H:%M:%SZ")
if created_on_date_time.date() >= create_after_date_time.date():
found_journals.append(
{"issue_id": issue_id, "subject": subject, "journal": journal, "created_on": created_on})
break
return found_journals
'''
def get_issue_journals(id, updated_on, for_user_name, create_after_date_time, db):
#print("issue:{0}, {1}, {2}".format(id, for_user_name, create_after_date_time))
found_journals = []
while True:
data = ""
db_updated_on = db.get("issues", id, "updated_on", "")
if db_updated_on and db_updated_on == updated_on:
data = db.get("issues", id, "server_data", "")
if not data:
conn = http.client.HTTPSConnection(S_SERVER_HOST)
conn.request("GET", "/issues/{0}.xml?key={1}&include=journals".format(id, S_API_KEY), headers=headers)
res = conn.getresponse()
data = res.read().decode('utf-8')
try:
root = ElementTree.fromstring(data)
except ElementTree.ParseError as e:
print(str(data) + " .. waiting 10s")
time.sleep(S_TIMEOUT_SECONDS)
continue
issue_id = root.find("id").text
subject = root.find("subject").text
journalsElement = root.find("journals")
for journal in journalsElement.iter("journal"):
user = journal.find("user")
user_name = user.attrib['name']
user_id = user.attrib['id']
if user_name != for_user_name:
continue
created_on = journal.find('created_on').text
created_on_date_time = datetime.datetime.strptime(created_on, "%Y-%m-%dT%H:%M:%SZ")
if created_on_date_time.date() >= create_after_date_time.date():
found_journals.append(
{"issue_id": issue_id, "subject": subject, "journal": journal, "created_on": created_on})
db.put("issues", id, "updated_on", updated_on)
db.put("issues", id, "server_data", data)
break
return found_journals
def init_log_time_reports(from_date, to_date, for_user_name=""):
'''
Retrievies time entries array in xml form from the server.
Args:
from_date: start date
to_date: end date
for_user_name: optional, filter by user name
Returns:
array of matching xml.etree.ElementTree.Element types
'''
time_entries = []
current_offset = 0
while True:
path = "/time_entries.xml?key={0}&limit={1}&period_type={2}&from={3}&to={4}&offset={5}".format(
S_API_KEY,
400, # limit
2,
from_date,
to_date,
str(current_offset)
)
if path in request_response_cache:
data = request_response_cache[path]
else:
conn = http.client.HTTPSConnection(S_SERVER_HOST)
conn.request("GET", path, headers=headers)
res = conn.getresponse()
data = res.read()
try:
root = ElementTree.fromstring(data)
except ElementTree.ParseError as e:
print(str(data) + " .. waiting 2s")
time.sleep(2)
continue
request_response_cache[path] = data
total_count = root.attrib['total_count']
limit = root.attrib['limit']
offset = root.attrib['offset']
for child in root:
current_offset = current_offset + 1
percent = (float(current_offset) / float(total_count)) * 100.0
sys.stdout.write('\r')
sys.stdout.write(
"Getting time entries: [%-20s] %d%%" % ('=' * int(int(percent + 0.5) / 5.0), int(percent + 0.5)))
sys.stdout.flush()
if for_user_name != "":
entry_user_name = child.find('user').attrib['name']
if entry_user_name != for_user_name:
continue
time_entries.append(child)
if current_offset >= int(total_count):
break
return time_entries
def get_user_list_from_time_entries(time_entries):
'''
List users from the time entries array
Args:
time_entries:
must be generated using init_log_time_reports
Returns:
array of tuples (username, user_id)
'''
user_list = []
for entry in time_entries:
user_name = entry.find('user').attrib['name']
user_id = entry.find('user').attrib['id']
if [user_name, user_id] not in user_list:
user_list.append([user_name, user_id])
return user_list
def get_log_time_report_for_user(time_entries, reportUserName, reportUserId, hours_per_day, from_date, to_date):
'''
Generates report in string format for provided time entries.
Args:
time_entries: collection of time entries, must be generated using init_log_time_reports
reportUserName: optional, you may specify user name for whom to generate this report
reportUserId: user id, mandatory - used in outputed report
hours_per_day: how many hours a day does this person work (we assume 5 days work week)
from_date: start date
to_date: end date
Returns:
dictionary containing three parts of the report:
- header
- daily work
- percentage work on various projects
:{"header": header_report_str, "daily": daily_report_str, "by_case_percentage": by_case_report_str}
'''
daily_report_str = ""
index = 1
loop_index = 0
total_day_hours = {}
total_by_case_in_week_hours = {}
last_date = ""
total_week_hours = 0
total_entries_for_user = 0
d_start = datetime.datetime.strptime(from_date, '%Y-%m-%d').date()
d_end = datetime.datetime.strptime(to_date, '%Y-%m-%d').date()
work_days = np.busday_count(d_start, d_end, holidays=holidays)
print("\n")
for child in reversed(time_entries):
loop_index += 1
percent = (float(loop_index) / float(len(time_entries))) * 100.0
sys.stdout.write('\r')
sys.stdout.write("Generating report: [%-20s] %d%% - %s" % (
'=' * int(int(percent + 0.5) / 5.0), int(percent + 0.5), reportUserName))
sys.stdout.flush()
user_name = child.find('user').attrib['name']
if user_name != reportUserName:
continue
total_entries_for_user += 1
spent_on_str = child.find('spent_on').text
spent_on = datetime.datetime.strptime(spent_on_str, '%Y-%m-%d')
spent_on_date = spent_on.date()
spent_on_week_day_name = spent_on_date.strftime("%A")
if last_date != spent_on_str:
if last_date != "":
daily_report_str += "total: {0}h\n".format(total_day_hours[last_date])
daily_report_str += "\n{0} {1}\n".format(spent_on_week_day_name, spent_on_str)
hours_float = float(child.find('hours').text)
total_week_hours += hours_float
if spent_on_str not in total_day_hours:
total_day_hours[spent_on_str] = hours_float
else:
total_day_hours[spent_on_str] += hours_float
last_date = spent_on_str
if child.find('issue') is not None:
case_id = child.find('issue').attrib['id']
else:
case_id = -2
if case_id not in total_by_case_in_week_hours:
total_by_case_in_week_hours[case_id] = hours_float
else:
total_by_case_in_week_hours[case_id] += hours_float
if case_id == -1 or case_id == -2:
case_field = "{0}".format(get_issue_name(case_id))
else:
case_field = "case {0}: {1}".format(case_id, get_issue_name(case_id))
daily_report_str += "- {3} [{1} - https://{6}/issues/{2}]\n {4}h - {5}\n".format(
index,
child.find('project').attrib['name'],
case_id, case_field,
hours_float,
child.find('comments').text,
S_SERVER_HOST
)
index += 1
daily_report_str += "total: {0}h".format(total_day_hours[last_date])
daily_report_str += "\n"
if total_week_hours < work_days * hours_per_day:
total_by_case_in_week_hours[-1] = (work_days * hours_per_day) - total_week_hours;
by_case_report_str = ""
percent_by_case_id = {}
for case_id, hours in total_by_case_in_week_hours.items():
if case_id == -1:
# out of office
percent_by_case_id[case_id] = (hours / (hours_per_day * work_days)) * 100.0
else:
percent_by_case_id[case_id] = (hours / (hours_per_day * work_days)) * 100.0
for elem in sorted(percent_by_case_id.items(), key=lambda x: x[1], reverse=True):
if elem[0] == -1 or elem[0] == -2:
case_field = "{0}".format(get_issue_name(elem[0]))
else:
case_field = "case {0}: {1}".format(elem[0], get_issue_name(elem[0]))
by_case_report_str += "{0:4.1f}% {1:4.1f}h - {2} \n".format(elem[1], total_by_case_in_week_hours[elem[0]],
case_field)
header_report_str = "Report for: {0} (ID={1})\n".format(reportUserName, reportUserId)
header_report_str += "Report date: {0}\n".format(strftime("%Y-%m-%d %H:%M:%S", gmtime()))
header_report_str += "Report span: od {0} do {1}\n".format(from_date, to_date)
header_report_str += "Total work days: {0}\n".format(work_days)
header_report_str += "Number of entries: {0}\n".format(total_entries_for_user)
header_report_str += "Work week: {0}h/day\n".format(hours_per_day)
return {"header": header_report_str, "daily": daily_report_str, "by_case_percentage": by_case_report_str}
def generate_report(from_date, to_date, db, user_name=""):
'''
Args:
from_date: start date
to_date: end date
user_name: name of user, might be empty then all found users will be in report
Returns: Report in string form
'''
report_str = ""
time_entries = init_log_time_reports(from_date, to_date, user_name)
user_list = get_user_list_from_time_entries(time_entries)
for userName in user_list:
report = get_log_time_report_for_user(time_entries, userName[0], userName[1], 8, from_date, to_date)
report_str += "\n*** *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** ***\n\n{0}\n{2}{1}".format(
report["header"], report["daily"], report["by_case_percentage"])
report_str += "\n*** *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** *** ***"
return report_str
def generate_activity_history(after_date, for_user_name, db):
# GET /issues.xml?updated_on=%3E%3D2014-01-02T00:00:00Z
after_date_date_time = datetime.datetime.strptime(after_date, "%Y-%m-%d")
journal_entries = []
current_offset = 0
total_count = 0
total_found = 0
def journal_get_callback(after_date, fut):
nonlocal total_found
nonlocal current_offset
total_found += len(fut.result())
current_offset = current_offset + 1
percent = (float(current_offset) / float(total_count)) * 100.0
sys.stdout.write(
"Analyzing journals: [%-20s] %d%% - Found %d journal entries since %s\r"
% ('=' * int(int(percent + 0.5) / 5.0), int(percent + 0.5), total_found, after_date))
sys.stdout.flush()
with ThreadPoolExecutor(max_workers=1) as pool:
while True:
path = "/issues.xml?key={0}&limit={1}&offset={2}&updated_on=%3E%3D{3}T00:00:00Z".format(
S_API_KEY,
400, # limit
str(current_offset),
after_date
)
conn = http.client.HTTPSConnection(S_SERVER_HOST)
conn.request("GET", path, headers=headers)
res = conn.getresponse()
data = res.read()
try:
root = ElementTree.fromstring(data)
except ElementTree.ParseError as e:
# print(e)
print(str(data) + " .. waiting 1s")
# sys.exit(1)
time.sleep(1)
continue
request_response_cache[path] = data
total_count = int(root.attrib['total_count'])
limit = root.attrib['limit']
offset = root.attrib['offset']
issue_entries = []
for child in root.iter("issue"):
issue_id = child.find('id').text
issue_subject = child.find('subject').text
updated_on = child.find('updated_on').text
issue_entries.append({"id": issue_id, "subject": issue_subject, "updated_on": updated_on})
'''
# Journals are getting in bunches of 5, but this is notworking as er api does not allow it.
window = 5
windowed_issues_entries = [[id[0] for id in issue_entries[i:i + window]] for i in range(len(issue_entries)) if i % window == 0]
futures = {pool.submit(get_issues_journals, id_entries, for_user_name, after_date_date_time) for id_entries in
windowed_issues_entries}
'''
futures = [pool.submit(get_issue_journals, identry["id"], identry["updated_on"],
for_user_name, after_date_date_time, db) for identry in
issue_entries]
for fut in futures:
fut.add_done_callback(functools.partial(journal_get_callback, after_date))
concurrent.futures.wait(futures)
for fut in futures:
for entry in fut.result():
journal_entries.append({
"issue_id": entry['issue_id'],
"issue_name": entry['subject'],
"journal_entry_time": entry['created_on']
})
if current_offset >= total_count:
break;
return journal_entries
def generate_history_report(after_date, for_user_name, db):
journal_entries = generate_activity_history(after_date, for_user_name, db)
print('\n')
journal_entries.sort(
key=lambda x: datetime.datetime.strptime(x['journal_entry_time'], "%Y-%m-%dT%H:%M:%SZ").timestamp())
report = ""
current_issue_id = -1
for entry in journal_entries:
if entry['issue_id'] != current_issue_id:
current_issue_id = entry['issue_id']
report += 'Issue: https://{0}/issues/{1} - {2}\n'.format(S_SERVER_HOST, current_issue_id,
entry['issue_name'])
dt = datetime.datetime.strptime(entry['journal_entry_time'], "%Y-%m-%dT%H:%M:%SZ")
report += ' : {0} {1}\n'.format(str(dt.date()), str(dt.time()))
return report
def main():
argv = sys.argv[1:]
print_report = False
print_history = False
this_week_report = False
history_days_period = 7
report_for_user = ''
reference_date = datetime.datetime.now()
try:
# Define the getopt parameters
opts, args = getopt.getopt(argv, 'rthu:d:e:',
['report', 'this_week_report', 'history', 'user', 'days_period', 'reference_date'])
if len(opts) == 0 or len(opts) > 5:
print(usage_text)
else:
# Iterate the options and get the corresponding values
for o, a in opts:
if o in ('-r', '--report'):
print_report = True
elif o in ('-t', '--this_week_report'):
this_week_report = True
elif o in ('-h', '--history'):
print_history = True
elif o in ('-u', '--user'):
report_for_user = a
elif o in ('-d', '--days_period'):
history_days_period = int(a)
elif o in ('-e', '--reference_date'):
reference_date = datetime.datetime.strptime(a, '%Y-%m-%d')
except getopt.GetoptError:
# Print something useful
print(usage_text)
sys.exit(2)
with KeyValueDB("er_" + S_SERVER_HOST + ".db") as db:
if print_report:
date_time_days_ago = reference_date
if not this_week_report:
date_time_days_ago = date_time_days_ago - datetime.timedelta(days=history_days_period)
start = date_time_days_ago - datetime.timedelta(days=date_time_days_ago.weekday())
end = start + datetime.timedelta(days=history_days_period - 1)
print(generate_report(str(start.date()), str(end.date()), report_for_user, db))
if print_history:
if print_report:
print("\n\n")
# from_date_time = reference_date - datetime.timedelta(days=history_days_period)
from_date_time = reference_date - datetime.timedelta(days=date_time_days_ago.weekday())
date_time_days_ago = str(from_date_time.date())
print(generate_history_report(date_time_days_ago, report_for_user, db))
if __name__ == '__main__':
#test_data_cache()
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment