Skip to content

Instantly share code, notes, and snippets.

@goraj
Last active February 19, 2022 19:59
Show Gist options
  • Save goraj/9e4c3ea2d4908005ec33a6c52e213ee6 to your computer and use it in GitHub Desktop.
Save goraj/9e4c3ea2d4908005ec33a6c52e213ee6 to your computer and use it in GitHub Desktop.
nautilus parquet example
from nautilus_trader.backtest.data.wranglers import BarDataWrangler
from nautilus_trader.persistence.external.readers import ParquetReader
from nautilus_trader.persistence.catalog import DataCatalog
from functools import partial
from nautilus_trader.core.datetime import dt_to_unix_nanos
from nautilus_trader.model.c_enums.bar_aggregation import BarAggregation
from nautilus_trader.model.data.bar import Bar, BarType, BarSpecification
from nautilus_trader.model.enums import AggregationSource, PriceType
from nautilus_trader.persistence.external.core import process_files, write_objects
from nautilus_trader.model.identifiers import InstrumentId
from nautilus_trader.model.identifiers import Symbol
from nautilus_trader.model.identifiers import Venue
from nautilus_trader.model.instruments.equity import Equity
from nautilus_trader.model.objects import Money
from nautilus_trader.model.objects import Price
from nautilus_trader.model.objects import Quantity
from nautilus_trader.model.currencies import USD
from nautilus_trader.persistence.external.core import write_objects
from pathlib import Path
import os, shutil
def make_equity_instrument(name, exchange):
symbol=Symbol(name)
venue=Venue(exchange)
return Equity(
instrument_id=InstrumentId(symbol=symbol, venue=venue),
native_symbol=symbol,
currency=USD,
price_precision=2,
price_increment=Price.from_str("0.01"),
multiplier=Quantity.from_int(1),
lot_size=Quantity.from_int(1),
isin=None,
ts_event=0,
ts_init=0,
)
def parser(data, instrument):
bar_type = BarType(
instrument.id,
BarSpecification(1, BarAggregation.MINUTE, PriceType.LAST),
AggregationSource.EXTERNAL, # Bars are aggregated in the data already
)
wrangler = BarDataWrangler(bar_type, instrument)
bars = wrangler.process(data.set_index("datetime"))
yield from bars
CATALOG_PATH = os.getcwd() + "/catalog"
# Clear if it already exists, then create fresh
if os.path.exists(CATALOG_PATH):
shutil.rmtree(CATALOG_PATH)
os.mkdir(CATALOG_PATH)
DATA_DIR = Path("/mnt/e/equity/universe/")
catalog = DataCatalog(CATALOG_PATH)
exchange = "NYSE"
tickers = ["SPY", "XBI"]
for ticker in tickers:
instrument = make_equity_instrument(ticker, exchange)
process_files(
glob_path=DATA_DIR / f"{instrument.native_symbol}.parquet",
reader=ParquetReader(parser=partial(parser, instrument=instrument)),
catalog=catalog,
)
write_objects(catalog, [instrument])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment