Skip to content

Instantly share code, notes, and snippets.

@dogwood008
Last active March 5, 2021 13:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dogwood008/d26627648b5a670a46f979557a6d1e9d to your computer and use it in GitHub Desktop.
Save dogwood008/d26627648b5a670a46f979557a6d1e9d to your computer and use it in GitHub Desktop.
kabu STATION API を backtraderから使う https://how-to-make-stock-trading-system.dogwood008.com/
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
#!/usr/bin/env python
# coding: utf-8
# ## Utilities
# In[1]:
def is_in_jupyter() -> bool:
'''
Determine wheather is the environment Jupyter Notebook
https://blog.amedama.jp/entry/detect-jupyter-env
'''
if 'get_ipython' not in globals():
# Python shell
return False
env_name = get_ipython().__class__.__name__
if env_name == 'TerminalInteractiveShell':
# IPython shell
return False
# Jupyter Notebook
return True
print(is_in_jupyter())
# In[3]:
# https://recruit-tech.co.jp/blog/2018/10/16/jupyter_notebook_tips/
if is_in_jupyter():
def set_stylesheet():
from IPython.display import display, HTML
css = get_ipython().getoutput('wget https://raw.githubusercontent.com/lapis-zero09/jupyter_notebook_tips/master/css/jupyter_notebook/monokai.css -q -O -')
css = "\n".join(css)
display(HTML('<style type="text/css">%s</style>'%css))
set_stylesheet()
# ## Main
# In[ ]:
#!/usr/bin/env python
# -*- coding: utf-8; py-indent-offset:4 -*-
###############################################################################
#
# Copyright (C) 2015-2020 Daniel Rodriguez
# Copyright (C) 2021 dogwood008 (modified)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
###############################################################################
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import collections
from datetime import datetime, timedelta
import time as _time
import json
import threading
# import oandapy
# import requests # oandapy depdendency
import backtrader as bt
from backtrader.metabase import MetaParams
from backtrader.utils.py3 import queue, with_metaclass
from backtrader.utils import AutoDict
import kabusapi
# In[ ]:
# Extend the exceptions to support extra cases
# '''
# class OandaRequestError(oandapy.OandaError):
# def __init__(self):
# er = dict(code=599, message='Request Error', description='')
# super(self.__class__, self).__init__(er)
#
#
# class OandaStreamError(oandapy.OandaError):
# def __init__(self, content=''):
# er = dict(code=598, message='Failed Streaming', description=content)
# super(self.__class__, self).__init__(er)
#
#
# class OandaTimeFrameError(oandapy.OandaError):
# def __init__(self, content):
# er = dict(code=597, message='Not supported TimeFrame', description='')
# super(self.__class__, self).__init__(er)
#
#
# class OandaNetworkError(oandapy.OandaError):
# def __init__(self):
# er = dict(code=596, message='Network Error', description='')
# super(self.__class__, self).__init__(er)
# '''
# class API(oandapy.API):
# def request(self, endpoint, method='GET', params=None):
# # Overriden to make something sensible out of a
# # request.RequestException rather than simply issuing a print(str(e))
# url = '%s/%s' % (self.api_url, endpoint)
#
# method = method.lower()
# params = params or {}
#
# func = getattr(self.client, method)
#
# request_args = {}
# if method == 'get':
# request_args['params'] = params
# else:
# request_args['data'] = params
#
# # Added the try block
# try:
# response = func(url, **request_args)
# except requests.RequestException as e:
# return OandaRequestError().error_response
#
# content = response.content.decode('utf-8')
# content = json.loads(content)
#
# # error message
# if response.status_code >= 400:
# # changed from raise to return
# return oandapy.OandaError(content).error_response
#
# return content
# In[ ]:
#FIXME
# class Streamer(oandapy.Streamer):
# def __init__(self, q, headers=None, *args, **kwargs):
# # Override to provide headers, which is in the standard API interface
# super(Streamer, self).__init__(*args, **kwargs)
#
# if headers:
# self.client.headers.update(headers)
#
# self.q = q
#
# def run(self, endpoint, params=None):
# # Override to better manage exceptions.
# # Kept as much as possible close to the original
# self.connected = True
#
# params = params or {}
#
# ignore_heartbeat = None
# if 'ignore_heartbeat' in params:
# ignore_heartbeat = params['ignore_heartbeat']
#
# request_args = {}
# request_args['params'] = params
#
# url = '%s/%s' % (self.api_url, endpoint)
#
# while self.connected:
# # Added exception control here
# try:
# response = self.client.get(url, **request_args)
# except requests.RequestException as e:
# self.q.put(OandaRequestError().error_response)
# break
#
# if response.status_code != 200:
# self.on_error(response.content)
# break # added break here
#
# # Changed chunk_size 90 -> None
# try:
# for line in response.iter_lines(chunk_size=None):
# if not self.connected:
# break
#
# if line:
# data = json.loads(line.decode('utf-8'))
# if not (ignore_heartbeat and 'heartbeat' in data):
# self.on_success(data)
#
# except: # socket.error has been seen
# self.q.put(OandaStreamError().error_response)
# break
#
# def on_success(self, data):
# if 'tick' in data:
# self.q.put(data['tick'])
# elif 'transaction' in data:
# self.q.put(data['transaction'])
#
# def on_error(self, data):
# self.disconnect()
# self.q.put(OandaStreamError(data).error_response)
# In[ ]:
from enum import Enum
class KabusAPIEnv(Enum):
DEV = 'dev'
PROD = 'prod'
# In[ ]:
class MetaSingleton(MetaParams):
'''Metaclass to make a metaclassed class a singleton'''
def __init__(cls, name, bases, dct):
super(MetaSingleton, cls).__init__(name, bases, dct)
cls._singleton = None
def __call__(cls, *args, **kwargs):
if cls._singleton is None:
cls._singleton = (
super(MetaSingleton, cls).__call__(*args, **kwargs))
return cls._singleton
class KabuSAPIStore(with_metaclass(MetaSingleton, object)):
'''Singleton class wrapping to control the connections to Kabu STATION API.
Params:
- ``token`` (default:``None``): API access token
- ``account`` (default: ``None``): account id
- ``practice`` (default: ``False``): use the test environment
- ``account_tmout`` (default: ``10.0``): refresh period for account
value/cash refresh
'''
BrokerCls = None # broker class will autoregister
DataCls = None # data class will auto register
params = (
('url', 'localhost'),
('env', KabuSAPIEnv.DEV),
('port', None),
('password', None),
)
# _DTEPOCH = datetime(1970, 1, 1)
# _ENVPRACTICE = 'practice'
# _ENVLIVE = 'live'
@classmethod
def getdata(cls, *args, **kwargs):
'''Returns ``DataCls`` with args, kwargs'''
return cls.DataCls(*args, **kwargs)
@classmethod
def getbroker(cls, *args, **kwargs):
'''Returns broker with *args, **kwargs from registered ``BrokerCls``'''
return cls.BrokerCls(*args, **kwargs)
def __init__(self):
def _getport() -> int:
if self.p.port:
return port
return 18081 if self.p.env == KabuSAPIEnv.DEV else 18080
def _init_kabusapi_client(self) -> kabusapiapi.Context:
url = self.p.url
port = self.p.get('port', _getport())
password = self.p.password
token = kabusapi.Context(url, port, password).token
self.kapi = kabusapi.Context(url, port, token=token)
super(KabuSAPIStore, self).__init__()
self.notifs = collections.deque() # store notifications for cerebro
self._env = None # reference to cerebro for general notifications
self.broker = None # broker instance
self.datas = list() # datas that have registered over start
self._orders = collections.OrderedDict() # map order.ref to oid
self._ordersrev = collections.OrderedDict() # map oid to order.ref
self._transpend = collections.defaultdict(collections.deque)
_init_kabusapi_client()
self._cash = 0.0
self._value = 0.0
self._evt_acct = threading.Event()
def start(self, data=None, broker=None):
# Datas require some processing to kickstart data reception
if data is None and broker is None:
self.cash = None
return
if data is not None:
self._env = data._env
# For datas simulate a queue with None to kickstart co
self.datas.append(data)
if self.broker is not None:
self.broker.data_started(data)
elif broker is not None:
self.broker = broker
self.streaming_events()
self.broker_threads()
def stop(self):
# signal end of thread
if self.broker is not None:
self.q_ordercreate.put(None)
self.q_orderclose.put(None)
self.q_account.put(None)
def put_notification(self, msg, *args, **kwargs):
self.notifs.append((msg, args, kwargs))
def get_notifications(self):
'''Return the pending "store" notifications'''
self.notifs.append(None) # put a mark / threads could still append
return [x for x in iter(self.notifs.popleft, None)]
# Oanda supported granularities
# _GRANULARITIES = {
# (bt.TimeFrame.Seconds, 5): 'S5',
# (bt.TimeFrame.Seconds, 10): 'S10',
# (bt.TimeFrame.Seconds, 15): 'S15',
# (bt.TimeFrame.Seconds, 30): 'S30',
# (bt.TimeFrame.Minutes, 1): 'M1',
# (bt.TimeFrame.Minutes, 2): 'M3',
# (bt.TimeFrame.Minutes, 3): 'M3',
# (bt.TimeFrame.Minutes, 4): 'M4',
# (bt.TimeFrame.Minutes, 5): 'M5',
# (bt.TimeFrame.Minutes, 10): 'M5',
# (bt.TimeFrame.Minutes, 15): 'M5',
# (bt.TimeFrame.Minutes, 30): 'M5',
# (bt.TimeFrame.Minutes, 60): 'H1',
# (bt.TimeFrame.Minutes, 120): 'H2',
# (bt.TimeFrame.Minutes, 180): 'H3',
# (bt.TimeFrame.Minutes, 240): 'H4',
# (bt.TimeFrame.Minutes, 360): 'H6',
# (bt.TimeFrame.Minutes, 480): 'H8',
# (bt.TimeFrame.Days, 1): 'D',
# (bt.TimeFrame.Weeks, 1): 'W',
# (bt.TimeFrame.Months, 1): 'M',
# }
def get_positions(self):
try:
positions = self.oapi.get_positions(self.p.account)
except (oandapy.OandaError, OandaRequestError,):
return None
poslist = positions.get('positions', [])
return poslist
#def get_granularity(self, timeframe, compression):
# return self._GRANULARITIES.get((timeframe, compression), None)
def get_instrument(self, dataname):
try:
insts = self.oapi.get_instruments(self.p.account,
instruments=dataname)
except (oandapy.OandaError, OandaRequestError,):
return None
i = insts.get('instruments', [{}])
return i[0] or None
def streaming_events(self, tmout=None):
q = queue.Queue()
kwargs = {'q': q, 'tmout': tmout}
t = threading.Thread(target=self._t_streaming_listener, kwargs=kwargs)
t.daemon = True
t.start()
t = threading.Thread(target=self._t_streaming_events, kwargs=kwargs) # FIXME: _t_streaming_events
t.daemon = True
t.start()
return q
def _t_streaming_listener(self, q, tmout=None):
while True:
trans = q.get()
self._transaction(trans)
# FIXME: Streamer
def _t_streaming_events(self, q, tmout=None):
if tmout is not None:
_time.sleep(tmout)
# FIXME: oandapy.Streamer
# streamer = Streamer(q,
# environment=self._oenv,
# access_token=self.p.token,
# headers={'X-Accept-Datetime-Format': 'UNIX'})
#
# streamer.events(ignore_heartbeat=False)
def candles(self, dataname, dtbegin, dtend, timeframe, compression,
candleFormat, includeFirst):
kwargs = locals().copy()
kwargs.pop('self')
kwargs['q'] = q = queue.Queue()
t = threading.Thread(target=self._t_candles, kwargs=kwargs) # FIXME: _t_candles
t.daemon = True
t.start()
return q
def _t_candles(self, dataname, dtbegin, dtend, timeframe, compression,
candleFormat, includeFirst, q):
# FIXME: granularity = self.get_granularity(timeframe, compression)
if granularity is None:
e = OandaTimeFrameError()
q.put(e.error_response)
return
dtkwargs = {}
if dtbegin is not None:
dtkwargs['start'] = int((dtbegin - self._DTEPOCH).total_seconds()) # FIXME: _DTEPOCH
if dtend is not None:
dtkwargs['end'] = int((dtend - self._DTEPOCH).total_seconds()) # FIXME: _DTEPOCH
try:
# FIXME: granularity
# response = self.oapi.get_history(instrument=dataname,
# granularity=granularity,
# candleFormat=candleFormat,
# **dtkwargs)
except oandapy.OandaError as e:
q.put(e.error_response)
q.put(None)
return
# FIXME
for candle in response.get('candles', []):
q.put(candle)
q.put({}) # end of transmission
def streaming_prices(self, dataname, tmout=None):
q = queue.Queue()
kwargs = {'q': q, 'dataname': dataname, 'tmout': tmout}
t = threading.Thread(target=self._t_streaming_prices, kwargs=kwargs) # FIXME: _t_streaming_prices
t.daemon = True
t.start()
return q
# FIXME: Streamer
def _t_streaming_prices(self, dataname, q, tmout):
if tmout is not None:
_time.sleep(tmout)
# FIXME: Streamer
# FIXME streamer = Streamer(q, environment=self._oenv,
# FIXME access_token=self.p.token,
# FIXME headers={'X-Accept-Datetime-Format': 'UNIX'})
# FIXME streamer.rates(self.p.account, instruments=dataname)
def get_cash(self):
return self._cash
def get_value(self):
return self._value
_ORDEREXECS = {
bt.Order.Market: 'market',
bt.Order.Limit: 'limit',
bt.Order.Stop: 'stop',
bt.Order.StopLimit: 'stop',
}
def broker_threads(self):
self.q_account = queue.Queue()
self.q_account.put(True) # force an immediate update
t = threading.Thread(target=self._t_account)
t.daemon = True
t.start()
self.q_ordercreate = queue.Queue()
t = threading.Thread(target=self._t_order_create)
t.daemon = True
t.start()
self.q_orderclose = queue.Queue()
t = threading.Thread(target=self._t_order_cancel)
t.daemon = True
t.start()
# Wait once for the values to be set
self._evt_acct.wait(self.p.account_tmout)
def _t_account(self):
while True:
try:
msg = self.q_account.get(timeout=self.p.account_tmout)
if msg is None:
break # end of thread
except queue.Empty: # tmout -> time to refresh
pass
try:
accinfo = self.oapi.get_account(self.p.account)
except Exception as e:
self.put_notification(e)
continue
try:
self._cash = accinfo['marginAvail']
self._value = accinfo['balance']
except KeyError:
pass
self._evt_acct.set()
def order_create(self, order, stopside=None, takeside=None, **kwargs):
okwargs = dict()
okwargs['instrument'] = order.data._dataname
okwargs['units'] = abs(order.created.size)
okwargs['side'] = 'buy' if order.isbuy() else 'sell'
okwargs['type'] = self._ORDEREXECS[order.exectype]
if order.exectype != bt.Order.Market:
okwargs['price'] = order.created.price
if order.valid is None:
# 1 year and datetime.max fail ... 1 month works
valid = datetime.utcnow() + timedelta(days=30)
else:
valid = order.data.num2date(order.valid)
# To timestamp with seconds precision
okwargs['expiry'] = int((valid - self._DTEPOCH).total_seconds()) # FIXME: _DTEPOCH
if order.exectype == bt.Order.StopLimit:
okwargs['lowerBound'] = order.created.pricelimit
okwargs['upperBound'] = order.created.pricelimit
if order.exectype == bt.Order.StopTrail:
okwargs['trailingStop'] = order.trailamount
if stopside is not None:
okwargs['stopLoss'] = stopside.price
if takeside is not None:
okwargs['takeProfit'] = takeside.price
okwargs.update(**kwargs) # anything from the user
self.q_ordercreate.put((order.ref, okwargs,))
return order
_OIDSINGLE = ['orderOpened', 'tradeOpened', 'tradeReduced']
_OIDMULTIPLE = ['tradesClosed']
def _t_order_create(self):
while True:
msg = self.q_ordercreate.get()
if msg is None:
break
oref, okwargs = msg
try:
o = self.oapi.create_order(self.p.account, **okwargs)
except Exception as e:
self.put_notification(e)
self.broker._reject(oref)
return
# Ids are delivered in different fields and all must be fetched to
# match them (as executions) to the order generated here
oids = list()
for oidfield in self._OIDSINGLE:
if oidfield in o and 'id' in o[oidfield]:
oids.append(o[oidfield]['id'])
for oidfield in self._OIDMULTIPLE:
if oidfield in o:
for suboidfield in o[oidfield]:
oids.append(suboidfield['id'])
if not oids:
self.broker._reject(oref)
return
self._orders[oref] = oids[0]
self.broker._submit(oref)
if okwargs['type'] == 'market':
self.broker._accept(oref) # taken immediately
for oid in oids:
self._ordersrev[oid] = oref # maps ids to backtrader order
# An transaction may have happened and was stored
tpending = self._transpend[oid]
tpending.append(None) # eom marker
while True:
trans = tpending.popleft()
if trans is None:
break
self._process_transaction(oid, trans)
def order_cancel(self, order):
self.q_orderclose.put(order.ref)
return order
def _t_order_cancel(self):
while True:
oref = self.q_orderclose.get()
if oref is None:
break
oid = self._orders.get(oref, None)
if oid is None:
continue # the order is no longer there
try:
o = self.oapi.close_order(self.p.account, oid)
except Exception as e:
continue # not cancelled - FIXME: notify
self.broker._cancel(oref)
_X_ORDER_CREATE = ('STOP_ORDER_CREATE',
'LIMIT_ORDER_CREATE', 'MARKET_IF_TOUCHED_ORDER_CREATE',)
def _transaction(self, trans):
# Invoked from Streaming Events. May actually receive an event for an
# oid which has not yet been returned after creating an order. Hence
# store if not yet seen, else forward to processer
ttype = trans['type']
if ttype == 'MARKET_ORDER_CREATE':
try:
oid = trans['tradeReduced']['id']
except KeyError:
try:
oid = trans['tradeOpened']['id']
except KeyError:
return # cannot do anything else
elif ttype in self._X_ORDER_CREATE:
oid = trans['id']
elif ttype == 'ORDER_FILLED':
oid = trans['orderId']
elif ttype == 'ORDER_CANCEL':
oid = trans['orderId']
elif ttype == 'TRADE_CLOSE':
oid = trans['id']
pid = trans['tradeId']
if pid in self._orders and False: # Know nothing about trade
return # can do nothing
# Skip above - at the moment do nothing
# Received directly from an event in the WebGUI for example which
# closes an existing position related to order with id -> pid
# COULD BE DONE: Generate a fake counter order to gracefully
# close the existing position
msg = ('Received TRADE_CLOSE for unknown order, possibly generated'
' over a different client or GUI')
self.put_notification(msg, trans)
return
else: # Go aways gracefully
try:
oid = trans['id']
except KeyError:
oid = 'None'
msg = 'Received {} with oid {}. Unknown situation'
msg = msg.format(ttype, oid)
self.put_notification(msg, trans)
return
try:
oref = self._ordersrev[oid]
self._process_transaction(oid, trans)
except KeyError: # not yet seen, keep as pending
self._transpend[oid].append(trans)
_X_ORDER_FILLED = ('MARKET_ORDER_CREATE',
'ORDER_FILLED', 'TAKE_PROFIT_FILLED',
'STOP_LOSS_FILLED', 'TRAILING_STOP_FILLED',)
def _process_transaction(self, oid, trans):
try:
oref = self._ordersrev.pop(oid)
except KeyError:
return
ttype = trans['type']
if ttype in self._X_ORDER_FILLED:
size = trans['units']
if trans['side'] == 'sell':
size = -size
price = trans['price']
self.broker._fill(oref, size, price, ttype=ttype)
elif ttype in self._X_ORDER_CREATE:
self.broker._accept(oref)
self._ordersrev[oid] = oref
elif ttype in 'ORDER_CANCEL':
reason = trans['reason']
if reason == 'ORDER_FILLED':
pass # individual execs have done the job
elif reason == 'TIME_IN_FORCE_EXPIRED':
self.broker._expire(oref)
elif reason == 'CLIENT_REQUEST':
self.broker._cancel(oref)
else: # default action ... if nothing else
self.broker._reject(oref)
# In[ ]:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment