-
-
Save aursu/bcba7dde060469a18b865c306096f666 to your computer and use it in GitHub Desktop.
#!/usr/bin/python3 | |
# script created for reading SMS messages from 3G modem connected to PC | |
# 1) it looks for modem | |
# 2) reads all SMS messages from modem | |
# 3) prints all found SMS messages to stdout only if total messages count | |
# greater than SMS_STORE_COUNT (default is 3) | |
# 4) save all but SMS_STORE_COUNT messages to txt files and | |
# delete them from modem | |
import sys | |
import dbus | |
from datetime import datetime | |
import json | |
import os | |
from dbus.mainloop.glib import DBusGMainLoop | |
from gi.repository import GLib | |
SMS_STORE_COUNT = 3 | |
if 'SMS_STORE_COUNT' in os.environ: | |
SMS_STORE_COUNT = int(os.environ['SMS_STORE_COUNT']) | |
# https://developer.gnome.org/ModemManager/unstable/ModemManager-Flags-and-Enumerations.html#MMSmsState | |
class MMSmsState(object): | |
MM_SMS_STATE_UNKNOWN = 0 | |
MM_SMS_STATE_STORED = 1 | |
MM_SMS_STATE_RECEIVING = 2 | |
MM_SMS_STATE_RECEIVED = 3 | |
MM_SMS_STATE_SENDING = 4 | |
MM_SMS_STATE_SENT = 5 | |
# singleton: main app loop | |
class MainLoop(object): | |
instance = None | |
loop = None | |
def __new__(cls, *args, **kwargs): | |
if not cls.instance: | |
cls.instance = super(MainLoop, cls).__new__(cls, *args, **kwargs) | |
return cls.instance | |
def __init__(self): | |
if not self.loop: | |
self.loop = GLib.MainLoop() | |
def run(self): | |
self.loop.run() | |
def quit(self): | |
self.loop.quit() | |
class DBus(object): | |
system_bus = None | |
dbus_proxy = None | |
@staticmethod | |
def type_cast(val): | |
if val is None: | |
return None | |
elif isinstance(val, (dbus.String, dbus.ObjectPath)): | |
return str(val) | |
elif isinstance(val, (dbus.Int32, dbus.UInt32)): | |
return int(val) | |
elif isinstance(val, dbus.Array): | |
return [DBus.type_cast(e) for e in val] | |
return val | |
def __init__(self, *args, **kwargs): | |
super(DBus, self).__init__(*args, **kwargs) | |
self.system_bus = dbus.SystemBus() | |
# https://dbus.freedesktop.org/doc/dbus-python/tutorial.html#proxy-objects | |
def setup_proxy_object(self, bus_name, object_path): | |
self.dbus_proxy = self.system_bus.get_object(bus_name, object_path) | |
def set_proxy_object(self, proxy_object): | |
if isinstance(proxy_object, DBus): | |
self.dbus_proxy = proxy_object.dbus_proxy | |
elif isinstance(proxy_object, (dbus.Interface, dbus.proxies.ProxyObject)): | |
self.dbus_proxy = proxy_object | |
def get_proxy_object(self): | |
return self.dbus_proxy | |
def get_dbus_interface(self, interface): | |
if self.dbus_proxy: | |
return dbus.Interface(self.dbus_proxy, dbus_interface=interface) | |
return None | |
# https://dbus.freedesktop.org/doc/dbus-specification.html#standard-interfaces-objectmanager | |
def get_objmanager_objects(self): | |
if self.dbus_proxy: | |
return self.get_dbus_interface('org.freedesktop.DBus.ObjectManager').GetManagedObjects() | |
return {} | |
class DBusObject(DBus): | |
obj_path = None | |
def __init__(self, bus_name, path, *args, **kwargs): | |
super(DBusObject, self).__init__(*args, **kwargs) | |
self.obj_path = path | |
self.setup_proxy_object(bus_name, path) | |
def get_object_path(self): | |
return self.obj_path | |
class ModemManagerObject(DBusObject): | |
def __init__(self, obj_path, *args, **kwargs): | |
super(ModemManagerObject, self).__init__(bus_name='org.freedesktop.ModemManager1', path=obj_path, *args, **kwargs) | |
@staticmethod | |
def object_path(obj, path = None): | |
objbasepath = "/org/freedesktop/ModemManager1/%s/" % obj | |
objid = None | |
if isinstance(path, str) and objbasepath in path: | |
objid = path.split('/')[-1] | |
else: | |
objid = path | |
objpath = None | |
try: | |
objpath = "%s%d" % (objbasepath, int(objid)) | |
except (ValueError, TypeError): | |
print("Bad object ID provided: %s" % objid, file=sys.stderr) | |
return objpath | |
class DBusInterface(DBus): | |
name = None | |
interface = None | |
properties = None | |
def __init__(self, dbus_interface, *args, **kwargs): | |
super(DBusInterface, self).__init__(*args, **kwargs) | |
self.name = dbus_interface | |
self.interface = self.get_dbus_interface(dbus_interface) | |
self.set_properties() | |
# https://dbus.freedesktop.org/doc/dbus-specification.html#standard-interfaces-properties | |
def get_properties(self): | |
try: | |
if self.dbus_proxy: | |
return self.dbus_proxy.GetAll(self.name, dbus_interface='org.freedesktop.DBus.Properties') | |
except dbus.exceptions.DBusException as e: | |
print("Can not get %s interface properties: %s" % (self.interface, e), file=sys.stderr) | |
return None | |
def set_properties(self): | |
self.properties = self.get_properties() | |
def get_property(self, name): | |
if self.properties and name in self.properties: | |
return DBus.type_cast(self.properties[name]) | |
return None | |
def setup_signal(self, name, handler): | |
self.interface.connect_to_signal(name, handler) | |
class MMModem(DBusInterface, ModemManagerObject): | |
# parameters: | |
# modem could be Integer (aka Modem ID) or String. If String - it could | |
# contain number representing Modem ID or Modem Object Path | |
def __init__(self, modem = None): | |
path = ModemManagerObject.object_path('Modem', modem) | |
super(MMModem, self).__init__(obj_path=path, dbus_interface='org.freedesktop.ModemManager1.Modem') | |
# The equipment manufacturer, as reported by the modem. (eg 'huawei') | |
def Manufacturer(self): | |
return self.get_property('Manufacturer') | |
# The equipment model, as reported by the modem. (eg 'E1550') | |
def Model(self): | |
return self.get_property('Model') | |
# The identity of the device. | |
# This will be the IMEI number for GSM devices and the hex-format ESN/MEID | |
# for CDMA devices. | |
def EquipmentIdentifier(self): | |
return self.get_property('EquipmentIdentifier') | |
# List of numbers (e.g. MSISDN in 3GPP) being currently handled by this modem. | |
# return value: Array(String) | |
def OwnNumbers(self): | |
return self.get_property('OwnNumbers') | |
class MMModemSms(DBusInterface, ModemManagerObject): | |
def __init__(self, sms): | |
path = ModemManagerObject.object_path('SMS', sms) | |
super(MMModemSms, self).__init__(obj_path=path, dbus_interface='org.freedesktop.ModemManager1.Sms') | |
# Number to which the message is addressed. | |
def Number(self): | |
return self.get_property('Number') | |
# Message text, in UTF-8. | |
def Text(self): | |
return self.get_property('Text') | |
def State(self): | |
return self.get_property('State') | |
# Time when the first PDU of the SMS message arrived the SMSC, in ISO8601 | |
# format. This field is only applicable if the PDU type is | |
# MM_SMS_PDU_TYPE_DELIVER or MM_SMS_PDU_TYPE_STATUS_REPORT. | |
def Timestamp(self): | |
stamp = self.get_property('Timestamp') | |
if isinstance(stamp, str) and len(stamp) == 15: | |
# convert into format %y%m%d%H%M%S%z and return | |
return '{:0<17s}'.format(stamp) | |
elif isinstance(stamp, str) and len(stamp) == 25: | |
# convert into format %Y%m%dT%H%M%S%z and return | |
return stamp.translate(str.maketrans({':': '', '-': ''})) | |
return None | |
def get_datetime(self): | |
stamp = self.Timestamp() | |
if stamp: | |
try: | |
return datetime.strptime(stamp, '%Y%m%dT%H%M%S%z') | |
except ValueError: | |
return datetime.strptime(stamp, '%y%m%d%H%M%S%z') | |
return None | |
def get_date(self): | |
dt = self.get_datetime() | |
if dt: | |
return dt.strftime("%c %Z") | |
return None | |
def save(self, fname = None): | |
if fname is None: | |
fname = "%s-%s.txt" % (self.Number(), self.Timestamp()) | |
sms = {} | |
for f in ['Number', 'Text', 'Timestamp']: | |
sms[f] = self.get_property(f) | |
sms['Date'] = self.get_date() | |
with open(fname.lower(), 'w') as fp: | |
json.dump(sms, fp, ensure_ascii=False, indent="\t", sort_keys=True) | |
# signal handlers | |
def message_added(path, received): | |
print("Message received: path=%s; received=%s" % (path, received)) | |
# MainLoop().quit() | |
class MMModemMessaging(DBusInterface): | |
def __init__(self, modem): | |
if not isinstance(modem, MMModem): | |
modem = MMModem(modem) | |
self.set_proxy_object(modem) | |
super(MMModemMessaging, self).__init__(dbus_interface='org.freedesktop.ModemManager1.Modem.Messaging') | |
# The list of SMS object paths. | |
def Messages(self): | |
return self.get_property('Messages') | |
# return SMS object (MMModemSms) | |
# parameters: | |
# sms is SMS object path or object ID; if not provided - return all | |
# SMS objects | |
def get_sms(self, sms = None, reverse=True): | |
if sms is None: | |
# sort by date | |
return sorted( | |
filter( | |
lambda x: x.State() == MMSmsState.MM_SMS_STATE_RECEIVED, | |
list( | |
map( | |
lambda x: MMModemSms(x), | |
self.Messages() | |
) | |
) | |
), | |
key=lambda x: x.get_datetime(), | |
reverse=reverse | |
) | |
smspath = ModemManagerObject.object_path('SMS', sms) | |
if smspath in self.Messages(): | |
return MMModemSms(smspath) | |
return None | |
def delete(self, sms): | |
smspath = None | |
if isinstance(sms, MMModemSms): | |
smspath = sms.get_object_path() | |
else: | |
smspath = ModemManagerObject.object_path('SMS', sms) | |
if smspath in self.Messages(): | |
self.interface.Delete(dbus.ObjectPath(smspath)) | |
def signal_added(self, handler = message_added): | |
self.setup_signal('Added', handler) | |
class ModemManager(ModemManagerObject): | |
modems = None | |
def __init__(self): | |
# https://www.freedesktop.org/software/ModemManager/api/latest/ref-dbus-bus-name.html | |
# https://www.freedesktop.org/software/ModemManager/api/latest/ref-dbus-object-manager.html | |
super(ModemManager, self).__init__(obj_path='/org/freedesktop/ModemManager1') | |
self.modems = self.get_modems_list() | |
# return list of modem object paths | |
def get_modems_list(self): | |
modems = [] | |
# https://www.freedesktop.org/software/ModemManager/api/latest/ref-dbus-standard-interfaces-objectmanager.html | |
for p in self.get_objmanager_objects(): | |
if isinstance(p, dbus.ObjectPath): | |
modems += [str(p)] | |
return modems | |
# return MMModem object for specified modem (if exists) | |
def get_modem(self, modem): | |
mpath = ModemManagerObject.object_path('Modem', modem) | |
if mpath in self.modems: | |
return MMModem(mpath) | |
return None | |
# get MMModem object for first modem from list of installed in the system | |
def get_first(self): | |
if self.modems: | |
return self.get_modem(self.modems[0]) | |
return None | |
# filter existing modems by some value matching to Modem property | |
# if value is not provided - return first modem from the list | |
# if filtered only one modem - return its MMModem object | |
# otherwise - return list of MMModem objects for filtered modems | |
def get_modem_by(self, name = 'OwnNumbers', value = None): | |
if value is None: | |
return self.get_first() | |
elif name in ['Manufacturer', 'Model', 'EquipmentIdentifier', | |
'OwnNumbers', 'PrimaryPort', 'State']: | |
modems = list( | |
filter(lambda x: value in x.get_property(name), | |
map(lambda x: MMModem(x), self.modems))) | |
if len(modems) == 1: | |
return modems[0] | |
return modems | |
return None | |
def main(): | |
# DBusGMainLoop(set_as_default=True) | |
mm = ModemManager() | |
m = mm.get_first() | |
if m: | |
ms = MMModemMessaging(m) | |
# ms.signal_added() | |
cnt = 0 | |
messages = ms.get_sms() | |
for m in messages: | |
print("%s (%s): %s" % (m.Number(), m.get_date(), m.Text())) | |
cnt += 1 | |
if cnt > SMS_STORE_COUNT: | |
m.save() | |
ms.delete(m) | |
# MainLoop().run() | |
main() |
Hi,
I am trying to use this code for continious SMS read, but it doesnt 'work. get_sms() can only read messages on start. I uncommented the MainLoop code here and added my handler for signal_added, i receive signal for incomming sms , but if try to read sms from handler using ms.get_sms() i receive empty result,
any idea as why this happens? maybe MainLoop().run() should be replaced?Regards,
Actually MainLoop functionality has been never completed and even tested :) it worked on my Raspberry as a part of cron job. I’ve lost my device during one of the trips and it was final point of this development. Sorry that can not help
Hi,
I am trying to use this code for continious SMS read, but it doesnt 'work. get_sms() can only read messages on start. I uncommented the MainLoop code here and added my handler for signal_added, i receive signal for incomming sms , but if try to read sms from handler using ms.get_sms() i receive empty result,
any idea as why this happens? maybe MainLoop().run() should be replaced?
Regards,Actually MainLoop functionality has been never completed and even tested :) it worked on my Raspberry as a part of cron job. I’ve lost my device during one of the trips and it was final point of this development. Sorry that can not help
Thanks anyway!!!
Hi,
I am trying to use this code for continious SMS read, but it doesnt 'work. get_sms() can only read messages on start. I uncommented the MainLoop code here and added my handler for signal_added, i receive signal for incomming sms , but if try to read sms from handler using ms.get_sms() i receive empty result,
any idea as why this happens? maybe MainLoop().run() should be replaced?
Regards,