Skip to content

Instantly share code, notes, and snippets.

package RxTest.RxTest;
import rx.functions.Func2;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import rx.Observable;
public class LookbackTransformer<T, R> implements Observable.Transformer<T, R> {
using System;
using System.Collections.Concurrent;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
namespace Server {
static class OverflowQueue_ {
/// <summary>
@Dorus
Dorus / Additional Rx Extension Methods
Last active October 15, 2016 21:27
RxExtensions.cs
public static class Extended {
public static IObservable<TSource> Amb<TSource>(this IObservable<IObservable<TSource>> source) {
return Observable.Create<TSource>(o => {
int first = -1;
return source.TakeWhile(_ => first == -1)
.Select((el, c) => el
.DoFirst(1, _ => Interlocked.CompareExchange(ref first, c, -1))
.TakeWhile(_ => first == c))
.Merge().Subscribe(o);
@Dorus
Dorus / GroupByDistinct.cs
Last active August 29, 2015 14:25
GroupBy that closes and creates a new Group when the key changes. Useful on sorted collections.
public static IObservable<IObservable<TElement>> GroupByDistinct<TElement, TKey>(this IObservable<TElement> source, Func<TElement, TKey> keySelector)
{
return Observable.Create<IObservable<TElement>>(o =>
{
SerialDisposable dis = new SerialDisposable();
Subject<TElement> sub = new Subject<TElement>();
dis.Disposable = sub;
o.OnNext(sub.AsObservable());
bool first = true;
TKey prev = default(TKey);