Skip to content

Instantly share code, notes, and snippets.

@umitanuki
Created December 11, 2018 21:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save umitanuki/0a80fb15dbee2c7b96056077779efd33 to your computer and use it in GitHub Desktop.
Save umitanuki/0a80fb15dbee2c7b96056077779efd33 to your computer and use it in GitHub Desktop.
from numpy import (
average,
)
from pipeline_live.engine import LivePipelineEngine
from pipeline_live.data.sources.iex import list_symbols
from pipeline_live.data.iex.pricing import USEquityPricing
from zipline.pipeline.factors import CustomFactor
from zipline.pipeline import Pipeline
from zipline.utils.numpy_utils import rolling_window
from zipline.pipeline.factors.basic import exponential_weights
class MACDHist(CustomFactor):
inputs = (USEquityPricing.close,)
params = ('fast_period', 'slow_period', 'signal_period')
def __new__(cls,
fast_period=12,
slow_period=26,
signal_period=9,
*args,
**kwargs):
if slow_period <= fast_period:
raise ValueError(
"'slow_period' must be greater than 'fast_period', but got\n"
"slow_period={slow}, fast_period={fast}".format(
slow=slow_period,
fast=fast_period,
)
)
return super(MACDHist, cls).__new__(
cls,
fast_period=fast_period,
slow_period=slow_period,
signal_period=signal_period,
window_length=slow_period + signal_period - 1,
*args, **kwargs
)
def _ewma(self, data, length):
decay_rate = 1.0 - (2.0 / (1.0 + length))
return average(
data,
axis=1,
weights=exponential_weights(length, decay_rate)
)
def compute(self, today, assets, out, close, fast_period, slow_period,
signal_period):
slow_EWMA = self._ewma(
rolling_window(close, slow_period),
slow_period
)
fast_EWMA = self._ewma(
rolling_window(close, fast_period)[-signal_period:],
fast_period
)
macd = fast_EWMA - slow_EWMA
signal = self._ewma(macd.T, signal_period)
out[:] = macd[-1, :] - signal
macdhist = MACDHist()
pipe = Pipeline({'macdsignal': macdhist}, screen=macdhist > 0)
eng = LivePipelineEngine(list_symbols)
df = eng.run_pipeline(pipe)
print(df)
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
pipeline-live = ""
ipython = "*"
ta-lib = "*"
[dev-packages]
pytest = "*"
pytest-cov = "*"
[requires]
python_version = "3.6"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment