Skip to content

Instantly share code, notes, and snippets.

@yashh
Created February 12, 2012 00:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yashh/1805221 to your computer and use it in GitHub Desktop.
Save yashh/1805221 to your computer and use it in GitHub Desktop.
remember the milk cmd line
#!/usr/bin/python
"""Command line rtm interface
requirements: sudo pip install simplejson texttable
"""
import os, sys, urllib, urllib2, json, ConfigParser, datetime
from hashlib import md5
from texttable import Texttable
FILE_PATH = "~/.rtm"
SECTION = "rtm_data"
INBOX=XXXX
class RTM(object):
BASE_URL = "https://api.rememberthemilk.com/services/rest/"
AUTH_URL = "http://www.rememberthemilk.com/services/auth/"
def __init__(self, api_key, secret, token):
self.api_key = api_key
self.secret = secret
self.token = token
self._reset_params()
def _reset_params(self):
self.params = {
"format": "json",
"api_key": self.api_key
}
def _sort_params(self):
""" Return a list of (key, value) sorted based on keys
>>> list(sortedItems({'a': 1, 1: 55, 'key': 7}))
[(1, 55), ('a', 1), ('key', 7)]
"""
keys = list(self.params.keys())
keys.sort(key=str)
for key in keys:
yield key, self.params[key]
def _sign(self):
pairs = ''.join(['%s%s' % (k, v) for k, v in self._sort_params()])
return md5((self.secret + pairs).encode('utf-8')).hexdigest()
def get(self, key_to_return):
self.params["api_sig"] = self._sign()
rsp = urllib2.urlopen(self.BASE_URL+"?"+urllib.urlencode(self.params)).read()
data = json.loads(rsp.decode("utf-8"))["rsp"]
if data['stat'] == "fail":
raise Exception(data)
return data.get(key_to_return)
def getFrob(self):
self.params.update({"method": "rtm.auth.getFrob"})
return self.get("frob")
def get_and_save_token(self, frob):
self._reset_params()
self.params["method"] = "rtm.auth.getToken"
self.params["frob"] = frob
auth_data = self.get("auth")
token = auth_data['token']
set_config({"api_key": self.api_key,
"secret": self.secret,
"token": token})
return token
def auth(self):
frob = self.getFrob()
self._reset_params()
self.params["perms"] = "write"
self.params["frob"] = frob
self.params["api_sig"] = self._sign()
url = self.AUTH_URL + "?" + urllib.urlencode(self.params)
print "Visit this url: %s\n" % url
raw_input("enter once you gave access")
self.get_and_save_token(frob)
def format_tasks(self, items):
table = Texttable(max_width=0)
table.set_deco(Texttable.VLINES)
table.set_cols_dtype(['i', 't', 't'])
rows = [["id", "name", "due"]]
for counter, item in enumerate(items):
if type(item["task"]) == list or item["task"]["completed"]:
continue
priority = item["task"]["priority"]
due_time = item["task"].get("due", "")
if due_time:
due_time = humanize_timestamp(due_time)
item_name = item["name"]
if priority != "N":
item_name = item_name + "(*)"
rows.append([counter, item_name, due_time])
table.add_rows(rows)
print table.draw()
def _getList(self):
self.params["method"] = "rtm.tasks.getList"
self.params["auth_token"] = self.token
self.params["list_id"] = INBOX
tasks = self.get("tasks")["list"]
return tasks["taskseries"]
def getList(self):
items = self._getList()
return self.format_tasks(items)
def get_timeline(self):
self.params["method"] = "rtm.timelines.create"
self.params["auth_token"] = self.token
return self.get("timeline")
def addTask(self):
name = " ".join(sys.argv[2:])
if not name:
print "Enter task"
return
timeline = self.get_timeline()
self._reset_params()
self.params["method"] = "rtm.tasks.add"
self.params["auth_token"] = self.token
self.params["timeline"] = timeline
self.params["list_id"] = INBOX
self.params["name"] = name
self.params["parse"] = "1"
self.get("some")
print "Added"
def completeTask(self):
if len(sys.argv) < 3:
print "Usage: t c 23"
return
task_id = int(sys.argv[2])
items = self._getList()
taskseries_id = items[task_id]["id"]
internal_task_id = items[task_id]["task"]["id"]
self._reset_params()
timeline = self.get_timeline()
self._reset_params()
self.params["method"] = "rtm.tasks.complete"
self.params["auth_token"] = self.token
self.params["timeline"] = timeline
self.params["list_id"] = INBOX
self.params["taskseries_id"] = taskseries_id
self.params["task_id"] = internal_task_id
if self.get("stat") == "ok":
print "Completed"
def not_found(self):
print "invalid option"
# HELPERS
def get_config():
config = ConfigParser.ConfigParser()
full_file_path = os.path.expanduser(FILE_PATH)
config.read(full_file_path)
try:
defaults = config.items(SECTION)
return dict(defaults)
except ConfigParser.NoSectionError:
print """Create %s file with:
[%s]
api_key=xxx
secret=xxx
""" % (full_file_path, SECTION)
sys.exit(0)
def set_config(data):
config = ConfigParser.ConfigParser()
full_file_path = os.path.expanduser(FILE_PATH)
cfgfile = open(full_file_path,'w')
config.add_section(SECTION)
config.set(SECTION,'api_key',data['api_key'])
config.set(SECTION,'secret', data['secret'])
config.set(SECTION,'token', data['token'])
config.write(cfgfile)
cfgfile.close()
def humanize_timestamp(date_object):
if type(date_object) == unicode:
date_object = datetime.datetime.strptime(date_object, "%Y-%m-%dT%H:%M:%SZ")
delta = datetime.datetime.now() - date_object
plural = lambda x: 's' if x != 1 else ''
num_weeks = delta.days / 7
if (num_weeks > 1):
return "%d week%s ago" % (num_weeks, plural(num_weeks))
elif (num_weeks < -1):
return " in %d week%s" % (abs(num_weeks), plural(num_weeks))
if (delta.days > 0):
if delta.days < 2:
return "1 day ago"
return "%d day%s ago" % (delta.days, plural(delta.days))
elif (delta.days < 0):
return "in %d day%s" % (abs(delta.days), plural(delta.days))
num_hours = delta.seconds / 3600
if (num_hours > 0):
return "%d hour%s ago" % (num_hours, plural(num_hours))
elif (num_hours < 0):
return "in %d hour%s" % (abs(num_hours), plural(num_hours))
num_minutes = delta.seconds / 60
if (num_minutes > 0):
return "%d minute%s ago" % (num_minutes, plural(num_minutes))
elif (num_minutes < 0):
return "in %d minute%s" % (abs(num_hours), plural(num_minutes))
def get_action(rtm, abbr):
action_directory = {"l": rtm.getList,
"a": rtm.addTask,
"c": rtm.completeTask}
return action_directory.get(abbr, rtm.not_found)()
def main():
try:
args = sys.argv[1]
except IndexError:
args = "l"
config = get_config()
rtm = RTM(config["api_key"], config["secret"], config.get("token"))
return get_action(rtm, args)
if __name__ == '__main__': main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment