Skip to content

Instantly share code, notes, and snippets.

@gangtao

gangtao/stock.py Secret

Created February 28, 2023 00:34
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gangtao/5e6426edd1a3d26b5bea580538fae11a to your computer and use it in GitHub Desktop.
Save gangtao/5e6426edd1a3d26b5bea580538fae11a to your computer and use it in GitHub Desktop.
Portfolio Monitor and Analysis Demo Code
import os
import json
import random
import time
import math
from timeplus import Query, Stream, Environment
api_key = 'key'
api_address = 'https://dev.timeplus.cloud/workspace'
env = Environment().address(api_address).apikey(api_key)
def create_account_stream():
try:
stream = (
Stream(env=env)
.name("account")
.column("id", "string")
.column("funding", "float")
.mode("versioned_kv")
.primary_key("id")
.create()
)
print("account created")
except Exception as e:
print(f"failed to create account {stream_name} {e}")
def create_position_stream():
try:
stream = (
Stream(env=env)
.name("position")
.column("accountId", "string")
.column("code", "string")
.column("quantity", "float")
.mode("versioned_kv")
.primary_key("code")
.create()
)
print("position created")
except Exception as e:
print(f"failed to create position {stream_name} {e}")
def create_transaction_stream():
try:
stream = (
Stream(env=env)
.name("transaction")
.column("accountId", "string")
.column("code", "string")
.column("direction", "int")
.column("quantity", "int")
.column("price", "float")
.create()
)
print("transaction created")
except Exception as e:
print(f"failed to create transaction {e}")
def delete_stream(name):
try:
stream = (
Stream(env=env)
.name(name)
.delete()
)
print(f"stream {name} deleted")
except Exception as e:
print(f"failed to delete transaction {e}")
def new_or_update_account(name, funding):
try:
stream = Stream(env=env).name("account")
payload = {"id":name,"funding":funding}
stream.ingest(payload=json.dumps(payload), format="streaming")
print(f"account {name} with {funding} added")
except Exception as e:
print(f"failed to add/updated account {name} {e}")
class TradingPolicy:
def __init__(self, portfolio, account, funding, env):
self._portfolio = portfolio
self._account = account
self._interval = 3
self._env = env
self._funding = funding
self._position = {}
new_or_update_account(account, funding)
for security in self._portfolio:
self.update_position(security, 0)
def query_funding(self):
querySQL = f"select funding from account where id = '{self._account}'"
query = (
Query(env=self._env).sql(query=querySQL)
.create()
)
result = 0
for event in query.result():
if event.event == 'message':
data = json.loads(event.data)
for row in data:
result = row[0]
return result
def get_funding(self):
return self._funding
def get_stock_price(self, stock_code):
price_stream = 'stock_ticks_us_r'
querySQL = f"""SELECT
latest_price
FROM
table({price_stream})
WHERE
(code = '{stock_code}') AND (_tp_time = (
SELECT
max(_tp_time)
FROM
table({price_stream})
WHERE
code = '{stock_code}'
))"""
query = (
Query(env=self._env).sql(query=querySQL)
.create()
)
result = 0
for event in query.result():
if event.event == 'message':
data = json.loads(event.data)
for row in data:
result = row[0]
return result
def execute(self, security, direction, price, quantity):
try:
stream = Stream(env=env).name("transaction")
payload = {"accountId":self._account,"code":security, "direction":direction, "quantity":quantity, "price": price}
stream.ingest(payload=json.dumps(payload), format="streaming")
print(f"execute {payload}")
except Exception as e:
print(f"failed to payload {payload} {e}")
def update_position(self, security, quantity):
self._position[security] = quantity
try:
stream = Stream(env=env).name("position")
payload = {"accountId":self._account,"code":security, "quantity":quantity}
stream.ingest(payload=json.dumps(payload), format="streaming")
print(f"new position {payload}")
except Exception as e:
print(f"failed to payload {payload} {e}")
def query_position(self, security):
querySQL = f"select quantity from position where accountId = '{self._account}' and code = '{security}'"
query = (
Query(env=self._env).sql(query=querySQL)
.create()
)
result = 0
for event in query.result():
if event.event == 'message':
data = json.loads(event.data)
for row in data:
result = row[0]
return result
def get_position(self, security):
if security in self._position:
return self._position[security]
return 0
def update_account(self, funding):
self._funding = funding
new_or_update_account(self._account,funding)
def run(self):
bug_or_sell = [1, -1]
while True:
direction = random.choice(bug_or_sell)
current_funding = self.get_funding()
security_code = random.choice(self._portfolio)
latest_price = self.get_stock_price(security_code)
if direction == 1:
quantity = math.floor((current_funding / latest_price) * random.uniform(0, 1))
print(f"buy current_funding:{current_funding} {security_code}:{latest_price}-{quantity}")
if quantity > 0 :
total = latest_price * quantity
funding_after_transaction = current_funding - total
self.update_account(funding_after_transaction)
self.execute(security_code, direction, latest_price, quantity)
current_position = self.get_position(security_code)
new_position = current_position + quantity
self.update_position(security_code, new_position)
else:
current_position = self.get_position(security_code)
print(f"sell current_funding:{current_funding} {security_code}:{latest_price} - {current_position}")
if current_position > 0 :
quantity = math.floor(current_position * random.uniform(0, 1))
total = latest_price * quantity
funding_after_transaction = current_funding + total
self.update_account(funding_after_transaction)
self.execute(security_code, direction, latest_price, quantity)
new_position = current_position - quantity
self.update_position(security_code, new_position)
time.sleep(self._interval)
delete_stream('transaction')
create_transaction_stream()
create_account_stream()
create_position_stream()
# market_data is real-time stream which running already
tp = TradingPolicy(['105.MSFT','105.AMZN', '106.KO','106.WMT'], '001', 1000000, env)
tp.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment