Skip to content

Instantly share code, notes, and snippets.

@philcleveland
Created December 2, 2013 00:43
Show Gist options
  • Save philcleveland/7743048 to your computer and use it in GitHub Desktop.
Save philcleveland/7743048 to your computer and use it in GitHub Desktop.
PlayingWithReactiveUIandTwitter
public void StreamTrackedContent(Action<Tweet> callback, CancellationToken cancel, params string[] trackTerms)
{
var trackTheseTermsCSV = string.Join(",", trackTerms);
Console.WriteLine(string.Format("Tracking filter stream for {0}", trackTerms));
(from strm in _twitCtx.Streaming
where strm.Type == StreamingType.Filter &&
strm.Track == trackTheseTermsCSV
select strm)
.StreamingCallback(strm =>
{
if (cancel.IsCancellationRequested)
{
strm.CloseStream();
Console.WriteLine("CANCELLATION REQUESTED");
return;
}
if (strm.Status != TwitterErrorStatus.Success)
{
strm.CloseStream();
Console.WriteLine("ERROR ON TWITTER STREAM");
throw new ApplicationException(strm.Error.ToString());
}
if (!string.IsNullOrWhiteSpace(strm.Content))
{
var t = ConvertContent(strm.Content);
Console.WriteLine(strm.Content + "\n");
Console.WriteLine(string.Format("returning tweet from api: {0}", t));
callback(t);
}
})
.SingleOrDefault();
}
public class StreamTrackedContentViewModel : ReactiveObject
{
public StreamTrackedContentViewModel(Twitter twitterApi)
{
Tweets = new ReactiveList<Tweet>();
TweetViewModels = Tweets.CreateDerivedCollection(tweet => new TweetTileViewModel(tweet.CreatedAt,
tweet.UserPic,
tweet.UserName,
tweet.ScreenName,
tweet.Text));
StreamTweetsCommand = new ReactiveCommand();
var newTweets = StreamTweetsCommand.RegisterAsync(p => twitterApi.GetTweetsObservable(SearchText));
newTweets.Subscribe(x => {
Console.WriteLine(string.Format("Subscribe called to add {0}", x));
Tweets.Add(x);
});
}
public ReactiveCommand StreamTweetsCommand { get; set; }
public IReactiveDerivedList<TweetTileViewModel> TweetViewModels { get; protected set; }
public ReactiveList<Tweet> Tweets { get; protected set; }
private string _searchText = "";
public string SearchText
{
get { return _searchText; }
set { _searchText = value; }
}
}
public static class Extensions
{
public static IObservable<Tweet> GetTweetsObservable(this Twitter This, string searchTerms)
{
var ret = new AsyncSubject<Tweet>();
try
{
This.StreamTrackedContent(tweets =>
{
ret.OnNext(tweets);
ret.OnCompleted();
}, CancellationToken.None, searchTerms);
}
catch (Exception ex)
{
ret.OnError(ex);
}
return ret;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment