Created
November 29, 2011 17:53
-
-
Save oldpatricka/1405711 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import sys | |
import json | |
import string | |
import base64 | |
import argparse | |
import threading | |
import logging | |
import textwrap | |
import signal | |
import time | |
import httplib | |
import curses | |
import curses.textpad | |
import curses.ascii | |
LOGGING = True | |
class MessagePad(curses.textpad.Textbox): | |
winched = False | |
def do_command(self, ch): | |
""" | |
do_command is overridden to handle special cases so we can detect | |
the backspace key and when someone tries to send a message. | |
""" | |
if ch == curses.ascii.CR or ch == curses.ascii.LF: | |
return 0 | |
elif ch == curses.ascii.DEL: | |
return curses.textpad.Textbox.do_command(self,curses.ascii.BS) | |
else: | |
return curses.textpad.Textbox.do_command(self,ch) | |
def edit(self, validate=None, text=""): | |
self.winched = False | |
for char in list(text): | |
self.win.refresh() | |
ch = ord(char) | |
if validate: | |
ch = validate(ch) | |
if not ch: | |
continue | |
if not self.do_command(ch): | |
continue | |
while 1: | |
self.win.timeout(100) | |
if self.winched: | |
return "" | |
ch = self.win.getch() | |
if ch == -1: | |
continue | |
if validate: | |
ch = validate(ch) | |
if not ch: | |
continue | |
if not self.do_command(ch): | |
break | |
self.win.refresh() | |
printable = self.gather_printable() | |
self.win.erase() | |
return printable | |
def gather_printable(self): | |
chars = self.gather() | |
printable = filter(lambda x: curses.ascii.isprint(x), chars) | |
printable = printable.strip() | |
return printable | |
class CampfriendView(): | |
messages = [] | |
users = [] | |
message_win = None | |
message = None | |
chatlog = None | |
stdscr = None | |
winched_data = "" | |
room = "" | |
def __init__(self, stdscr, room="", user="", | |
send_message_callback=None, | |
get_messages_callback=None, | |
join_room_callback=None, | |
leave_room_callback=None, | |
get_users_callback=None): | |
# Setup callbacks | |
if send_message_callback: | |
self.send_message = send_message_callback | |
else: | |
self.send_message = self._dummy_callback | |
if get_messages_callback: | |
self.get_messages = get_messages_callback | |
else: | |
self.get_messages = self._dummy_callback | |
if join_room_callback: | |
self.join_room = join_room_callback | |
else: | |
self.join_room = self._dummy_callback | |
if leave_room_callback: | |
self.leave_room = leave_room_callback | |
else: | |
self.leave_room = self._dummy_callback | |
if get_users_callback: | |
self.get_users = get_users_callback | |
else: | |
self.get_users = self._dummy_callback | |
# Setup model | |
self.username = user | |
if room: | |
self.room = room | |
self.join_room(self.room) | |
# Setup drawing | |
self.stdscr = stdscr | |
self.draw_window() | |
self.draw_content() | |
signal.signal(signal.SIGWINCH, self._resize_signal_handler) | |
self.update_messages(self.room) | |
self.update_users(self.room) | |
while True: | |
# Get message from user | |
try: | |
old_text = self.winched_data | |
self.winched_data = "" | |
typed = self.message.edit(text=old_text) | |
except KeyboardInterrupt: | |
self.update_timer.cancel() | |
if self.room: | |
self.leave_room(self.room) | |
sys.exit() | |
except: | |
continue | |
if typed != "" and not self.message.winched: | |
self.send_message_async(self.room, typed) | |
self.message_win.erase() | |
def draw_content(self): | |
self.stdscr.refresh() | |
self.draw_chatlog() | |
self.draw_sidebar() | |
self.message_win.refresh() | |
def draw_chatlog(self): | |
self.chatlog.erase() | |
self.draw_messages() | |
self.chatlog.border(0) | |
self.chatlog.refresh() | |
def _resize_signal_handler(self, signum, frame): | |
self.message.winched = True | |
# TODO optimize this | |
time.sleep(0.5) | |
printable = self.message.gather_printable() | |
if printable != "": | |
self.winched_data = printable | |
curses.endwin() | |
self.draw_window() | |
self.draw_content() | |
def draw_window(self): | |
(screen_height, screen_width) = self.stdscr.getmaxyx() | |
right_margin_width = 15 | |
chatlog_x = 0 | |
chatlog_y = 0 | |
chatlog_h = screen_height - 5 | |
chatlog_w = screen_width - right_margin_width | |
self.chatlog = curses.newwin(chatlog_h, chatlog_w, | |
chatlog_y, chatlog_x) | |
sidebar_x = chatlog_x + chatlog_w | |
sidebar_y = chatlog_y | |
sidebar_h = chatlog_h | |
sidebar_w = screen_width - chatlog_w | |
self.sidebar = curses.newwin(sidebar_h, sidebar_w, | |
sidebar_y, sidebar_x) | |
left_margin = 1 | |
message_x = chatlog_x + left_margin | |
message_y = chatlog_h | |
message_h = screen_height - chatlog_h | |
message_w = screen_width - left_margin | |
self.message_win = curses.newwin(message_h, message_w, | |
message_y, message_x) | |
self.message = MessagePad(self.message_win) | |
def draw_sidebar(self): | |
self.sidebar.addstr(1, 1, string.upper(self.room), curses.A_BOLD) | |
self.sidebar.addstr(3, 1, "Who's here?", curses.A_BOLD) | |
userlist_start_y = 4 | |
y = userlist_start_y | |
for user in self.users: | |
self.sidebar.addstr(y, 1, self.format_name(user)) | |
y += 1 | |
self.sidebar.refresh() | |
def draw_messages(self): | |
log_height, log_width = self.chatlog.getmaxyx() | |
# adjust for borders | |
log_height -= 2 | |
log_width -= 2 | |
log_posn = log_height | |
for msg in reversed(self.messages): | |
(name, text, _) = msg | |
name = self.format_name(name) | |
# calculate positions for name and message text | |
name_posision = 1 | |
name_width = len(name) | |
text_position = name_width + 2 | |
text_width = log_width - text_position | |
# wrap message text to fit between name and edge of field | |
try: | |
text = textwrap.fill(text, text_width) | |
except: | |
#Couldn't wrap to that size, don't bother | |
break | |
text = text.replace("\n", "\n" + " " * text_position) | |
extra_lines = text.count("\n") | |
log_posn -= extra_lines | |
if log_posn <= 0: | |
break | |
try: | |
self.chatlog.addstr(log_posn, name_posision, name, curses.A_BOLD) | |
self.chatlog.addstr(log_posn, text_position, text) | |
log_posn -= 1 | |
except: | |
break | |
def format_name(self, name): | |
try: | |
first, last = name.split() | |
return "%s %s." % (first, last[0]) | |
except: | |
return name | |
def send_message_async(self, room, message): | |
def _send_message(room, message): | |
self.send_message(room, message) | |
self.messages.append((self.username, message, None)) | |
self.draw_chatlog() | |
send_timer = threading.Timer(0, _send_message, [room, message]) | |
send_timer.start() | |
def update_messages(self, room): | |
self.messages = self.get_messages(room) | |
self.draw_content() | |
self.update_timer = threading.Timer(2, self.update_messages, [self.room]) | |
self.update_timer.start() | |
def update_users(self, room): | |
new_users = self.get_users(room) | |
if new_users: | |
self.users = new_users | |
self.draw_content() | |
self.update_users_timer = threading.Timer(2, self.update_users, [self.room]) | |
self.update_users_timer.start() | |
def _dummy_callback(self, *args): | |
return None | |
class Campfire(): | |
username = "" | |
apikey = "" | |
headers = {} | |
server = "" | |
https = False | |
users = {} | |
i = 0 | |
messages = {} | |
def __init__(self, apikey, server): | |
self.apikey = apikey | |
self.server = server | |
self.headers = {"Authorization" : self._auth_header(self.apikey), | |
"Content-Type": "application/json"} | |
response = self._get("/users/me.xml", "", self.headers) | |
if response.status == 301: | |
self.https = True | |
response = self._get("/users/me.json", "", self.headers) | |
if response.status != 200: | |
print "Problem connecting to campfire. Error: '%s'" %response.reason | |
sys.exit() | |
try: | |
data = json.loads(response.read()) | |
except: | |
print "Couldn't load data from campfire" | |
sys.exit() | |
user = data["user"] | |
self.username = user["name"] | |
def _auth_header(self, apikey): | |
return "Basic " + base64.b64encode("%s:" % apikey) | |
def _connection(self): | |
if self.https: | |
return httplib.HTTPSConnection(self.server) | |
else: | |
return httplib.HTTPConnection(self.server) | |
def get_rooms(self): | |
response = self._get("/rooms.json", "", self.headers) | |
if response.status != 200: | |
sys.exit() | |
try: | |
data = json.loads(response.read()) | |
except: | |
print "Couldn't load data from campfire" | |
sys.exit() | |
rooms = data["rooms"] | |
return rooms | |
def get_room_id(self, room_name): | |
rooms = self.get_rooms() | |
id = None | |
for room in rooms: | |
if string.lower(room["name"]) == string.lower(room_name): | |
id = room["id"] | |
break | |
if not id: | |
print "Couldn't find room id" | |
sys.exit() | |
return id | |
def get_username(self, id): | |
if not id: | |
return None | |
if self.users.has_key(id): | |
return self.users[id] | |
response = self._get("/users/%s.json" % id, "", self.headers) | |
if response.status != 200: | |
sys.exit() | |
try: | |
data = json.loads(response.read()) | |
except: | |
print "Couldn't load data from campfire" | |
sys.exit() | |
user = data["user"] | |
name = user["name"] | |
self.users[id] = name | |
return name | |
def get_users(self, room): | |
id = self.get_room_id(room) | |
response = self._get("/room/%s.json" % id, "", self.headers) | |
if response.status != 200: | |
sys.exit() | |
roomjson = response.read() | |
roomdata = json.loads(roomjson) | |
roomdata = roomdata["room"] | |
userdata = roomdata["users"] | |
users = [] | |
for user in userdata: | |
users.append(user["name"]) | |
return users | |
def get_recent_messages(self, room): | |
id = self.get_room_id(room) | |
response = self._get("/room/%s/recent.json" % id, "", self.headers) | |
if response.status != 200: | |
sys.exit() | |
try: | |
data = json.loads(response.read()) | |
except: | |
print "Couldn't load data from campfire" | |
sys.exit() | |
messages = [] | |
messages_json = data["messages"] | |
for msg in messages_json: | |
if msg["type"] not in ("TextMessage", "PasteMessage"): | |
continue | |
user = self.get_username(msg["user_id"]) | |
body = msg["body"] | |
timestamp = msg["created_at"] | |
messages.append((user, body, timestamp)) | |
return messages | |
def send_message(self, room, message): | |
id = self.get_room_id(room) | |
message = self._format_message(message) | |
response = self._post("/room/%s/speak.json" % id, message, self.headers) | |
if response.status / 100 != 2: | |
sys.exit(response.status) | |
def join_room(self, room): | |
id = self.get_room_id(room) | |
response = self._post("/room/%s/join.json" % id, "", self.headers) | |
if response.status / 100 != 2: | |
sys.exit(response.status) | |
def leave_room(self, room): | |
id = self.get_room_id(room) | |
response = self._post("/room/%s/leave.json" % id, "", self.headers) | |
if response.status / 100 != 2: | |
sys.exit(response.status) | |
def get_messages(self, room): | |
self.messages[room] = self.get_recent_messages(room) | |
return self.messages[room] | |
def _format_message(self, message): | |
return json.dumps({"message" : {"type": "TextMessage", "body": message}}) | |
def _get(self, path, params, headers): | |
""" | |
wraps a get request to Campfire | |
""" | |
connection = self._connection() | |
connection.request("GET", path, params, headers) | |
return connection.getresponse() | |
def _post(self, path, params, headers): | |
""" | |
wraps a post request to Campfire | |
""" | |
connection = self._connection() | |
connection.request("POST", path, params, headers) | |
return connection.getresponse() | |
logging.basicConfig(filename='log.log',level=logging.DEBUG) | |
# Get APIKey | |
config_path = "~/.campfriend" | |
try: | |
with open(os.path.expanduser(config_path)) as config_file: | |
config = config_file.read() | |
apikey = config.strip() | |
except: | |
raise | |
print "Sorry, couldn't read your api key" | |
sys.exit() | |
# Get command line args | |
description = "Terminal Campfire client" | |
argparser = argparse.ArgumentParser(description=description) | |
argparser.add_argument("--server", metavar="example.campfirenow.com") | |
argparser.add_argument("--room", metavar="YourRoom") | |
args = argparser.parse_args() | |
if not args.server: | |
print "You must specify a server to connect to" | |
argparser.print_help() | |
sys.exit(1) | |
if not args.room: | |
print "You must specify a room to join" | |
argparser.print_help() | |
sys.exit(1) | |
# Wire everything up | |
campfire = Campfire(apikey, args.server) | |
def main(stdscr): | |
CampfriendView(stdscr, room=args.room, user=campfire.username, | |
send_message_callback=campfire.send_message, | |
get_messages_callback=campfire.get_messages, | |
join_room_callback=campfire.join_room, | |
leave_room_callback=campfire.leave_room, | |
get_users_callback=campfire.get_users) | |
curses.wrapper(main) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment