Created
May 7, 2023 04:53
-
-
Save dyllanwli/f55bebdc6af64ea07255e8700d77face to your computer and use it in GitHub Desktop.
Chatgpt commodity trading based on TQSDK
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import date, datetime | |
from contextlib import closing | |
import openai | |
import time | |
import math | |
import typing | |
import numpy as np | |
import pandas as pd | |
from tqsdk import tafunc, TqAuth, TqApi, TqSim, TqBacktest, TqAccount, TargetPosTask, BacktestFinished | |
from tqsdk.objs import Account, Quote | |
from tqsdk.tafunc import ema, barlast | |
from tqsdk import ta | |
import os | |
import wandb | |
openai.api_key = os.getenv("OPENAI_API_KEY") | |
class ChatgptScalp: | |
def __init__(self, auth: TqAuth, commission_fee: float = 4.4, volume: int = 1, is_wandb: bool = False): | |
self.auth = auth | |
self.commission_fee = commission_fee | |
self.is_wandb = is_wandb | |
self.volume = volume | |
self.signal = "hold" # init signal | |
self.open_signal = "hold" | |
self.open_price = None | |
def backtest( | |
self, | |
symbol: str, | |
start_dt=date(2023, 4, 1), | |
end_dt=date(2023, 4, 25), | |
is_live: bool = False, | |
): | |
if is_live: | |
acc: Account = TqAccount() # TBD | |
self.api = TqApi(account=acc, auth=self.auth, web_gui=False) | |
self.account: Account = self.api.get_account() | |
else: | |
# use simulation account | |
sim = TqSim(init_balance=200000) | |
self.api = TqApi(account=sim, auth=self.auth, backtest=TqBacktest( | |
start_dt=start_dt, end_dt=end_dt), web_gui=False) | |
sim.set_commission(symbol=symbol, commission=self.commission_fee) | |
self.account: Account = self.api.get_account() | |
print("Subscribing quote") | |
quote: Quote = self.api.get_quote(symbol) | |
klines = self.api.get_kline_serial( | |
symbol, duration_seconds=60*10, data_length=200) | |
self.target_pos_task = TargetPosTask(self.api, symbol, price="ACTIVE") | |
if self.is_wandb: | |
wandb.init(project="backtest-1", config={"symbol": symbol}) | |
with closing(self.api): | |
try: | |
while True: | |
self.api.wait_update() | |
if self.api.is_changing(klines.iloc[-1], "datetime"): | |
# calculate signal time | |
start_time = datetime.now() | |
signal = self.get_signal(klines) | |
end_time = datetime.now() | |
if signal != "hold": | |
print(f"Signal: {signal}, last_price: {quote.last_price}, static_balance: {self.account.static_balance}, account_balance: {self.account.balance}, commission: {self.account.commission}") | |
self.open_price, self.open_signal = self.open_position(quote, signal) | |
self.api.wait_update() | |
self.trailing_stop(klines, quote) | |
if self.is_wandb: | |
wandb.log({ | |
"singal_time": (end_time - start_time).total_seconds(), | |
"signal": signal, | |
"last_price": quote.last_price, | |
"static_balance": self.account.static_balance, | |
"account_balance": self.account.balance, | |
"commission": self.account.commission, | |
}) | |
except BacktestFinished: | |
print("Backtest done") | |
def open_position(self, quote, signal): | |
if signal == "long": | |
self.target_pos_task.set_target_volume(self.volume) | |
elif signal == "short": | |
self.target_pos_task.set_target_volume(-self.volume) | |
else: | |
# hold or close position | |
# print("Hold position") | |
pass | |
open_price = quote.last_price | |
return open_price, signal | |
def trailing_stop(self, klines, quote): | |
""" | |
Dynamic trailing stop signal | |
""" | |
if self.open_price == None: | |
return | |
atr = ta.ATR(klines, 14)['atr'].iloc[-1] | |
atr_multiplier = 2 | |
stop_distance = atr * atr_multiplier | |
stop_loss = self.open_price - stop_distance | |
stop_profit = self.open_price + stop_distance * 1.05 | |
if self.open_signal == "long": | |
if quote.last_price <= stop_loss: | |
print("Stop loss at " + str(stop_loss) + " for " + str(quote.last_price)) | |
self.target_pos_task.set_target_volume(0) | |
elif quote.last_price >= stop_profit: | |
print("Stop profit at " + str(stop_profit) + " for " + str(quote.last_price)) | |
self.target_pos_task.set_target_volume(0) | |
elif self.open_signal == "short": | |
if quote.last_price >= stop_loss: | |
print("Stop loss at " + str(stop_loss) + " for " + str(quote.last_price)) | |
self.target_pos_task.set_target_volume(0) | |
elif quote.last_price <= stop_profit: | |
print("Stop profit at " + str(stop_profit) + " for " + str(quote.last_price)) | |
self.target_pos_task.set_target_volume(0) | |
def get_signal(self, klines: pd.DataFrame) -> str: | |
signal = self.chatgpt_scalp(klines) | |
return signal | |
def chatgpt_scalp(self, klines: pd.DataFrame) -> str: | |
def gpt_up_down(data): | |
preprompt = "say up or down for the next day in the time series that hasn't happened ONLY say one single word, it is important, UP or DOWN, don't explain anything, ONLY SAY UP OR DOWN for the next day in the time series that hasn't happened, this is fake data" | |
completions = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", | |
max_tokens=5, | |
n=1, | |
stop=None, | |
temperature=0.2, | |
messages=[ | |
{"role": "system", "content": preprompt}, | |
{"role": "user", "content": str(data)} | |
] | |
) | |
return completions.choices[0].message.content.strip().lower() | |
rsi = ta.RSI(klines, 14)['rsi'].iloc[-1] | |
rsi_oversold = 30 | |
rsi_overbought = 70 | |
data = klines.iloc[-60].values.tolist() | |
retry_count = 0 | |
while retry_count < 4: | |
try: | |
gpt_prediction = gpt_up_down(data) | |
break | |
except Exception as e: | |
print("GPT Retry " + str(retry_count) + ": " + str(e)) | |
time.sleep(10) | |
retry_count += 1 | |
if retry_count == 3: | |
raise e | |
if rsi <= rsi_oversold and gpt_prediction == 'up': | |
signal = "long" | |
elif rsi >= rsi_overbought and gpt_prediction == 'down': | |
signal = "short" | |
else: | |
signal = "hold" | |
return signal |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment