Skip to content

Instantly share code, notes, and snippets.

@jhw
Last active January 25, 2024 13:57
Show Gist options
  • Save jhw/be211f8ed4adbe4d6636ce8faafc9163 to your computer and use it in GitHub Desktop.
Save jhw/be211f8ed4adbe4d6636ce8faafc9163 to your computer and use it in GitHub Desktop.
Step functions Monte Carlo option simulator
env
*.pyc
__pycache__
tmp

analytic option pricing

(env) jhw@Justins-Air be211f8ed4adbe4d6636ce8faafc9163 % python black_scholes_analytic.py 
4.759422392871535 0.8085993729000975

monte carlo option pricing (increasing the number of simulations)

(env) jhw@Justins-Air be211f8ed4adbe4d6636ce8faafc9163 % python black_scholes_monte_carlo.py 10
5.46352976838472 1.0156451864260814
(env) jhw@Justins-Air be211f8ed4adbe4d6636ce8faafc9163 % python black_scholes_monte_carlo.py 10 
4.682102006920414 2.231094820389125
(env) jhw@Justins-Air be211f8ed4adbe4d6636ce8faafc9163 % python black_scholes_monte_carlo.py 100
4.285415639443292 0.858663860824674
(env) jhw@Justins-Air be211f8ed4adbe4d6636ce8faafc9163 % python black_scholes_monte_carlo.py 100
4.8960762635969 0.9106197289574887
(env) jhw@Justins-Air be211f8ed4adbe4d6636ce8faafc9163 % python black_scholes_monte_carlo.py 1000 
4.773801186917847 0.9430312893771983
(env) jhw@Justins-Air be211f8ed4adbe4d6636ce8faafc9163 % python black_scholes_monte_carlo.py 1000
4.907947718875114 0.9387008260102214
(env) jhw@Justins-Air be211f8ed4adbe4d6636ce8faafc9163 % python black_scholes_monte_carlo.py 10000
4.805789198911004 0.7669655701118467
(env) jhw@Justins-Air be211f8ed4adbe4d6636ce8faafc9163 % python black_scholes_monte_carlo.py 10000 
4.71522413299339 0.7892732144136169
(env) jhw@Justins-Air be211f8ed4adbe4d6636ce8faafc9163 % python black_scholes_monte_carlo.py 100000
4.779131147725099 0.8096773125803727
(env) jhw@Justins-Air be211f8ed4adbe4d6636ce8faafc9163 % python black_scholes_monte_carlo.py 100000
4.765376482926105 0.797924189302356
(env) jhw@Justins-Air be211f8ed4adbe4d6636ce8faafc9163 % python black_scholes_monte_carlo.py 1000000
4.766227205144107 0.8086772938808223
(env) jhw@Justins-Air be211f8ed4adbe4d6636ce8faafc9163 % python black_scholes_monte_carlo.py 1000000
4.7603314714354585 0.8073282430865037

option pricing with step functions

(env) jhw@Justins-Air be211f8ed4adbe4d6636ce8faafc9163 % python ping_machine.py 500                                   
{'executionArn': 'arn:aws:states:eu-west-1:119552584133:execution:MonteCarloStepFunction-GMfMdN7guIcU:581efb89-e9be-4a3a-a5d1-4372cbb3bda3', 'startDate': datetime.datetime(2024, 1, 25, 13, 52, 54, 558000, tzinfo=tzlocal()), 'ResponseMetadata': {'RequestId': 'ea81e2c4-c4d6-414b-9443-a6cdf9677a89', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'ea81e2c4-c4d6-414b-9443-a6cdf9677a89', 'date': 'Thu, 25 Jan 2024 13:52:54 GMT', 'content-type': 'application/x-amz-json-1.0', 'content-length': '168', 'connection': 'keep-alive'}, 'RetryAttempts': 0}}
(env) jhw@Justins-Air be211f8ed4adbe4d6636ce8faafc9163 % python list_bucket.py
results-2024-01-25-13-53-07-521656.json
(env) jhw@Justins-Air be211f8ed4adbe4d6636ce8faafc9163 % python dump_object.py results-2024-01-25-13-53-07-521656.json
{
  "call": {
    "mean": 4.7492818179801235,
    "var": 0.025404159331333914,
    "n": 500
  },
  "put": {
    "mean": 0.8050890208253552,
    "var": 0.0033100728732492497,
    "n": 500
  }
}
AppName=monte-carlo-step-functions-demo
import math
def norm_cdf(x):
return (1 + math.erf(x / math.sqrt(2))) / 2
def black_scholes(spot, strike, time, sigma, rate, option_type):
d1 = (math.log(spot / strike) + (rate + 0.5 * sigma ** 2) * time) / (sigma * math.sqrt(time))
d2 = d1 - sigma * math.sqrt(time)
if option_type == 'call':
price = spot * norm_cdf(d1) - strike * math.exp(-rate * time) * norm_cdf(d2)
elif option_type == 'put':
price = strike * math.exp(-rate * time) * norm_cdf(-d2) - spot * norm_cdf(-d1)
else:
raise ValueError("Invalid option type. Use 'call' or 'put'.")
return price
if __name__=="__main__":
spot_price = 42
strike_price = 40
time_to_expiration = 0.5
volatility = 0.2
interest_rate = 0.1
call_price = black_scholes(spot_price, strike_price, time_to_expiration, volatility, interest_rate, 'call')
put_price = black_scholes(spot_price, strike_price, time_to_expiration, volatility, interest_rate, 'put')
print (call_price, put_price)
import math, random
def monte_carlo(spot, strike, time, sigma, rate, option_type, simulations):
total=0
for _ in range(simulations):
stock_price_at_expiration = spot * math.exp((rate - 0.5 * sigma**2) * time + sigma * math.sqrt(time) * random.gauss(0, 1))
if option_type == 'call':
payoff = max(stock_price_at_expiration - strike, 0)
elif option_type == 'put':
payoff = max(strike - stock_price_at_expiration, 0)
else:
raise ValueError("Invalid option type. Use 'call' or 'put'.")
total+=payoff
option_price = math.exp(-rate * time) * total / simulations
return option_price
if __name__=="__main__":
try:
import sys, re
if len(sys.argv) < 2:
raise RuntimeError("please enter n(simulations)")
simulations=sys.argv[1]
if not re.search("^\\d+", simulations):
raise RuntimeError("n(simulations) is invalid")
simulations=int(simulations)
spot_price = 42
strike_price = 40
time_to_expiration = 0.5 # 6 months
volatility = 0.2 # 20%
interest_rate = 0.1 # 10%
call_price_mc = monte_carlo(spot_price, strike_price, time_to_expiration, volatility, interest_rate, 'call', simulations)
put_price_mc = monte_carlo(spot_price, strike_price, time_to_expiration, volatility, interest_rate, 'put', simulations)
print (call_price_mc, put_price_mc)
except RuntimeError as error:
print ("Error:%s" % str(error))
import boto3
def fetch_outputs(cf, stackname):
outputs={}
for stack in cf.describe_stacks()["Stacks"]:
if (stack["StackName"].startswith(stackname) and
"Outputs" in stack):
for output in stack["Outputs"]:
outputs[output["OutputKey"]]=output["OutputValue"]
return outputs
if __name__=="__main__":
props=dict([row.split("=") for row in open("app.props").read().split("\n")
if row!=''])
cf=boto3.client("cloudformation")
outputs=fetch_outputs(cf, props["AppName"])
s3=boto3.client("s3")
resp=s3.list_objects(Bucket=outputs["BucketName"])
if "Contents" in resp:
for obj in resp["Contents"]:
s3.delete_object(Bucket=outputs["BucketName"],
Key=obj["Key"])
#!/usr/bin/env bash
. app.props
aws cloudformation delete-stack --stack-name $AppName
#!/usr/bin/env bash
. app.props
aws cloudformation deploy --stack-name $AppName --template-file stack.yaml --capabilities CAPABILITY_NAMED_IAM
import boto3
def fetch_outputs(cf, stackname):
outputs={}
for stack in cf.describe_stacks()["Stacks"]:
if (stack["StackName"].startswith(stackname) and
"Outputs" in stack):
for output in stack["Outputs"]:
outputs[output["OutputKey"]]=output["OutputValue"]
return outputs
if __name__=="__main__":
props=dict([row.split("=") for row in open("app.props").read().split("\n")
if row!=''])
cf=boto3.client("cloudformation")
outputs=fetch_outputs(cf, props["AppName"])
s3=boto3.client("s3")
resp=s3.list_objects(Bucket=outputs["BucketName"])
if "Contents" in resp:
for obj in resp["Contents"]:
print (obj["Key"])
#!/usr/bin/env bash
. app.props
aws cloudformation describe-stack-events --stack-name $AppName --query "StackEvents[].{\"1.Timestamp\":Timestamp,\"2.Id\":LogicalResourceId,\"3.Type\":ResourceType,\"4.Status\":ResourceStatus,\"5.Reason\":ResourceStatusReason}"
import boto3
def fetch_outputs(cf, stackname):
outputs={}
for stack in cf.describe_stacks()["Stacks"]:
if (stack["StackName"].startswith(stackname) and
"Outputs" in stack):
for output in stack["Outputs"]:
outputs[output["OutputKey"]]=output["OutputValue"]
return outputs
if __name__=="__main__":
props=dict([row.split("=") for row in open("app.props").read().split("\n")
if row!=''])
cf=boto3.client("cloudformation")
print (fetch_outputs(cf, props["AppName"]))
#!/usr/bin/env bash
. app.props
aws cloudformation describe-stacks --stack-name $AppName --query 'Stacks[0].Outputs' --output table
#!/usr/bin/env bash
. app.props
aws cloudformation describe-stack-resources --stack-name $AppName --query "StackResources[].{\"1.Timestamp\":Timestamp,\"2.LogicalId\":LogicalResourceId,\"3.PhysicalId\":PhysicalResourceId,\"4.Type\":ResourceType,\"5.Status\":ResourceStatus}"
#!/usr/bin/env bash
aws cloudformation describe-stacks --query "Stacks[].{\"1.Name\":StackName,\"2.Status\":StackStatus}"
import boto3, json, re, sys
def fetch_outputs(cf, stackname):
outputs={}
for stack in cf.describe_stacks()["Stacks"]:
if (stack["StackName"].startswith(stackname) and
"Outputs" in stack):
for output in stack["Outputs"]:
outputs[output["OutputKey"]]=output["OutputValue"]
return outputs
if __name__=="__main__":
try:
if len(sys.argv) < 2:
raise RuntimeError("please enter n(simulations)")
nsims=sys.argv[1]
if not re.search("^\\d+$", nsims):
raise RuntimeError("n(sims) is invalid")
nsims=int(nsims)
props=dict([row.split("=") for row in open("app.props").read().split("\n")
if row!=''])
cf=boto3.client("cloudformation")
outputs=fetch_outputs(cf, props["AppName"])
machinearn=outputs["MonteCarloStepFunctionArn"]
sf=boto3.client('stepfunctions')
body={"simulationTasks": [{} for i in range(nsims)]}
print (sf.start_execution(stateMachineArn=machinearn,
input=json.dumps(body)))
except RuntimeError as error:
print ("Error: %s" % str(error))
awscli
boto3
botocore
numpy
scipy
# from pareto2.scripts.debug import hungarorise
from botocore.exceptions import ClientError
import boto3, re, os, sys, time
def hungarorise(text):
return "".join([tok.capitalize()
for tok in re.split("\\-|\\_", text)])
"""
- need to iterate through log groups as lambda may have been deployed but corresponding log group may not have been created yet
- then need to check against live functions because you are likely to have loads of old logs groups with same names but different salts
"""
def filter_log_group(logs, L, stackname, lambdaname):
def log_group_name(funcname):
return "/aws/lambda/%s" % funcname
def fetch_functions(lam, stackname):
functions, token = [], None
while True:
resp=lam.list_functions(**{"Marker": token} if token else {})
functions+=[log_group_name(fn["FunctionName"])
for fn in resp["Functions"]
if fn["FunctionName"].startswith(stackname)]
if "NextMarker" in resp:
token=resp["NextMarker"]
else:
break
return functions
def fetch_groups(logs, stackname):
prefix=log_group_name(stackname)
groupnames, token = [], None
while True:
resp=logs.describe_log_groups(**{"nextToken": token} if token else {})
groupnames+=[group["logGroupName"]
for group in resp["logGroups"]
if group["logGroupName"].startswith(prefix)]
if "nextToken" in resp:
token=resp["nextToken"]
else:
break
return groupnames
def filter_group(groupnames, lambdaname):
filtered=[groupname
for groupname in groupnames
if lambdaname in groupname]
if filtered==[]:
raise RuntimeError("no log groups found")
elif len(filtered) > 1:
raise RuntimeError("multiple log groups found")
return filtered.pop()
funcnames=fetch_functions(L, stackname)
groupnames=[groupname
for groupname in fetch_groups(logs, stackname)
if groupname in funcnames]
return filter_group(groupnames, hungarorise(lambdaname))
def fetch_log_events(logs, kwargs):
events, token = [], None
while True:
if token:
kwargs["nextToken"]=token
resp=logs.filter_log_events(**kwargs)
events+=resp["events"]
if "nextToken" in resp:
token=resp["nextToken"]
else:
break
return sorted(events,
key=lambda x: x["timestamp"])
if __name__=="__main__":
try:
"""
appname=os.environ["APP_NAME"]
if appname in ["", None]:
raise RuntimeError("app package not found")
"""
props=dict([row.split("=") for row in open("app.props").read().split("\n")
if row!=''])
appname=props["AppName"]
if len(sys.argv) < 4:
raise RuntimeError("Please enter lambda, window, query")
lambdaname, window, query = sys.argv[1:4]
if not re.search("^\\d+$", window):
raise RuntimeError("window is invalid")
window=int(window)
logs, L = boto3.client("logs"), boto3.client("lambda")
stackname=appname
groupname=filter_log_group(logs, L,
stackname=stackname,
lambdaname=lambdaname)
starttime=int(1000*(time.time()-window))
kwargs={"logGroupName": groupname,
"startTime": starttime,
"interleaved": True}
if query not in ["*", ""]:
kwargs["filterPattern"]=query
events=fetch_log_events(logs, kwargs)
for event in events:
msg=re.sub("\\r|\\n", "", event["message"])
if (msg.startswith("START") or
msg.startswith("REPORT") or
msg.startswith("END") or
"Found credentials" in msg):
continue
print (msg)
except RuntimeError as error:
print ("Error: %s" % str(error))
except ClientError as error:
print ("Error: %s" % str(error))
#!/usr/bin/env bash
export AWS_PROFILE=woldeploy
export AWS_DEFAULT_OUTPUT=table
AWSTemplateFormatVersion: '2010-09-09'
Resources:
Bucket:
Type: AWS::S3::Bucket
MonteCarloLambdaExecutionRole:
Type: 'AWS::IAM::Role'
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action: 'sts:AssumeRole'
Policies:
- PolicyName: LambdaExecutionPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- 'logs:*'
Resource: '*'
- Effect: Allow
Action:
- 's3:*'
Resource: '*'
MonteCarloStatesExecutionRole:
Type: 'AWS::IAM::Role'
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- states.amazonaws.com
Action: 'sts:AssumeRole'
Policies:
- PolicyName: LambdaExecutionPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- 'logs:*'
Resource: '*'
- Effect: Allow
Action:
- 's3:*'
Resource: '*'
- Effect: Allow
Action:
- 'lambda:*'
Resource: '*'
MonteCarloMapFunction:
Type: 'AWS::Lambda::Function'
Properties:
Code:
ZipFile: |
import json, math, random
def monte_carlo(option_type, spot=42, strike=40, time=0.5, sigma=0.2, rate=0.1, simulations=1000):
total=0
for _ in range(simulations):
stock_price_at_expiration = spot * math.exp((rate - 0.5 * sigma**2) * time + sigma * math.sqrt(time) * random.gauss(0, 1))
if option_type == 'call':
payoff = max(stock_price_at_expiration - strike, 0)
elif option_type == 'put':
payoff = max(strike - stock_price_at_expiration, 0)
else:
raise ValueError("Invalid option type. Use 'call' or 'put'.")
total+=payoff
option_price = math.exp(-rate * time) * total / simulations
return option_price
def handler(event, context):
results={"call": monte_carlo(option_type="call"),
"put": monte_carlo(option_type="put")}
return {
'statusCode': 200,
'body': json.dumps(results, indent=2)
}
Environment:
Variables:
BUCKET:
Ref: Bucket
Handler: 'index.handler'
Role: !GetAtt MonteCarloLambdaExecutionRole.Arn
Runtime: python3.10
Timeout: 300
MonteCarloReduceFunction:
Type: 'AWS::Lambda::Function'
Properties:
Code:
ZipFile: |
import boto3, os, json
from datetime import datetime
def mean(X):
return sum(X)/len(X)
def variance(X):
m=mean(X)
return mean([(x-m)**2 for x in X])
def handler(event, context):
results=[json.loads(result['body']) for result in event]
calls=[result["call"] for result in results]
puts=[result["put"] for result in results]
results={"call": {"mean": mean(calls),
"var": variance(calls),
"n": len(calls)},
"put": {"mean": mean(puts),
"var": variance(puts),
"n": len(puts)}}
s3=boto3.client("s3")
ts=datetime.utcnow().strftime("%Y-%m-%d-%H-%M-%S-%f")
s3.put_object(Bucket=os.environ["BUCKET"],
Key="results-%s.json" % ts,
Body=json.dumps(results, indent=2),
ContentType="application/json")
return {
'statusCode': 200,
'body': json.dumps(results, indent=2)
}
Environment:
Variables:
BUCKET:
Ref: Bucket
Handler: 'index.handler'
Role: !GetAtt MonteCarloLambdaExecutionRole.Arn
Runtime: python3.10
Timeout: 300
MonteCarloStepFunction:
Type: AWS::StepFunctions::StateMachine
Properties:
RoleArn: !GetAtt MonteCarloStatesExecutionRole.Arn
DefinitionString:
!Sub |
{
"Comment": "Monte Carlo Simulation State Machine",
"StartAt": "FanOut",
"States": {
"FanOut": {
"Type": "Map",
"ItemsPath": "$.simulationTasks",
"MaxConcurrency": 100,
"Iterator": {
"StartAt": "RunSimulation",
"States": {
"RunSimulation": {
"Type": "Task",
"Resource": "${MonteCarloMapFunction.Arn}",
"End": true
}
}
},
"ResultPath": "$.mapResults",
"Next": "ReduceResults"
},
"ReduceResults": {
"Type": "Task",
"Resource": "${MonteCarloReduceFunction.Arn}",
"InputPath": "$.mapResults",
"End": true
}
}
}
Outputs:
BucketName:
Value: !Ref Bucket
MonteCarloStepFunctionArn:
Value: !Ref MonteCarloStepFunction

short

  • pass arguments to map function

done

  • avoid hardcoding number of map processes
  • separate machine role from lambda role
  • script to dump s3 contents to workspace
  • test machine
  • script to clean bucket
  • script to ping stack
  • script to inspect logs
  • script to list s3
  • script to get outputs
  • test deployment
  • define bucket
  • push bucket name as env variable to reduce template
  • stack.yaml
  • shell scripts
  • step functions permissions
from botocore.exceptions import ClientError
import boto3, sys
def fetch_outputs(cf, stackname):
outputs={}
for stack in cf.describe_stacks()["Stacks"]:
if (stack["StackName"].startswith(stackname) and
"Outputs" in stack):
for output in stack["Outputs"]:
outputs[output["OutputKey"]]=output["OutputValue"]
return outputs
def s3_key_exists(s3, bucketname, key):
try:
s3.head_object(Bucket=bucketname, Key=key)
return True
except ClientError as error:
if error.response['Error']['Code']:
return False
else:
return True
if __name__=="__main__":
try:
if len(sys.argv) < 2:
raise RuntimeError("please enter s3 key")
s3key=sys.argv[1]
props=dict([row.split("=") for row in open("app.props").read().split("\n")
if row!=''])
cf=boto3.client("cloudformation")
outputs=fetch_outputs(cf, props["AppName"])
s3=boto3.client("s3")
if not s3_key_exists(s3, outputs["BucketName"], s3key):
raise RuntimeError("%s does not exist" % s3key)
print (s3.get_object(Bucket=outputs["BucketName"],
Key=s3key)["Body"].read().decode("utf-8"))
except RuntimeError as error:
print ("Error: %s" % str(error))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment