Instantly share code, notes, and snippets.

View WithLatestFrom
public static IObservable<TResult> WithLatestFrom<TFirst, TSecond, TResult>(this IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
{
if (first == null)
throw new ArgumentNullException("first");
if (second == null)
throw new ArgumentNullException("second");
if (resultSelector == null)
throw new ArgumentNullException("resultSelector");
return first.Publish(f =>
View withLatestFrom
void Main()
{
var slow = Observable.Interval(TimeSpan.FromSeconds(1));
var fast = Observable.Interval(TimeSpan.FromMilliseconds(250));
Observable.CombineLatest(
slow,
fast,
Tuple.Create)
.DistinctUntilChanged(tp => tp.Item1)
View async-gate.cs
var clientsQuery = from systemStatus in clientService.SystemStatus.Where(s => s == Status.Available)
from clients in clientService.LoadClients()
select clients;
View ExecuteRequest.cs
public IObservable<IStale<ITrade>> ExecuteRequest(IExecutablePrice executablePrice, long notional, string dealtCurrency)
{
var price = executablePrice.Parent;
var request = new TradeRequestDto
{
Direction = executablePrice.Direction == Direction.BUY ? DirectionDto.Buy : DirectionDto.Sell,
Notional = notional,
SpotRate = executablePrice.Rate,
Symbol = price.CurrencyPair.Symbol,
View DetectStale.cs
public static IObservable<IStale<T>> DetectStale<T>(this IObservable<T> source, TimeSpan stalenessPeriod, IScheduler scheduler)
{
return Observable.Create<IStale<T>>(observer =>
{
var timerSubscription = new SerialDisposable();
var observerLock = new object();
Action scheduleStale = () =>
{
timerSubscription.Disposable = Observable
View Conflate.cs
public static IObservable<T> Conflate<T>(this IObservable<T> source, TimeSpan minimumUpdatePeriod, IScheduler scheduler)
{
return Observable.Create<T>(observer =>
{
// indicate when the last update was published
var lastUpdateTime = DateTimeOffset.MinValue;
// indicate if an update is currently scheduled
var updateScheduled = new MultipleAssignmentDisposable();
// indicate if completion has been requested (we can't complete immediatly if an update is in flight)
var completionRequested = false;
View ObserveLatestOn
public static IObservable<T> ObserveLatestOn<T>(this IObservable<T> source, IScheduler scheduler)
{
return Observable.Create<T>(observer =>
{
var gate = new object();
bool active = false;
var cancelable = new MultipleAssignmentDisposable();
var disposable = source.Materialize().Subscribe(thisNotification =>
{
bool wasNotAlreadyActive;
View gist:74b13723cbcceed5d7ff
private void LoadSpotTiles()
{
_referenceDataRepository.GetCurrencyPairsStream()
.ObserveOn(_concurrencyService.Dispatcher)
.SubscribeOn(_concurrencyService.TaskPool)
.Subscribe(
currencyPairs => currencyPairs.ForEach(HandleCurrencyPairUpdate),
error => _log.Error("Failed to get currencies", error));
}
View gist:10738289
public static IObservable<IStale<T>> DetectStale<T>(this IObservable<T> source, TimeSpan stalenessPeriod, IScheduler scheduler)
{
return Observable.Create<IStale<T>>(observer =>
{
var timerSubscription = new SerialDisposable();
var observerLock = new object();
Action scheduleStale = () =>
{
timerSubscription.Disposable = Observable
View gist:10737828
internal interface IExecutionServiceClient
{
IObservable<TradeDto> ExecuteRequest(TradeRequestDto tradeRequest);
}
interface IPriceRepository
{
IObservable<IPrice> GetPriceStream(ICurrencyPair currencyPair);
}