Skip to content

Instantly share code, notes, and snippets.

@kpmiller
Last active April 16, 2019 12:36
Show Gist options
  • Save kpmiller/d7c9bd7a2855f5dd4af283492802ad6c to your computer and use it in GitHub Desktop.
Save kpmiller/d7c9bd7a2855f5dd4af283492802ad6c to your computer and use it in GitHub Desktop.
python 2.7 code that reads csv files saved from thinkorswim and applies a scan criteria to it
#!/usr/bin/python
import csv, json, sys, re, os
import glob
try:
loglevel = int(os.environ["LOGLEVEL"])
except:
loglevel = 1
def Log(s, level=1):
if level <= loglevel:
print s
def CheckFile(filename = None, volumefilter = 100):
optcsv = []
f = open(filename, "r")
for line in csv.reader(f):
optcsv.append(line)
f.close()
symbol = ""
optd = {}
key = ""
opts = {}
underlying = 0.0
state=0
for line in optcsv:
#state 0: looking for first line for quote symbol
#state 10: looking for "UNDERLYING"
#state 11: eating underlying header
#state 12: taking underlying
#state 1: looking for a date line with a () days to exp
#state 2: store header, go to 3
#state 3: eating lines until empty line, then end date, reset key and opts
Log(str(line), 900)
Log(str(state), 900)
if state == 0:
m = re.match(".*option quote for (\w*) on ", line[0])
if m != None:
symbol = m.group(1)
Log("\nScanning " + symbol, level = 1)
state = 10
elif state == 10:
if len(line)>0 and line[0].rfind ("UNDERLYING") != -1:
state = 11
elif state == 11:
state = 12
elif state == 12:
underlying = float(line[0])
Log("Quote " + str(underlying), level = 1)
state = 1
elif state == 1:
if len(line)>0:
m = re.match("\d* ... .. \((\d*)\)", line[0])
if m != None:
key = line[0]
state = 2
elif state == 2:
if len(line) == 0:
state = 1
else:
header = line
state = 3
elif state == 3:
if len(line) == 0:
optd[key] = opts
opts = {}
key = ""
state = 1
else:
if line[2] != "<empty>":
puts = {}
calls = {}
strike = ""
exp = ""
isCalls = True
#Make a dictionary based on the header instead of
#by csv position. Things that come before "Strike" will get
#added to calls, after get added to puts
for i in range(0,len(header)):
if len(header[i]) == 0:
pass
elif header[i] == "Exp":
exp = line[i]
elif header[i] == "Strike":
strike = line[i]
isCalls = False
elif isCalls:
calls[header[i]] = line[i]
else:
puts[header[i]] = line[i]
# ['', '', 'Mark', 'Volume', 'Open.Int', 'Prob.ITM', 'Delta', 'BID', 'BX', 'ASK', 'AX', 'Exp', 'Strike', 'BID', 'BX', 'ASK', 'AX', 'Mark', 'Volume', 'Open.Int', 'Prob.ITM', 'Delta', '', '']
# 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
try:
st = float(strike)
cm = float(calls["Mark"])
cp = float(calls["Prob.ITM"].replace("%",""))
co = int(calls["Open.Int"].replace(",", ""))
cv = int(calls["Volume"].replace(",", ""))
if (co > cv): cv = co
pm = float(puts["Mark"])
pp = float(puts["Prob.ITM"].replace("%",""))
po = int(puts["Open.Int"].replace(",", ""))
pv = int(puts["Volume"].replace(",", ""))
if (po > pv): pv = po
opts[st] = {
"strike" : st,
"callmark" : cm,
"probcall" : cp,
"callvol" : cv,
"putmark" : pm,
"probput" : pp,
"putvol" : pv
}
except:
print "conversion error"
optseries = optd.keys()
optseries.sort()
#look for call spreads of probability 65% or greater
for key in optseries:
m = re.match("\d* ... .. \((\d*)\)", key)
daysout = int(m.group(1))
if daysout > 75:
Log(key, 9)
Log("Option date too far away (%d days)\n" % daysout, 9)
continue
Log (key, 1)
strikes = optd[key]
strikeskeys = strikes.keys()
strikeskeys.sort()
#look for call spreads at $1
for strikekey in strikeskeys:
info = strikes[strikekey]
prob = info["probcall"]
vol = info["callvol"]
mark = info["callmark"]
Log(info, 9)
if strikekey < underlying:
Log("%.02f strike is ITM" % strikekey, 3)
elif vol < volumefilter:
Log("%.02f fails volume test" % strikekey, 3)
elif (prob > 35.0) or (prob < 5.0):
Log("%.02f fails probability test" % strikekey, 3)
else:
#use integers to avoid float weirdness
Log("Looking for matches for strike %f" % strikekey, 3)
st = int(strikekey*100.0)
for sk2 in strikeskeys:
st2 = int(sk2*100.0)
Log("Looking for $1 spread for %d, checking %d"%(st, st2), 9)
if st2 == st + 100:
info2 = strikes[sk2]
targetprice = (float(prob) / 100.0)
price = mark - info2["callmark"]
if price < targetprice:
Log("%s rejected: %0.2f/%0.2f call (%0.2f/%0.2f), ITM probability %.02f%%, profit %.02f is too low" % (symbol, strikekey, sk2, mark, info2["callmark"], prob, price), 2)
elif info2["callvol"] < volumefilter:
Log("%s rejected: %0.2f/%0.2f call (%0.2f/%0.2f), ITM probability %.02f%%, volume %d of further spread too low " % (symbol, strikekey, sk2, mark, info2["callmark"], prob, info2["callvol"]), 2)
else:
Log("%s candidate: %0.2f/%0.2f call (%0.2f/%0.2f), probability %.02f%%" % (symbol, strikekey, sk2, mark, info2["callmark"], prob), 1)
break
#look for put spreads at $1
for strikekey in strikeskeys:
info = strikes[strikekey]
prob = info["probput"]
vol = info["putvol"]
mark = info["putmark"]
Log(info, 9)
if strikekey > underlying:
Log("%.02f strike is ITM" % strikekey, 3)
elif vol < volumefilter:
Log("%.02f fails volume test" % strikekey, 3)
elif (prob > 35.0) or (prob < 5.0):
Log("%.02f fails probability test" % strikekey, 3)
else:
#use integers to avoid float weirdness
Log("Looking for matches for strike %f" % strikekey, 3)
st = int(strikekey*100.0)
for sk2 in strikeskeys:
st2 = int(sk2*100.0)
Log("Looking for $1 spread for %d, checking %d"%(st, st2), 9)
if st2 == st - 100:
info2 = strikes[sk2]
targetprice = (float(prob) / 100.0)
price = mark - info2["putmark"]
if price < targetprice:
Log("%s rejected: %0.2f/%0.2f put (%0.2f/%0.2f), ITM probability %.02f%%, profit %.02f is too low" % (symbol, strikekey, sk2, mark, info2["putmark"], prob, price), 2)
elif info2["putmark"] < volumefilter:
Log("%s rejected: %0.2f/%0.2f put (%0.2f/%0.2f), ITM probability %.02f%%, volume %d of further spread too low " % (symbol, strikekey, sk2, mark, info2["putmark"], prob, info2["putvol"]), 2)
else:
Log("%s candidate: %0.2f/%0.2f put (%0.2f/%0.2f), probability %.02f%%" % (symbol, strikekey, sk2, mark, info2["putmark"], prob), 1)
break
Log("", 1) #line break
if len(sys.argv) == 1:
scandir = os.path.abspath(os.path.expanduser("~/Desktop"))
else:
scandir = os.path.abspath(os.path.expanduser(sys.argv[1]))
files = glob.glob(scandir+"/*.csv")
for f in files:
CheckFile(f)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment