Skip to content

Instantly share code, notes, and snippets.

@peace2048
Created January 28, 2016 10:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save peace2048/0f397b3069f9fecd32ee to your computer and use it in GitHub Desktop.
Save peace2048/0f397b3069f9fecd32ee to your computer and use it in GitHub Desktop.
void Main()
{
var connection = ConnectionMultiplexer.Connect("192.168.5.40");
var sx1 = connection.ObservableSubscription("pa");
using (sx1.Subscribe(_ => Console.WriteLine(_))) Console.ReadLine();
var sx2 = connection.ObservableList("qa", TimeSpan.FromSeconds(5));
using (sx2.Subscribe(_ => Console.WriteLine(_))) Console.ReadLine();
var sx3 = connection.ObservableList("qa", "pa");
using (sx3.Subscribe(_ => Console.WriteLine(_))) Console.ReadLine();
var sx4 = connection.ObservableList("qa", "pa", TimeSpan.FromSeconds(5));
using (sx4.Subscribe(_ => Console.WriteLine(_))) Console.ReadLine();
}
static class RedisObservableExtensions
{
public static IObservable<RedisValue> ObservableSubscription(this ConnectionMultiplexer connection, RedisChannel topic)
{
return Observable.Create<RedisValue>(observer =>
{
var subscriber = connection.GetSubscriber();
subscriber.Subscribe(topic, (channel, value) =>
{
observer.OnNext(value);
});
return () =>
{
subscriber.Unsubscribe(topic);
};
});
}
public static IObservable<RedisValue> ObservableList(this ConnectionMultiplexer connection, RedisKey key, RedisChannel topic)
{
return ObservableList(connection, key, topic, TimeSpan.FromSeconds(5));
}
public static IObservable<RedisValue> ObservableList(this ConnectionMultiplexer connection, RedisKey key, RedisChannel topic, TimeSpan timeout)
{
return Observable.Create<RedisValue>(observer =>
{
var cts = new CancellationTokenSource();
var ctoken = cts.Token;
var subscriber = connection.GetSubscriber();
var ev = new AutoResetEvent(false);
subscriber.Subscribe(topic, (channel, value) =>
{
ev.Set();
});
var task = Task.Factory.StartNew(()=>
{
try
{
var db = connection.GetDatabase();
var waitHandles = new[] {ev, ctoken.WaitHandle};
while (!ctoken.IsCancellationRequested)
{
var r = db.ListLeftPop(key);
if (r.HasValue)
{
observer.OnNext(r);
}
else
{
WaitHandle.WaitAny(waitHandles, timeout);
}
}
}
catch (Exception e)
{
if (!ctoken.IsCancellationRequested)
{
observer.OnError(e);
}
}
});
return () =>
{
subscriber.Unsubscribe(topic);
cts.Cancel();
try { task.Wait(); }
catch { }
cts.Dispose();
ev.Dispose();
};
});
}
public static IObservable<RedisValue> ObservableList(this ConnectionMultiplexer connection, RedisKey key, TimeSpan wait)
{
return Observable.Create<RedisValue>(observer =>
{
var cts = new CancellationTokenSource();
var ctoken = cts.Token;
var task = Task.Factory.StartNew(()=>
{
try
{
var db = connection.GetDatabase();
while (true)
{
var r = db.ListLeftPop(key);
if (r.HasValue)
{
observer.OnNext(r);
}
else
{
Task.Delay(wait).Wait(ctoken);
}
}
}
catch (Exception e)
{
if (!ctoken.IsCancellationRequested)
{
observer.OnError(e);
}
}
});
return () =>
{
cts.Cancel();
try { task.Wait(); }
catch { }
cts.Dispose();
};
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment