Created
June 16, 2017 07:59
-
-
Save sorz/b6a5f433a30b7585f7774b3260692cb8 to your computer and use it in GitHub Desktop.
Decode SMS from CDMA PDU
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
import binascii | |
import logging | |
import io | |
def _parse_fields(raw): | |
"""Return a iterators of (id, value).""" | |
raw = io.BytesIO(raw) | |
while True: | |
header = raw.read(2) | |
if len(header) < 2: | |
return | |
f_id, f_len = header | |
f_val = raw.read(f_len) | |
yield f_id, f_val | |
class BitsBuffer: | |
def __init__(self, data): | |
self._data = io.BytesIO(data) | |
self._byte = 0 | |
self._left_bits = 0 | |
def read(self, bits): | |
val = 0 | |
while bits > 0: | |
n = min(self._left_bits, bits) | |
if n == 0: | |
byte = self._data.read(1) | |
if not byte: | |
return | |
self._byte = byte[0] | |
self._left_bits = 8 | |
else: | |
val <<= n | |
new = self._byte >> (self._left_bits - n) | |
new &= 2 ** n - 1 | |
val |= new | |
self._left_bits -= n | |
bits -= n | |
return val | |
class PDUDecoder: | |
def __init__(self, pdu): | |
self.pdu = binascii.unhexlify(pdu) | |
self.caller = None | |
self.ref = None | |
self.total = 1 | |
self.no = 1 | |
self.sent_at = None | |
self.text = '' | |
self._decode() | |
def _decode(self): | |
for f_id, val in _parse_fields(self.pdu[1:]): | |
if f_id == 0x02: # Caller ID | |
self._decode_caller_id(val) | |
elif f_id == 0x08: # Bearer data | |
self._decode_bearer(val) | |
def _decode_caller_id(self, data): | |
bits = BitsBuffer(data) | |
mode = bits.read(2) | |
if mode != 0: | |
logging.warning('unknown number encoding mode') | |
return | |
num_len = bits.read(8) | |
num = '' | |
for i in range(num_len): | |
num += 'D1234567890*#ABC'[bits.read(4)] | |
self.caller = num | |
def _decode_bearer(self, data): | |
user_header = False | |
for f_id, val in _parse_fields(data): | |
if f_id == 0x00: # Message ID | |
user_header = (val[2] & 0x08) > 0 | |
elif f_id == 0x01: # User data | |
self._decode_user_data(val, user_header) | |
elif f_id == 0x03: # Datetime | |
self._decode_datetime(val) | |
def _decode_datetime(self, val): | |
bits = BitsBuffer(val) | |
def next_num(): | |
return bits.read(4) * 10 + bits.read(4) | |
self.sent_at = datetime( | |
year=next_num() + 2000, | |
month=next_num(), | |
day=next_num(), | |
hour=next_num(), | |
minute=next_num(), | |
second=next_num()) | |
def _decode_user_data(self, raw, has_header): | |
bits = BitsBuffer(raw) | |
encoding = bits.read(5) | |
bits.read(8) # length | |
if has_header: | |
self._decode_user_data_header(bits) | |
self.text = self._decode_str(bits, encoding) | |
def _decode_user_data_header(self, bits): | |
length = bits.read(8) | |
if length != 5: | |
logging.warning('unknown user header (len != 5), ignored') | |
bits.read(length * 8) | |
return | |
if bits.read(8) != 0x00: | |
logging.warning('unknown user header (id != 0), ignored') | |
bits.read((length - 1) * 8) | |
return | |
length = bits.read(8) | |
self.ref = bits.read(8) | |
self.total = bits.read(8) | |
self.no = bits.read(8) | |
def _decode_str(self, bits, encoding): | |
def read_all(num_bits): | |
s = bytearray() | |
while True: | |
byte = bits.read(num_bits) | |
if byte is None: | |
return s | |
s.append(byte) | |
if encoding == 2: # 7-bit ASCII | |
return read_all(7).decode('ascii', 'replace') | |
elif encoding == 4: # Unicode | |
return read_all(8).decode('utf-16be', 'replace') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment