Skip to content

Instantly share code, notes, and snippets.

@mattbarrett
mattbarrett / gist:10736594
Created April 15, 2014 14:21
Asynchrony as a service
private void ExecuteAsync(long notional)
{
_executablePrice.ExecuteRequest(notional, _parent.DealtCurrency)
.ObserveOn(_concurrencyService.Dispatcher)
.SubscribeOn(_concurrencyService.ThreadPool)
.Subscribe(OnExecutedResult, OnExecutionError);
}
internal interface IExecutionServiceClient
{
IObservable<TradeDto> ExecuteRequest(TradeRequestDto tradeRequest);
}
interface IPriceRepository
{
IObservable<IPrice> GetPriceStream(ICurrencyPair currencyPair);
}
public static IObservable<IStale<T>> DetectStale<T>(this IObservable<T> source, TimeSpan stalenessPeriod, IScheduler scheduler)
{
return Observable.Create<IStale<T>>(observer =>
{
var timerSubscription = new SerialDisposable();
var observerLock = new object();
Action scheduleStale = () =>
{
timerSubscription.Disposable = Observable
private void LoadSpotTiles()
{
_referenceDataRepository.GetCurrencyPairsStream()
.ObserveOn(_concurrencyService.Dispatcher)
.SubscribeOn(_concurrencyService.TaskPool)
.Subscribe(
currencyPairs => currencyPairs.ForEach(HandleCurrencyPairUpdate),
error => _log.Error("Failed to get currencies", error));
}
public static IObservable<T> ObserveLatestOn<T>(this IObservable<T> source, IScheduler scheduler)
{
return Observable.Create<T>(observer =>
{
var gate = new object();
bool active = false;
var cancelable = new MultipleAssignmentDisposable();
var disposable = source.Materialize().Subscribe(thisNotification =>
{
bool wasNotAlreadyActive;
public static IObservable<IStale<T>> DetectStale<T>(this IObservable<T> source, TimeSpan stalenessPeriod, IScheduler scheduler)
{
return Observable.Create<IStale<T>>(observer =>
{
var timerSubscription = new SerialDisposable();
var observerLock = new object();
Action scheduleStale = () =>
{
timerSubscription.Disposable = Observable
public IObservable<IStale<ITrade>> ExecuteRequest(IExecutablePrice executablePrice, long notional, string dealtCurrency)
{
var price = executablePrice.Parent;
var request = new TradeRequestDto
{
Direction = executablePrice.Direction == Direction.BUY ? DirectionDto.Buy : DirectionDto.Sell,
Notional = notional,
SpotRate = executablePrice.Rate,
Symbol = price.CurrencyPair.Symbol,
var clientsQuery = from systemStatus in clientService.SystemStatus.Where(s => s == Status.Available)
from clients in clientService.LoadClients()
select clients;
@mattbarrett
mattbarrett / withLatestFrom
Created May 5, 2015 13:08
withLatestFrom operator
void Main()
{
var slow = Observable.Interval(TimeSpan.FromSeconds(1));
var fast = Observable.Interval(TimeSpan.FromMilliseconds(250));
Observable.CombineLatest(
slow,
fast,
Tuple.Create)
.DistinctUntilChanged(tp => tp.Item1)
public static IObservable<TResult> WithLatestFrom<TFirst, TSecond, TResult>(this IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
{
if (first == null)
throw new ArgumentNullException("first");
if (second == null)
throw new ArgumentNullException("second");
if (resultSelector == null)
throw new ArgumentNullException("resultSelector");
return first.Publish(f =>