Created
October 7, 2016 22:34
-
-
Save tubaman/e459e54a4b825d542822b8803193f6af to your computer and use it in GitHub Desktop.
Parser for AT&T Wireless bill(CSV format)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import logging | |
import sys | |
import csv | |
import pint | |
ureg = pint.UnitRegistry() | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.INFO) | |
def parse_minutes(header, data_reader): | |
logger.debug("parse_minutes") | |
assert header == ['Item', 'Day', 'Date', 'Time', 'Number Called', 'Call To', 'Min', 'Rate Code', 'Rate Pd', 'Feature', 'Airtime Charge', "LD/Add'l Charge", 'Total Charge'] | |
minutes = 0 | |
for row in data_reader: | |
if row[0] == "Total Call Detail": | |
break | |
minutes += int(row[6]) | |
return minutes | |
def parse_messages(header, data_reader): | |
logger.debug("parse_messages") | |
assert header == ['Item', 'Day', 'Date', 'Time', 'To/From', 'Type', 'Msg/KB', 'Rate Code', 'Rate Pd', 'Feature', 'In/Out', 'Total Charge'] | |
messages = 0 | |
# txt messages | |
for row in data_reader: | |
if row[0] == "Subtotal for Text Messages:": | |
logger.debug("end of txt messages") | |
break | |
messages += int(row[6]) | |
# multimedia messages | |
for row in data_reader: | |
if row[0] == "Subtotal for Picture/Video Messages:": | |
logger.debug("end of multimedia messages") | |
break | |
messages += int(row[6]) | |
return messages | |
def parse_data_amount(data_amount): | |
assert data_amount.endswith('KB') | |
data_amount = data_amount.replace('KB', 'kibibyte') | |
amount = ureg.Quantity(data_amount) | |
#amount, unit = data_amount.split(' ') | |
#assert unit == "KB" | |
#amount = int(amount.replace(',', '')) | |
return amount | |
def parse_data(first_row, data_reader): | |
logger.debug("parse_data") | |
data = parse_data_amount(first_row[6]) | |
for row in data_reader: | |
if row[0] == "Subtotal for Data Plans:": | |
break | |
data += parse_data_amount(row[6]) | |
return data | |
def debug_reader(data_reader): | |
for row in data_reader: | |
logger.debug("row: %r", row) | |
yield row | |
def parse_user(data_reader): | |
user = {} | |
row = data_reader.next() | |
assert row[0] == "Mobile Number:" | |
user['mobile_number'] = row[1] | |
row = data_reader.next() # skip row | |
row = data_reader.next() | |
assert row[0] == "User Name::" | |
user['user_name'] = row[1] | |
state = 'looking_for_minutes' | |
logger.debug("state: %r", state) | |
for row in data_reader: | |
if state == 'looking_for_minutes' and row[0] == 'Item': | |
header = row | |
user['minutes'] = parse_minutes(header, data_reader) | |
state = 'looking_for_messages' | |
logger.debug("state: %r", state) | |
elif state == 'looking_for_messages' and row[0] == 'Item': | |
header = row | |
user['messages'] = parse_messages(header, data_reader) | |
state = 'looking_for_data' | |
logger.debug("state: %r", state) | |
elif state == 'looking_for_data' and row[0] == '1': | |
first_row = row | |
user['data'] = parse_data(first_row, data_reader) | |
break | |
return user | |
def parse_users(data_reader): | |
for row in data_reader: | |
if row == ['AT&T']: | |
user = parse_user(data_reader) | |
yield user | |
def clean_rows(data_reader): | |
for row in data_reader: | |
row = [c.strip() for c in row if c.strip()] | |
if row == []: | |
continue | |
yield row | |
def main(argv=None): | |
if argv is None: | |
argv = sys.argv | |
logging.basicConfig() | |
try: | |
csv_path = sys.argv[1] | |
except IndexError: | |
data_reader = csv.reader(sys.stdin) | |
else: | |
data_reader = csv.reader(open(csv_path, 'rb')) | |
data_reader = clean_rows(data_reader) | |
data_reader = debug_reader(data_reader) | |
users = list(parse_users(data_reader)) | |
logger.debug("users: %r", users) | |
#for user in users: | |
# print "name: %s(%s)" % (user['user_name'], user['mobile_number']) | |
# print "minutes: %s" % user['minutes'] | |
# print "messages: %s" % user['messages'] | |
# print "data: %s" % user['data'].to("GiB") | |
print "TOTAL" | |
print "minutes: %s" % sum([u['minutes'] for u in users]) | |
print "messages: %s" % sum([u['messages'] for u in users]) | |
print "data: %s" % sum([u['data'] for u in users]).to("MiB") | |
if __name__ == '__main__': | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment