Last active
June 5, 2017 15:31
-
-
Save informationsea/6789c63678f063246ee7d5697897e030 to your computer and use it in GitHub Desktop.
Check Credit Card log
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import argparse | |
import csv | |
import sqlite3 | |
import sys | |
import datetime | |
import pytz | |
import collections | |
import re | |
def _main(): | |
parser = argparse.ArgumentParser(description="Search Credit Info") | |
parser.add_argument('gnucash') | |
parser.add_argument('credit', type=argparse.FileType('r', encoding='shift-jis')) | |
options = parser.parse_args() | |
conn = sqlite3.connect(options.gnucash) | |
credit = load_credit_csv(options.credit) | |
conn.execute('DROP TABLE IF EXISTS found_payment;') | |
check_lock(conn) | |
accounts = list_credit_accounts(conn) | |
for i, one in enumerate(accounts): | |
print(i+1, one['name']) | |
print('Select >', end=' ') | |
selected = input() | |
if not selected.isdigit(): | |
print('Canceled by user', file=sys.stderr) | |
sys.exit(1) | |
selected_account = accounts[int(selected)-1]['guid'] | |
credit_group = collections.defaultdict(list) | |
for one in credit: | |
credit_group[(one[0], one[1], one[3])].append(one[2]) | |
now = datetime.datetime.now(pytz.timezone('Asia/Tokyo'))\ | |
.astimezone(pytz.utc).strftime('%Y%m%d%H%M%S') | |
cur = conn.cursor() | |
cur.execute('DROP VIEW IF EXISTS payments') | |
cur.execute('CREATE VIEW payments AS SELECT splits.guid, transactions.post_date, transactions.description, splits.memo, splits.account_guid, splits.reconcile_state, splits.reconcile_date, splits.value_num FROM transactions, splits WHERE transactions.guid = splits.tx_guid') | |
bad_list = [] | |
for k, v in credit_group.items(): | |
#print('>', k[0], k[1], ', '.join(v)) | |
cur.execute('SELECT guid, description, post_date, -value_num, reconcile_state FROM payments WHERE account_guid = ? AND post_date >= ? AND post_date <= ? AND value_num = ?', (selected_account, k[0], k[1], -int(k[2]))) | |
found_guid = [] | |
for one in cur: | |
#print(one[0], one[1], one[2], one[3], one[4]) | |
found_guid.append(one) | |
if len(found_guid) == len(v): | |
#print 'OK!' | |
for one in found_guid: | |
conn.execute('UPDATE splits SET reconcile_state = "y", reconcile_date = ? WHERE guid = ?', [now, one[0]]) | |
else: | |
bad_list.append((k, v, len(found_guid))) | |
bad_list.sort(key = lambda x: x[0][0]) | |
for one in bad_list: | |
t = datetime.datetime.strptime(one[0][0], '%Y%m%d%H%M%S').replace(tzinfo = pytz.utc).astimezone(pytz.timezone('Asia/Tokyo')).date() | |
print(t, one[0][2], ','.join(one[1]), len(one[1]), '/', one[2], t.strftime("%Y/%m/%d %H:%M:%S"), one[0][0]) | |
conn.commit() | |
def check_lock(conn): | |
lockcount = conn.execute('SELECT count(*) FROM gnclock;').fetchone()[0] | |
if lockcount > 0: | |
print("Please close GnuCash before continuing.", file=sys.stderr) | |
sys.exit(1) | |
def list_credit_accounts(conn): | |
cur = conn.cursor() | |
cur.execute('SELECT accounts.guid, accounts.name, count(*) as "splitcount" FROM accounts INNER JOIN splits ON accounts.guid = splits.account_guid WHERE account_type = "CREDIT" GROUP BY accounts.guid, accounts.name HAVING splitcount > 0 ORDER BY splitcount DESC') | |
accounts = [] | |
for one in cur: | |
accounts.append({'guid': one[0], 'name': one[1]}) | |
return accounts | |
def load_credit_csv(credit): | |
reader = csv.reader(credit) | |
mode = None | |
values = [] | |
# load header | |
for rawrow in reader: | |
row = [x for x in rawrow] | |
if len(row) <= 1: | |
continue | |
if row[1].startswith('5334-') or row[1].startswith('6900-') or row[1].startswith('4980-'): | |
mode = 'smbc' | |
break | |
if not row[0] and not row[1]: | |
continue | |
if row[0] == 'ご利用者' and row[2] == 'ご利用日' and row[4] == 'ご利用金額(¥)': | |
mode = 'jcb' | |
break | |
if re.match(r'\d\d\d\d/\d\d?/\d\d?', row[0]): | |
mode = 'smbc2' | |
values.append((row[0], row[1], row[6])) | |
break | |
if not mode: | |
print('Cannot suggest credit CSV type', file=sys.stderr) | |
sys.exit(1) | |
print('CSV:', mode) | |
for rawrow in reader: | |
row = [x for x in rawrow] | |
if mode == 'smbc': | |
if len(row) > 3 and row[3] and re.match(r'\d\d\d\d/\d\d/\d\d', row[0]): | |
values.append((row[0], row[1], row[2])) | |
elif mode == 'jcb': | |
values.append((row[2], row[3], row[4])) | |
elif mode == 'smbc2': | |
values.append((row[0], row[1], row[6])) | |
values_cleaned = [create_date(x[0]) + [x[1].strip(), | |
x[2].strip().replace(',', '')] for x in values] | |
return values_cleaned | |
def create_date(d): | |
#print d | |
elements = [int(x) for x in d.strip().split('/')] | |
dt = datetime.datetime(elements[0], elements[1], elements[2], 0, 0, 0, tzinfo=pytz.timezone('Asia/Tokyo')) | |
dt2 = datetime.datetime(elements[0], elements[1], elements[2], 23, 59, 59, tzinfo=pytz.timezone('Asia/Tokyo')) | |
#print dt.astimezone(pytz.utc).strftime('%Y%m%d%H%M%S') | |
return [x.strftime('%Y%m%d%H%M%S') for x in (dt, dt2)] | |
if __name__ == '__main__': | |
_main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment