Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Telepathy/desktopcouch logger POC
#!/usr/bin/env python
"""Telepathy CouchDB Logger"""
from dbus.mainloop.glib import DBusGMainLoop
import gobject
import dbus
#for observer thing
import telepathy
import telepathy.server
# for datetime format
from datetime import datetime
from telepathy.interfaces import CLIENT, \
CLIENT_OBSERVER, \
CHANNEL_INTERFACE, \
CHANNEL_INTERFACE_MESSAGES, \
CONNECTION, \
CONNECTION_INTERFACE_CONTACTS, \
CONNECTION_INTERFACE_ALIASING
from telepathy.server.channel import CHANNEL_TYPE_TEXT
from desktopcouch.records.server import CouchDatabase
from desktopcouch.records.record import Record
from telepathy.server import DBusProperties
from telepathy._generated.Client_Observer import ClientObserver
RECORD_TYPE = 'http://www.rtg.in.ua/empathy-im-couchdb'
CHANNEL_INTERFACE_CHANNEL_TYPE = CHANNEL_INTERFACE + '.ChannelType'
CHANNEL_INTERFACE_TARGET_HANDLE = CHANNEL_INTERFACE + '.TargetHandle'
class InstantMessage():
"""Empty class for every instant message"""
def __init__(self):
self.sender = None
self.recepient = None
self.time = None
self.message = None
self.protocol = None
class InstantMessageStorage():
"""Simple interface for message storage, CouchDB"""
def __init__(self):
self.couchdb = CouchDatabase("im", create=True)
def put(self, message):
"""Adds the message to CouchDB database"""
message_time = datetime.fromtimestamp(message.time).isoformat()
record = Record({ "from" : message.sender, \
"to" : message.recepient, \
"time" : message_time, \
"message" : message.message, \
"protocol": message.protocol, \
},
record_type = RECORD_TYPE)
record_id = self.couchdb.put_record(record)
return record_id
print "CouchdbLogger()"
class CouchdbLogger(ClientObserver, DBusProperties):
"""Couchdb Logger, implementation is not tied to CouchDB
so it is probably a bad name"""
def __init__(self, *args):
print "Calling parent ClientObserver methods"
ClientObserver.__init__(self, *args)
self.channel_cache = {}
# FIXME: is it ok to get session bus again?
# some parent class should define that as well
self.bus = dbus.SessionBus()
self._interfaces = set()
self._interfaces.add(CLIENT)
self._interfaces.add(CLIENT_OBSERVER)
telepathy.server.DBusProperties.__init__(self)
self._implement_property_get(CLIENT, {
'Interfaces': lambda: [ CLIENT_OBSERVER ],
})
self._implement_property_get(CLIENT_OBSERVER, {
'ObserverChannelFilter': lambda: dbus.Array([
dbus.Dictionary({
CHANNEL_INTERFACE_CHANNEL_TYPE: CHANNEL_TYPE_TEXT
}, signature = 'sv')
], signature='a{sv}')
})
self.storage = InstantMessageStorage()
# 1. set up the signal handlers
self.bus.add_signal_receiver(
handler_function = self.message_sent_handler, \
dbus_interface = CHANNEL_INTERFACE_MESSAGES, \
signal_name = 'MessageSent', \
path_keyword = 'path'
)
self.bus.add_signal_receiver(
handler_function = self.message_received_handler, \
dbus_interface = CHANNEL_INTERFACE_MESSAGES, \
signal_name = 'MessageReceived', \
path_keyword = 'path'
)
# 2. let it run
def message_sent_handler(self, content=None, flags=0, \
message_token='', path=None):
"""Called for every MessageSent signal received"""
header = content[0]
parts = content[1:]
if not path in self.channel_cache:
# TODO: request channel details since we
# may join the conversation later than it
# was started
print "MessageSent: path {0} is not in channel cache".format(path)
return
channel_info = self.channel_cache[path]
sender_info = channel_info['self-handle']
recepient_info = channel_info['target-handle']
sender_id = sender_info[CONNECTION + '/contact-id']
recepient_id = recepient_info[CONNECTION + '/contact-id']
content = ''
for part in parts:
if not "content-type" in part:
continue
if part["content-type"] == "text/plain":
content += part["content"]
im = InstantMessage()
im.sender = sender_id
im.recepient = recepient_id
im.time = header['message-sent']
im.protocol = channel_info['protocol']
im.message = content
record_id = self.storage.put(im)
print "{0} -> {1}\n\tRecord Id: {2}".format(sender_id, \
recepient_id, \
record_id)
def message_received_handler(self, message=None, path=None):
"""Called for every MessageReceived signal received"""
header = message[0]
parts = message[1:]
if "delivery-error" in header and header["delivery-error"] == 1:
print "Delivery errors are not logged" # FIXME: or should they?
return
# FIXME: Duplicating code with sent handler
if not path in self.channel_cache:
print 'MessageReceived: path {0} is ' \
'not in channel_cache'.format(path)
return
channel_info = self.channel_cache[path]
sender_info = channel_info['target-handle']
recepient_info = channel_info['self-handle']
sender_id = sender_info[CONNECTION + '/contact-id']
recepient_id = recepient_info[CONNECTION + '/contact-id']
content = ''
for part in parts:
if not "content-type" in part:
continue
if part["content-type"] == "text/plain":
content += part["content"]
im = InstantMessage()
im.sender = sender_id
im.recepient = recepient_id
im.time = header['message-received']
im.protocol = channel_info['protocol']
im.message = content
record_id = self.storage.put(im)
print "{0} -> {1}\n\tRecord Id: {2}".format(sender_id, \
recepient_id, \
record_id)
def ObserveChannels(self, account, connection, channels, \
dispatch_operation, requests_satisfied, \
observer_info):
"""DBus method called by Telepathy"""
print "Incoming channels on {0}".format(connection)
for object_path, props in channels:
self.register_channel(account, connection, object_path, props)
def register_channel(self, account, connection, channel, props):
"""Registers the channel in internal cache, handles pending messages"""
print "Registering channel: {0}".format(channel)
self.channel_cache[channel] = {}
channel_info = self.channel_cache[channel]
# Clients MAY parse the object path to determine
# the connection manager name and the protocol,
# but MUST NOT attempt to parse the account part.
# Connection managers MAY use any unique string for this part.
#/org/freedesktop/Telepathy/Connection/gabble/jabber/a@example.com
# Connection Manager ------------------^ ^--- Protocol
#0 1 2 3 4 5 6 7
channel_info['protocol'] = connection.split('/')[6]
connection_bus_name = connection[1:].replace("/", ".")
print "Connection Bus Name: {0}".format(connection_bus_name)
# We need to find out who is who in this channel
# the local party handle is SelfHandle,
# the remote party handle arrived in TargetHandle propery
account_proxy = self.bus.get_object(connection_bus_name, \
connection)
self_handle = account_proxy.Get(CONNECTION, \
'SelfHandle')
target_handle = props[CHANNEL_INTERFACE_TARGET_HANDLE]
# FIXME: we might not need to call GetContactAttributes
# immediately since some of the relevant bits are already
# available in props
connection_iface = dbus.Interface( \
account_proxy, \
dbus_interface = CONNECTION_INTERFACE_CONTACTS)
handle_info = connection_iface.GetContactAttributes(
# Handles
[ self_handle, target_handle ], \
# Interfaces
[ CONNECTION_INTERFACE_ALIASING ], \
# Hold
# If true, all handles in the result
# have been held on behalf of the
# calling process, as if by a call
# to Connection.HoldHandles.
True )
# iface=org.freedesktop.Telepathy.Connection.Interface.Contacts
# path=/org/freedesktop/Telepathy/Connection/gabble/\
# jabber/
# roman_2eyepishev_40gmail_2ecom_2fTelepathy_2e1ac6c8e0
# (connection)
# not intercepting KeyError for now
# do we really need to carry the int handle around?
channel_info['self-handle'] = handle_info[self_handle]
channel_info['target-handle'] = handle_info[target_handle]
# Getting channel proxy
channel_proxy = self.bus.get_object(connection_bus_name, channel)
pending_messages = channel_proxy.Get(CHANNEL_INTERFACE_MESSAGES, \
'PendingMessages')
# Now we have something to do with these pending messages
for message in pending_messages:
print "Processing pending message {0}".format(message)
self.message_received_handler(message, path=channel)
def publish():
"""Registers the bus name and creates the logger"""
print "Connecting to session bus..."
bus = dbus.SessionBus()
bus_name = '.'.join([CLIENT, 'CouchdbLogger'])
object_path = '/' + bus_name.replace('.', '/')
print "Registering bus name {0}".format(bus_name)
bus_name = dbus.service.BusName(bus_name, bus=bus)
print "Starting CouchdbLogger()"
CouchdbLogger(bus_name, object_path)
def main():
"""Application entry point"""
print "Initializing Telepathy CouchDB logger..."
DBusGMainLoop(set_as_default=True)
gobject.timeout_add(0, publish)
loop = gobject.MainLoop()
loop.run()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment