Skip to content

Instantly share code, notes, and snippets.

@meghs91
Last active May 16, 2019 15:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save meghs91/944036c4a88ea3a251770b87bbf0b616 to your computer and use it in GitHub Desktop.
Save meghs91/944036c4a88ea3a251770b87bbf0b616 to your computer and use it in GitHub Desktop.
IBrokerage and IDataQueueHandle implemented
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using NodaTime;
using QuantConnect.Data;
using QuantConnect.Data.Market;
using QuantConnect.Logging;
using QuantConnect.Packets;
using RestSharp;
using KTick = KiteConnect.Tick;
using Tick = QuantConnect.Data.Market.Tick;
namespace QuantConnect.Brokerages.MyBrokerage
{
public partial class MyBrokerage : Brokerage, IDataQueueHandler
{
/// <summary>
/// The list of ticks received
/// </summary>
private readonly List<Tick> _ticks = new List<Tick>();
private readonly ConcurrentDictionary<string, Symbol> _subscribedSymbols = new ConcurrentDictionary<string, Symbol>();
#region IDataQueueHandler implementation
/// <summary>
/// Get the next ticks from the live trading data queue
/// </summary>
/// <returns>IEnumerable list of ticks since the last update.</returns>
public IEnumerable<BaseData> GetNextTicks()
{
lock (_ticks)
{
var copy = _ticks.ToArray();
_ticks.Clear();
return copy;
}
}
/// <summary>
/// Adds the specified symbols to the subscription
/// </summary>
/// <param name="job">Job we're subscribing for:</param>
/// <param name="symbols">The symbols to be added keyed by SecurityType</param>
public void Subscribe(LiveNodePacket job, IEnumerable<Symbol> symbols)
{
var symbolsToSubscribe = symbols.Where(x => !_subscribedSymbols.ContainsKey(x.Value));
foreach (var symbol in symbolsToSubscribe.Where(CanSubscribe))
{
//FOLLOWING CODE JUST TO SUBSCRIBLE ONLY TCS SPECIFIC SECURITIES FOR TESTING PURPOSE
Log.Trace($"MyBrokerage.Subscribe(): {symbol}");
Symbol sym = Symbol.Create("TCS", SecurityType.Equity, Market.USA);
_subscribedSymbols.TryAdd(sym.Value, symbol);
ticker.Subscribe(Tokens: new UInt32[] { 2953217 }); // SUBSCRIBE TO BROKER DATA FEED
}
}
/// <summary>
/// Removes the specified symbols from the subscription
/// </summary>
/// <param name="job">Job we're processing.</param>
/// <param name="symbols">The symbols to be removed keyed by SecurityType</param>
public void Unsubscribe(LiveNodePacket job, IEnumerable<Symbol> symbols)
{
var symbolsToUnsubscribe = symbols.Where(x => _subscribedSymbols.ContainsKey(x.Value));
foreach (var symbol in symbolsToUnsubscribe.Where(CanSubscribe))
{
Log.Trace($"MyBrokerage.Unsubscribe(): {symbol}");
Symbol removed;
_subscribedSymbols.TryRemove(symbol.Value, out removed);
}
ticker.UnSubscribe(symbolsToUnsubscribe.Select((Symbol arg) => UInt32.Parse(arg.Value)).ToArray());
}
/// <summary>
/// Returns true if this brokerage supports the specified symbol
/// </summary>
private static bool CanSubscribe(Symbol symbol)
{
// ignore unsupported security types
if (symbol.ID.SecurityType != SecurityType.Equity)
return false;
return symbol.Value.ToLower().IndexOf("universe", StringComparison.Ordinal) == -1;
}
/// <summary>
/// Event handler for streaming trade ticks
/// </summary>
/// <param name="trade">The data object containing the received tick</param>
private void OnTradeReceived(KTick trade)
{
Symbol symbol = Symbol.Create("TCS", SecurityType.Equity, Market.NSE);
var time = DateTime.Now;
// live ticks timestamps must be in exchange time zone
DateTimeZone exchangeTimeZone;
if (!_symbolExchangeTimeZones.TryGetValue(key: symbol, value: out exchangeTimeZone))
{
exchangeTimeZone = _marketHours.GetExchangeHours(Market.USA, symbol, SecurityType.Equity).TimeZone;
_symbolExchangeTimeZones.Add(symbol, exchangeTimeZone);
}
time = time.ConvertFromUtc(exchangeTimeZone);
var tick = new Tick(System.DateTime.Now, symbol, trade.LastPrice, trade.LastPrice, trade.LastPrice)
{
TickType = TickType.Trade,
Quantity = trade.LastQuantity
};
//WE RECEIVED DATA HERE
Log.Trace($"Tick : { tick.LastPrice}");
lock (_ticks)
{
_ticks.Add(tick);
}
}
#endregion
#region IBrokerage implementation
public MyBrokerage(IOrderProvider orderProvider, ISecurityProvider securityProvider, string appID, string appSecret, string accessToken, string userID) : base("MyBrokerage")
{
MyAPIKey = appID;
MySecret = appSecret;
MyAccessToken = accessToken;
MyUserId = userID;
kite = new Kite(MyAPIKey, Debug: true);
kite.SetAccessToken(MyAccessToken);
_orderProvider = orderProvider;
_securityProvider = securityProvider;
_marketHours = MarketHoursDatabase.FromDataFolder();
}
/// <summary>
/// Connects the client to the broker's remote servers
/// </summary>
public override void Connect()
{
ticker.OnTick += (TickData) => OnTick(TickData);
}
//Other stub method implemented
#endregion
}
}
@meghs91
Copy link
Author

meghs91 commented May 16, 2019

implemented IBrokerage and IDataQueueHandler In the algorithm, ondata method does not called,
public override void OnData(Slice slice)

where as contineously getting tick data in

private void OnTradeReceived(Tick trade)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment