Skip to content

Instantly share code, notes, and snippets.

@Josh5
Last active September 2, 2018 20:35
Show Gist options
  • Save Josh5/62ef0947b084eba65e7800b02b294e87 to your computer and use it in GitHub Desktop.
Save Josh5/62ef0947b084eba65e7800b02b294e87 to your computer and use it in GitHub Desktop.
Tasks2IssueTracker

Install

Install deps:

sudo apt install python-pip python3-pip python3-setuptools python-setuptools;
sudo -H python3 -m pip install lxml;

Install script to taskwarrior hooks (ie. ~/.task/hooks/)

Option A)

Install a bash script that fetches the hook from gist every time.

Open ~/.task/hooks/on-add-Tasks2IssueTracker.sh and add the following:

#!/bin/bash
#

read JSON_STRING;

NOW="$(date +'%Y-%m-%d')";
SCRIPT_SOURCE=https://gist.githubusercontent.com/Josh5/62ef0947b084eba65e7800b02b294e87/raw/Tasks2IssueTracker.py;
TEMP_SCRIPT_DATE=/tmp/${NOW}-Tasks2IssueTracker.py;
TEMP_SCRIPT=/tmp/Tasks2IssueTracker.py;

if [[ ! -z ${TEMP_SCRIPT_DATE} ]]; then
    curl -sSL ${SCRIPT_SOURCE} > ${TEMP_SCRIPT_DATE};
    line=$(head -n 1 ${TEMP_SCRIPT_DATE});
    if [[ ${line} =~ '/usr/bin/python' ]]; then
        cp -f ${TEMP_SCRIPT_DATE} ${TEMP_SCRIPT};
    fi
fi
chmod +x ${TEMP_SCRIPT};
echo ${JSON_STRING} | ${TEMP_SCRIPT};
Option B)

Install/update the actual script to the hooks dir:

curl -sSL https://gist.githubusercontent.com/Josh5/62ef0947b084eba65e7800b02b294e87/raw/Tasks2IssueTracker.py > ~/.task/hooks/Tasks2IssueTracker.py;
chmod +x Tasks2IssueTracker.py
#!/usr/bin/python3
#
import requests
import os
import sys
import json
import re
import codecs
import datetime
from lxml import html
# start logging
import logging
logging.basicConfig(filename='/tmp/task_Tasks2IssueTracker.log',level=logging.DEBUG,format='%(asctime)s - %(levelname)s: %(message)s');
# load config
config = {};
try:
with open(os.path.join(os.path.expanduser('~'), '.task', 'Tasks2IssueTracker_config.json'), 'r') as configFile:
config = json.load(configFile);
except Exception:
logging.error('[MAIN]: Failed to load config.');
sys.exit(1);
class HANDLE():
def __init__(self,debug=False,title=None,config=None):
self.title = title;
self.config = config;
self.debug = self.config['debugging'];
def getConfig(self, config_id):
config = self.config[self.title];
if config and str(config_id) in config:
return config[str(config_id)];
return None
def getProject(self, project):
project = project.lower();
project_dict = self.getConfig('projects');
if project_dict and project in project_dict:
project_config = project_dict[project];
else:
project_config = {};
return project_config;
def log(self, string):
if self.debug:
if not self.title:
template = '%s';
else :
template = '[' + self.title + ']: %s'
try:
logging.debug(template % string);
except UnicodeEncodeError:
# we can't anticipate everything in unicode they might throw at
# us, but we can handle a simple BOM
bom = unicode(codecs.BOM_UTF8, 'utf8');
logging.debug(template % string.replace(bom, ''));
except Exception:
pass;
return False
class REMOTE():
def __init__(self, timeout=20, debug=False):
self.timeout = timeout;
self.debug = debug;
# Configure session and cookies
self.http_session = requests.Session();
self.cookies = None;
self.use_cookie = False;
def log(self, string):
if self.debug:
try:
logging.debug('[REMOTE]: %s' % string);
except UnicodeEncodeError:
# we can't anticipate everything in unicode they might throw at
# us, but we can handle a simple BOM
bom = unicode(codecs.BOM_UTF8, 'utf8');
logging.debug('[REMOTE]: %s' % string.replace(bom, ''));
except Exception:
pass
def make_request(self, url, method, payload=None, headers=None, allow_redirects=True, files=None):
"""Make an http request. Return the response."""
self.log('Request URL: %s' % url);
self.log('Headers: %s' % headers);
self.log('Payload: %s' % payload);
try:
if method == 'get':
req = self.http_session.get(
url, params=payload, headers=headers, allow_redirects=allow_redirects, timeout=self.timeout);
elif method == 'files':
req = self.http_session.post(
url, data=payload, files=files, headers=headers, allow_redirects=allow_redirects, timeout=self.timeout);
elif method == 'getfile':
req = self.http_session.get(
url, params=payload, headers=headers, allow_redirects=allow_redirects, timeout=self.timeout, stream=True);
elif method == 'json':
req = self.http_session.post(
url, json=payload, headers=headers, allow_redirects=allow_redirects,timeout=self.timeout);
else: # post
req = self.http_session.post(
url, data=payload, headers=headers, allow_redirects=allow_redirects, timeout=self.timeout);
req.raise_for_status();
self.log('Response code: %s' % req.status_code);
self.log('Response: %s' % req.content);
return req
except requests.exceptions.HTTPError as error:
self.log('An HTTP error occurred: %s' % error);
raise
except requests.exceptions.ProxyError:
self.log('Error connecting to proxy server');
raise
except requests.exceptions.ConnectionError as error:
self.log('Connection Error: - %s' % error.message)
raise
except requests.exceptions.RequestException as error:
self.log('Error: - %s' % error.value);
raise
class Harvest(object):
def __init__(self, *args, **kwargs):
self.config = None;
if 'config' in kwargs:
self.config = kwargs.get("config");
self.handle = HANDLE(title='harvest',config=self.config);
self.debug = self.handle.debug;
self.req = REMOTE(debug=self.debug);
self.headers = [];
self.cookie = self.handle.getConfig("cookie");
def setHeaders(self):
headers = {
'content-type' : 'application/json',
'accept' : 'application/json, text/javascript, */*; q=0.01',
"x-requested-with" : "XMLHttpRequest",
'x-csrf-token' : self.handle.getConfig("x-csrf-token"),
'cookie' : self.handle.getConfig("cookie"),
};
return headers;
def get_project_config(self, project, notes):
date_now = datetime.datetime.now().strftime("%Y-%m-%d");
project_config = self.handle.getProject(project);
if project_config:
project_config['user_id'] = self.handle.getConfig("user_id");
project_config['notes'] = notes;
project_config['spent_at'] = str(date_now);
return project_config;
return False;
def create_entry(self, task_dict):
project = task_dict['project'];
time_now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S");
if 'tags' in task_dict and task_dict['tags']:
mantis_issue_id = '';
for tag in task_dict['tags']:
if tag.isdigit() and not tag == '1':
mantis_issue_id = "ISSUE #{} \n".format(str(tag))
break;
notes = "{}STARTED: {} \nDESCRIPTION: {}".format(
mantis_issue_id, time_now, task_dict['description'])
else:
notes = "STARTED: {} \nDESCRIPTION: {}".format(
time_now, task_dict['description'])
project_config = self.get_project_config(project, notes);
if project_config:
headers = self.setHeaders();
_url = self.handle.getConfig("harvest_url");
ret = self.req.make_request(_url, 'json', payload=project_config, headers=headers);
self.handle.log(ret.content);
task_dict['Tasks2IssueTracker_harvest'] = 'success';
return task_dict;
else:
task_dict['Tasks2IssueTracker_harvest'] = 'failed';
return task_dict;
class Mantis(object):
def __init__(self, *args, **kwargs):
self.config = None;
if 'config' in kwargs:
self.config = kwargs.get("config");
self.handle = HANDLE(title='mantis',config=self.config);
self.debug = self.handle.debug;
self.req = REMOTE(debug=self.debug);
self.headers = [];
self.cookie = self.handle.getConfig("cookie");
def fetch_form_data(self, form_data={}):
_url = self.handle.getConfig("mantis_url") + "/bug_report_page.php";
headers = {
'Cookie': self.cookie
}
ret = self.req.make_request(_url, 'get', headers=headers);
if ret.status_code == 200:
tree = html.fromstring(ret.text);
form_data['bug_report_token'] = tree.xpath('//input[@name="bug_report_token"]/@value')[0];
return form_data;
return False;
def raise_issue(self, task_dict):
if not 'description' in task_dict and not task_dict['description']:
self.handle.log("No description provided, not creating a Mantis issue");
return task_dict;
project = task_dict['project'];
string = task_dict['description'];
_url = self.handle.getConfig("mantis_url") + "/bug_report.php?posted=1";
project_config = self.handle.getProject(project);
if project_config:
issue_number = False;
project_config['summary'] = string;
project_config['description'] = string;
project_config['handler_id'] = self.handle.getConfig("handler_id");
send_dict = self.fetch_form_data(project_config);
if not send_dict:
self.handle.log("Failed to fetch bug report form");
return task_dict;
headers = {
'Cookie': self.cookie
}
ret = self.req.make_request(_url, 'post', payload=send_dict, headers=headers);
if ret.status_code == 200:
task_dict['Tasks2IssueTracker_mantis'] = 'success';
self.handle.log("ret status code is 200");
if self.debug:
with open('/tmp/task_last_mantis.html', 'w') as html_response:
html_response.write(ret.text);
button_String = re.findall("View Submitted Issue [0-9]*</a>", ret.text);
if button_String:
issue_number = re.findall(r'\d+', button_String[0]);
if issue_number:
issue_number = str(issue_number[0]);
if 'tags' in task_dict:
task_dict['tags'].append(issue_number);
else:
task_dict['tags'] = [issue_number];
self.handle.log("Successfully created issue: {}".format(issue_number));
if not issue_number:
self.handle.log("Failed to create issue");
else:
self.handle.log("Manits is not configured to create issues");
task_dict['Tasks2IssueTracker_mantis'] = 'failed';
return task_dict;
def fetch_issue(self, mantis_issue_id, task_dict):
project = task_dict['project'];
project_config = self.handle.getProject(project);
if project_config:
_url = self.handle.getConfig("mantis_url") + '/view.php?id={}'.format(mantis_issue_id)
headers = {
'Cookie': self.cookie
}
ret = self.req.make_request(_url, 'get', headers=headers);
if ret.status_code == 200:
tree = html.fromstring(ret.text);
try:
task_dict['description'] = tree.xpath('//td[@class="bug-summary"]/text()')[0].lstrip("0");
except:
self.handle.log("Failed to fetch mantis issue summary");
else:
self.handle.log("Manits is not configured to create issues");
task_dict['Tasks2IssueTracker_mantis'] = 'failed';
return task_dict;
def handle_mantis(task_dict):
if not 'project' in task_dict and not task_dict['project']:
logging.debug('[HANDLE_TASK]: No project provided, not creating a Mantis issue');
return task_dict;
mantis = Mantis(config=config);
mantis_issue = False;
if 'tags' in task_dict and task_dict['tags']:
for tag in task_dict['tags']:
if tag.isdigit():
mantis_issue = str(tag);
if mantis_issue:
task_dict = mantis.fetch_issue(mantis_issue, task_dict);
else:
task_dict = mantis.raise_issue(task_dict);
return task_dict;
def handle_harvest(task_dict):
logging.debug('[HANDLE_TASK]: Running harvest handler');
harvest = Harvest(config=config);
task_dict = harvest.create_entry(task_dict);
return task_dict;
def handle_task(task_dict):
if 'mantis' in config and config['mantis']['enabled']:
try:
task_dict = handle_mantis(task_dict);
except Exception:
logging.error('[HANDLE_TASK]: Failed to run mantis handle - {}'.format(sys.exc_info()[0]));
if 'harvest' in config and config['harvest']['enabled']:
try:
task_dict = handle_harvest(task_dict);
except Exception:
logging.error('[HANDLE_TASK]: Failed to run harvest handle - {}'.format(sys.exc_info()[0]));
return task_dict;
task_json = sys.stdin.readline()
logging.debug('[MAIN]: Received input - {}'.format(task_json));
try:
added_task = json.loads(task_json);
return_task = handle_task(added_task);
task_json = json.dumps(return_task);
except Exception:
logging.error('[MAIN]: Failed to handle task passed by taskwarrior - {}'.format(sys.exc_info()[0]));
print(task_json);
sys.exit(0)
{
"debugging": true,
"mantis": {
"enabled" : true,
"cookie" : "<xxxx>",
"mantis_url" : "https://www.<xxxx>.org/mantis/",
"projects": {
"myproject": {
"m_id" : "<xxxx>",
"project_id" : "<xxxx>",
"category_id" : "<xxxx>",
"reproducibility" : "<xxxx>",
"severity" : "<xxxx>",
"priority" : "<xxxx>",
"handler_id" : "<xxxx>",
"steps_to_reproduce" : "",
"additional_info" : "",
"tag_string" : "",
"tag_select" : "<xxxx>",
"custom_field_4_year" : "<xxxx>",
"custom_field_4_month" : "<xxxx>",
"custom_field_4_day" : "<xxxx>",
"custom_field_4_presence" : "<xxxx>",
"custom_field_1" : "<xxxx>",
"custom_field_1_presence" : "<xxxx>",
"custom_field_3" : "<xxxx>,",
"custom_field_3_presence" : "<xxxx>",
"view_state" : "<xxxx>",
}
}
},
"harvest": {
"enabled" : true,
"user_id" : "<xxxx>",
"x-csrf-token" : "<xxxx>",
"cookie" : "<xxxx>",
"harvest_url" : "https://<xxxx>.harvestapp.com/time/api",
"projects": {
"myproject": {
"notes" : "",
"spent_at" : "",
"hours" : "0.02",
"project_id" : "<xxxx>",
"task_id" : "<xxxx>"
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment