May 1, 2015
"cells": [
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"from zipfile import ZipFile\n",
"from datetime import datetime\n",
"from pytz import timezone\n",
"import numpy as np\n",
"from numpy.lib import recfunctions\n",
"import tables as tb"
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
"outputs": [],
"source": [
"fname = '../local_data/'"
"cell_type": "markdown",
"metadata": {},
"source": [
"fields of dailyquotes file taqquote\n",
"[9] text EXCHANGE N Nyse T/Q NASDAQ\n",
"[10:25] text symbol 6+10\n",
"[26:36] bid price 7+4\n",
"[37:43] bid size (units)\n",
"[44:54] ask price 7+4\n",
"[55:61] ask size\n",
"[62] text Condition of quote\n",
"[63:66] market maker\n",
"[67] bid exchange\n",
"[68] ask aexchange\n",
"[69:84] int seqno\n",
"[85] int bbo indicator\n",
"[86] int NASDAQ BBO indocator\n",
"[87] text cancel/correction\n",
"[88] text C=CTA N=UTP\n",
"[90] text Retail interest indicator\n",
"cell_type": "markdown",
"metadata": {},
"source": [
"### Read this in"
"cell_type": "markdown",
"metadata": {
"collapsed": true
"source": [
"# Two characters are also used at the end of each line as a line indicator\n",
"widths = [9, 1, 16, 11, 7, 11, 7, 1, 4, 1, 1, 16, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]\n",
"w = np.array(widths)\n",
"cell_type": "markdown",
"metadata": {},
"source": [
"Note that we'll have 98 bytes total with the `\\r\\n` on the end."
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"cell_type": "markdown",
"metadata": {
"collapsed": false
"source": [
"# Note - we're using object here (as pandas will do this anyway), \n",
"# and we'll need to convert back to fixed width strings later\n",
"# We can get the widths from the widths list above\n",
"old_dtype = [('Time', np.datetime64),\n",
" ('Exchange', object), # |S1\n",
" ('Symbol', object), # |S16, etc.\n",
" ('Bid_Price', np.float64),\n",
" ('Bid_Size', np.int32),\n",
" ('Ask_Price', np.float64),\n",
" ('Ask_Size', np.int32),\n",
" ('Quote_Condition', object),\n",
" ('Market_Maker',, # This is blank - want to skip?\n",
" ('Bid_Exchange', object),\n",
" ('Ask_Exchange', object),\n",
" ('Sequence_Number', np.int64),\n",
" ('National_BBO_Ind', np.int8), # These aren't really numbers\n",
" ('National_BBO_Ind', np.int8), # Maybe should be string?\n",
" ('Quote_Cancel_Correction', object),\n",
" ('Source_of_Quote', object),\n",
" ('Retail_Interest_Indicator_RPI', object),\n",
" ('Short_Sale_Restriction_Indicator', object),\n",
" ('LULD_BBO_Indicator_CQS', object),\n",
" ('LULD_BBO_Indicator_UTP', object),\n",
" ('FINRA_ADF_MPID_Indicator', object),\n",
" ('SIP_generated_Message_Identifier', object),\n",
" ('National_BBO_LULD_Indicator', object)\n",
" ] # Then there's two characters for newline\n",
"# This was for pandas' screwball approach to dtype\n",
"# names = [a for a,b in dtype]\n",
"# dtype = dict(dtype)\n",
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
"outputs": [],
"source": [
"# Note that the '|' character means byte order doesn't matter, \n",
"# which it never will for \"bytes\" (which is what 'S' stands for)\n",
"initial_dtype = [('Time', 'S9'), # HHMMSSmmm, should be in Eastern Time (ET)\n",
" # ('hour', '|S2'),\n",
" # ('minute', '|S2'),\n",
" # ('second', '|S2'),\n",
" # ('msec', '|S3'),\n",
" ('Exchange', 'S1'),\n",
" ('Symbol', 'S16'),\n",
" ('Bid_Price', 'S11'), # 7.4 (fixed point)\n",
" ('Bid_Size', 'S7'),\n",
" ('Ask_Price', 'S11'), # 7.4\n",
" ('Ask_Size', 'S7'),\n",
" ('Quote_Condition', 'S1'),\n",
" ('Market_Maker', 'S4'), # This ends up getting discarded, it should always be b' '\n",
" ('Bid_Exchange', 'S1'),\n",
" ('Ask_Exchange', 'S1'),\n",
" ('Sequence_Number', 'S16'),\n",
" ('National_BBO_Ind', 'S1'),\n",
" ('NASDAQ_BBO_Ind', 'S1'),\n",
" ('Quote_Cancel_Correction', 'S1'),\n",
" ('Source_of_Quote', 'S1'),\n",
" ('Retail_Interest_Indicator_RPI', 'S1'),\n",
" ('Short_Sale_Restriction_Indicator', 'S1'),\n",
" ('LULD_BBO_Indicator_CQS', 'S1'),\n",
" ('LULD_BBO_Indicator_UTP', 'S1'),\n",
" ('FINRA_ADF_MPID_Indicator', 'S1'),\n",
" ('SIP_generated_Message_Identifier', 'S1'),\n",
" ('National_BBO_LULD_Indicator', 'S1'),\n",
" ('newline', 'S2')]"
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"# Justin and Pandas (I think) use time64, as does PyTables.\n",
"# We could use msec from beginning of day for now in an int16\n",
"# (maybe compare performance to datetime64? But dates should compress very well...)\n",
"time_col = 'Time'\n",
"convert_dtype = [\n",
" ('Bid_Price', np.float64),\n",
" ('Bid_Size', np.int32),\n",
" ('Ask_Price', np.float64),\n",
" ('Ask_Size', np.int32),\n",
" # ('Market_Maker', np.int8), # This is not currently used, and should always be b' '\n",
" ('Sequence_Number', np.int64),\n",
" # ('National_BBO_Ind', np.int8), # The _Ind fields are actually categorical - leaving as strings\n",
" # ('NASDAQ_BBO_Ind', np.int8),\n",
" ]\n",
"passthrough_strings = ['Exchange',\n",
" 'Symbol',\n",
" 'Quote_Condition',\n",
" 'Bid_Exchange',\n",
" 'Ask_Exchange',\n",
" 'National_BBO_Ind', # The _Ind fields are actually categorical - leaving as strings\n",
" 'NASDAQ_BBO_Ind',\n",
" 'Quote_Cancel_Correction',\n",
" 'Source_of_Quote',\n",
" 'Retail_Interest_Indicator_RPI',\n",
" 'Short_Sale_Restriction_Indicator',\n",
" 'LULD_BBO_Indicator_CQS',\n",
" 'LULD_BBO_Indicator_UTP',\n",
" 'FINRA_ADF_MPID_Indicator',\n",
" 'SIP_generated_Message_Identifier',\n",
" 'National_BBO_LULD_Indicator']"
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"# Lifted from blaze.pytables\n",
"def dtype_to_pytables(dtype):\n",
" \"\"\" Convert NumPy dtype to PyTable descriptor\n",
" Examples\n",
" --------\n",
" >>> from tables import Int32Col, StringCol, Time64Col\n",
" >>> dt = np.dtype([('name', 'S7'), ('amount', 'i4'), ('time', 'M8[us]')])\n",
" >>> dtype_to_pytables(dt) # doctest: +SKIP\n",
" {'amount': Int32Col(shape=(), dflt=0, pos=1),\n",
" 'name': StringCol(itemsize=7, shape=(), dflt='', pos=0),\n",
" 'time': Time64Col(shape=(), dflt=0.0, pos=2)}\n",
" \"\"\"\n",
" d = {}\n",
" for pos, name in enumerate(dtype.names):\n",
" dt, _ = dtype.fields[name]\n",
" if issubclass(dt.type, np.datetime64):\n",
" tdtype = tb.Description({name: tb.Time64Col(pos=pos)}),\n",
" else:\n",
" tdtype = tb.descr_from_dtype(np.dtype([(name, dt)]))\n",
" el = tdtype[0] # removed dependency on toolz -DJC\n",
" getattr(el, name)._v_pos = pos\n",
" d.update(el._v_colobjects)\n",
" return d"
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false
"outputs": [],
"source": [
"# The \"easy\" dtypes are the \"not datetime\" dtypes\n",
"easy_dtype = []\n",
"convert_dict = dict(convert_dtype)\n",
"for name, dtype in initial_dtype:\n",
" if name in convert_dict:\n",
" easy_dtype.append( (name, convert_dict[name]) )\n",
" elif name in passthrough_strings:\n",
" easy_dtype.append( (name, dtype) )\n",
"# PyTables will not accept np.datetime64, we hack below, but we use it to work with the blaze\n",
"# function above.\n",
"# We also shift Time to the end (while I'd rather maintain order), as it's more efficient for Dav\n",
"# given the technical debt he's already built up.\n",
"pytables_dtype = easy_dtype + [('Time', 'datetime64[ms]')]\n",
"pytables_desc = dtype_to_pytables(\n",
" np.dtype(pytables_dtype) )"
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
"outputs": [],
"source": [
"class TAQ2HDF5:\n",
" \n",
" def __init__(self, taq_fname):\n",
" self.taq_fname = taq_fname\n",
" \n",
" def convert_taq(self):\n",
" # The below doesn't work for pandas (and neither does `unzip` from the command line). Probably want to use\n",
" # something like `7z x -so 2> /dev/null` if we use pandas.\n",
" with ZipFile(self.taq_fname) as zfile:\n",
" for inside_f in zfile.filelist:\n",
" # The original filename is available as inside_f.filename\n",
" with as infile:\n",
" first = infile.readline()\n",
" # You need to use bytes to split bytes\n",
" dateish, numlines = first.split(b\":\")\n",
" numlines = int(numlines)\n",
" \n",
" # Get dates to combine with times later\n",
" # This is a little over-trusting of the spec...\n",
" self.month = int(dateish[2:4])\n",
" = int(dateish[4:6])\n",
" self.year = int(dateish[6:10])\n",
" # Should I use a context manager here?\n",
" h5_table = self.setup_hdf5(inside_f.filename, numlines)\n",
" try:\n",
" self.raw_conversion(numlines, infile, h5_table)\n",
" finally:\n",
" self.finalize_hdf5()\n",
" \n",
" def setup_hdf5(self, h5_fname_root, numlines):\n",
" # We're using aggressive compression and checksums, since this will likely stick around\n",
" # Stopping one level short of max compression - don't be greedy.\n",
" self.h5 = tb.open_file(h5_fname_root + '.h5', title=h5_fname_root, mode='w', \n",
" filters=tb.Filters(complevel=8, complib='blosc:lz4hc', fletcher32=True) )\n",
" \n",
" return self.h5.create_table('/', 'daily_quotes', description=pytables_desc, expectedrows=numlines)\n",
" \n",
" \n",
" def finalize_hdf5(self):\n",
" self.h5.close()\n",
" def process_chunk(self, all_strings):\n",
" # This is unnecessary copying\n",
" easy_converted = all_strings.astype(easy_dtype)\n",
" \n",
" # These don't have the decimal point in the TAQ file\n",
" for dollar_col in ['Bid_Price', 'Ask_Price']:\n",
" easy_converted[dollar_col] /= 10000\n",
" \n",
" # Currently, there doesn't seem to be any utility to converting to numpy.datetime64\n",
" # PyTables wants float64's corresponding to the POSIX Standar (relative to 1970-01-01, UTC)\n",
" converted_time = [datetime( self.year, self.month,, \n",
" int(raw[:2]), int(raw[2:4]), int(raw[4:6]),\n",
" int(raw[6:9]) * 1000, # msec needs to be microsec \n",
" tzinfo=timezone('US/Eastern') ).timestamp()\n",
" for raw in all_strings['Time'] ]\n",
" \n",
" # More unnecessary copying\n",
" records = recfunctions.append_fields(easy_converted, 'Time', converted_time, usemask=False)\n",
" \n",
" return records\n",
" \n",
" \n",
" # at some point, we might optimize chunksize. For now, assume PyTables is smart\n",
" def raw_conversion(self, numlines, infile, out, chunksize=None):\n",
" '''Read raw bytes from TAQ, write to HDF5'''\n",
" if chunksize is None:\n",
" chunksize = out.chunkshape[0]\n",
" \n",
" while(True):\n",
" raw_bytes = * chunksize)\n",
" if raw_bytes == '':\n",
" break\n",
" # If we use asarray with this dtype, it crashes Python! (might not be true anymore)\n",
" # ndarray gives 'S' arrays instead of chararrays (as recarray does)\n",
" all_strings = np.ndarray(chunksize, buffer=raw_bytes, dtype=initial_dtype)\n",
" # This approach doesn't work...\n",
" # out[chunk_start:chunk_stop, 1:] = all_strings[:,1:-1]\n",
" \n",
" out.append( self.process_chunk(all_strings) )"
"cell_type": "code",
"execution_count": 16,
"metadata": {
"collapsed": false
"outputs": [
"data": {
"text/plain": [
"b'075300081PA 000000000000000000000007294000000027R PP000000000007625512 C \\r\\n075300085PA 000000000000000000000006076000000010R PP000000000007625612 C \\r\\n'"
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
"source": [
"cell_type": "markdown",
"metadata": {},
"source": [
"## Let's process our file"
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"test_run = TAQ2HDF5(fname)"
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
"outputs": [],
"source": [
"%time test_run.convert_taq()"
"cell_type": "code",
"execution_count": 248,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"h5 = tb.open_file('./taqquote20140206.h5')"
"cell_type": "code",
"execution_count": 250,
"metadata": {
"collapsed": false
"outputs": [
"data": {
"text/plain": [
"array([ (b'P', b'A ', 0.0, 0, 0.0, 0, b'R', b'P', b'P', 14, b' ', b'C', b' ', b' ', b' ', b' ', b' ', b' ', b' ', 1391676960.901),\n",
" (b'P', b'A ', 0.0, 0, 72.94, 27, b'R', b'P', b'P', 76255, b' ', b'C', b' ', b' ', b' ', b' ', b' ', b' ', b' ', 1391690940.081),\n",
" (b'P', b'A ', 0.0, 0, 60.76, 10, b'R', b'P', b'P', 76256, b' ', b'C', b' ', b' ', b' ', b' ', b' ', b' ', b' ', 1391690940.085),\n",
" ...,\n",
" (b'K', b'A ', 57.79, 1, 57.92, 1, b'R', b'K', b'K', 2631333, b' ', b'C', b' ', b' ', b' ', b' ', b' ', b' ', b' ', 1391698830.251),\n",
" (b'Z', b'A ', 57.82, 3, 57.84, 1, b'R', b'Z', b'Z', 2632737, b' ', b'C', b' ', b' ', b' ', b' ', b' ', b' ', b' ', 1391698831.256),\n",
" (b'N', b'A ', 57.82, 4, 57.84, 3, b'R', b'N', b'N', 2632938, b'A', b'C', b'B', b' ', b' ', b' ', b' ', b' ', b'A', 1391698831.333)], \n",
" dtype=[('Exchange', 'S1'), ('Symbol', 'S16'), ('Bid_Price', '<f8'), ('Bid_Size', '<i4'), ('Ask_Price', '<f8'), ('Ask_Size', '<i4'), ('Quote_Condition', 'S1'), ('Bid_Exchange', 'S1'), ('Ask_Exchange', 'S1'), ('Sequence_Number', '<i8'), ('Quote_Cancel_Correction', 'S1'), ('Source_of_Quote', 'S1'), ('Retail_Interest_Indicator_RPI', 'S1'), ('Short_Sale_Restriction_Indicator', 'S1'), ('LULD_BBO_Indicator_CQS', 'S1'), ('LULD_BBO_Indicator_UTP', 'S1'), ('FINRA_ADF_MPID_Indicator', 'S1'), ('SIP_generated_Message_Identifier', 'S1'), ('National_BBO_LULD_Indicator', 'S1'), ('Time', '<f8')])"
"execution_count": 250,
"metadata": {},
"output_type": "execute_result"
"source": [
"cell_type": "code",
"execution_count": 232,
"metadata": {
"collapsed": true
"outputs": [],
"source": [
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
"outputs": [],
"source": []
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.4.3"
"nbformat": 4,
"nbformat_minor": 0
