Skip to content

Instantly share code, notes, and snippets.

@tubaman
Created October 7, 2016 22:34
Show Gist options
  • Save tubaman/e459e54a4b825d542822b8803193f6af to your computer and use it in GitHub Desktop.
Save tubaman/e459e54a4b825d542822b8803193f6af to your computer and use it in GitHub Desktop.
Parser for AT&T Wireless bill(CSV format)
#!/usr/bin/env python
import logging
import sys
import csv
import pint
ureg = pint.UnitRegistry()
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def parse_minutes(header, data_reader):
logger.debug("parse_minutes")
assert header == ['Item', 'Day', 'Date', 'Time', 'Number Called', 'Call To', 'Min', 'Rate Code', 'Rate Pd', 'Feature', 'Airtime Charge', "LD/Add'l Charge", 'Total Charge']
minutes = 0
for row in data_reader:
if row[0] == "Total Call Detail":
break
minutes += int(row[6])
return minutes
def parse_messages(header, data_reader):
logger.debug("parse_messages")
assert header == ['Item', 'Day', 'Date', 'Time', 'To/From', 'Type', 'Msg/KB', 'Rate Code', 'Rate Pd', 'Feature', 'In/Out', 'Total Charge']
messages = 0
# txt messages
for row in data_reader:
if row[0] == "Subtotal for Text Messages:":
logger.debug("end of txt messages")
break
messages += int(row[6])
# multimedia messages
for row in data_reader:
if row[0] == "Subtotal for Picture/Video Messages:":
logger.debug("end of multimedia messages")
break
messages += int(row[6])
return messages
def parse_data_amount(data_amount):
assert data_amount.endswith('KB')
data_amount = data_amount.replace('KB', 'kibibyte')
amount = ureg.Quantity(data_amount)
#amount, unit = data_amount.split(' ')
#assert unit == "KB"
#amount = int(amount.replace(',', ''))
return amount
def parse_data(first_row, data_reader):
logger.debug("parse_data")
data = parse_data_amount(first_row[6])
for row in data_reader:
if row[0] == "Subtotal for Data Plans:":
break
data += parse_data_amount(row[6])
return data
def debug_reader(data_reader):
for row in data_reader:
logger.debug("row: %r", row)
yield row
def parse_user(data_reader):
user = {}
row = data_reader.next()
assert row[0] == "Mobile Number:"
user['mobile_number'] = row[1]
row = data_reader.next() # skip row
row = data_reader.next()
assert row[0] == "User Name::"
user['user_name'] = row[1]
state = 'looking_for_minutes'
logger.debug("state: %r", state)
for row in data_reader:
if state == 'looking_for_minutes' and row[0] == 'Item':
header = row
user['minutes'] = parse_minutes(header, data_reader)
state = 'looking_for_messages'
logger.debug("state: %r", state)
elif state == 'looking_for_messages' and row[0] == 'Item':
header = row
user['messages'] = parse_messages(header, data_reader)
state = 'looking_for_data'
logger.debug("state: %r", state)
elif state == 'looking_for_data' and row[0] == '1':
first_row = row
user['data'] = parse_data(first_row, data_reader)
break
return user
def parse_users(data_reader):
for row in data_reader:
if row == ['AT&T']:
user = parse_user(data_reader)
yield user
def clean_rows(data_reader):
for row in data_reader:
row = [c.strip() for c in row if c.strip()]
if row == []:
continue
yield row
def main(argv=None):
if argv is None:
argv = sys.argv
logging.basicConfig()
try:
csv_path = sys.argv[1]
except IndexError:
data_reader = csv.reader(sys.stdin)
else:
data_reader = csv.reader(open(csv_path, 'rb'))
data_reader = clean_rows(data_reader)
data_reader = debug_reader(data_reader)
users = list(parse_users(data_reader))
logger.debug("users: %r", users)
#for user in users:
# print "name: %s(%s)" % (user['user_name'], user['mobile_number'])
# print "minutes: %s" % user['minutes']
# print "messages: %s" % user['messages']
# print "data: %s" % user['data'].to("GiB")
# print
print "TOTAL"
print "minutes: %s" % sum([u['minutes'] for u in users])
print "messages: %s" % sum([u['messages'] for u in users])
print "data: %s" % sum([u['data'] for u in users]).to("MiB")
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment