Skip to content

Instantly share code, notes, and snippets.

@oldpatricka
Created November 29, 2011 17:53
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save oldpatricka/1405711 to your computer and use it in GitHub Desktop.
Save oldpatricka/1405711 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import os
import sys
import json
import string
import base64
import argparse
import threading
import logging
import textwrap
import signal
import time
import httplib
import curses
import curses.textpad
import curses.ascii
LOGGING = True
class MessagePad(curses.textpad.Textbox):
winched = False
def do_command(self, ch):
"""
do_command is overridden to handle special cases so we can detect
the backspace key and when someone tries to send a message.
"""
if ch == curses.ascii.CR or ch == curses.ascii.LF:
return 0
elif ch == curses.ascii.DEL:
return curses.textpad.Textbox.do_command(self,curses.ascii.BS)
else:
return curses.textpad.Textbox.do_command(self,ch)
def edit(self, validate=None, text=""):
self.winched = False
for char in list(text):
self.win.refresh()
ch = ord(char)
if validate:
ch = validate(ch)
if not ch:
continue
if not self.do_command(ch):
continue
while 1:
self.win.timeout(100)
if self.winched:
return ""
ch = self.win.getch()
if ch == -1:
continue
if validate:
ch = validate(ch)
if not ch:
continue
if not self.do_command(ch):
break
self.win.refresh()
printable = self.gather_printable()
self.win.erase()
return printable
def gather_printable(self):
chars = self.gather()
printable = filter(lambda x: curses.ascii.isprint(x), chars)
printable = printable.strip()
return printable
class CampfriendView():
messages = []
users = []
message_win = None
message = None
chatlog = None
stdscr = None
winched_data = ""
room = ""
def __init__(self, stdscr, room="", user="",
send_message_callback=None,
get_messages_callback=None,
join_room_callback=None,
leave_room_callback=None,
get_users_callback=None):
# Setup callbacks
if send_message_callback:
self.send_message = send_message_callback
else:
self.send_message = self._dummy_callback
if get_messages_callback:
self.get_messages = get_messages_callback
else:
self.get_messages = self._dummy_callback
if join_room_callback:
self.join_room = join_room_callback
else:
self.join_room = self._dummy_callback
if leave_room_callback:
self.leave_room = leave_room_callback
else:
self.leave_room = self._dummy_callback
if get_users_callback:
self.get_users = get_users_callback
else:
self.get_users = self._dummy_callback
# Setup model
self.username = user
if room:
self.room = room
self.join_room(self.room)
# Setup drawing
self.stdscr = stdscr
self.draw_window()
self.draw_content()
signal.signal(signal.SIGWINCH, self._resize_signal_handler)
self.update_messages(self.room)
self.update_users(self.room)
while True:
# Get message from user
try:
old_text = self.winched_data
self.winched_data = ""
typed = self.message.edit(text=old_text)
except KeyboardInterrupt:
self.update_timer.cancel()
if self.room:
self.leave_room(self.room)
sys.exit()
except:
continue
if typed != "" and not self.message.winched:
self.send_message_async(self.room, typed)
self.message_win.erase()
def draw_content(self):
self.stdscr.refresh()
self.draw_chatlog()
self.draw_sidebar()
self.message_win.refresh()
def draw_chatlog(self):
self.chatlog.erase()
self.draw_messages()
self.chatlog.border(0)
self.chatlog.refresh()
def _resize_signal_handler(self, signum, frame):
self.message.winched = True
# TODO optimize this
time.sleep(0.5)
printable = self.message.gather_printable()
if printable != "":
self.winched_data = printable
curses.endwin()
self.draw_window()
self.draw_content()
def draw_window(self):
(screen_height, screen_width) = self.stdscr.getmaxyx()
right_margin_width = 15
chatlog_x = 0
chatlog_y = 0
chatlog_h = screen_height - 5
chatlog_w = screen_width - right_margin_width
self.chatlog = curses.newwin(chatlog_h, chatlog_w,
chatlog_y, chatlog_x)
sidebar_x = chatlog_x + chatlog_w
sidebar_y = chatlog_y
sidebar_h = chatlog_h
sidebar_w = screen_width - chatlog_w
self.sidebar = curses.newwin(sidebar_h, sidebar_w,
sidebar_y, sidebar_x)
left_margin = 1
message_x = chatlog_x + left_margin
message_y = chatlog_h
message_h = screen_height - chatlog_h
message_w = screen_width - left_margin
self.message_win = curses.newwin(message_h, message_w,
message_y, message_x)
self.message = MessagePad(self.message_win)
def draw_sidebar(self):
self.sidebar.addstr(1, 1, string.upper(self.room), curses.A_BOLD)
self.sidebar.addstr(3, 1, "Who's here?", curses.A_BOLD)
userlist_start_y = 4
y = userlist_start_y
for user in self.users:
self.sidebar.addstr(y, 1, self.format_name(user))
y += 1
self.sidebar.refresh()
def draw_messages(self):
log_height, log_width = self.chatlog.getmaxyx()
# adjust for borders
log_height -= 2
log_width -= 2
log_posn = log_height
for msg in reversed(self.messages):
(name, text, _) = msg
name = self.format_name(name)
# calculate positions for name and message text
name_posision = 1
name_width = len(name)
text_position = name_width + 2
text_width = log_width - text_position
# wrap message text to fit between name and edge of field
try:
text = textwrap.fill(text, text_width)
except:
#Couldn't wrap to that size, don't bother
break
text = text.replace("\n", "\n" + " " * text_position)
extra_lines = text.count("\n")
log_posn -= extra_lines
if log_posn <= 0:
break
try:
self.chatlog.addstr(log_posn, name_posision, name, curses.A_BOLD)
self.chatlog.addstr(log_posn, text_position, text)
log_posn -= 1
except:
break
def format_name(self, name):
try:
first, last = name.split()
return "%s %s." % (first, last[0])
except:
return name
def send_message_async(self, room, message):
def _send_message(room, message):
self.send_message(room, message)
self.messages.append((self.username, message, None))
self.draw_chatlog()
send_timer = threading.Timer(0, _send_message, [room, message])
send_timer.start()
def update_messages(self, room):
self.messages = self.get_messages(room)
self.draw_content()
self.update_timer = threading.Timer(2, self.update_messages, [self.room])
self.update_timer.start()
def update_users(self, room):
new_users = self.get_users(room)
if new_users:
self.users = new_users
self.draw_content()
self.update_users_timer = threading.Timer(2, self.update_users, [self.room])
self.update_users_timer.start()
def _dummy_callback(self, *args):
return None
class Campfire():
username = ""
apikey = ""
headers = {}
server = ""
https = False
users = {}
i = 0
messages = {}
def __init__(self, apikey, server):
self.apikey = apikey
self.server = server
self.headers = {"Authorization" : self._auth_header(self.apikey),
"Content-Type": "application/json"}
response = self._get("/users/me.xml", "", self.headers)
if response.status == 301:
self.https = True
response = self._get("/users/me.json", "", self.headers)
if response.status != 200:
print "Problem connecting to campfire. Error: '%s'" %response.reason
sys.exit()
try:
data = json.loads(response.read())
except:
print "Couldn't load data from campfire"
sys.exit()
user = data["user"]
self.username = user["name"]
def _auth_header(self, apikey):
return "Basic " + base64.b64encode("%s:" % apikey)
def _connection(self):
if self.https:
return httplib.HTTPSConnection(self.server)
else:
return httplib.HTTPConnection(self.server)
def get_rooms(self):
response = self._get("/rooms.json", "", self.headers)
if response.status != 200:
sys.exit()
try:
data = json.loads(response.read())
except:
print "Couldn't load data from campfire"
sys.exit()
rooms = data["rooms"]
return rooms
def get_room_id(self, room_name):
rooms = self.get_rooms()
id = None
for room in rooms:
if string.lower(room["name"]) == string.lower(room_name):
id = room["id"]
break
if not id:
print "Couldn't find room id"
sys.exit()
return id
def get_username(self, id):
if not id:
return None
if self.users.has_key(id):
return self.users[id]
response = self._get("/users/%s.json" % id, "", self.headers)
if response.status != 200:
sys.exit()
try:
data = json.loads(response.read())
except:
print "Couldn't load data from campfire"
sys.exit()
user = data["user"]
name = user["name"]
self.users[id] = name
return name
def get_users(self, room):
id = self.get_room_id(room)
response = self._get("/room/%s.json" % id, "", self.headers)
if response.status != 200:
sys.exit()
roomjson = response.read()
roomdata = json.loads(roomjson)
roomdata = roomdata["room"]
userdata = roomdata["users"]
users = []
for user in userdata:
users.append(user["name"])
return users
def get_recent_messages(self, room):
id = self.get_room_id(room)
response = self._get("/room/%s/recent.json" % id, "", self.headers)
if response.status != 200:
sys.exit()
try:
data = json.loads(response.read())
except:
print "Couldn't load data from campfire"
sys.exit()
messages = []
messages_json = data["messages"]
for msg in messages_json:
if msg["type"] not in ("TextMessage", "PasteMessage"):
continue
user = self.get_username(msg["user_id"])
body = msg["body"]
timestamp = msg["created_at"]
messages.append((user, body, timestamp))
return messages
def send_message(self, room, message):
id = self.get_room_id(room)
message = self._format_message(message)
response = self._post("/room/%s/speak.json" % id, message, self.headers)
if response.status / 100 != 2:
sys.exit(response.status)
def join_room(self, room):
id = self.get_room_id(room)
response = self._post("/room/%s/join.json" % id, "", self.headers)
if response.status / 100 != 2:
sys.exit(response.status)
def leave_room(self, room):
id = self.get_room_id(room)
response = self._post("/room/%s/leave.json" % id, "", self.headers)
if response.status / 100 != 2:
sys.exit(response.status)
def get_messages(self, room):
self.messages[room] = self.get_recent_messages(room)
return self.messages[room]
def _format_message(self, message):
return json.dumps({"message" : {"type": "TextMessage", "body": message}})
def _get(self, path, params, headers):
"""
wraps a get request to Campfire
"""
connection = self._connection()
connection.request("GET", path, params, headers)
return connection.getresponse()
def _post(self, path, params, headers):
"""
wraps a post request to Campfire
"""
connection = self._connection()
connection.request("POST", path, params, headers)
return connection.getresponse()
logging.basicConfig(filename='log.log',level=logging.DEBUG)
# Get APIKey
config_path = "~/.campfriend"
try:
with open(os.path.expanduser(config_path)) as config_file:
config = config_file.read()
apikey = config.strip()
except:
raise
print "Sorry, couldn't read your api key"
sys.exit()
# Get command line args
description = "Terminal Campfire client"
argparser = argparse.ArgumentParser(description=description)
argparser.add_argument("--server", metavar="example.campfirenow.com")
argparser.add_argument("--room", metavar="YourRoom")
args = argparser.parse_args()
if not args.server:
print "You must specify a server to connect to"
argparser.print_help()
sys.exit(1)
if not args.room:
print "You must specify a room to join"
argparser.print_help()
sys.exit(1)
# Wire everything up
campfire = Campfire(apikey, args.server)
def main(stdscr):
CampfriendView(stdscr, room=args.room, user=campfire.username,
send_message_callback=campfire.send_message,
get_messages_callback=campfire.get_messages,
join_room_callback=campfire.join_room,
leave_room_callback=campfire.leave_room,
get_users_callback=campfire.get_users)
curses.wrapper(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment