Skip to content

Instantly share code, notes, and snippets.

@shirou
Last active August 17, 2022 03:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shirou/58973d23dd1e1d15c54079a978e5e11c to your computer and use it in GitHub Desktop.
Save shirou/58973d23dd1e1d15c54079a978e5e11c to your computer and use it in GitHub Desktop.
AWS Timestream
import boto3
DBName = 'shiroutest'
client = boto3.client('timestream-query')
SQL = "SELECT * FROM shiroutest.USDJPY WHERE time between '2022-08-15' and '2022-08-16' "
paginator = client.get_paginator('query')
pageIterator = paginator.paginate(
QueryString=SQL,
)
results = []
count = 0
for page in pageIterator:
count += 1
if 'Rows' not in page or len(page['Rows']) == 0:
continue
else:
results.extend(page['Rows'])
print(count, len(results))
1min OHLC, 180 days.
from local PC.
- 1 day → 1 page, 1440 rows, 1.855 sec
- 30 days → 13 page, 42923 rows, 9.472 sec
- 180 days → 69 page, 259200 rows, 51.649 sec
import boto3
import datetime
import csv
import time
client = boto3.client('timestream-write')
DBName = 'shiroutest'
timefmt = "%Y-%m-%d %H:%M:%S.%f"
dimensions = [
{'Name': 'market', 'Value': "fake"}
]
def read(row):
# get Multi-measure records
# https://docs.aws.amazon.com/ja_jp/timestream/latest/developerguide/writes.html
d = datetime.datetime.strptime(row[0], timefmt)
record = {
'Dimensions': dimensions,
'MeasureName': 'ohlc',
'MeasureValueType': 'MULTI',
'Time': str(int(time.mktime(d.timetuple())) * 1000 + d.microsecond),
'TimeUnit': 'MILLISECONDS',
'MeasureValues': [
{
'Name': 'open',
'Value': row[1],
'Type': 'DOUBLE',
},
{
'Name': 'high',
'Value': row[2],
'Type': 'DOUBLE',
},
{
'Name': 'low',
'Value': row[3],
'Type': 'DOUBLE',
},
{
'Name': 'close',
'Value': row[4],
'Type': 'DOUBLE',
},
]
}
return record
def chunk(list, size):
return [list[i:i+size] for i in range(0,len(list), size)]
def write(currency):
fname = f"testdata/{currency}.csv"
records = []
with open(fname) as csvfile:
reader = csv.reader(csvfile)
for row in reader:
records.append(read(row))
for c in chunk(records, 100):
client.write_records(
DatabaseName=DBName,
TableName=currency,
Records=c,
CommonAttributes={},
)
time.sleep(1/1000)
write("USDJPY")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment