Skip to content

Instantly share code, notes, and snippets.

@Kikimora
Last active April 22, 2020 00:38
Show Gist options
  • Save Kikimora/1ad8e1f95b57eed3d5fddb8206a945e1 to your computer and use it in GitHub Desktop.
Save Kikimora/1ad8e1f95b57eed3d5fddb8206a945e1 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MyExchange.Common.Data;
using Qoden.Util;
using Qoden.Util.Serilog;
using Qoden.Validation;
using Serilog;
namespace Qoden.Exchange.Bots
{
interface IOpenOrdersStore
{
/// <summary>
/// Get current opened orders state. Use this prop only in conjunction with <see cref="IsReady"/>.
/// If opened orders not ready you can retrieve null or outdated data.
/// </summary>
IEnumerable<OrderData> OpenedOrders { get; }
/// <summary>
/// Indicates about <see cref="OpenedOrders"/> readiness.
/// True when subscription to OpenOrders is opened and first batch of orders is completely received.
/// </summary>
bool IsReady { get; }
/// <summary>
/// Use this field to determine events about opened orders
/// </summary>
string SubscriptionId { get; }
}
internal class OpenOrdersStore : IOpenOrdersStore
{
private readonly ILogger _logger;
private readonly IExchangeController _controller;
private readonly Dictionary<string, OrderData> _openedOrders = new Dictionary<string, OrderData>();
private CancellationTokenSource _openOrdersCts;
private string _openOrdersRequestId;
public OpenOrdersStore(IExchangeController controller, ILogger logger)
{
_controller = controller;
_logger = logger;
_openOrdersCts = new CancellationTokenSource();
}
public bool IsReady { get; private set; }
public void Load()
{
_openOrdersRequestId = Guid.NewGuid().ToString();
_openOrdersCts = new CancellationTokenSource();
_logger.Information("Subscribe to OpenOrders {RequestId}", _openOrdersRequestId);
_controller.SubscribeToOpenOrdersStreamAsync(_openOrdersRequestId, _openOrdersCts.Token).ForgetTask(_logger);
}
public void Unload()
{
try
{
_openOrdersCts.Cancel();
}
catch (Exception)
{
//ignore
}
IsReady = false;
_openedOrders.Clear();
}
public string SubscriptionId => _openOrdersRequestId;
public IEnumerable<OrderData> OpenedOrders
{
get
{
var orders = new OrderData[_openedOrders.Count];
_openedOrders.Values.CopyTo(orders, 0);
return orders;
}
}
public string OpenOrdersRequestId => _openOrdersRequestId;
public void OnLoadingFinished()
{
Assert.State(IsReady, nameof(IsReady)).IsFalse();
IsReady = true;
}
public void OnOrder(OrderData order)
{
_openedOrders.TryGetValue(order.Id, out var knownOrder);
var orderExists = !string.IsNullOrEmpty(knownOrder.Id);
switch (order.Status)
{
case OrderStatus.Pending:
if (orderExists)
_logger.Warning(
"Order event skipped. Got same order as the one we known with unexpected status {@EventOrder} {@KnownOrder}",
order, knownOrder);
else
_openedOrders[order.Id] = order;
break;
case OrderStatus.Sending:
if (orderExists && knownOrder.Status != OrderStatus.Pending)
_logger.Warning(
"Order event skipped. Got same order as the one we known with unexpected status {@EventOrder} {@KnownOrder}",
order, knownOrder);
else
_openedOrders[order.Id] = order;
break;
case OrderStatus.Working:
if (orderExists && knownOrder.Status != OrderStatus.Pending &&
knownOrder.Status != OrderStatus.Sending)
_logger.Warning(
"Order event skipped. Got same order as the one we known with unexpected status {@EventOrder} {@KnownOrder}",
order, knownOrder);
else
_openedOrders[order.Id] = order;
break;
case OrderStatus.Rejected:
case OrderStatus.Cancelled:
case OrderStatus.Completed:
case OrderStatus.Expired:
if (!orderExists)
_logger.Warning(
"Order event skipped. Got order update with final status and have no orders with same id in cache {@Order}",
order);
else
_openedOrders.Remove(order.Id);
break;
default:
throw new ArgumentOutOfRangeException();
}
}
}
/// <summary>
/// Works together with <see cref="IAccountRepository"/> and <see cref="IBotAccount"/>.
/// Instances of this class shared between multiple proxies and work as a shared data store for them.
/// <see cref="BotAccount"/> maintains reference counted subscriptions as requested by proxies and dynamically subscribe/unsubscribe
/// from <see cref="IExchangeController"/>.
/// It then handle relevant <see cref="InputExchangeEvent"/> to maintain local account data in sync with remote exchange.
/// </summary>
internal class BotAccount : RefCounted
{
private readonly ILogger _logger;
private readonly IExchangeController _controller;
private readonly OpenOrdersStore _openOrders;
private int _openOrdersSubscriptionsCounter;
private bool _disposed;
private bool Disposed
{
get => Volatile.Read(ref _disposed);
set => Volatile.Write(ref _disposed, value);
}
public IEnumerable<OrderData> OpenedOrders => _openOrders.OpenedOrders;
public string OpenOrdersRequestId => _openOrders.OpenOrdersRequestId;
public bool OpenedOrdersLoaded => _openOrders.IsReady;
public string ExchangeId { get; }
public string AccountId { get; }
public bool LoggedIn => _controller.LoggedIn;
public BotAccount(string exchangeId, string accountId, IExchangeControllerFactory factory)
{
_logger = Log.ForContext<BotAccount>().WithProps(new Dictionary<string, string>
{
["AccountId"] = accountId,
["ExchangeId"] = exchangeId,
});
_controller = factory.Create(exchangeId, accountId);
_openOrders = new OpenOrdersStore(_controller, _logger);
ExchangeId = exchangeId;
AccountId = accountId;
}
//use this instead of OpenedOrders, OpenedOrdersReady, OpenOrdersSubscriptionId
public IOpenOrdersStore OpenOrders => _openOrders;
/// <summary>
/// Handle events to manage subscriptions and collecting and keep up to date OpenOrders.
/// </summary>
public void Dispatch(InputExchangeEvent evt)
{
CheckDisposed();
if ((evt.Type == InputExchangeEventType.AccountConnect && evt.Error != null) ||
evt.Type == InputExchangeEventType.OrderStreamClosed)
{
if (_openOrdersSubscriptionsCounter == 0) return;
if (_openOrders.SubscriptionId != null)
{
if (evt.Error != null)
{
var message = "Reset open orders state because an error occured and connection has been closed {Event}";
_logger.Error(evt.Error, message, evt.Type);
}
//Looks like something wrong with exchange account or open orders subscription
//reload open orders
_openOrders.Unload();
_openOrders.Load();
}
}
if (evt.Type == InputExchangeEventType.Order)
{
//Order events should go into open orders store.
if (evt.HasPayload)
{
//record new orders as well as order changes
_openOrders.OnOrder(evt.OrderPayload);
}
if (evt.RequestId == _openOrders.SubscriptionId && evt.IsLastItem)
{
_openOrders.OnLoadingFinished();
}
}
}
/// <summary>
/// Login if not logged in yet. <see cref="InputExchangeEventType.AccountConnect"/> event will be published as a result
/// </summary>
/// <param name="requestId"></param>
/// <param name="token"></param>
public void Login(string requestId, CancellationToken token = default) =>
_controller.LoginAsync(requestId, token).ForgetTask(_logger);
public void SubscribeToOpenOrders()
{
CheckDisposed();
if (++_openOrdersSubscriptionsCounter != 1) return;
_openOrders.Load();
}
public void UnSubscribeFromOpenOrders()
{
CheckDisposed();
if (_openOrdersSubscriptionsCounter == 0 || --_openOrdersSubscriptionsCounter != 0) return;
_openOrders.Unload();
}
public Task SendOrderAsync(string requestId, OrderRequest order)
{
CheckDisposed();
return _controller.SendOrderAsync(requestId, order);
}
public Task CancelOrderAsync(string requestId, string orderId)
{
CheckDisposed();
return _controller.CancelOrderAsync(requestId, orderId);
}
public Task CancelAllOrdersAsync(string requestId, string instrument = null)
{
CheckDisposed();
return _controller.CancelAllOrdersAsync(requestId, instrument);
}
protected override void Dispose(bool disposing)
{
if (!disposing) return;
_openOrders.Unload();
_controller.Dispose();
Disposed = true;
}
private void CheckDisposed()
{
if (Disposed)
throw new ObjectDisposedException(nameof(BotAccount));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment