Skip to content

Instantly share code, notes, and snippets.

@muromec
Last active December 15, 2015 16:59
Show Gist options
  • Save muromec/5293195 to your computer and use it in GitHub Desktop.
Save muromec/5293195 to your computer and use it in GitHub Desktop.
import os
import msgpack
import socket
def cmd(name):
def cmd(f):
f._cmd_name = name
return f
return cmd
class User(object):
def __init__(self, login, name):
self.login = login
self.name = name
class Call(object):
def __init__(self, cid, cdir, cstate, ts, remote):
self.cid = cid
self.dir = cdir
self.state = cstate
self.ts = ts
self.remote = remote
def accept(self):
self._ipc.control_call(self.cid, 0)
def hang(self):
self._ipc.control_call(self.cid, 1)
def established(self):
pass
def __repr__(self):
sdir = "to" if self.dir else "from"
return "<Call %s %s>" %(sdir,self.remote.login,)
class Texr(object):
def __init__(self):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
home = os.getenv("HOME")
sock.connect("%s/.texr.sock" % (home,))
self.commands = self.find_commands()
self.calls = {}
self.sock = sock
def find_commands(self):
keys = self.__dict__.keys() + self.__class__.__dict__.keys()
ret = {}
for name in keys:
f = getattr(self, name, None)
if not f:
continue
if hasattr(f, '_cmd_name'):
ret[f._cmd_name] = f
return ret
def send(self, *args):
buf = msgpack.packb(list(args))
self.sock.send(buf)
def loop(self):
unpacker = msgpack.Unpacker()
while True:
unpacker.feed(self.sock.recv(4096))
for event in unpacker:
if not event or not isinstance(event, list):
continue
name = event[0]
handler = self.get_handler(name)
if handler:
handler(*event[1:])
else:
print 'event unhandled', event
def get_handler(self, name):
return self.commands.get(name)
def set_online(self, state):
assert isinstance(state, int)
self.send("sip.online", state)
def send_login(self, login, password):
assert isinstance(login, str) and login
assert isinstance(password, str) and password
self.send("cert.get", login, password)
def control_call(self, cid, op):
assert isinstance(cid, str) and cid
assert isinstance(op, int)
self.send("sip.call.control", cid, op)
@cmd("cert.ok")
def cert_ok(self, code, name=None):
if code == 0:
print 'logged in ok', name
self.set_online(1)
else:
self.interactive_login()
@cmd("sip.reg")
def reg_state(self, state):
if state == 0:
print 'reg failed, offline now'
elif state == 5:
print 'reg in progress...'
else:
print 'reg %d' %( state,)
@cmd("sip.call.add")
def add_call(self, cid, cdir, cstate, timestamp, name, login):
remote = User(login=login, name=name)
call_ob = Call(cid, cdir, cstate, timestamp, remote)
call_ob._ipc = self
self.calls[cid] = call_ob
print 'add call', call_ob
self.handle_add(call_ob)
@cmd("sip.call.del")
def del_call(self, cid, reason):
call_ob = self.calls.get(cid)
if call_ob is None:
return
del self.calls[cid]
print 'call end', call_ob
@cmd("sip.call.est")
def est_call(self, cid):
call_ob = self.calls.get(cid)
if call_ob is None:
return
call_ob.established()
print 'call established', call_ob
def handle_add(self, call_ob):
if call_ob.remote.login == 'ilya.muromec':
call_ob.accept()
def interactive_login(self):
import getpass
login = raw_input("login: ")
password = getpass.getpass("password: ")
self.send_login(login, password)
def main():
texr = Texr()
texr.send("sip.me", "ilya.muromec")
texr.loop()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment