Skip to content

Instantly share code, notes, and snippets.

@robcarver17
Created November 2, 2022 16:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save robcarver17/94e66e91ea49a3d986c1378c58425ba0 to your computer and use it in GitHub Desktop.
Save robcarver17/94e66e91ea49a3d986c1378c58425ba0 to your computer and use it in GitHub Desktop.
### Need the following:
### 60:40 rebalanced cash
### slow and medium speed futures
import matplotlib
matplotlib.use("TkAgg")
matplotlib.rcParams.update({'font.size': 22})
DEMEAN = False
## FIRST CHUNK OF CODE USES PYSYSTEMTRADE, BUT IF YOU HAVE SOME DATA ALREADY YOU CAN SKIP AHEAD
from syscore.objects import missing_data
from syscore.genutils import progressBar
from systems.provided.futures_chapter15.basesystem import *
from systems.accounts.curves.account_curve import quant_ratio_lower_curve, quant_ratio_upper_curve
import pandas as pd
class RawDataWithSpotAdjustment(RawData):
def get_daily_prices(self, instrument_code) -> pd.Series:
dailyprice = self.data_stage.daily_prices(instrument_code)
demean = system.config.get_element_or_missing_data('demean')
if demean is missing_data:
return dailyprice
if demean is False:
return dailyprice
underlying = self.daily_denominator_price(instrument_code)
ann_perc_average_return_dict = dict(SP500_micro = 0.02,
US10 = 0.012)
daily_perc_average_return = ann_perc_average_return_dict[instrument_code]/256
drift_price_terms = daily_perc_average_return * underlying
cum_drift_price = drift_price_terms.cumsum()
price_adjusted_for_drift = dailyprice - cum_drift_price
return price_adjusted_for_drift
def futures_system(
data=arg_not_supplied,
config=arg_not_supplied,
trading_rules=arg_not_supplied,
log_level="on",
):
if data is arg_not_supplied:
data = csvFuturesSimData()
if config is arg_not_supplied:
config = Config("systems.provided.futures_chapter15.futuresconfig.yaml")
rules = Rules(trading_rules)
system = System(
[
Account(),
Portfolios(),
PositionSizing(),
RawDataWithSpotAdjustment(),
ForecastCombine(),
ForecastScaleCap(),
rules,
],
data,
config,
)
system.set_logging_level(log_level)
return system
system = futures_system()
system.config.instrument_weights = dict(SP500_micro = .5, US10 = .5)
setattr(system.config, "demean", DEMEAN)
sp500_returns = system.rawdata.get_daily_percentage_returns('SP500_micro')
us10_returns = system.rawdata.get_daily_percentage_returns('US10')
## both start sept 1982
## Effectively daily rebalancing
long_only = sp500_returns*60 + us10_returns * 40
fast_system = futures_system()
setattr(fast_system.config, "demean", DEMEAN)
fast_system.config.percentage_vol_target=9.0
fast_system.config.instrument_weights = dict(SP500_micro = .5, US10 = .5)
fast_system.config.forecast_weights = dict(ewmac8_32 = .5, ewmac16_64 = .5)
fast_system.config.use_instrument_div_mult_estimates = True
fast_acc = fast_system.accounts.portfolio().percent
slow_system = futures_system()
setattr(slow_system.config, "demean", DEMEAN)
slow_system.config.percentage_vol_target=9.0
slow_system.config.instrument_weights = dict(SP500_micro = .5, US10 = .5)
slow_system.config.forecast_weights = dict(ewmac32_128 = .5, ewmac64_256 = .5)
slow_system.config.use_instrument_div_mult_estimates = True
slow_acc = slow_system.accounts.portfolio().percent
quant_ratio_upper_curve(long_only)
import pandas as pd
all = pd.concat([long_only, fast_acc.as_ts, slow_acc.as_ts], axis=1)
all.columns = ['60:40', 'Fast TF', 'Slow TF']
all.corr().round(2)
## Remainder of the code is generic and doesn't require psystemtrade
from random import uniform
def generate_set_of_subsamples(returns: pd.DataFrame,
monte_count: int = 10) -> list:
p = progressBar(monte_count)
list_of_subsampled_returns = []
for __ in range(monte_count):
list_of_subsampled_returns.append(
generate_subsample(returns)
)
p.iterate()
return list_of_subsampled_returns
def generate_subsample(returns: pd.DataFrame) -> list:
returns = returns.dropna()
subsample_returns = [
returns.iloc[
int(uniform(0, len(returns)))
]
for __ in range(len(returns))
]
subsample_df = pd.concat(subsample_returns, axis=1)
subsample_df = subsample_df.transpose()
subsample_df.index = returns.index
return subsample_df
def get_cagr_distr_across_weights(list_of_subsampled_returns: list,
all_possible_weights: list):
p = progressBar(len(all_possible_weights))
cagr_distr_by_weight = []
for weights in all_possible_weights:
cagr_distr_by_weight.append(
measure_cagr_distribution_for_weights(list_of_subsampled_returns=list_of_subsampled_returns,
weights=weights)
)
p.iterate()
return cagr_distr_by_weight
import numpy as np
def generate_list_of_possible_weights(columns: list):
assert len(columns)==3
all_possible_weights = []
for w1 in np.arange(0, 1.001, 0.02):
for w2 in np.arange(0, 1.001, 0.02):
if w1+w2 > 1:
continue
w3 = 1 - w2 - w1
weights = {columns[0]: round(w1,2),
columns[1]: round(w2,2),
columns[2]: round(w3,2)}
all_possible_weights.append(weights)
return all_possible_weights
def cagr(acc_curve: pd.Series):
x = 1 + (acc_curve / 100)
x[x<0] = 0
final = x.product()
if final<0:
return np.nan
daily_root = final**(1/len(acc_curve))
ann_root = daily_root**256
return 100*(ann_root -1)
def measure_cagr_distribution_for_weights(list_of_subsampled_returns: list,
weights: dict):
weights_df = pd.DataFrame(weights, index = list_of_subsampled_returns[0].index)
weighted_returns = [
subsample_df * weights_df
for subsample_df in list_of_subsampled_returns
]
portfolio_returns = [
weighted_df.sum(axis=1)
for weighted_df in weighted_returns
]
cagr_list = [
cagr(portfolio_return_df)
for portfolio_return_df in portfolio_returns
]
return cagr_list
list_of_subsampled_returns = generate_set_of_subsamples(all,
monte_count=100)
all_possible_weights = generate_list_of_possible_weights(list(all.columns))
cagr_distr_by_weight = get_cagr_distr_across_weights(list_of_subsampled_returns, all_possible_weights)
def get_optimal_weights(cagr_distr_by_weight: list,
all_possible_weights: list,
quantile_point: float):
list_of_cagr = get_cagr_at_quantile_points(cagr_distr_by_weight, quantile_point)
return all_possible_weights[list_of_cagr.index(max(list_of_cagr))]
def get_heatmap_df(cagr_distr_by_weight: list,
all_possible_weights: list,
quantile_point: float):
list_of_cagr = get_cagr_at_quantile_points(cagr_distr_by_weight, quantile_point)
weight_points = list(set([list(weight.values())[0] for weight in all_possible_weights]))
weight_points.sort()
the_grid = pd.DataFrame(np.nan, index = weight_points, columns = weight_points)
asset_1 = list(all_possible_weights[0].keys())[0]
asset_2 = list(all_possible_weights[0].keys())[2]
for idx, weight in enumerate(all_possible_weights):
weight_asset_1 = weight[asset_1]
weight_asset_2 = weight[asset_2]
cagr = list_of_cagr[idx]
the_grid[weight_asset_1][weight_asset_2] = cagr
return the_grid
def get_cagr_at_quantile_points(cagr_distr_by_weight: list,
quantile_point: float):
cagr_list = [
np.quantile(cagr_distr, quantile_point)
for cagr_distr in cagr_distr_by_weight
]
return cagr_list
import matplotlib.pyplot as plt
def plot_heatmap(df, all_possible_weights):
matplotlib.rcParams.update({'font.size': 10})
asset_1 = list(all_possible_weights[0].keys())[0]
asset_2 = list(all_possible_weights[0].keys())[2]
#cmap = matplotlib.colors.Colormap(5)
plt.pcolor(df)
plt.yticks(np.arange(0.5, len(df.index), 1), df.index)
plt.xticks(np.arange(0.5, len(df.columns), 1), df.columns)
plt.xlabel(asset_1)
plt.ylabel(asset_2)
plt.colorbar()
plt.show()
matplotlib.rcParams.update({'font.size': 22})
the_grid = get_heatmap_df(cagr_distr_by_weight=cagr_distr_by_weight,
all_possible_weights=all_possible_weights,
quantile_point=.5)
plot_heatmap(the_grid, all_possible_weights)
from copy import copy
def make_whitespace_plot(cagr_distr_by_weight: list,
all_possible_weights: list):
cagr_30 = get_heatmap_df(cagr_distr_by_weight=cagr_distr_by_weight,
all_possible_weights=all_possible_weights,
quantile_point=.3)
cagr_50 = get_heatmap_df(cagr_distr_by_weight=cagr_distr_by_weight,
all_possible_weights=all_possible_weights,
quantile_point=.5)
optimal_weights = get_optimal_weights(cagr_distr_by_weight, all_possible_weights, .5)
max_30 = cagr_30[optimal_weights['60:40']][optimal_weights['Slow TF']]
whitespace = copy(cagr_50)
whitespace[cagr_50>max_30]= np.nan
return whitespace
plot_heatmap(make_whitespace_plot(cagr_distr_by_weight, all_possible_weights), all_possible_weights)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment