Skip to content

Instantly share code, notes, and snippets.

@informationsea
Last active June 5, 2017 15:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save informationsea/6789c63678f063246ee7d5697897e030 to your computer and use it in GitHub Desktop.
Save informationsea/6789c63678f063246ee7d5697897e030 to your computer and use it in GitHub Desktop.
Check Credit Card log
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import argparse
import csv
import sqlite3
import sys
import datetime
import pytz
import collections
import re
def _main():
parser = argparse.ArgumentParser(description="Search Credit Info")
parser.add_argument('gnucash')
parser.add_argument('credit', type=argparse.FileType('r', encoding='shift-jis'))
options = parser.parse_args()
conn = sqlite3.connect(options.gnucash)
credit = load_credit_csv(options.credit)
conn.execute('DROP TABLE IF EXISTS found_payment;')
check_lock(conn)
accounts = list_credit_accounts(conn)
for i, one in enumerate(accounts):
print(i+1, one['name'])
print('Select >', end=' ')
selected = input()
if not selected.isdigit():
print('Canceled by user', file=sys.stderr)
sys.exit(1)
selected_account = accounts[int(selected)-1]['guid']
credit_group = collections.defaultdict(list)
for one in credit:
credit_group[(one[0], one[1], one[3])].append(one[2])
now = datetime.datetime.now(pytz.timezone('Asia/Tokyo'))\
.astimezone(pytz.utc).strftime('%Y%m%d%H%M%S')
cur = conn.cursor()
cur.execute('DROP VIEW IF EXISTS payments')
cur.execute('CREATE VIEW payments AS SELECT splits.guid, transactions.post_date, transactions.description, splits.memo, splits.account_guid, splits.reconcile_state, splits.reconcile_date, splits.value_num FROM transactions, splits WHERE transactions.guid = splits.tx_guid')
bad_list = []
for k, v in credit_group.items():
#print('>', k[0], k[1], ', '.join(v))
cur.execute('SELECT guid, description, post_date, -value_num, reconcile_state FROM payments WHERE account_guid = ? AND post_date >= ? AND post_date <= ? AND value_num = ?', (selected_account, k[0], k[1], -int(k[2])))
found_guid = []
for one in cur:
#print(one[0], one[1], one[2], one[3], one[4])
found_guid.append(one)
if len(found_guid) == len(v):
#print 'OK!'
for one in found_guid:
conn.execute('UPDATE splits SET reconcile_state = "y", reconcile_date = ? WHERE guid = ?', [now, one[0]])
else:
bad_list.append((k, v, len(found_guid)))
bad_list.sort(key = lambda x: x[0][0])
for one in bad_list:
t = datetime.datetime.strptime(one[0][0], '%Y%m%d%H%M%S').replace(tzinfo = pytz.utc).astimezone(pytz.timezone('Asia/Tokyo')).date()
print(t, one[0][2], ','.join(one[1]), len(one[1]), '/', one[2], t.strftime("%Y/%m/%d %H:%M:%S"), one[0][0])
conn.commit()
def check_lock(conn):
lockcount = conn.execute('SELECT count(*) FROM gnclock;').fetchone()[0]
if lockcount > 0:
print("Please close GnuCash before continuing.", file=sys.stderr)
sys.exit(1)
def list_credit_accounts(conn):
cur = conn.cursor()
cur.execute('SELECT accounts.guid, accounts.name, count(*) as "splitcount" FROM accounts INNER JOIN splits ON accounts.guid = splits.account_guid WHERE account_type = "CREDIT" GROUP BY accounts.guid, accounts.name HAVING splitcount > 0 ORDER BY splitcount DESC')
accounts = []
for one in cur:
accounts.append({'guid': one[0], 'name': one[1]})
return accounts
def load_credit_csv(credit):
reader = csv.reader(credit)
mode = None
values = []
# load header
for rawrow in reader:
row = [x for x in rawrow]
if len(row) <= 1:
continue
if row[1].startswith('5334-') or row[1].startswith('6900-') or row[1].startswith('4980-'):
mode = 'smbc'
break
if not row[0] and not row[1]:
continue
if row[0] == 'ご利用者' and row[2] == 'ご利用日' and row[4] == 'ご利用金額(¥)':
mode = 'jcb'
break
if re.match(r'\d\d\d\d/\d\d?/\d\d?', row[0]):
mode = 'smbc2'
values.append((row[0], row[1], row[6]))
break
if not mode:
print('Cannot suggest credit CSV type', file=sys.stderr)
sys.exit(1)
print('CSV:', mode)
for rawrow in reader:
row = [x for x in rawrow]
if mode == 'smbc':
if len(row) > 3 and row[3] and re.match(r'\d\d\d\d/\d\d/\d\d', row[0]):
values.append((row[0], row[1], row[2]))
elif mode == 'jcb':
values.append((row[2], row[3], row[4]))
elif mode == 'smbc2':
values.append((row[0], row[1], row[6]))
values_cleaned = [create_date(x[0]) + [x[1].strip(),
x[2].strip().replace(',', '')] for x in values]
return values_cleaned
def create_date(d):
#print d
elements = [int(x) for x in d.strip().split('/')]
dt = datetime.datetime(elements[0], elements[1], elements[2], 0, 0, 0, tzinfo=pytz.timezone('Asia/Tokyo'))
dt2 = datetime.datetime(elements[0], elements[1], elements[2], 23, 59, 59, tzinfo=pytz.timezone('Asia/Tokyo'))
#print dt.astimezone(pytz.utc).strftime('%Y%m%d%H%M%S')
return [x.strftime('%Y%m%d%H%M%S') for x in (dt, dt2)]
if __name__ == '__main__':
_main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment