Created
November 27, 2019 19:00
-
-
Save suhrmann/2014de61a6b7feae28b5afbfb43ca7eb to your computer and use it in GitHub Desktop.
twosigma/flint/example/Flint Example.ipynb as Python script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark import SparkContext, SQLContext | |
from pyspark.sql import Row | |
from pyspark.sql.types import StructType, StructField, TimestampType, DoubleType, StringType | |
import ts.flint | |
from ts.flint import FlintContext | |
flintContext = FlintContext(sqlContext) | |
sp500 = spark.read.option('header', True).option('inferSchema', True).csv('sp500.csv').withColumnRenamed('Date', 'time') | |
sp500 = flintContext.read.dataframe(sp500) | |
sp500_return = sp500.withColumn('return', 10000 * (sp500['Close'] - sp500['Open']) / sp500['Open']).select('time', 'return') | |
sp500_return.show() | |
from ts.flint import windows | |
sp500_previous_day_return = sp500_return.shiftTime(windows.future_absolute_time('1day')).toDF('time', 'previous_day_return') | |
sp500_joined_return = sp500_return.leftJoin(sp500_previous_day_return) | |
sp500_joined_return.show() | |
sp500_joined_return = sp500_return.leftJoin(sp500_previous_day_return, tolerance='3days').dropna() | |
sp500_joined_return.show() | |
from ts.flint import summarizers | |
sp500_decayed_return = sp500_joined_return.summarizeWindows( | |
window = windows.past_absolute_time('7day'), | |
summarizer = summarizers.ewma('previous_day_return', alpha=0.5) | |
) | |
sp500_decayed_return.show() | |
from ts.flint import summarizers | |
sp500_decayed_return = sp500_joined_return.summarizeWindows( | |
window = windows.past_absolute_time('7day'), | |
summarizer = summarizers.ewma('previous_day_return', alpha=0.5) | |
) | |
sp500_decayed_return.show() | |
from ts.flint import udf | |
import numpy as np | |
@udf('double', arg_type='numpy') | |
def decayed(columns): | |
v = columns[0] | |
decay = np.power(0.5, np.arange(len(v)))[::-1] | |
return (v * decay).sum() | |
sp500_decayed_return = sp500_joined_return.summarizeWindows( | |
window = windows.past_absolute_time('7day'), | |
summarizer = {'previous_day_return_decayed_sum': decayed(sp500_joined_return[['previous_day_return']])} | |
) | |
sp500_decayed_return.show() | |
from pyspark.ml.regression import LinearRegression | |
from pyspark.ml.feature import VectorAssembler | |
assembler = VectorAssembler( | |
inputCols=["previous_day_return", "previous_day_return_decayed_sum"], | |
outputCol="features") | |
output = assembler.transform(sp500_decayed_return).select('return', 'features').toDF('label', 'features') | |
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8) | |
model = lr.fit(output) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment