Skip to content

Instantly share code, notes, and snippets.

@suhrmann
Created November 27, 2019 19:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save suhrmann/2014de61a6b7feae28b5afbfb43ca7eb to your computer and use it in GitHub Desktop.
Save suhrmann/2014de61a6b7feae28b5afbfb43ca7eb to your computer and use it in GitHub Desktop.
twosigma/flint/example/Flint Example.ipynb as Python script
from pyspark import SparkContext, SQLContext
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, TimestampType, DoubleType, StringType
import ts.flint
from ts.flint import FlintContext
flintContext = FlintContext(sqlContext)
sp500 = spark.read.option('header', True).option('inferSchema', True).csv('sp500.csv').withColumnRenamed('Date', 'time')
sp500 = flintContext.read.dataframe(sp500)
sp500_return = sp500.withColumn('return', 10000 * (sp500['Close'] - sp500['Open']) / sp500['Open']).select('time', 'return')
sp500_return.show()
from ts.flint import windows
sp500_previous_day_return = sp500_return.shiftTime(windows.future_absolute_time('1day')).toDF('time', 'previous_day_return')
sp500_joined_return = sp500_return.leftJoin(sp500_previous_day_return)
sp500_joined_return.show()
sp500_joined_return = sp500_return.leftJoin(sp500_previous_day_return, tolerance='3days').dropna()
sp500_joined_return.show()
from ts.flint import summarizers
sp500_decayed_return = sp500_joined_return.summarizeWindows(
window = windows.past_absolute_time('7day'),
summarizer = summarizers.ewma('previous_day_return', alpha=0.5)
)
sp500_decayed_return.show()
from ts.flint import summarizers
sp500_decayed_return = sp500_joined_return.summarizeWindows(
window = windows.past_absolute_time('7day'),
summarizer = summarizers.ewma('previous_day_return', alpha=0.5)
)
sp500_decayed_return.show()
from ts.flint import udf
import numpy as np
@udf('double', arg_type='numpy')
def decayed(columns):
v = columns[0]
decay = np.power(0.5, np.arange(len(v)))[::-1]
return (v * decay).sum()
sp500_decayed_return = sp500_joined_return.summarizeWindows(
window = windows.past_absolute_time('7day'),
summarizer = {'previous_day_return_decayed_sum': decayed(sp500_joined_return[['previous_day_return']])}
)
sp500_decayed_return.show()
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
inputCols=["previous_day_return", "previous_day_return_decayed_sum"],
outputCol="features")
output = assembler.transform(sp500_decayed_return).select('return', 'features').toDF('label', 'features')
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
model = lr.fit(output)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment