Skip to content

Instantly share code, notes, and snippets.

@ftence
Created June 23, 2020 09:49
Show Gist options
  • Save ftence/2c588349358367bf0f287387774c97ea to your computer and use it in GitHub Desktop.
Save ftence/2c588349358367bf0f287387774c97ea to your computer and use it in GitHub Desktop.
#!/usr/bin/python3 -u
#
# A very simple integration of Facebook Prophet for WarpScript CALL.
#
# Output the maximum number of instances of this 'callable' to spawn
# The absolute maximum is set in the configuration file via 'warpscript.call.maxcapacity'
print(16)
import pickle
from urllib.parse import quote, unquote
import sys
import os
from base64 import b64decode, b64encode
from fbprophet import Prophet
from datetime import datetime as dt
import pandas as pd
import traceback
# from https://stackoverflow.com/q/11130156
class suppress_stdout_stderr(object):
def __init__(self):
self.null_fds = [os.open(os.devnull, os.O_RDWR) for x in range(2)]
self.save_fds = (os.dup(1), os.dup(2))
def __enter__(self):
os.dup2(self.null_fds[0], 1)
os.dup2(self.null_fds[1], 2)
def __exit__(self, *_):
os.dup2(self.save_fds[0], 1)
os.dup2(self.save_fds[1], 2)
os.close(self.null_fds[0])
os.close(self.null_fds[1])
# Loop, reading stdin, doing our stuff and outputing to stdout
while True:
try:
# Suppress outputs from Prophet
with suppress_stdout_stderr():
# Read input
line = sys.stdin.readline().strip()
args = pickle.loads(b64decode(unquote(line)))
# Get the timestamps and convert them to datetimes
timestamps = args['ticks']
datetimes = list(map(lambda x: dt.utcfromtimestamp(x / 1e6), timestamps))
# Get the values
values = args['values']
# Get the datetimes to know when to forecast
try:
future_ts = args['ticks_forecast']
future_dt = list(map(lambda x: dt.utcfromtimestamp(x / 1e6), future_ts))
except KeyError:
future_ts = timestamps
future_dt = datetimes # No future dates to predict, use same datetimes for in-sample prediction
# Format the data according to prophet format
df = pd.DataFrame(data={'ds': datetimes, 'y': values})
future = pd.DataFrame(data={'ds': future_dt})
# Instantiate and fit
m = Prophet()
m.fit(df)
# Do the forecasting for the specified dates
forecast = m.predict(future)
# Build output
output = {
'ticks_forecast': list(map(lambda dt: (dt - dt(1970, 1, 1)).total_seconds() * 1e6, forecast['ds'].tolist())),
'values_forecast': forecast['yhat'].tolist(),
'values_forecast_lower': forecast['yhat_lower'].tolist(),
'values_forecast_upper': forecast['yhat_upper'].tolist(),
}
# Output result
print(quote(b64encode(pickle.dumps(output))))
except Exception as err:
# If returning a content starting with ' ' (not URL encoded), then
# the rest of the line is interpreted as a URL encoded UTF-8 of an error message
# and will propagate the error to the calling WarpScript
print(' ' + quote(traceback.format_exc()))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment