Skip to content

Instantly share code, notes, and snippets.

@dkendrick
Created August 20, 2022 00:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dkendrick/bbe3256b8d09a15dc6f23c6f42d83954 to your computer and use it in GitHub Desktop.
Save dkendrick/bbe3256b8d09a15dc6f23c6f42d83954 to your computer and use it in GitHub Desktop.
Python script to assist algo by twapping down a Bybit hedged position, one is a futures and one a perp. Requires the new bybit python library pybit.
import logging
from asyncio import BaseEventLoop, gather, get_event_loop, sleep, Task
from functools import partial
from os import environ
from pybit import inverse_futures, inverse_perpetual
"""
env variables - please make sure these are set (i.e `export BYBIT_API_KEY=abc1234`)
BYBIT_API_KEY=
BYBIT_API_SECRET=
BYBIT_API_URL=https://api.bybit.com
"""
FUTURES_PAIR = "BTCUSDU22" # UPDATE_ME
FUTURES_SIDE = "Sell" # UPDATE_ME
PERP_PAIR = "BTCUSD" # UPDATE_ME
PERP_SIDE = "Buy" # UPDATE_ME
ORDER_SIZE = 99999999 # UPDATE_ME
TARGET_SIZE = 99999999 # UPDATE_ME
WAIT_SECONDS = 99999999 # UPDATE_ME
logging.basicConfig(level=logging.INFO)
futures_http = inverse_futures.HTTP(
endpoint=environ["BYBIT_API_URL"],
api_key=environ["BYBIT_API_KEY"],
api_secret=environ["BYBIT_API_SECRET"],
)
perp_http = inverse_perpetual.HTTP(
endpoint=environ["BYBIT_API_URL"],
api_key=environ["BYBIT_API_KEY"],
api_secret=environ["BYBIT_API_SECRET"],
)
def bybit_result(response) -> dict:
ret_code = response["ret_code"]
if ret_code != 0:
ret_msg = response["ret_msg"]
raise Exception(f"bybit responded with {ret_code} {ret_msg}")
raw_result = response["result"]
if len(raw_result) < 1:
raise Exception("bybit gave us an empty response")
return raw_result
def futures_position(positions):
for p in positions:
if p["data"]["symbol"] == FUTURES_PAIR:
return p
logging.info(positions)
raise Exception(f"Unable to get position for {FUTURES_PAIR}")
async def is_over_threshold(loop: BaseEventLoop):
response = await loop.run_in_executor(None, futures_http.my_position)
result = futures_position(bybit_result(response))
my_size = result["data"]["size"]
desired_size = TARGET_SIZE
logging.info(f"current pos {my_size}/{desired_size}")
return my_size > desired_size
async def reduce_pos(loop: BaseEventLoop, http, pair: str, side: str):
req = partial(
http.place_active_order,
symbol=pair,
order_type="Market",
side=side,
qty=ORDER_SIZE,
time_in_force="GoodTillCancel",
reduce_only=True,
)
response = await loop.run_in_executor(None, req)
result = bybit_result(response)
order_id = result["order_id"]
logging.info(f"order: {side} {ORDER_SIZE} {pair} {order_id}")
async def main(loop: BaseEventLoop):
while await is_over_threshold(loop):
perp_task = Task(reduce_pos(loop, perp_http, PERP_PAIR, PERP_SIDE))
future_task = Task(reduce_pos(loop, futures_http, FUTURES_PAIR, FUTURES_SIDE))
wait_task = Task(sleep(WAIT_SECONDS))
all_tasks = gather(perp_task, future_task, wait_task)
await all_tasks
if __name__ == "__main__":
loop = get_event_loop()
try:
loop.run_until_complete(main(loop))
except:
logging.exception("something wicked happened, look at error for more info")
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment