Skip to content

Instantly share code, notes, and snippets.

@kujyp
Created May 23, 2019 21:00
Show Gist options
  • Save kujyp/28231241bf9ea698026c3c2c79008fc2 to your computer and use it in GitHub Desktop.
Save kujyp/28231241bf9ea698026c3c2c79008fc2 to your computer and use it in GitHub Desktop.
p2p 투자 실수익금 계산 스크립트
# coding=utf-8
from __future__ import print_function
import datetime
import json
import os
from copy import deepcopy
import gspread
from oauth2client.service_account import ServiceAccountCredentials
def num2column(column_num):
return chr(ord('A') + column_num)
def get_label(col, row):
return "{}{}".format(num2column(col), row)
def array_lstrip(arr, target=''):
ret = []
idx = 0
while True:
if arr[idx] != target:
break
idx += 1
for curr_idx in range(idx, len(arr)):
ret.append(arr[curr_idx])
return ret
def encode(target):
return target.encode('utf-8')
def encode_arr(arr):
ret = []
for each in arr:
ret.append(each.encode('utf-8'))
return ret
def sort_by(data, label, labal_target):
copied_data = deepcopy(data)
target_label_idx = -1
for idx, each in enumerate(label):
if each == labal_target:
target_label_idx = idx
break
if target_label_idx == -1:
raise Exception("invalid target")
length = len(copied_data) - 1
for i in range(length):
for j in range(length - i):
if copied_data[j][target_label_idx] > copied_data[j + 1][target_label_idx]:
copied_data[j], copied_data[j + 1] = copied_data[j + 1], copied_data[j]
return copied_data
def main():
scope = ['https://spreadsheets.google.com/feeds',
'https://www.googleapis.com/auth/drive']
credentials = ServiceAccountCredentials.from_json_keyfile_name('gsheet-investment-fd7827dd5f67.json', scope)
gc = gspread.authorize(credentials)
wks = gc.open_by_key('1WuIknjqU5Rnb9owBAyTksTI8L-WoeKkn30oRc3djOJ4')
worksheet = wks.worksheet("in/out")
avalue = worksheet.get_all_values()
if not os.path.exists("data"):
os.mkdir("data")
with open(os.path.join(
"data",
"data-{}.json".format(str(datetime.datetime.now().date()))), "w"
) as f:
json.dump(avalue, f)
if __name__ == '__main__':
main()
# coding=utf-8
from __future__ import print_function
import collections
import datetime
import json
import locale
import os
from copy import deepcopy
BLANK_ROW_COUNT = 3
BLANK_COL_COUNT = 1
class StructuredData(object):
def __init__(self, data, label):
super(StructuredData, self).__init__()
self.data = data
self.label = label
def num2column(column_num):
return chr(ord('A') + column_num)
def get_label(col, row):
return "{}{}".format(num2column(col), row)
def array_lstrip(arr, target=''):
ret = []
idx = 0
while True:
if arr[idx] != target:
break
idx += 1
for curr_idx in range(idx, len(arr)):
ret.append(arr[curr_idx])
return ret
def encode(target):
return target.encode('utf-8')
def encode_arr(arr):
ret = []
for each in arr:
ret.append(each.encode('utf-8'))
return ret
def encode_arr2d(arr2d):
ret = []
for each_row in arr2d:
ret.append(encode_arr(each_row))
return ret
def sort_by(structured_data, labal_target):
data = deepcopy(structured_data.data)
label = structured_data.label
target_label_idx = get_label_idx(label=label, label_target=labal_target)
length = len(data) - 1
for i in range(length):
for j in range(length - i):
if data[j][target_label_idx] > data[j + 1][target_label_idx]:
data[j], data[j + 1] = data[j + 1], data[j]
return StructuredData(data, label)
def remove_empty_row(data):
ret = []
for row_idx in range(len(data)):
empty = True
for col_idx in range(len(data[row_idx])):
if data[row_idx][col_idx]:
empty = False
break
if not empty:
ret.append(deepcopy(data[row_idx]))
return ret
def get_label_idx(label, label_target):
ret = -1
for idx, each in enumerate(label):
if each == label_target:
ret = idx
break
if ret == -1:
raise Exception("invalid target")
return ret
def filter_remove_not_equals(structured_data, label_target, filter_target):
data = structured_data.data
label = structured_data.label
ret = []
target_label_idx = get_label_idx(label, label_target)
length = len(data)
for i in range(length):
if data[i][target_label_idx] == filter_target:
ret.append(deepcopy(data[i]))
return StructuredData(ret, label)
def filter_remove_equals(structured_data, label_target, filter_target):
data = structured_data.data
label = structured_data.label
ret = []
target_label_idx = get_label_idx(label, label_target)
length = len(data)
for i in range(length):
if data[i][target_label_idx] != filter_target:
ret.append(deepcopy(data[i]))
return StructuredData(ret, label)
def filter_remove_less_than(structured_data, label_target, filter_target):
data = structured_data.data
label = structured_data.label
ret = []
target_label_idx = get_label_idx(label, label_target)
length = len(data)
for i in range(length):
if data[i][target_label_idx] >= filter_target:
ret.append(deepcopy(data[i]))
return StructuredData(ret, label)
def filter_remove_greater_than(structured_data, label_target, filter_target):
data = structured_data.data
label = structured_data.label
ret = []
target_label_idx = get_label_idx(label, label_target)
length = len(data)
for i in range(length):
if data[i][target_label_idx] <= filter_target:
ret.append(deepcopy(data[i]))
return StructuredData(ret, label)
def print_data(data):
for idx, each_row in enumerate(data.data):
for label_idx, label_name in enumerate(data.label):
print("[{}: {}] ".format(label_name, each_row[label_idx]), end='')
print()
def get_structed_data_from_raw(raw_data):
label = []
data = []
for col_num, val in enumerate(raw_data[BLANK_ROW_COUNT][BLANK_COL_COUNT:]):
label.append(encode(val))
for row_num in range(BLANK_ROW_COUNT + 1, len(raw_data)):
data.append(deepcopy(raw_data[row_num][BLANK_COL_COUNT:]))
return StructuredData(data, label)
def date_minus(date1, date2):
parsed_date1 = datetime.datetime.strptime(date1, "%Y-%m-%d").date()
parsed_date2 = datetime.datetime.strptime(date2, "%Y-%m-%d").date()
return (parsed_date1 - parsed_date2).days
def get_today():
return (datetime.datetime.today()).strftime("%Y-%m-%d")
def convert_to_int(structured_data, target_label):
target_label_idx = get_label_idx(structured_data.label, target_label)
ret = deepcopy(structured_data.data)
for row_idx in range(len(ret)):
ret[row_idx][target_label_idx] = int(ret[row_idx][target_label_idx])
return StructuredData(ret, structured_data.label)
def get_total_spent(structured_data, target_date):
structured_data = filter_remove_greater_than(structured_data, "날짜",
target_date)
structured_data = convert_to_int(structured_data, "총가격")
structured_data = filter_remove_less_than(structured_data, "총가격", 0)
# print(len(structured_data.data))
# print()
price_idx = get_label_idx(label=structured_data.label, label_target="총가격")
balance = 0
for each_row in structured_data.data:
# print(each_row[5])
# print(each_row[price_idx])
balance += int(each_row[price_idx])
# print()
return balance
def get_total_earn(structured_data, target_date):
structured_data = filter_remove_greater_than(structured_data, "날짜", target_date)
structured_data = convert_to_int(structured_data, "총가격")
structured_data = filter_remove_greater_than(structured_data, "총가격", 0)
structured_data = filter_remove_equals(structured_data, "총가격", "")
price_idx = get_label_idx(label=structured_data.label, label_target="총가격")
balance = 0
for each_row in structured_data.data:
# print(each_row[price_idx])
balance += int(each_row[price_idx])
# print()
return -balance
def get_average_balance(structured_data):
diff_by_date = collections.OrderedDict()
data = structured_data.data
date_idx = get_label_idx(label=structured_data.label, label_target="날짜")
price_idx = get_label_idx(label=structured_data.label, label_target="총가격")
accumulated_amount = 0
for each_row in data:
curr_date = each_row[date_idx]
curr_amount = float(each_row[price_idx])
accumulated_amount += curr_amount
diff_by_date[curr_date] = accumulated_amount
diff_by_date[get_today()] = accumulated_amount
prev_date = None
accumulated_balance = 0.0
accumulated_days = 0
for key, val in diff_by_date.items():
if prev_date is None:
prev_date = key
continue
date_diff = date_minus(key, prev_date)
accumulated_balance += val * date_diff
accumulated_days += date_diff
# print("[{}: {}] ".format("accumulated_days", accumulated_days))
# print("[{}: {}] ".format("accumulated_balance", accumulated_balance))
prev_date = key
average_balance = accumulated_balance / accumulated_days
return average_balance
def get_annual_interest_rate(current_profit, average_balance, days):
return current_profit / average_balance / days * 365
def get_start_date(structured_data):
assert len(structured_data.data) > 0
structured_data = sort_by(structured_data, "날짜")
return structured_data.data[0][get_label_idx(structured_data.label, "날짜")]
def get_evaluation_sum(structured_data, target_date_as_string):
structured_data = filter_remove_greater_than(structured_data, "날짜", target_date_as_string)
structured_data = filter_remove_equals(structured_data, "현재평가금", "")
structured_data = sort_by(structured_data, "날짜")
structured_data = sort_by(structured_data, "종목")
current_evaluation = {}
for each in structured_data.data:
# print(each[get_label_idx(structured_data.label, "날짜")])
# print(each[get_label_idx(structured_data.label, "종목")])
# print(each[get_label_idx(structured_data.label, "현재평가금")])
# print()
p2pname = each[get_label_idx(structured_data.label, "종목")]
current_evaluation[p2pname] = each[
get_label_idx(structured_data.label, "현재평가금")]
evaluation_sum = 0
for _, val in current_evaluation.items():
evaluation_sum += int(val)
return evaluation_sum
def get_profit(structured_data, target_date_as_string):
evaluation_sum = get_evaluation_sum(structured_data, target_date_as_string)
total_outgo = get_total_spent(structured_data, target_date_as_string)
total_income = get_total_earn(structured_data, target_date_as_string)
# print("evaluation_sum", evaluation_sum)
# print("get_total_spent", total_outgo)
# print("total_income", total_income)
return total_income + evaluation_sum - total_outgo
def print_information(structure_data, target_date, investment_type, investment_name=None):
structure_data = filter_remove_not_equals(structure_data, "종류", investment_type)
if investment_name != None:
structure_data = filter_remove_not_equals(structure_data, "종목", investment_name)
structure_data = filter_remove_greater_than(structure_data, "날짜",
target_date)
# print("데이터개수", len(structure_data.data))
average_balance = get_average_balance(structure_data)
start_date_as_string = get_start_date(structure_data)
days = date_minus(target_date, start_date_as_string)
current_profit = get_profit(structure_data, target_date)
locale.setlocale(locale.LC_ALL, "ko_KR")
print("평균잔액: [{}]".format(locale.currency(average_balance, grouping=True)))
print("수익: [{}]".format(locale.currency(current_profit, grouping=True)))
print("기간: [{} ~ {}]".format(start_date_as_string, target_date))
print("날짜수: [{}]".format(days))
print("수익률: [{:0.2f}%]".format(
get_annual_interest_rate(current_profit, average_balance, days) * 100))
def main():
with open(os.path.join(
"data",
"data-2019-05-24.json"), "r"
) as f:
raw_data = json.load(f)
target_date_as_string = get_today()
# target_date_as_string = "2019-03-31"
strcted_data = get_structed_data_from_raw(raw_data)
strcted_data.data = encode_arr2d(strcted_data.data)
strcted_data.data = remove_empty_row(strcted_data.data)
strcted_data = sort_by(strcted_data, "날짜")
for p2pname in [
"어니스트펀드",
"테라펀딩",
"피플펀드",
"투게더",
]:
print(p2pname)
print_information(strcted_data, target_date_as_string, "p2p", p2pname)
print()
print("Total")
print_information(strcted_data, target_date_as_string, "p2p")
print()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment