Last active
May 16, 2019 15:17
-
-
Save meghs91/944036c4a88ea3a251770b87bbf0b616 to your computer and use it in GitHub Desktop.
IBrokerage and IDataQueueHandle implemented
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Threading; | |
using NodaTime; | |
using QuantConnect.Data; | |
using QuantConnect.Data.Market; | |
using QuantConnect.Logging; | |
using QuantConnect.Packets; | |
using RestSharp; | |
using KTick = KiteConnect.Tick; | |
using Tick = QuantConnect.Data.Market.Tick; | |
namespace QuantConnect.Brokerages.MyBrokerage | |
{ | |
public partial class MyBrokerage : Brokerage, IDataQueueHandler | |
{ | |
/// <summary> | |
/// The list of ticks received | |
/// </summary> | |
private readonly List<Tick> _ticks = new List<Tick>(); | |
private readonly ConcurrentDictionary<string, Symbol> _subscribedSymbols = new ConcurrentDictionary<string, Symbol>(); | |
#region IDataQueueHandler implementation | |
/// <summary> | |
/// Get the next ticks from the live trading data queue | |
/// </summary> | |
/// <returns>IEnumerable list of ticks since the last update.</returns> | |
public IEnumerable<BaseData> GetNextTicks() | |
{ | |
lock (_ticks) | |
{ | |
var copy = _ticks.ToArray(); | |
_ticks.Clear(); | |
return copy; | |
} | |
} | |
/// <summary> | |
/// Adds the specified symbols to the subscription | |
/// </summary> | |
/// <param name="job">Job we're subscribing for:</param> | |
/// <param name="symbols">The symbols to be added keyed by SecurityType</param> | |
public void Subscribe(LiveNodePacket job, IEnumerable<Symbol> symbols) | |
{ | |
var symbolsToSubscribe = symbols.Where(x => !_subscribedSymbols.ContainsKey(x.Value)); | |
foreach (var symbol in symbolsToSubscribe.Where(CanSubscribe)) | |
{ | |
//FOLLOWING CODE JUST TO SUBSCRIBLE ONLY TCS SPECIFIC SECURITIES FOR TESTING PURPOSE | |
Log.Trace($"MyBrokerage.Subscribe(): {symbol}"); | |
Symbol sym = Symbol.Create("TCS", SecurityType.Equity, Market.USA); | |
_subscribedSymbols.TryAdd(sym.Value, symbol); | |
ticker.Subscribe(Tokens: new UInt32[] { 2953217 }); // SUBSCRIBE TO BROKER DATA FEED | |
} | |
} | |
/// <summary> | |
/// Removes the specified symbols from the subscription | |
/// </summary> | |
/// <param name="job">Job we're processing.</param> | |
/// <param name="symbols">The symbols to be removed keyed by SecurityType</param> | |
public void Unsubscribe(LiveNodePacket job, IEnumerable<Symbol> symbols) | |
{ | |
var symbolsToUnsubscribe = symbols.Where(x => _subscribedSymbols.ContainsKey(x.Value)); | |
foreach (var symbol in symbolsToUnsubscribe.Where(CanSubscribe)) | |
{ | |
Log.Trace($"MyBrokerage.Unsubscribe(): {symbol}"); | |
Symbol removed; | |
_subscribedSymbols.TryRemove(symbol.Value, out removed); | |
} | |
ticker.UnSubscribe(symbolsToUnsubscribe.Select((Symbol arg) => UInt32.Parse(arg.Value)).ToArray()); | |
} | |
/// <summary> | |
/// Returns true if this brokerage supports the specified symbol | |
/// </summary> | |
private static bool CanSubscribe(Symbol symbol) | |
{ | |
// ignore unsupported security types | |
if (symbol.ID.SecurityType != SecurityType.Equity) | |
return false; | |
return symbol.Value.ToLower().IndexOf("universe", StringComparison.Ordinal) == -1; | |
} | |
/// <summary> | |
/// Event handler for streaming trade ticks | |
/// </summary> | |
/// <param name="trade">The data object containing the received tick</param> | |
private void OnTradeReceived(KTick trade) | |
{ | |
Symbol symbol = Symbol.Create("TCS", SecurityType.Equity, Market.NSE); | |
var time = DateTime.Now; | |
// live ticks timestamps must be in exchange time zone | |
DateTimeZone exchangeTimeZone; | |
if (!_symbolExchangeTimeZones.TryGetValue(key: symbol, value: out exchangeTimeZone)) | |
{ | |
exchangeTimeZone = _marketHours.GetExchangeHours(Market.USA, symbol, SecurityType.Equity).TimeZone; | |
_symbolExchangeTimeZones.Add(symbol, exchangeTimeZone); | |
} | |
time = time.ConvertFromUtc(exchangeTimeZone); | |
var tick = new Tick(System.DateTime.Now, symbol, trade.LastPrice, trade.LastPrice, trade.LastPrice) | |
{ | |
TickType = TickType.Trade, | |
Quantity = trade.LastQuantity | |
}; | |
//WE RECEIVED DATA HERE | |
Log.Trace($"Tick : { tick.LastPrice}"); | |
lock (_ticks) | |
{ | |
_ticks.Add(tick); | |
} | |
} | |
#endregion | |
#region IBrokerage implementation | |
public MyBrokerage(IOrderProvider orderProvider, ISecurityProvider securityProvider, string appID, string appSecret, string accessToken, string userID) : base("MyBrokerage") | |
{ | |
MyAPIKey = appID; | |
MySecret = appSecret; | |
MyAccessToken = accessToken; | |
MyUserId = userID; | |
kite = new Kite(MyAPIKey, Debug: true); | |
kite.SetAccessToken(MyAccessToken); | |
_orderProvider = orderProvider; | |
_securityProvider = securityProvider; | |
_marketHours = MarketHoursDatabase.FromDataFolder(); | |
} | |
/// <summary> | |
/// Connects the client to the broker's remote servers | |
/// </summary> | |
public override void Connect() | |
{ | |
ticker.OnTick += (TickData) => OnTick(TickData); | |
} | |
//Other stub method implemented | |
#endregion | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
implemented IBrokerage and IDataQueueHandler In the algorithm, ondata method does not called,
public override void OnData(Slice slice)
private void OnTradeReceived(Tick trade)