Skip to content

Instantly share code, notes, and snippets.

@fengtality
Created April 17, 2024 23:38
Show Gist options
  • Save fengtality/99be1f83fccad99fb98a4b95c17bf876 to your computer and use it in GitHub Desktop.
Save fengtality/99be1f83fccad99fb98a4b95c17bf876 to your computer and use it in GitHub Desktop.
Botcamp Challenge Solution: Customizing a Controller
import time
from decimal import Decimal
import pandas_ta as ta # noqa: F401
from hummingbot.core.data_type.common import TradeType
from hummingbot.smart_components.executors.position_executor.data_types import PositionConfig, TrailingStop
from hummingbot.smart_components.executors.position_executor.position_executor import PositionExecutor
from hummingbot.smart_components.strategy_frameworks.data_types import OrderLevel
from hummingbot.smart_components.strategy_frameworks.market_making.market_making_controller_base import (
MarketMakingControllerBase,
MarketMakingControllerConfigBase,
)
class DManV2Config(MarketMakingControllerConfigBase):
strategy_name: str = "dman_v2_std"
macd_fast: int = 12
macd_slow: int = 26
macd_signal: int = 9
natr_length: int = 14
class DManV2(MarketMakingControllerBase):
"""
Directional Market Making Strategy making use of NATR indicator to make spreads dynamic and shift the mid price.
"""
def __init__(self, config: DManV2Config):
super().__init__(config)
self.config = config
def refresh_order_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool:
"""
Checks if the order needs to be refreshed.
You can reimplement this method to add more conditions.
"""
if executor.position_config.timestamp + order_level.order_refresh_time > time.time():
return False
return True
def early_stop_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool:
"""
If an executor has an active position, should we close it based on a condition.
"""
return False
def cooldown_condition(self, executor: PositionExecutor, order_level: OrderLevel) -> bool:
"""
After finishing an order, the executor will be in cooldown for a certain amount of time.
This prevents the executor from creating a new order immediately after finishing one and execute a lot
of orders in a short period of time from the same side.
"""
if executor.close_timestamp and executor.close_timestamp + order_level.cooldown_time > time.time():
return True
return False
def get_processed_data(self):
"""
Gets the price and spread multiplier from the last candlestick.
"""
candles_df = self.candles[0].candles_df
natr = ta.natr(candles_df["high"], candles_df["low"], candles_df["close"], length=self.config.natr_length) / 100
macd_output = ta.macd(candles_df["close"], fast=self.config.macd_fast, slow=self.config.macd_slow, signal=self.config.macd_signal)
macd = macd_output[f"MACD_{self.config.macd_fast}_{self.config.macd_slow}_{self.config.macd_signal}"]
macdh = macd_output[f"MACDh_{self.config.macd_fast}_{self.config.macd_slow}_{self.config.macd_signal}"]
macd_signal = - (macd - macd.mean()) / macd.std()
macdh_signal = macdh.apply(lambda x: 1 if x > 0 else -1)
max_price_shift = natr / 2
price_multiplier = (0.5 * macd_signal + 0.5 * macdh_signal) * max_price_shift
# candles_df["spread_multiplier"] = natr
candles_length = self.config.natr_length
std = candles_df["close"].rolling().std(candles_length)
mean = candles_df["close"].rolling().mean(candles_length)
std_normalized = std / mean
candles_df["spread_multiplier"] = std_normalized
candles_df["price_multiplier"] = price_multiplier
return candles_df
def get_position_config(self, order_level: OrderLevel) -> PositionConfig:
"""
Creates a PositionConfig object from an OrderLevel object.
Here you can use technical indicators to determine the parameters of the position config.
"""
close_price = self.get_close_price(self.config.exchange, self.config.trading_pair)
amount = order_level.order_amount_usd / close_price
price_multiplier, spread_multiplier = self.get_price_and_spread_multiplier()
price_adjusted = close_price * (1 + price_multiplier)
side_multiplier = -1 if order_level.side == TradeType.BUY else 1
order_price = price_adjusted * (1 + order_level.spread_factor * spread_multiplier * side_multiplier)
if order_level.triple_barrier_conf.trailing_stop_trailing_delta and order_level.triple_barrier_conf.trailing_stop_trailing_delta:
trailing_stop = TrailingStop(
activation_price_delta=order_level.triple_barrier_conf.trailing_stop_activation_price_delta,
trailing_delta=order_level.triple_barrier_conf.trailing_stop_trailing_delta,
)
else:
trailing_stop = None
position_config = PositionConfig(
timestamp=time.time(),
trading_pair=self.config.trading_pair,
exchange=self.config.exchange,
side=order_level.side,
amount=amount,
take_profit=order_level.triple_barrier_conf.take_profit,
stop_loss=order_level.triple_barrier_conf.stop_loss,
time_limit=order_level.triple_barrier_conf.time_limit,
entry_price=Decimal(order_price),
open_order_type=order_level.triple_barrier_conf.open_order_type,
take_profit_order_type=order_level.triple_barrier_conf.take_profit_order_type,
trailing_stop=trailing_stop,
leverage=self.config.leverage
)
return position_config
from decimal import Decimal
from typing import Dict
from hummingbot.connector.connector_base import ConnectorBase, TradeType
from hummingbot.core.data_type.common import OrderType, PositionAction, PositionSide
from hummingbot.data_feed.candles_feed.candles_factory import CandlesConfig
from hummingbot.smart_components.controllers.dman_v2_std import DManV2, DManV2Config
from hummingbot.smart_components.strategy_frameworks.data_types import (
ExecutorHandlerStatus,
OrderLevel,
TripleBarrierConf,
)
from hummingbot.smart_components.strategy_frameworks.market_making.market_making_executor_handler import (
MarketMakingExecutorHandler,
)
from hummingbot.strategy.script_strategy_base import ScriptStrategyBase
class MarketMakingDmanV2(ScriptStrategyBase):
trading_pair = "HBAR-USDT"
triple_barrier_conf = TripleBarrierConf(
stop_loss=Decimal("0.03"), take_profit=Decimal("0.02"),
time_limit=60 * 60 * 24,
trailing_stop_activation_price_delta=Decimal("0.002"),
trailing_stop_trailing_delta=Decimal("0.0005")
)
config_v2 = DManV2Config(
exchange="binance_perpetual",
trading_pair=trading_pair,
order_levels=[
OrderLevel(level=0, side=TradeType.BUY, order_amount_usd=Decimal("15"),
spread_factor=Decimal(0.5), order_refresh_time=60 * 5,
cooldown_time=15, triple_barrier_conf=triple_barrier_conf),
OrderLevel(level=1, side=TradeType.BUY, order_amount_usd=Decimal("30"),
spread_factor=Decimal(1.0), order_refresh_time=60 * 5,
cooldown_time=15, triple_barrier_conf=triple_barrier_conf),
OrderLevel(level=2, side=TradeType.BUY, order_amount_usd=Decimal("50"),
spread_factor=Decimal(2.0), order_refresh_time=60 * 5,
cooldown_time=15, triple_barrier_conf=triple_barrier_conf),
OrderLevel(level=0, side=TradeType.SELL, order_amount_usd=Decimal("15"),
spread_factor=Decimal(0.5), order_refresh_time=60 * 5,
cooldown_time=15, triple_barrier_conf=triple_barrier_conf),
OrderLevel(level=1, side=TradeType.SELL, order_amount_usd=Decimal("30"),
spread_factor=Decimal(1.0), order_refresh_time=60 * 5,
cooldown_time=15, triple_barrier_conf=triple_barrier_conf),
OrderLevel(level=2, side=TradeType.SELL, order_amount_usd=Decimal("50"),
spread_factor=Decimal(2.0), order_refresh_time=60 * 5,
cooldown_time=15, triple_barrier_conf=triple_barrier_conf),
],
candles_config=[
CandlesConfig(connector="binance_perpetual", trading_pair=trading_pair, interval="3m", max_records=1000),
],
leverage=10,
natr_length=21, macd_fast=12, macd_slow=26, macd_signal=9
)
dman_v2 = DManV2(config=config_v2)
empty_markets = {}
markets = dman_v2.update_strategy_markets_dict(empty_markets)
def __init__(self, connectors: Dict[str, ConnectorBase]):
super().__init__(connectors)
self.dman_v2_executor = MarketMakingExecutorHandler(strategy=self, controller=self.dman_v2)
def on_stop(self):
self.close_open_positions()
def on_tick(self):
"""
This shows you how you can start meta controllers. You can run more than one at the same time and based on the
market conditions, you can orchestrate from this script when to stop or start them.
"""
if self.dman_v2_executor.status == ExecutorHandlerStatus.NOT_STARTED:
self.dman_v2_executor.start()
def format_status(self) -> str:
if not self.ready_to_trade:
return "Market connectors are not ready."
lines = []
lines.extend(["DMAN V2", self.dman_v2_executor.to_format_status()])
lines.extend(["\n-----------------------------------------\n"])
# Add controller name and candles feed showing spread multiplier
candles_df = self.dman_v2.get_processed_data()
lines.extend(["Controller: " + self.config_v2.strategy_name])
lines.extend(["Candles Data Frame:"] + [line for line in candles_df.tail().to_string(index=False).split("\n")])
return "\n".join(lines)
def close_open_positions(self):
# we are going to close all the open positions when the bot stops
for connector_name, connector in self.connectors.items():
for trading_pair, position in connector.account_positions.items():
if trading_pair in self.markets[connector_name]:
if position.position_side == PositionSide.LONG:
self.sell(connector_name=connector_name,
trading_pair=position.trading_pair,
amount=abs(position.amount),
order_type=OrderType.MARKET,
price=connector.get_mid_price(position.trading_pair),
position_action=PositionAction.CLOSE)
elif position.position_side == PositionSide.SHORT:
self.buy(connector_name=connector_name,
trading_pair=position.trading_pair,
amount=abs(position.amount),
order_type=OrderType.MARKET,
price=connector.get_mid_price(position.trading_pair),
position_action=PositionAction.CLOSE)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment