Last active
April 22, 2020 00:38
-
-
Save Kikimora/1ad8e1f95b57eed3d5fddb8206a945e1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using MyExchange.Common.Data; | |
using Qoden.Util; | |
using Qoden.Util.Serilog; | |
using Qoden.Validation; | |
using Serilog; | |
namespace Qoden.Exchange.Bots | |
{ | |
interface IOpenOrdersStore | |
{ | |
/// <summary> | |
/// Get current opened orders state. Use this prop only in conjunction with <see cref="IsReady"/>. | |
/// If opened orders not ready you can retrieve null or outdated data. | |
/// </summary> | |
IEnumerable<OrderData> OpenedOrders { get; } | |
/// <summary> | |
/// Indicates about <see cref="OpenedOrders"/> readiness. | |
/// True when subscription to OpenOrders is opened and first batch of orders is completely received. | |
/// </summary> | |
bool IsReady { get; } | |
/// <summary> | |
/// Use this field to determine events about opened orders | |
/// </summary> | |
string SubscriptionId { get; } | |
} | |
internal class OpenOrdersStore : IOpenOrdersStore | |
{ | |
private readonly ILogger _logger; | |
private readonly IExchangeController _controller; | |
private readonly Dictionary<string, OrderData> _openedOrders = new Dictionary<string, OrderData>(); | |
private CancellationTokenSource _openOrdersCts; | |
private string _openOrdersRequestId; | |
public OpenOrdersStore(IExchangeController controller, ILogger logger) | |
{ | |
_controller = controller; | |
_logger = logger; | |
_openOrdersCts = new CancellationTokenSource(); | |
} | |
public bool IsReady { get; private set; } | |
public void Load() | |
{ | |
_openOrdersRequestId = Guid.NewGuid().ToString(); | |
_openOrdersCts = new CancellationTokenSource(); | |
_logger.Information("Subscribe to OpenOrders {RequestId}", _openOrdersRequestId); | |
_controller.SubscribeToOpenOrdersStreamAsync(_openOrdersRequestId, _openOrdersCts.Token).ForgetTask(_logger); | |
} | |
public void Unload() | |
{ | |
try | |
{ | |
_openOrdersCts.Cancel(); | |
} | |
catch (Exception) | |
{ | |
//ignore | |
} | |
IsReady = false; | |
_openedOrders.Clear(); | |
} | |
public string SubscriptionId => _openOrdersRequestId; | |
public IEnumerable<OrderData> OpenedOrders | |
{ | |
get | |
{ | |
var orders = new OrderData[_openedOrders.Count]; | |
_openedOrders.Values.CopyTo(orders, 0); | |
return orders; | |
} | |
} | |
public string OpenOrdersRequestId => _openOrdersRequestId; | |
public void OnLoadingFinished() | |
{ | |
Assert.State(IsReady, nameof(IsReady)).IsFalse(); | |
IsReady = true; | |
} | |
public void OnOrder(OrderData order) | |
{ | |
_openedOrders.TryGetValue(order.Id, out var knownOrder); | |
var orderExists = !string.IsNullOrEmpty(knownOrder.Id); | |
switch (order.Status) | |
{ | |
case OrderStatus.Pending: | |
if (orderExists) | |
_logger.Warning( | |
"Order event skipped. Got same order as the one we known with unexpected status {@EventOrder} {@KnownOrder}", | |
order, knownOrder); | |
else | |
_openedOrders[order.Id] = order; | |
break; | |
case OrderStatus.Sending: | |
if (orderExists && knownOrder.Status != OrderStatus.Pending) | |
_logger.Warning( | |
"Order event skipped. Got same order as the one we known with unexpected status {@EventOrder} {@KnownOrder}", | |
order, knownOrder); | |
else | |
_openedOrders[order.Id] = order; | |
break; | |
case OrderStatus.Working: | |
if (orderExists && knownOrder.Status != OrderStatus.Pending && | |
knownOrder.Status != OrderStatus.Sending) | |
_logger.Warning( | |
"Order event skipped. Got same order as the one we known with unexpected status {@EventOrder} {@KnownOrder}", | |
order, knownOrder); | |
else | |
_openedOrders[order.Id] = order; | |
break; | |
case OrderStatus.Rejected: | |
case OrderStatus.Cancelled: | |
case OrderStatus.Completed: | |
case OrderStatus.Expired: | |
if (!orderExists) | |
_logger.Warning( | |
"Order event skipped. Got order update with final status and have no orders with same id in cache {@Order}", | |
order); | |
else | |
_openedOrders.Remove(order.Id); | |
break; | |
default: | |
throw new ArgumentOutOfRangeException(); | |
} | |
} | |
} | |
/// <summary> | |
/// Works together with <see cref="IAccountRepository"/> and <see cref="IBotAccount"/>. | |
/// Instances of this class shared between multiple proxies and work as a shared data store for them. | |
/// <see cref="BotAccount"/> maintains reference counted subscriptions as requested by proxies and dynamically subscribe/unsubscribe | |
/// from <see cref="IExchangeController"/>. | |
/// It then handle relevant <see cref="InputExchangeEvent"/> to maintain local account data in sync with remote exchange. | |
/// </summary> | |
internal class BotAccount : RefCounted | |
{ | |
private readonly ILogger _logger; | |
private readonly IExchangeController _controller; | |
private readonly OpenOrdersStore _openOrders; | |
private int _openOrdersSubscriptionsCounter; | |
private bool _disposed; | |
private bool Disposed | |
{ | |
get => Volatile.Read(ref _disposed); | |
set => Volatile.Write(ref _disposed, value); | |
} | |
public IEnumerable<OrderData> OpenedOrders => _openOrders.OpenedOrders; | |
public string OpenOrdersRequestId => _openOrders.OpenOrdersRequestId; | |
public bool OpenedOrdersLoaded => _openOrders.IsReady; | |
public string ExchangeId { get; } | |
public string AccountId { get; } | |
public bool LoggedIn => _controller.LoggedIn; | |
public BotAccount(string exchangeId, string accountId, IExchangeControllerFactory factory) | |
{ | |
_logger = Log.ForContext<BotAccount>().WithProps(new Dictionary<string, string> | |
{ | |
["AccountId"] = accountId, | |
["ExchangeId"] = exchangeId, | |
}); | |
_controller = factory.Create(exchangeId, accountId); | |
_openOrders = new OpenOrdersStore(_controller, _logger); | |
ExchangeId = exchangeId; | |
AccountId = accountId; | |
} | |
//use this instead of OpenedOrders, OpenedOrdersReady, OpenOrdersSubscriptionId | |
public IOpenOrdersStore OpenOrders => _openOrders; | |
/// <summary> | |
/// Handle events to manage subscriptions and collecting and keep up to date OpenOrders. | |
/// </summary> | |
public void Dispatch(InputExchangeEvent evt) | |
{ | |
CheckDisposed(); | |
if ((evt.Type == InputExchangeEventType.AccountConnect && evt.Error != null) || | |
evt.Type == InputExchangeEventType.OrderStreamClosed) | |
{ | |
if (_openOrdersSubscriptionsCounter == 0) return; | |
if (_openOrders.SubscriptionId != null) | |
{ | |
if (evt.Error != null) | |
{ | |
var message = "Reset open orders state because an error occured and connection has been closed {Event}"; | |
_logger.Error(evt.Error, message, evt.Type); | |
} | |
//Looks like something wrong with exchange account or open orders subscription | |
//reload open orders | |
_openOrders.Unload(); | |
_openOrders.Load(); | |
} | |
} | |
if (evt.Type == InputExchangeEventType.Order) | |
{ | |
//Order events should go into open orders store. | |
if (evt.HasPayload) | |
{ | |
//record new orders as well as order changes | |
_openOrders.OnOrder(evt.OrderPayload); | |
} | |
if (evt.RequestId == _openOrders.SubscriptionId && evt.IsLastItem) | |
{ | |
_openOrders.OnLoadingFinished(); | |
} | |
} | |
} | |
/// <summary> | |
/// Login if not logged in yet. <see cref="InputExchangeEventType.AccountConnect"/> event will be published as a result | |
/// </summary> | |
/// <param name="requestId"></param> | |
/// <param name="token"></param> | |
public void Login(string requestId, CancellationToken token = default) => | |
_controller.LoginAsync(requestId, token).ForgetTask(_logger); | |
public void SubscribeToOpenOrders() | |
{ | |
CheckDisposed(); | |
if (++_openOrdersSubscriptionsCounter != 1) return; | |
_openOrders.Load(); | |
} | |
public void UnSubscribeFromOpenOrders() | |
{ | |
CheckDisposed(); | |
if (_openOrdersSubscriptionsCounter == 0 || --_openOrdersSubscriptionsCounter != 0) return; | |
_openOrders.Unload(); | |
} | |
public Task SendOrderAsync(string requestId, OrderRequest order) | |
{ | |
CheckDisposed(); | |
return _controller.SendOrderAsync(requestId, order); | |
} | |
public Task CancelOrderAsync(string requestId, string orderId) | |
{ | |
CheckDisposed(); | |
return _controller.CancelOrderAsync(requestId, orderId); | |
} | |
public Task CancelAllOrdersAsync(string requestId, string instrument = null) | |
{ | |
CheckDisposed(); | |
return _controller.CancelAllOrdersAsync(requestId, instrument); | |
} | |
protected override void Dispose(bool disposing) | |
{ | |
if (!disposing) return; | |
_openOrders.Unload(); | |
_controller.Dispose(); | |
Disposed = true; | |
} | |
private void CheckDisposed() | |
{ | |
if (Disposed) | |
throw new ObjectDisposedException(nameof(BotAccount)); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment