Portfolio Monitor and Analysis Demo Code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import json | |
import random | |
import time | |
import math | |
from timeplus import Query, Stream, Environment | |
api_key = 'key' | |
api_address = 'https://dev.timeplus.cloud/workspace' | |
env = Environment().address(api_address).apikey(api_key) | |
def create_account_stream(): | |
try: | |
stream = ( | |
Stream(env=env) | |
.name("account") | |
.column("id", "string") | |
.column("funding", "float") | |
.mode("versioned_kv") | |
.primary_key("id") | |
.create() | |
) | |
print("account created") | |
except Exception as e: | |
print(f"failed to create account {stream_name} {e}") | |
def create_position_stream(): | |
try: | |
stream = ( | |
Stream(env=env) | |
.name("position") | |
.column("accountId", "string") | |
.column("code", "string") | |
.column("quantity", "float") | |
.mode("versioned_kv") | |
.primary_key("code") | |
.create() | |
) | |
print("position created") | |
except Exception as e: | |
print(f"failed to create position {stream_name} {e}") | |
def create_transaction_stream(): | |
try: | |
stream = ( | |
Stream(env=env) | |
.name("transaction") | |
.column("accountId", "string") | |
.column("code", "string") | |
.column("direction", "int") | |
.column("quantity", "int") | |
.column("price", "float") | |
.create() | |
) | |
print("transaction created") | |
except Exception as e: | |
print(f"failed to create transaction {e}") | |
def delete_stream(name): | |
try: | |
stream = ( | |
Stream(env=env) | |
.name(name) | |
.delete() | |
) | |
print(f"stream {name} deleted") | |
except Exception as e: | |
print(f"failed to delete transaction {e}") | |
def new_or_update_account(name, funding): | |
try: | |
stream = Stream(env=env).name("account") | |
payload = {"id":name,"funding":funding} | |
stream.ingest(payload=json.dumps(payload), format="streaming") | |
print(f"account {name} with {funding} added") | |
except Exception as e: | |
print(f"failed to add/updated account {name} {e}") | |
class TradingPolicy: | |
def __init__(self, portfolio, account, funding, env): | |
self._portfolio = portfolio | |
self._account = account | |
self._interval = 3 | |
self._env = env | |
self._funding = funding | |
self._position = {} | |
new_or_update_account(account, funding) | |
for security in self._portfolio: | |
self.update_position(security, 0) | |
def query_funding(self): | |
querySQL = f"select funding from account where id = '{self._account}'" | |
query = ( | |
Query(env=self._env).sql(query=querySQL) | |
.create() | |
) | |
result = 0 | |
for event in query.result(): | |
if event.event == 'message': | |
data = json.loads(event.data) | |
for row in data: | |
result = row[0] | |
return result | |
def get_funding(self): | |
return self._funding | |
def get_stock_price(self, stock_code): | |
price_stream = 'stock_ticks_us_r' | |
querySQL = f"""SELECT | |
latest_price | |
FROM | |
table({price_stream}) | |
WHERE | |
(code = '{stock_code}') AND (_tp_time = ( | |
SELECT | |
max(_tp_time) | |
FROM | |
table({price_stream}) | |
WHERE | |
code = '{stock_code}' | |
))""" | |
query = ( | |
Query(env=self._env).sql(query=querySQL) | |
.create() | |
) | |
result = 0 | |
for event in query.result(): | |
if event.event == 'message': | |
data = json.loads(event.data) | |
for row in data: | |
result = row[0] | |
return result | |
def execute(self, security, direction, price, quantity): | |
try: | |
stream = Stream(env=env).name("transaction") | |
payload = {"accountId":self._account,"code":security, "direction":direction, "quantity":quantity, "price": price} | |
stream.ingest(payload=json.dumps(payload), format="streaming") | |
print(f"execute {payload}") | |
except Exception as e: | |
print(f"failed to payload {payload} {e}") | |
def update_position(self, security, quantity): | |
self._position[security] = quantity | |
try: | |
stream = Stream(env=env).name("position") | |
payload = {"accountId":self._account,"code":security, "quantity":quantity} | |
stream.ingest(payload=json.dumps(payload), format="streaming") | |
print(f"new position {payload}") | |
except Exception as e: | |
print(f"failed to payload {payload} {e}") | |
def query_position(self, security): | |
querySQL = f"select quantity from position where accountId = '{self._account}' and code = '{security}'" | |
query = ( | |
Query(env=self._env).sql(query=querySQL) | |
.create() | |
) | |
result = 0 | |
for event in query.result(): | |
if event.event == 'message': | |
data = json.loads(event.data) | |
for row in data: | |
result = row[0] | |
return result | |
def get_position(self, security): | |
if security in self._position: | |
return self._position[security] | |
return 0 | |
def update_account(self, funding): | |
self._funding = funding | |
new_or_update_account(self._account,funding) | |
def run(self): | |
bug_or_sell = [1, -1] | |
while True: | |
direction = random.choice(bug_or_sell) | |
current_funding = self.get_funding() | |
security_code = random.choice(self._portfolio) | |
latest_price = self.get_stock_price(security_code) | |
if direction == 1: | |
quantity = math.floor((current_funding / latest_price) * random.uniform(0, 1)) | |
print(f"buy current_funding:{current_funding} {security_code}:{latest_price}-{quantity}") | |
if quantity > 0 : | |
total = latest_price * quantity | |
funding_after_transaction = current_funding - total | |
self.update_account(funding_after_transaction) | |
self.execute(security_code, direction, latest_price, quantity) | |
current_position = self.get_position(security_code) | |
new_position = current_position + quantity | |
self.update_position(security_code, new_position) | |
else: | |
current_position = self.get_position(security_code) | |
print(f"sell current_funding:{current_funding} {security_code}:{latest_price} - {current_position}") | |
if current_position > 0 : | |
quantity = math.floor(current_position * random.uniform(0, 1)) | |
total = latest_price * quantity | |
funding_after_transaction = current_funding + total | |
self.update_account(funding_after_transaction) | |
self.execute(security_code, direction, latest_price, quantity) | |
new_position = current_position - quantity | |
self.update_position(security_code, new_position) | |
time.sleep(self._interval) | |
delete_stream('transaction') | |
create_transaction_stream() | |
create_account_stream() | |
create_position_stream() | |
# market_data is real-time stream which running already | |
tp = TradingPolicy(['105.MSFT','105.AMZN', '106.KO','106.WMT'], '001', 1000000, env) | |
tp.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment