Skip to content

Instantly share code, notes, and snippets.

@rbirkby
Last active August 29, 2015 14:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rbirkby/78d6c952aec44ff6e43e to your computer and use it in GitHub Desktop.
Save rbirkby/78d6c952aec44ff6e43e to your computer and use it in GitHub Desktop.
C# port of Netflix Rx video example
internal class VideoExample {
/**
* Demonstrate how Rx is used to compose Observables together such as
* how a web service would to generate a JSON response.
*
* The simulated methods for the metadata represent different services
* that are often backed by network calls.
*
* This will return a sequence of dictionaries such as this:
*
* [id:1000, title:video-1000-title, length:5428, bookmark:0,
* rating:[actual:4, average:3, predicted:0]]
*/
public IObservable<dynamic> GetVideoGridForDisplay(int userId)
{
return GetListOfLists(userId).SelectMany(list =>
{
return list.Videos()
.Take(10)
.SelectMany(video =>
{
var m = video.Metadata().Select(md => new { title = md["title"], length = md["duration"] });
var b = video.Bookmark(userId).Select(bookmark => bookmark);
var r = video.Rating(userId).Select(rating => new
{
actual = rating.ActualStarRating(),
average = rating.AverageStarRating(),
predicted = rating.PredictedStarRating()
});
return Observable.Zip(m, b, r, (metadata, bookmark, rating) =>
{
return new
{
id = video.VideoId,
metadata.title,
metadata.length,
bookmark,
rating
};
});
});
});
}
public IObservable<VideoList> GetListOfLists(int userId)
{
return Observable.Create<VideoList>(observer =>
{
// this will happen on a separate thread as it requires a network call
TaskPoolScheduler.Default.Schedule(() => {
// simulate network latency
Thread.Sleep(180);
foreach (var i in Enumerable.Range(0, 15)) {
observer.OnNext(new VideoList(i));
}
observer.OnCompleted();
});
return () => { };
});
}
}
class VideoList {
private int _listPosition;
public VideoList(int position) {
_listPosition = position;
}
public string ListName() {
return "ListName-" + _listPosition;
}
public int ListPosition() {
return _listPosition;
}
public IObservable<Video> Videos() {
return Observable.Create<Video>((Func<IObserver<Video>, Action>)(observer =>
{
// we already have the videos once a list is loaded
// so we won't launch another thread but return
// the sequence of videos via push
foreach (var i in Enumerable.Range(0, 50)) {
observer.OnNext(new Video((_listPosition * 1000) + i));
}
observer.OnCompleted();
return () => { };
}));
}
}
class Video {
private int _videoId;
public Video(int videoId) {
_videoId = videoId;
}
public int VideoId
{
get { return _videoId; }
}
// synchronous
public IObservable<Dictionary<string, string>> Metadata() {
// simulate fetching metadata from an in-memory cache
// so it will not asynchronously execute on a thread but
// immediately return an Observable with the data
return Observable.Create<Dictionary<string, string>>(observer =>
{
observer.OnNext(new Dictionary<string, string> {
{"title", "video-" + _videoId + "-title"},
{"duration", "5428"}
});
observer.OnCompleted();
return () => { };
});
}
// asynchronous
public IObservable<int> Bookmark(int userId) {
// simulate fetching the bookmark for this user
// that specifies the last played position if
// this video has been played before
return Observable.Create<int>(observer =>
{
// this will happen on a separate thread as it requires a network call
TaskPoolScheduler.Default.Schedule(() => {
// simulate network latency
Thread.Sleep(4);
if (new Random().Next(6) > 1) {
// most of the time they haven't watched a movie
// so the position is 0
observer.OnNext(new Random().Next(0));
} else {
observer.OnNext(new Random().Next(4000));
}
observer.OnCompleted();
});
return () => { };
});
}
// asynchronous
public IObservable<VideoRating> Rating(int userId)
{
// simulate fetching the VideoRating for this user
return Observable.Create<VideoRating>(observer =>
{
// this will happen on a separate thread as it requires a network call
TaskPoolScheduler.Default.Schedule(() =>
{
// simulate network latency
Thread.Sleep(10);
observer.OnNext(new VideoRating(_videoId, userId));
observer.OnCompleted();
});
return () => { };
});
}
}
class VideoRating {
private int _videoId;
private int _userId;
public VideoRating(int videoId, int userId) {
_videoId = videoId;
_userId = userId;
}
public int PredictedStarRating() {
return new Random().Next(5);
}
public int AverageStarRating() {
return new Random().Next(4);
}
public int ActualStarRating() {
return new Random().Next(5);
}
}
static void Main()
{
var v = new VideoExample();
Console.WriteLine("---- sequence of video dictionaries ----");
v.GetVideoGridForDisplay(1).Subscribe(
videoDictionary => {// onNext
// this will print the dictionary for each video
// and is a good representation of how progressive rendering could work
Console.WriteLine(videoDictionary);
},
exception => Console.WriteLine("Error: " + exception),
(Action)(() => { })
);
v = new VideoExample();
v.GetVideoGridForDisplay(1).ToList().Subscribe(
videoDictionaryList => {// onNext
// this will be called once with a list
// and demonstrates how a sequence can be combined
// for document style responses (most webservices)
Console.WriteLine("\n ---- single list of video dictionaries ----\n" + videoDictionaryList);
},
exception => {// onError
Console.WriteLine("Error: " + exception);
},
(Action)(() => { }));
Console.ReadKey();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment