Skip to content

Instantly share code, notes, and snippets.

@bboyle1234
Created March 11, 2015 15:39
Show Gist options
  • Save bboyle1234/11b7f0f47e76e070d969 to your computer and use it in GitHub Desktop.
Save bboyle1234/11b7f0f47e76e070d969 to your computer and use it in GitHub Desktop.
TickDataContext
using ApexSolid;
using ApexSolid.Data;
using ApexInvesting.Platform;
using NinjaTrader.Data;
using NinjaTrader.Indicator;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using NinjaTrader.Cbi;
using ApexInvesting.Platform.GlobalModels.Market;
namespace ApexTools.DataProvider {
/// <summary>
/// Create this object to get a source of tick data for asynchronous processing
/// </summary>
public class TickDataContext : IDataContext {
/// <summary>
/// Fires when a dataseries has been obtained, just before processing begins.
/// Note that at the time this event fires, the first ticks have not yet been added
/// to the processing queue.
/// </summary>
public event Action<IDataContext> Ready;
/// <summary>
/// Fires when a error puts this object into an unusable state
/// </summary>
public event Action<IDataContext> Error;
/// <summary>
/// The instrument this object supplies tick data for
/// </summary>
readonly Instrument Instrument;
/// <summary>
/// The thread that pumps ticks out of the NinjaTrader bars series into the TickDataQueue
/// </summary>
readonly Thread TickPumpThread;
/// <summary>
/// The SolidTickDataQueue used to provide asynchronous access to ninjatrader tick data
/// </summary>
readonly SolidTickDataQueue TickQueue = new SolidTickDataQueue();
/// <summary>
/// A reference to the ninjatrader bar series. The tick pump reads ticks from this.
/// </summary>
Bars bars;
/// <summary>
/// Stores the tick processing progress. This variable holds the index of the NinjaTrader 1-tick
/// bar that was last read.
/// </summary>
int indexOfLastTickRead = -1;
public DataContextStates State {
get;
private set;
}
public TimeSpan TimeSinceLastUpdate {
get { return DateTime.Now.Subtract(LastUpdateAtLocal); }
}
public DateTime LastUpdateAtLocal {
get;
private set;
}
public DateTime LastTickAtLocal {
get;
private set;
}
public double ProcessingCompletionRatio {
get {
if (null != LastError)
return 0;
if (null == bars || bars.Count <= 1 || indexOfLastTickRead < 0)
return 0;
return indexOfLastTickRead / bars.Count - 1;
}
}
public int NumUnprocessedTicks {
get {
if (null != LastError)
return 0;
if (null == bars || bars.Count == 0)
return 0;
return bars.Count - 1 - indexOfLastTickRead;
}
}
public Exception LastError {
get;
private set;
}
public bool IsDisposed {
get;
private set;
}
public TickDataContext(Instrument instrument) {
Instrument = instrument;
State = DataContextStates.Loading;
TickPumpThread = new Thread(RunTickPump);
TickPumpThread.IsBackground = true;
TickPumpThread.Start();
}
/// <summary>
/// Main method for the tick pumping thread.
/// This method loads the ninjatrader bars and pumps them into a tick pool.
/// </summary>
void RunTickPump() {
try {
GetBars();
Ready.TryInvoke(this);
PumpTicks();
} catch (ThreadAbortException x) { // don't do OnError if the thread is simply aborting due to Disposal
} catch (Exception x) {
OnError(x);
}
}
void GetBars() {
// actually get the bars ... this can take a while
bars = TickDataProvider.GetBars(Instrument);
// if an exception has not been thrown, we were succesful getting the bars
// ... execution can continue
// since we have the bars, we need to create a hook so that
// this object goes into error state if the bars are invalidated
// by a reload event
TickDataProvider.BarsReload += TickDataProvider_BarsReload;
}
void PumpTicks() {
while (!IsDisposed) {
// get the index of the first ninjatrader tick bar containing an unprocessed tick
var indexStart = indexOfLastTickRead + 1;
// get the index of the last ninjatrader tick bar containing an unprocessed tick
var indexEnd = bars.Count - 1;
// only proceed if there are unprocessed ticks
if (indexEnd >= indexStart) {
// create the index variable that will be incremented with each tick we process
var index = indexStart;
// create storage for the ticks we process
var newTicks = new List<SolidTick>(indexEnd - indexStart + 1);
// now extract all the ticks we can
for (; index <= indexEnd; index++) {
// sometimes the ninjatrader bars has storage added (increasing the bars.Count property) before the bar is
// actually ready to be processed. If we try to read the new bar while it's in this state, NinjaTrader
// will throw an exception. If this happens, we need to understand that NinjaTrader needs a little more time
// to get the new bar into a state where it can be read. Therefore we need to stop trying to read right up
// to bars.Count-1
try {
// get data out of the bar object and store it in a SolidTick object
newTicks.Add(new SolidTick { Price = bars.GetClose(index), Volume = bars.GetVolume(index), TimestampLocal = bars.GetTime(index) });
} catch {
// NinjaTrader threw an exception indicating that the bar is not yet ready for us to read it
break;
}
}
// because ninjatrader may have thrown that exception, there may be no ticks to process
// therefore we check first
if (newTicks.Count > 0) {
// push the new ticks into the SolidTickQueue so they can be asynchronously processed
TickQueue.PushTicks(newTicks);
// set the index of the last bar read ... NOTE this is not always equal to bars.Count - 1 due
// to the ninjatrader exception. That's why we had to store the "index" variable outside of the
// for loop above
indexOfLastTickRead = index - 1;
// set some other status properties
LastUpdateAtLocal = DateTime.Now;
LastTickAtLocal = newTicks[newTicks.Count - 1].TimestampLocal;
}
}
// rest and give more ticks time to arrive
Thread.Sleep(50);
}
}
/// <summary>
/// Called when the bars being processed are invalidated by a bars reloading event.
/// Causes this object to go into an error state
/// </summary>
void TickDataProvider_BarsReload(string instrumentId) {
if (instrumentId == Instrument.Id) {
OnError(new BarsWereReloadedException());
}
}
/// <summary>
/// Called when some sort of error causes this object to become unusable
/// </summary>
void OnError(Exception x) {
// set some status properties
LastError = x;
State = DataContextStates.Error;
// hopefully whoever listens to this event won't take forever to finish
Error.TryInvoke(this);
// finish up
Dispose();
}
public void Dispose() {
if (!IsDisposed) {
IsDisposed = true;
TickDataProvider.BarsReload -= TickDataProvider_BarsReload;
TickPumpThread.Abort();
}
}
#region class TickDataProvider
/// <summary>
/// A central provider for 1-tick dataseries, opening just one dataseries per instrument to keep NinjaTrader
/// running efficiently and providing instant access to any dataseries that has already been loaded once.
/// </summary>
/// <remarks>
/// Designed to be accessed only by the TickDataContext objects.
/// Public methods are not to be accessed by any other code
/// </remarks>
static class TickDataProvider {
/// <summary>
/// Notifies all TickDataContext objects that a bars reload event has occurred.
/// The TickDataContext objects will need to dispose themselves and register their "Error" state
/// </summary>
public static event Action<string> BarsReload;
/// <summary>
/// The start time of all dataseries to be loaded
/// </summary>
static readonly DateTime from = DateTime.Now.Date.AddDays(-120);
/// <summary>
/// The to time of all dataseries to be loaded
/// </summary>
static readonly DateTime to = DateTime.Now.Date.AddDays(10);
/// <summary>
/// The period of all dataseries to be loaded
/// </summary>
static readonly Period period = new Period(PeriodType.Tick, 1, MarketDataType.Last);
/// <summary>
/// The session of all dataseries to be loaded
/// </summary>
static readonly Session session = Session.String2Session("Default 24/7");
/// <summary>
/// Synchronization locks for accessing dataseries entries
/// </summary>
static readonly Dictionary<string, object> entryLocks = new Dictionary<string, object>();
/// <summary>
/// Synchronization locks for loading dataseries entries
/// </summary>
static readonly Dictionary<string, object> entryLoadingLocks = new Dictionary<string, object>();
/// <summary>
/// All the dataseries entries
/// </summary>
static readonly Dictionary<string, TickDataEntry> entries = new Dictionary<string, TickDataEntry>();
/// <summary>
/// Stores the bar reloads that have occured recently
/// </summary>
static readonly Dictionary<string, bool> recentBarReloads = new Dictionary<string, bool>();
/// <summary>
/// Gets an entry synchronization lock object for the given instrument id.
/// Creates the lock object if it is not already stored in the EntryLocks dictionary.
/// </summary>
static object GetEntryLock(string instrumentId) {
// a bit of syntactical magic using IDictionary extension methods to create the object
// if it doesn't already exist in the dictionary
return entryLocks.GetWithConstructor(instrumentId, (key) => new object());
}
/// <summary>
/// Gets an entry loading synchronization lock object for the given instrument id.
/// Creates the lock object if it is not already stored in the EntryLoadingLocks dictionary.
/// </summary>
static object GetEntryLoadingLock(string instrumentId) {
// a bit of syntactical magic using IDictionary extension methods to create the object
// if it doesn't already exist in the dictionary
return entryLoadingLocks.GetWithConstructor(instrumentId, (key) => new object());
}
/// <summary>
/// Sets a flag that indicates whether a bar reload has recently occured for the given instrument
/// </summary>
static void SetBarReloadFlag(string instrumentId) {
recentBarReloads[instrumentId] = true;
}
/// <summary>
/// Clears the flag that indicates whether a bar reload has recently occured for the given instrument
/// </summary>
static void ClearBarReloadFlag(string instrumentId) {
recentBarReloads.Remove(instrumentId);
}
/// <summary>
/// Checks the flag that indicates whether a bar reload has recently occured for the given instrument
/// </summary>
static bool IsBarReloadFlagSet(string instrumentId) {
return recentBarReloads.ContainsKey(instrumentId);
}
static TickDataProvider() {
// subscribe to BarsReload event so that we can
// invalidate all the dataseries that are no longer attached to live market data
Bars.BarsReload += Bars_BarsReload;
}
/// <summary>
/// Responds to an event indicating that the dataseries has been detached from live market data.
/// Disposes the dataseries and removes its entry.
/// Notifies TickDataContext objects that their data is invalidated.
/// </summary>
static void Bars_BarsReload(object sender, BarsReloadEventArgs e) {
// start by recording the fact that this instrument has been reloaded.
// we need to do this because if bars are currently being loaded, for this instrument,
// we need to dispose those bars and reload again.
SetBarReloadFlag(e.Instrument.Id);
// wait for access to the entry object - this should be very fast, maximum 100ns
lock (GetEntryLock(e.Instrument.Id)) {
TickDataEntry entry;
if (entries.TryGetValue(e.Instrument.Id, out entry)) {
// remove the entry from our store
entries.Remove(e.Instrument.Id);
// dispose the dataseries so ninjatrader will no longer maintain it
entry.Bars.Dispose();
}
}
// notify all TickDataContext objects that their dataseries has just been invalidated
BarsReload.TryInvoke(e.Instrument.Id);
}
/// <summary>
/// Gets a 1-tick dataseries for the requested instrument.
/// Returns an existing dataseries if it exists. Otherwise loads a new dataseries.
/// If a new dataseries must be loaded, the calling thread will be blocked for some time (perhaps up to a minute)
/// while NinjaTrader loads the data.
/// </summary>
/// <remarks>
/// Notice how this function takes care not to lock on the EntryLock for more than a few nanoseconds.
/// This is so that we never block NinjaTrader when it raises the Bar.BarsReload event, which also
/// waits on the EntryLock.
/// The desire to not block NinjaTrader from raising events while we were loading bars
/// is the reason for the use of the EntryLoadingLocks.
/// This method is ONLY for use by the TickDataContext object ... if you're using it
/// in your code, you're probably breaking something.
/// </remarks>
public static Bars GetBars(Instrument instrument) {
TickDataEntry entry;
// wait for access to the entry object - this should be very fast, maximum 100ns
lock (GetEntryLock(instrument.Id)) {
// if the entry exists, return the existing bar series.
if (entries.TryGetValue(instrument.Id, out entry)) {
return entry.Bars;
}
}
// entry does not exist - we have to load it.
// make sure the loading is only done once by preventing simultaneous access
lock (GetEntryLoadingLock(instrument.Id)) {
// if we had to wait on another thread before entering, it would have been
// because the other thread was loading a dataseries. Let's check once more
// to see if the entry has already been created by the thread we were waiting on
lock (GetEntryLock(instrument.Id)) {
// if the entry exists, return the existing bar series.
if (entries.TryGetValue(instrument.Id, out entry)) {
return entry.Bars;
}
}
ClearBarReloadFlag(instrument.Id);
// this will take a while - it's going to block the calling thread
var bars = Bars.GetBars(instrument, period, from, to, session, true, true);
// check if NinjaTrader has raised the Bars.BarsLoad event while the bars were loading
// in the line of code above.
if (IsBarReloadFlagSet(instrument.Id)) {
// get rid of these bars as we can't use them
// I'm actually not sure what happens in the Bars.GetBars function in this situation
// Perhaps it returns null, perhaps it throws an exception ... testing could provide
// more information
if (null != bars)
bars.Dispose();
// throw an exception that will result in the calling TickDataContext object having an Error state
throw new BarsWereReloadedDuringLoadException();
}
// create an entry object
entry = new TickDataEntry {
InstrumentId = instrument.Id,
Bars = bars,
};
// now add the entry to the entry store and return the bars
lock (GetEntryLock(instrument.Id)) {
entries[instrument.Id] = entry;
return entry.Bars;
}
}
}
}
#endregion
#region class TickDataEntry
/// <summary>
/// Contains all related data for a given instance of 1-tick dataseries stored in TickDataProvider
/// </summary>
class TickDataEntry {
public string InstrumentId;
public Bars Bars;
}
#endregion
#region class BarsWereReloadedDuringLoadException
/// <summary>
/// Exception indicates that NinjaTrader raised the Bars.BarsReload event while the
/// TickDataContext was loading bars.
/// </summary>
class BarsWereReloadedDuringLoadException : Exception {
public BarsWereReloadedDuringLoadException() : base("NinjaTrader raised a BarLoad event while these bars were being loaded.") { }
}
#endregion
#region class BarsWereReloadedException
/// <summary>
/// Exception indicatores that NinjaTrader rasied the Bars.BarsReload event while the
/// TickDataContext was operating normally.
/// </summary>
class BarsWereReloadedException : Exception {
public BarsWereReloadedException() : base("Bars were reloaded.") { }
}
#endregion
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment