Created
November 6, 2012 10:36
-
-
Save marianoguerra/4023941 to your computer and use it in GitHub Desktop.
otr support for sleekxmpp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pyotr | |
from sleekxmpp.xmlstream.stanzabase import JID | |
import logging | |
log = logging.getLogger(__name__) | |
from plugin import BasePlugin | |
import tabs | |
from tabs import ConversationTab | |
class Plugin(BasePlugin): | |
def init(self): | |
self.contacts = {} | |
# a dict of {full-JID: OTR object} | |
self.add_event_handler('conversation_say_after', self.on_conversation_say) | |
self.add_event_handler('conversation_msg', self.on_conversation_msg) | |
self.add_tab_command(ConversationTab, 'otr', self.command_otr, "Usage: /otr <start|end>\notr: Start or stop OTR for the current conversation", self.otr_completion) | |
ConversationTab.add_information_element('otr', self.display_encryption_status) | |
def cleanup(self): | |
ConversationTab.remove_information_element('otr') | |
self.del_tab_command(ConversationTab, 'otr') | |
def otr_special(self, tab, typ): | |
def helper(msg): | |
tab.add_message('%s: %s' % (typ, msg.decode())) | |
return helper | |
def otr_on_state_change(self, tab): | |
def helper(old, new): | |
old = self.otr_state(old) | |
new = self.otr_state(new) | |
tab.add_message('OTR state has changed from %s to %s' % (old, new)) | |
return helper | |
def get_otr(self, tab): | |
if tab not in self.contacts: | |
self.contacts[tab] = pyotr.OTR(on_error=self.otr_special(tab, 'Error'), on_warn=self.otr_special(tab, 'Warn'), on_state_change=self.otr_on_state_change(tab)) | |
return self.contacts[tab] | |
def on_conversation_say(self, message, tab): | |
""" | |
Feed the message through the OTR filter | |
""" | |
to = message['to'] | |
if not message['body']: | |
# there’s nothing to encrypt if this is a chatstate, for example | |
return | |
otr_state = self.get_otr(tab) | |
# Not sure what to do with xhtml bodies, and I don't like them anyway ;) | |
del message['xhtml_im'] | |
say = otr_state.transform_msg(message['body'].encode()) | |
if say is not None: | |
message['body'] = say.decode() | |
else: | |
del message['body'] | |
def on_conversation_msg(self, message, tab): | |
""" | |
Feed the message through the OTR filter | |
""" | |
fro = message['from'] | |
if not message['body']: | |
# there’s nothing to decrypt if this is a chatstate, for example | |
return | |
otr_state = self.get_otr(tab) | |
# Not sure what to do with xhtml bodies, and I don't like them anyway ;) | |
del message['xhtml_im'] | |
display, reply = otr_state.handle_msg(message['body'].encode()) | |
#self.core.information('D: {!r}, R: {!r}'.format(display, reply)) | |
if display is not None: | |
message['body'] = display.decode() | |
else: | |
del message['body'] | |
if reply is not None: | |
self.otr_say(tab, reply.decode()) | |
@staticmethod | |
def otr_state(state): | |
if state == pyotr.MSG_STATE_PLAINTEXT: | |
return 'plaintext' | |
elif state == pyotr.MSG_STATE_ENCRYPTED: | |
return 'encrypted' | |
elif state == pyotr.MSG_STATE_FINISHED: | |
return 'finished' | |
def display_encryption_status(self, jid): | |
""" | |
Returns the status of encryption for the associated jid. This is to be used | |
in the ConversationTab’s InfoWin. | |
""" | |
tab = self.core.get_tab_by_name(jid, tabs.ConversationTab) | |
if tab not in self.contacts: | |
return '' | |
state = self.otr_state(self.contacts[tab].state) | |
return ' OTR: %s' % (state,) | |
def otr_say(self, tab, line): | |
msg = self.core.xmpp.make_message(tab.get_name()) | |
msg['type'] = 'chat' | |
msg['body'] = line | |
msg.send() | |
def command_otr(self, args): | |
""" | |
A command to start or end OTR encryption | |
""" | |
args = args.split() | |
if not args: | |
return self.core.command_help("otr") | |
if isinstance(self.core.current_tab(), ConversationTab): | |
jid = JID(self.core.current_tab().get_name()) | |
command = args[0] | |
if command == 'start': | |
otr_state = self.get_otr(self.core.current_tab()) | |
self.otr_say(self.core.current_tab(), otr_state.start().decode()) | |
elif command == 'end': | |
otr_state = self.get_otr(self.core.current_tab()) | |
msg = otr_state.end() | |
if msg is not None: | |
self.otr_say(self.core.current_tab(), msg.decode()) | |
elif command == 'fpr': | |
otr_state = self.get_otr(self.core.current_tab()) | |
our = otr_state.our_fpr | |
if our: | |
our = hex(int.from_bytes(our, 'big'))[2:].ljust(40).upper() | |
their = otr_state.their_fpr | |
if their: | |
their = hex(int.from_bytes(their, 'big'))[2:].ljust(40).upper() | |
self.core.current_tab().add_message('Your: %s Their: %s' % (our, their)) | |
self.core.refresh_window() | |
def otr_completion(self, the_input): | |
return the_input.auto_completion(['start', 'end'], ' ') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi,
I need your help, im new in python and i don't know how i can integrate in my script.
Please tell me where and how do i integrate the plugin.
Thank you
`#!/usr/bin/env python
-- coding: utf-8 --
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz
This file is part of SleekXMPP.
"""
import sys
import logging
import getpass
from optparse import OptionParser
import sleekxmpp
import ssl
Python versions before 3.0 do not use UTF-8 encoding
by default. To ensure that Unicode is handled properly
throughout SleekXMPP, we will set the default encoding
ourselves to UTF-8.
if sys.version_info < (3, 0):
reload(sys)
sys.setdefaultencoding('utf-8')
else:
raw_input = input
class SendMsgBot(sleekxmpp.ClientXMPP):
if name == 'main':
# Setup the command line arguments.
optp = OptionParser()