Skip to content

Instantly share code, notes, and snippets.

@cryptochassis
Last active May 14, 2023 08:19
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save cryptochassis/198b4d34837ec88676febd146140cdd3 to your computer and use it in GitHub Desktop.
Save cryptochassis/198b4d34837ec88676febd146140cdd3 to your computer and use it in GitHub Desktop.
Optimize a Trading Strategy: Genetic Search
import argparse
import copy
import csv
import json
import os
import random
import statistics
import subprocess
from datetime import datetime, timedelta
from multiprocessing.pool import ThreadPool
TIME_DELTA_DAYS = 1
NUM_WORKER_PROCESSES = 3
TOTAL_NUMBER_OF_SUBSAMPLES = 7
class GeneticSearch:
def __init__(self, executable):
self.executable = executable
self.population = []
self.parents = []
self.children = []
self.numIndividuals = 20
self.numGenerations = 20
self.numParentsToSelect = 5
self.numChildrenToProduce = 5
self.numIndividualToDie = self.numChildrenToProduce
self.best = None
self.parameterRange = {
"spreadMinimum": {
"min": 0.0001,
"max": 0.005,
},
"spreadMaximum": {
"min": 0.005,
"max": 0.03,
},
"orderQuantity": {
"min": 0.05,
"max": 0.5,
},
"orderRefreshInterval": {
"min": 5,
"max": 30,
},
}
self.mutationProbability = 0.02
def initialize(self):
print("**Begin initialize**")
for _ in range(0, self.numIndividuals):
individual = {}
for k, v in self.parameterRange.items():
if k == "orderRefreshInterval":
individual[k] = random.randint(v["min"], v["max"])
else:
individual[k] = random.uniform(v["min"], v["max"])
self.population.append(individual)
print(f"Initial population is {json.dumps(self.population, indent=2)}")
print("**Finish initialize**")
def selectParents(self):
self.parents = self.population[0 : self.numParentsToSelect]
def crossover(self):
for _ in range(0, self.numChildrenToProduce):
individual = {}
couple = random.sample(self.parents, 2)
for k, _ in self.parameterRange.items():
if k == "orderRefreshInterval":
individual[k] = random.randint(
min(couple[0][k], couple[1][k]), max(couple[0][k], couple[1][k])
)
else:
individual[k] = random.uniform(
min(couple[0][k], couple[1][k]), max(couple[0][k], couple[1][k])
)
self.children.append(individual)
def mutate(self):
for individual in self.children:
for k, v in self.parameterRange.items():
if random.uniform(0, 1) <= self.mutationProbability:
individual[k] = (
(individual[k] - v["min"]) / (v["max"] - v["min"])
+ random.normalvariate(0, 1 / 6)
) * (v["max"] - v["min"]) + v["min"]
individual[k] = max(v["min"], min(individual[k], v["max"]))
if k == "orderRefreshInterval":
individual[k] = int(round(individual[k]))
def eliminate(self):
self.population.sort(key=lambda x: x["fitness"], reverse=True)
self.population = self.population[
: len(self.population) - self.numIndividualToDie
]
self.parents.clear()
self.children.clear()
def evaluate(self):
self.population.extend(self.children)
for individual in self.population:
if "fitness" not in individual:
balances = []
envCopy = copy.deepcopy(os.environ)
envCopy["SPREAD_PROPORTION_MINIMUM"] = str(individual["spreadMinimum"])
envCopy["SPREAD_PROPORTION_MAXIMUM"] = str(individual["spreadMaximum"])
envCopy["ORDER_QUANTITY_PROPORTION"] = str(individual["orderQuantity"])
envCopy["ORDER_REFRESH_INTERVAL_SECONDS"] = str(
individual["orderRefreshInterval"]
)
threadPool = ThreadPool(NUM_WORKER_PROCESSES)
for i in range(0, TOTAL_NUMBER_OF_SUBSAMPLES):
startDate = datetime.fromisoformat(
os.environ["START_DATE"]
) + timedelta(days=i)
endDate = startDate + timedelta(days=TIME_DELTA_DAYS)
envCopy["START_DATE"] = startDate.isoformat()[:10]
envCopy["END_DATE"] = endDate.isoformat()[:10]
def doWork(envCopy):
subprocess.run(
[self.executable],
env=envCopy,
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT,
)
threadPool.apply_async(doWork, (copy.deepcopy(envCopy),))
threadPool.close()
threadPool.join()
for i in range(0, TOTAL_NUMBER_OF_SUBSAMPLES):
startDate = datetime.fromisoformat(
os.environ["START_DATE"]
) + timedelta(days=i)
endDate = startDate + timedelta(days=TIME_DELTA_DAYS)
with open(
f"account_balance__coinbase__btc-usd__{(endDate - timedelta(days=1)).isoformat()[:10]}.csv"
) as f:
r = csv.reader(f)
next(r)
for row in r:
lastRow = row
midPrice = (float(row[3]) + float(row[4])) / 2
balance = float(lastRow[1]) * midPrice + float(lastRow[2])
balances.append(balance)
individual["fitness"] = statistics.mean(balances)
def evolve(self):
self.initialize()
for i in range(self.numGenerations):
print(f"**Begin generation {i}**")
self.selectParents()
self.crossover()
self.mutate()
self.evaluate()
self.eliminate()
print(f"**Finish generation {i}**")
print(f"Fittest individual is {json.dumps(self.population[0])}")
print(f"Fittest score is {self.population[0]['fitness']}")
argumentParser = argparse.ArgumentParser()
argumentParser.add_argument(
"--executable", required=True, type=str, help="The fullpath to the executable."
)
args = argumentParser.parse_args()
executable = args.executable
geneticSearch = GeneticSearch(executable)
geneticSearch.evolve()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment