Skip to content

Instantly share code, notes, and snippets.

@jinyu121 jinyu121/gammuapi.py
Last active Nov 29, 2018

Embed
What would you like to do?
Huawei Dongle SMS Forward
import configparser
import sqlite3
from os.path import join
from gammu import smsd
class ForwardTargetBase:
def send(self, message):
raise NotImplementedError()
class SMSTarget(ForwardTargetBase):
def __init__(self, phone, device):
self.phone = phone
self.device = device
def send(self, message):
sms_coding, sms_content = message[4], message[8]
sms_to_send = {
'Number': '{}'.format(self.phone),
'SMSC': {'Location': 1},
'Coding': sms_coding,
'Text': sms_content,
}
self.device.InjectSMS([sms_to_send])
class DongleDevice:
def __init__(self, device_config="/etc/gammu-smsdrc"):
self.device = smsd.SMSD(device_config)
self.forward_target = []
config = configparser.ConfigParser(allow_no_value=True)
config.read(device_config)
database_path = join(config['smsd'].get('dbdir', "/home/pi"),
config['smsd'].get('database', "smsd.db"))
self.database = sqlite3.connect(database_path)
def add_forward_target(self, target):
self.forward_target.append(target)
def forward(self):
sms_list = self._sms_list()
for sms in sms_list:
try:
self._do_forward(sms)
self._sms_set_read(sms[9])
except Exception as e:
print("Error: {}".format(e))
def _do_forward(self, message):
for target in self.forward_target:
target.send(message)
def _sms_list(self):
sql = "SELECT * FROM inbox WHERE Processed='false'"
c = self.database.cursor()
result = c.execute(sql).fetchall()
return result
def _sms_set_read(self, sms_id):
sql = "UPDATE inbox SET Processed = 'true' WHERE ID = ?"
c = self.database.cursor()
c.execute(sql, (sms_id,))
self.database.commit()
if "__main__" == __name__:
device = DongleDevice()
device.add_forward_target(SMSTarget("XXXXXXXXXXX", device.device))
device.forward()
# c.execute("UPDATE inbox SET Processed = 'false' ")
# conn.commit()
<?php
$db = new SQLite3('/home/pi/smsd.db') or die('Unable to open database');
$results = $db->query('SELECT * FROM inbox');
echo "<table>";
echo "<th><tr><td>ID</td><td>接收时间</td><td>发送方</td><td>内容</td><td>已处理</td></tr></th>";
while ($row = $results->fetchArray()) {
echo "<tr>";
echo " <td>".$row["ID"]."</td>";
echo " <td>".$row["ReceivingDateTime"]."</td>";
echo " <td>".$row["SenderNumber"]."</td>";
echo " <td>".$row["TextDecoded"]."</td>";
echo " <td>".$row["Processed"]."</td>";
echo "</tr>";
}
echo "</table>";
import requests
import json
import datetime
import logging
from enum import Enum
from collections import OrderedDict
from xmltodict import parse, unparse
logging.basicConfig(
filename='sms_record.log',
level=logging.INFO,
format='[%(asctime)s] %(levelname)s: %(message)s'
)
class ForwardTargetBase:
def send(self, message):
raise NotImplementedError()
class SMSTarget(ForwardTargetBase):
def __init__(self, phone, device):
self.phone = phone
self.device = device
def send(self, message):
response = self.device.sms_send(message['Content'], self.phone)
return response
class DongleDevice:
box_type = Enum("box_type", (
"LOCAL_INBOX", "LOCAL_SENT", "LOCAL_DRAFT", "LOCAL_TRASH",
"SIM_INBOX", "SIM_SENT", "SIM_DRAFT",
"MIX_INBOX", "MIX_SENT", "MIX_DRAFT"))
def __init__(self, ip="192.168.1.1"):
self.base_url = "http://{}/api".format(ip)
self.forward_target = []
def api_request(self, api, param, header=None):
if param is None:
response = requests.get(self.base_url + api)
else:
param = unparse({"request": param}, full_document=False)
response = requests.post(self.base_url + api, data=param.encode('utf-8'),
headers={'Content-type': 'text/plain; charset=utf-8'})
if response.status_code == 200:
data = json.loads(json.dumps(parse(response.content.decode())))
if "response" in data:
data = data['response']
else:
print("Error: \n\t API: {}\n\t Data: {}\n\t Header: {}".format(api, param, header))
data = None
return data
def sms_send(self, message, phone):
api_url = "/sms/send-sms"
param = OrderedDict([
('Index', -1),
('Phones', OrderedDict([('Phone', phone), ])),
('Sca', None),
('Content', message),
('Length', len(message)),
('Reserved', 0),
('Date', "{0:%Y-%m-%d %H:%M:%S}".format(datetime.datetime.now()))
])
response = self.api_request(api_url, param)
return response
def add_forward_target(self, target):
self.forward_target.append(target)
def forward(self):
# sms_count = self._sms_count()
sms_list = self._sms_list(self.__class__.box_type.LOCAL_INBOX)
if sms_list:
logging.info("{} new messages".format(len(sms_list)))
for s in sms_list:
logging.info("==> {}".format(s))
if s['Smstat'] == '0':
self._do_forward(s)
self._sms_set_read(s['Index'])
self._sms_delete(s['Index'])
else:
logging.info("Nothing new")
self._do_clean()
def _do_forward(self, message):
for target in self.forward_target:
target.send(message)
def _do_clean(self):
# sms_list = self._sms_list(self.__class__.box_type.LOCAL_INBOX)
# for s in sms_list:
# self._sms_delete(s['Index'])
sms_list = self._sms_list(self.__class__.box_type.LOCAL_SENT)
for s in sms_list:
self._sms_delete(s['Index'])
def _sms_set_read(self, sms_id):
api_url = "/sms/set-read"
param = OrderedDict([('Index', sms_id)])
response = self.api_request(api_url, param)
return response
def _sms_delete(self, sms_id):
api_url = "/sms/delete-sms"
param = OrderedDict([('Index', sms_id)])
response = self.api_request(api_url, param)
return response
def _sms_list(self, box_id=1):
api_url = "/sms/sms-list"
param = OrderedDict([
('PageIndex', 1),
('ReadCount', 20),
('BoxType', box_id.value),
('SortType', 0),
('Ascending', 0),
('UnreadPreferred', 1)
])
data = self.api_request(api_url, param)
sms_list = data["Messages"]
if sms_list is None:
sms_list = []
else:
sms_list = data["Messages"]["Message"]
if not isinstance(sms_list, list):
sms_list = [sms_list]
return sms_list
def _sms_count(self):
api_url = "/sms/sms-count"
data = self.api_request(api_url, None)
data = data or {
'SimInbox': 0, 'SimOutbox': 0, 'SimUnread': 0, 'SimDraft': 0, 'SimMax': 50,
'LocalInbox': 0, 'LocalOutbox': 0, 'LocalUnread': 0, 'LocalDraft': 0, 'LocalDeleted': 0,
'LocalMax': 500
}
return data
if "__main__" == __name__:
device = DongleDevice()
device.add_forward_target(SMSTarget("XXXXXXXXXXX", device))
device.forward()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.