Created
June 23, 2020 09:49
-
-
Save ftence/2c588349358367bf0f287387774c97ea to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 -u | |
# | |
# A very simple integration of Facebook Prophet for WarpScript CALL. | |
# | |
# Output the maximum number of instances of this 'callable' to spawn | |
# The absolute maximum is set in the configuration file via 'warpscript.call.maxcapacity' | |
print(16) | |
import pickle | |
from urllib.parse import quote, unquote | |
import sys | |
import os | |
from base64 import b64decode, b64encode | |
from fbprophet import Prophet | |
from datetime import datetime as dt | |
import pandas as pd | |
import traceback | |
# from https://stackoverflow.com/q/11130156 | |
class suppress_stdout_stderr(object): | |
def __init__(self): | |
self.null_fds = [os.open(os.devnull, os.O_RDWR) for x in range(2)] | |
self.save_fds = (os.dup(1), os.dup(2)) | |
def __enter__(self): | |
os.dup2(self.null_fds[0], 1) | |
os.dup2(self.null_fds[1], 2) | |
def __exit__(self, *_): | |
os.dup2(self.save_fds[0], 1) | |
os.dup2(self.save_fds[1], 2) | |
os.close(self.null_fds[0]) | |
os.close(self.null_fds[1]) | |
# Loop, reading stdin, doing our stuff and outputing to stdout | |
while True: | |
try: | |
# Suppress outputs from Prophet | |
with suppress_stdout_stderr(): | |
# Read input | |
line = sys.stdin.readline().strip() | |
args = pickle.loads(b64decode(unquote(line))) | |
# Get the timestamps and convert them to datetimes | |
timestamps = args['ticks'] | |
datetimes = list(map(lambda x: dt.utcfromtimestamp(x / 1e6), timestamps)) | |
# Get the values | |
values = args['values'] | |
# Get the datetimes to know when to forecast | |
try: | |
future_ts = args['ticks_forecast'] | |
future_dt = list(map(lambda x: dt.utcfromtimestamp(x / 1e6), future_ts)) | |
except KeyError: | |
future_ts = timestamps | |
future_dt = datetimes # No future dates to predict, use same datetimes for in-sample prediction | |
# Format the data according to prophet format | |
df = pd.DataFrame(data={'ds': datetimes, 'y': values}) | |
future = pd.DataFrame(data={'ds': future_dt}) | |
# Instantiate and fit | |
m = Prophet() | |
m.fit(df) | |
# Do the forecasting for the specified dates | |
forecast = m.predict(future) | |
# Build output | |
output = { | |
'ticks_forecast': list(map(lambda dt: (dt - dt(1970, 1, 1)).total_seconds() * 1e6, forecast['ds'].tolist())), | |
'values_forecast': forecast['yhat'].tolist(), | |
'values_forecast_lower': forecast['yhat_lower'].tolist(), | |
'values_forecast_upper': forecast['yhat_upper'].tolist(), | |
} | |
# Output result | |
print(quote(b64encode(pickle.dumps(output)))) | |
except Exception as err: | |
# If returning a content starting with ' ' (not URL encoded), then | |
# the rest of the line is interpreted as a URL encoded UTF-8 of an error message | |
# and will propagate the error to the calling WarpScript | |
print(' ' + quote(traceback.format_exc())) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment