Instantly share code, notes, and snippets.

Embed
What would you like to do?
Huawei Dongle SMS Forward
import configparser
import sqlite3
from os.path import join
from gammu import smsd
class ForwardTargetBase:
def send(self, message):
raise NotImplementedError()
class SMSTarget(ForwardTargetBase):
def __init__(self, phone, device):
self.phone = phone
self.device = device
def send(self, message):
sms_coding, sms_content = message[4], message[8]
sms_to_send = {
'Number': '{}'.format(self.phone),
'SMSC': {'Location': 1},
'Coding': sms_coding,
'Text': sms_content,
}
self.device.InjectSMS([sms_to_send])
class DongleDevice:
def __init__(self, device_config="/etc/gammu-smsdrc"):
self.device = smsd.SMSD(device_config)
self.forward_target = []
config = configparser.ConfigParser(allow_no_value=True)
config.read(device_config)
database_path = join(config['smsd'].get('dbdir', "/home/pi"),
config['smsd'].get('database', "smsd.db"))
self.database = sqlite3.connect(database_path)
def add_forward_target(self, target):
self.forward_target.append(target)
def forward(self):
sms_list = self._sms_list()
for sms in sms_list:
try:
self._do_forward(sms)
self._sms_set_read(sms[9])
except Exception as e:
print("Error: {}".format(e))
def _do_forward(self, message):
for target in self.forward_target:
target.send(message)
def _sms_list(self):
sql = "SELECT * FROM inbox WHERE Processed='false'"
c = self.database.cursor()
result = c.execute(sql).fetchall()
return result
def _sms_set_read(self, sms_id):
sql = "UPDATE inbox SET Processed = 'true' WHERE ID = ?"
c = self.database.cursor()
c.execute(sql, (sms_id,))
self.database.commit()
if "__main__" == __name__:
device = DongleDevice()
device.add_forward_target(SMSTarget("XXXXXXXXXXX", device.device))
device.forward()
# c.execute("UPDATE inbox SET Processed = 'false' ")
# conn.commit()
<?php
$db = new SQLite3('/home/pi/smsd.db') or die('Unable to open database');
$results = $db->query('SELECT * FROM inbox');
echo "<table>";
echo "<th><tr><td>ID</td><td>接收时间</td><td>发送方</td><td>内容</td><td>已处理</td></tr></th>";
while ($row = $results->fetchArray()) {
echo "<tr>";
echo " <td>".$row["ID"]."</td>";
echo " <td>".$row["ReceivingDateTime"]."</td>";
echo " <td>".$row["SenderNumber"]."</td>";
echo " <td>".$row["TextDecoded"]."</td>";
echo " <td>".$row["Processed"]."</td>";
echo "</tr>";
}
echo "</table>";
import requests
import json
import datetime
import logging
from enum import Enum
from collections import OrderedDict
from xmltodict import parse, unparse
logging.basicConfig(
filename='sms_record.log',
level=logging.INFO,
format='[%(asctime)s] %(levelname)s: %(message)s'
)
class ForwardTargetBase:
def send(self, message):
raise NotImplementedError()
class SMSTarget(ForwardTargetBase):
def __init__(self, phone, device):
self.phone = phone
self.device = device
def send(self, message):
response = self.device.sms_send(message['Content'], self.phone)
return response
class DongleDevice:
box_type = Enum("box_type", (
"LOCAL_INBOX", "LOCAL_SENT", "LOCAL_DRAFT", "LOCAL_TRASH",
"SIM_INBOX", "SIM_SENT", "SIM_DRAFT",
"MIX_INBOX", "MIX_SENT", "MIX_DRAFT"))
def __init__(self, ip="192.168.1.1"):
self.base_url = "http://{}/api".format(ip)
self.forward_target = []
def api_request(self, api, param, header=None):
if param is None:
response = requests.get(self.base_url + api)
else:
param = unparse({"request": param}, full_document=False)
response = requests.post(self.base_url + api, data=param.encode('utf-8'),
headers={'Content-type': 'text/plain; charset=utf-8'})
if response.status_code == 200:
data = json.loads(json.dumps(parse(response.content.decode())))
if "response" in data:
data = data['response']
else:
print("Error: \n\t API: {}\n\t Data: {}\n\t Header: {}".format(api, param, header))
data = None
return data
def sms_send(self, message, phone):
api_url = "/sms/send-sms"
param = OrderedDict([
('Index', -1),
('Phones', OrderedDict([('Phone', phone), ])),
('Sca', None),
('Content', message),
('Length', len(message)),
('Reserved', 0),
('Date', "{0:%Y-%m-%d %H:%M:%S}".format(datetime.datetime.now()))
])
response = self.api_request(api_url, param)
return response
def add_forward_target(self, target):
self.forward_target.append(target)
def forward(self):
# sms_count = self._sms_count()
sms_list = self._sms_list(self.__class__.box_type.LOCAL_INBOX)
if sms_list:
logging.info("{} new messages".format(len(sms_list)))
for s in sms_list:
logging.info("==> {}".format(s))
if s['Smstat'] == '0':
self._do_forward(s)
self._sms_set_read(s['Index'])
self._sms_delete(s['Index'])
else:
logging.info("Nothing new")
self._do_clean()
def _do_forward(self, message):
for target in self.forward_target:
target.send(message)
def _do_clean(self):
# sms_list = self._sms_list(self.__class__.box_type.LOCAL_INBOX)
# for s in sms_list:
# self._sms_delete(s['Index'])
sms_list = self._sms_list(self.__class__.box_type.LOCAL_SENT)
for s in sms_list:
self._sms_delete(s['Index'])
def _sms_set_read(self, sms_id):
api_url = "/sms/set-read"
param = OrderedDict([('Index', sms_id)])
response = self.api_request(api_url, param)
return response
def _sms_delete(self, sms_id):
api_url = "/sms/delete-sms"
param = OrderedDict([('Index', sms_id)])
response = self.api_request(api_url, param)
return response
def _sms_list(self, box_id=1):
api_url = "/sms/sms-list"
param = OrderedDict([
('PageIndex', 1),
('ReadCount', 20),
('BoxType', box_id.value),
('SortType', 0),
('Ascending', 0),
('UnreadPreferred', 1)
])
data = self.api_request(api_url, param)
sms_list = data["Messages"]
if sms_list is None:
sms_list = []
else:
sms_list = data["Messages"]["Message"]
if not isinstance(sms_list, list):
sms_list = [sms_list]
return sms_list
def _sms_count(self):
api_url = "/sms/sms-count"
data = self.api_request(api_url, None)
data = data or {
'SimInbox': 0, 'SimOutbox': 0, 'SimUnread': 0, 'SimDraft': 0, 'SimMax': 50,
'LocalInbox': 0, 'LocalOutbox': 0, 'LocalUnread': 0, 'LocalDraft': 0, 'LocalDeleted': 0,
'LocalMax': 500
}
return data
if "__main__" == __name__:
device = DongleDevice()
device.add_forward_target(SMSTarget("XXXXXXXXXXX", device))
device.forward()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment