Skip to content

Instantly share code, notes, and snippets.

@Nikasa1889
Last active April 19, 2018 08:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Nikasa1889/8f5a0c2a81bf982d28f1d31793866898 to your computer and use it in GitHub Desktop.
Save Nikasa1889/8f5a0c2a81bf982d28f1d31793866898 to your computer and use it in GitHub Desktop.
from datetime import datetime as dt
from multiprocessing import Pool
from tqdm import tqdm
from features import *
from sklearn.metrics import mean_squared_error
import sys
def cv_model(args):
'''Cross validate a predefined model with a given time series
Note: receive one args so that it can be call by worker pool easily'''
ts, model, p_test, aheads, lags, diffs, ext_series = args
r2_ans, mse_ans = [], []
n_trn = int((1-p_test)*len(ts))
ts_trn = ts.head(n_trn)
ts_tst = ts.tail(-n_trn)
_, models = train_model((None, ts_trn, model, aheads, lags, diffs, ext_series))
for i in range(5):
if i > 0:
ts_trn = ts_trn.append(ts_tst.head(200))
ts_tst = ts_tst.tail(-200)
_, preds = predict_model((None, ts_trn, models, lags, diffs, ext_series))
#r2.append(model.score(X_trn, y_trn))
mse_ans.append(mean_squared_error(preds, ts_tst.head(len(preds))))
return r2_ans, mse_ans
def exec_parallel(job_args, func, n_workers):
start_t = dt.now()
res = []
if n_workers > 1:
with Pool(n_workers) as wk_pool:
for i, r in enumerate(wk_pool.imap_unordered(func, job_args, 1)):
sys.stderr.write('\rDone {}/{} - {}%'.format(i, len(job_args), i/len(job_args)*100))
res.append(r)
else:
for job_arg in tqdm(job_args):
r = func(job_arg)
res.append(r)
print(dt.now()-start_t)
return res
# Demonstrate how to use exec_parallel(.) to run cv_model(.) func for each timeseries
# Cross validation test
AHEADS = [1,2,3,4,5,6,7,8,9,10,11]
LAGS = [0, 1, 2, 6, 11, 23, 35]
DIFFS = [0, 1] #Ndiff must < Nlags
EXT_SERIES = ['TOD', 'DOW', 'MOH']
P_TEST = 0.1
model = partial(linear_model.LinearRegression, fit_intercept=False, copy_X = False)
trn_df = read_trn_df()
ext_series = cal_ts_feats(trn_df, EXT_SERIES)
rand.seed(52)
sampled_tss = [trn_df[trn_df.columns[i_ts]] for i_ts in rand.sample(range(trn_df.shape[1]), k=100)]
job_args = [(ts, model, P_TEST, AHEADS, LAGS, DIFFS, ext_series) for ts in sampled_tss]
res = exec_parallel(job_args, cv_model, n_workers=10)
r2_res = np.array([r[0] for r in res])
mse_res = np.array([r[1] for r in res])
r2_avg, rmse_avg = np.mean(r2_res), np.sqrt(np.mean(mse_res))
print("r2={} rmse={}".format(r2_avg, rmse_avg))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment