Skip to content

Instantly share code, notes, and snippets.

View icexelloss's full-sized avatar

Li Jin icexelloss

View GitHub Profile
@icexelloss
icexelloss / gist:88195de046962e1d043c99d96e1b8b43
Created September 6, 2023 20:34
arrow_parquet_memory_repro
Status exp_read_s3_scan_raw(const std::string& fs,
const std::string& bucket,
const std::vector<std::string>& filenames,
const int dup_factor) {
// Read a S3 table directly through scan node
std::cout << "exp_read_s3_scan_raw()" << std::endl;
std::vector<std::string> dup_filenames = filenames;
for (int i = 0; i < dup_factor; ++i) {
dup_filenames.insert(dup_filenames.end(), filenames.begin(), filenames.end());
Code:
Status exp_9() {
// For local filesystem reading, file:/// may work here as well. You may
// need to #include <arrow/dataset/file_base.h>
const std::string uri_string = "ts3://";
// const std::string uri_string = "gs://";
// Filesystem and path
ARROW_ASSIGN_OR_RAISE(const std::shared_ptr<arrow::fs::FileSystem> filesystem,
arrow::fs::FileSystemFromUri(uri_string, nullptr));
-- Building using CMake version: 3.22.2
-- ---------------------
-- Boost_FOUND:
-- Boost_INCLUDE_DIRS:
-- Boost_LIBRARY_DIRS:
-- Boost_LIBRARIES:
-- ---------------------
-- Arrow version: 7.0.0 (full: '7.0.0-SNAPSHOT')
-- Arrow SO version: 700 (full: 700.0.0)
CMake Debug Log at cmake_modules/FindClangTools.cmake:50 (find_program):
import pyspark.sql.functions as F
# Returns of a particular stock.
# 1.01 means the stock goes up 1%; 0.95 means the stock goes down 5%
df = ...
# time, return
# 20180101, 1.01
# 20180102, 0.95
# 20180103, 1.05
# ...
@icexelloss
icexelloss / flint_boxcox.py
Created October 23, 2018 18:44
flint_boxcox
import pandas as pd
from scipy import stats
@udf('double')
def boxcox(v):
return pd.Series(stats.boxcox(v)[0])
df = …
# time, v
@icexelloss
icexelloss / flintcase6.py
Created October 18, 2018 19:18
FlintCase6
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
inputCols=["previous_day_return", "previous_day_decayed_return"],
outputCol="features")
output = assembler.transform(sp500_decayed_return).select('return', 'features').toDF('label', 'features')
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
@icexelloss
icexelloss / flintcase5.py
Created October 18, 2018 19:17
FlintCase5
from ts.flint import udf
@udf('double', arg_type='numpy')
def decayed(columns):
v = columns[0]
decay = np.power(0.5, np.arange(len(v)))[::-1]
return (v * decay).sum()
sp500_decayed_return = sp500_joined_return.summarizeWindows(
window = windows.past_absolute_time('7day'),
@icexelloss
icexelloss / flintcase4.py
Created October 18, 2018 19:17
FlintCase4
from ts.flint import summarizers
sp500_decayed_return = sp500_joined_return.summarizeWindows(
window = windows.past_absolute_time('7day'),
summarizer = summarizers.ewma('previous_day_return', alpha=0.5)
)
@icexelloss
icexelloss / flintcase3.py
Created October 18, 2018 19:17
FlintCase3
sp500_joined_return = sp500_return.leftJoin(sp500_previous_day_return, tolerance='3days').dropna()
@icexelloss
icexelloss / flintcase2.py
Created October 18, 2018 19:16
FlintCase2
sp500_previous_day_return = sp500_return.shiftTime(windows.future_absolute_time('1day')).toDF('time', 'previous_day_return')
sp500_joined_return = sp500_return.leftJoin(sp500_return_previous_day)