Skip to content

Instantly share code, notes, and snippets.

@sorz
Created June 16, 2017 07:59
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save sorz/b6a5f433a30b7585f7774b3260692cb8 to your computer and use it in GitHub Desktop.
Save sorz/b6a5f433a30b7585f7774b3260692cb8 to your computer and use it in GitHub Desktop.
Decode SMS from CDMA PDU
from datetime import datetime
import binascii
import logging
import io
def _parse_fields(raw):
"""Return a iterators of (id, value)."""
raw = io.BytesIO(raw)
while True:
header = raw.read(2)
if len(header) < 2:
return
f_id, f_len = header
f_val = raw.read(f_len)
yield f_id, f_val
class BitsBuffer:
def __init__(self, data):
self._data = io.BytesIO(data)
self._byte = 0
self._left_bits = 0
def read(self, bits):
val = 0
while bits > 0:
n = min(self._left_bits, bits)
if n == 0:
byte = self._data.read(1)
if not byte:
return
self._byte = byte[0]
self._left_bits = 8
else:
val <<= n
new = self._byte >> (self._left_bits - n)
new &= 2 ** n - 1
val |= new
self._left_bits -= n
bits -= n
return val
class PDUDecoder:
def __init__(self, pdu):
self.pdu = binascii.unhexlify(pdu)
self.caller = None
self.ref = None
self.total = 1
self.no = 1
self.sent_at = None
self.text = ''
self._decode()
def _decode(self):
for f_id, val in _parse_fields(self.pdu[1:]):
if f_id == 0x02: # Caller ID
self._decode_caller_id(val)
elif f_id == 0x08: # Bearer data
self._decode_bearer(val)
def _decode_caller_id(self, data):
bits = BitsBuffer(data)
mode = bits.read(2)
if mode != 0:
logging.warning('unknown number encoding mode')
return
num_len = bits.read(8)
num = ''
for i in range(num_len):
num += 'D1234567890*#ABC'[bits.read(4)]
self.caller = num
def _decode_bearer(self, data):
user_header = False
for f_id, val in _parse_fields(data):
if f_id == 0x00: # Message ID
user_header = (val[2] & 0x08) > 0
elif f_id == 0x01: # User data
self._decode_user_data(val, user_header)
elif f_id == 0x03: # Datetime
self._decode_datetime(val)
def _decode_datetime(self, val):
bits = BitsBuffer(val)
def next_num():
return bits.read(4) * 10 + bits.read(4)
self.sent_at = datetime(
year=next_num() + 2000,
month=next_num(),
day=next_num(),
hour=next_num(),
minute=next_num(),
second=next_num())
def _decode_user_data(self, raw, has_header):
bits = BitsBuffer(raw)
encoding = bits.read(5)
bits.read(8) # length
if has_header:
self._decode_user_data_header(bits)
self.text = self._decode_str(bits, encoding)
def _decode_user_data_header(self, bits):
length = bits.read(8)
if length != 5:
logging.warning('unknown user header (len != 5), ignored')
bits.read(length * 8)
return
if bits.read(8) != 0x00:
logging.warning('unknown user header (id != 0), ignored')
bits.read((length - 1) * 8)
return
length = bits.read(8)
self.ref = bits.read(8)
self.total = bits.read(8)
self.no = bits.read(8)
def _decode_str(self, bits, encoding):
def read_all(num_bits):
s = bytearray()
while True:
byte = bits.read(num_bits)
if byte is None:
return s
s.append(byte)
if encoding == 2: # 7-bit ASCII
return read_all(7).decode('ascii', 'replace')
elif encoding == 4: # Unicode
return read_all(8).decode('utf-16be', 'replace')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment