Skip to content

Instantly share code, notes, and snippets.

@vrymel
Created July 8, 2020 08:28
Show Gist options
  • Save vrymel/d14d7f4dacb747e7758ab74904c10844 to your computer and use it in GitHub Desktop.
Save vrymel/d14d7f4dacb747e7758ab74904c10844 to your computer and use it in GitHub Desktop.
import os
import boto3
from botocore.exceptions import ClientError
import requests
import arrow
import csv
from time import sleep
from decimal import *
BASE_URL = os.environ['PSE_BASE_URL']
QUOTE_API_PATH = os.environ['PSE_QUOTE_API_PATH']
API_ENDPOINT = "{}{}".format(BASE_URL, QUOTE_API_PATH)
TRADING_DATE_FORMAT = "YYYY-MM-DD HH:mm:ss"
BUCKET_NAME = "psedata"
PRICE_DAILY_PREFIX = "price_daily"
COMPANY_LIST_NAME = "company_list.csv"
TMP_COMPANY_LIST = "/tmp/{}".format(COMPANY_LIST_NAME)
TMP_DIRECTORY = "/tmp"
CHUNK_SIZE = 15
s3_client = boto3.client("s3")
dynamodb = boto3.resource("dynamodb", region_name=os.environ['DYNAMODB_REGION'],
endpoint_url=os.environ['DYNAMODB_ENDPOINT'])
dynamodb_table = dynamodb.Table(os.environ['DYNAMODB_TABLE'])
def lambda_handler(event, context):
target_date = get_event_datetime(event['time'])
target_file_name = "{}.csv".format(target_date.isoformat())
s3_key_name = "{}/{}".format(PRICE_DAILY_PREFIX, target_file_name)
working_file_path = "{}/{}".format(TMP_DIRECTORY, target_file_name)
prepare_local_working_file(working_file_path, s3_key_name)
last_completed_stock_code = get_last_completed_stock_code(working_file_path)
company_list = get_company_list()
if last_completed_stock_code:
print("Fast-forward after {}".format(last_completed_stock_code))
company_list = filter_completed_stock_codes(last_completed_stock_code, company_list)
# If there is no previous stock code detected write the header
if not last_completed_stock_code:
header = "STOCK_CODE,OPEN,LOW,HIGH,CLOSE,AVG_PRICE,VOLUME,VALUE\n"
write_price_actions(working_file_path, [header])
chunk = []
for price_action in request_price_actions(target_date, company_list):
# Only write when there is movement
if price_action.get("open"):
dynamodb_write(price_action)
pa = "{},{},{},{},{},{},{},{}\n".format(price_action.get("stock_code"),
price_action.get("open"),
price_action.get("low"),
price_action.get("high"),
price_action.get("close"),
price_action.get("avg_price"),
price_action.get("volume"),
price_action.get("value"))
print(pa)
chunk.append(pa)
if len(chunk) == CHUNK_SIZE:
print("Chunk full... Uploading to S3.")
write_price_actions(working_file_path, chunk)
upload_working_file(working_file_path, s3_key_name)
chunk = []
# Wait 0.25 sec in between requests
sleep(0.25)
# Write and upload remaining chunk that did not meet the CHUNK_SIZE maximum
if len(chunk) > 0:
write_price_actions(working_file_path, chunk)
upload_working_file(working_file_path, s3_key_name)
print("Job done!")
def request_price_actions(target_date, company_list):
trading_date = target_date.isoformat()
for company in company_list:
response = requests.post(API_ENDPOINT, data={"security": company.get("security_id")}, headers={"referer": BASE_URL})
if response:
json = response.json()
records = json.get("records")
today = get_target_date(target_date, records)
if today:
price_action = {
"stock_code": company.get("stock_code"),
"trading_date": trading_date,
"open": today.get("sqOpen"),
"low": today.get("sqLow"),
"high": today.get("sqHigh"),
"close": today.get("sqClose"),
"avg_price": today.get("avgPrice"),
"volume": today.get("totalVolume"),
"value": today.get("totalValue")
}
else:
price_action = {
"stock_code": company.get("stock_code"),
"trading_date": trading_date,
"open": None,
"low": None,
"high": None,
"close": None,
"avg_price": None,
"volume": None,
"value": None
}
yield price_action
def get_event_datetime(event_date_str):
return arrow.get(event_date_str).date()
def get_company_list():
with open(TMP_COMPANY_LIST, "wb") as file:
s3_client.download_fileobj(BUCKET_NAME, COMPANY_LIST_NAME, file)
company_details = []
with open(TMP_COMPANY_LIST, "r") as file:
reader = csv.reader(file, delimiter=",", quotechar='"')
next(reader)
for row in reader:
company_details.append({
"stock_code": row[0],
"company_name": row[1],
"company_id": row[2],
"security_id": row[3]
})
return company_details
def filter_completed_stock_codes(after_stock_code, company_list):
start_new_company_list = False
_company_list = []
for c in company_list:
if not start_new_company_list and c.get("stock_code") == after_stock_code:
start_new_company_list = True
continue
if start_new_company_list:
_company_list.append(c)
return _company_list
def get_target_date(target_datetime, records):
# The PSE API data is sorted from latest to oldest. So if the target_datetime
# is the current date then this iteration would just execute once and returns.
for r in records:
raw_target_date = r.get("tradingDate")
trading_date = arrow.get(raw_target_date, TRADING_DATE_FORMAT)
if target_datetime == trading_date.date():
return r
return False
def write_price_actions(working_file_path, price_actions):
with open(working_file_path, "a") as file:
for pa in price_actions:
file.write(pa)
def upload_working_file(source_file_path, key_name):
s3_client.upload_file(source_file_path, BUCKET_NAME, key_name, ExtraArgs={
'StorageClass': 'STANDARD_IA'
})
def prepare_local_working_file(local_working_file_path, key_name):
file_check = None
try:
file_check = s3_client.head_object(Bucket=BUCKET_NAME, Key=key_name)
except ClientError:
print("Object does not exist")
pass
if file_check:
print("Object exist, downloading")
with open(local_working_file_path, "wb") as file:
s3_client.download_fileobj(BUCKET_NAME, key_name, file)
else:
# Create or truncate file if it exist in local for some reason
with open(local_working_file_path, "w"):
pass
def get_last_completed_stock_code(local_working_file_path):
with open(local_working_file_path) as file:
lines = file.readlines()
lines_length = len(lines)
# If it's empty or just the header row
if lines_length <= 1:
return False
last_line = lines[lines_length - 1]
values = last_line.split(",")
stock_code = values[0]
return stock_code.strip()
def dynamodb_write(price_action):
item = {
"stock_code": price_action.get("stock_code"),
"trading_date": price_action.get("trading_date"),
"open": dynamodb_decimal(price_action.get("open")),
"low": dynamodb_decimal(price_action.get("low")),
"high": dynamodb_decimal(price_action.get("high")),
"close": dynamodb_decimal(price_action.get("close")),
"volume": dynamodb_decimal(price_action.get("volume"))
}
dynamodb_table.put_item(Item=item)
# See https://github.com/boto/boto3/issues/665 why this needs to be done
def dynamodb_decimal(value):
return Decimal(str(value))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment