Skip to content

Instantly share code, notes, and snippets.

@jpeckham
Created August 22, 2018 04:21
Show Gist options
  • Save jpeckham/46b5e94090d7d874427df7dc60aeb03c to your computer and use it in GitHub Desktop.
Save jpeckham/46b5e94090d7d874427df7dc60aeb03c to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Concurrent;
using System.Text;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
using Newtonsoft.Json;
using Tweetinvi;
using Tweetinvi.Models;
using Tweetinvi.Streams;
public class Program
{
public static void Main()
{
//twitter app keys
string consumerKey = "";
string consumerSecret = "";
//twitter user credentials
var twitterUserCreds = new TwitterCredentials(consumerKey, consumerSecret);
twitterUserCreds.AccessToken = "";
twitterUserCreds.AccessTokenSecret = "";
//tweetinvi setup (setup a tweet stream)
var stream = Stream.CreateFilteredStream(twitterUserCreds);
//var stream = Stream.CreateSampleStream(twitterUserCreds);
stream.AddFollow(14173315);//nbc
stream.AddFollow(25073877);//trump
stream.AddFollow(759251);//cnn
stream.AddFollow(97739866);//cbs
stream.AddFollow(1367531);//fox
stream.AddFollow(2467791);//wash post
// This option allows the application to get notified
// if the stream is about to be disconnected
stream.StallWarnings = true;
var config = new Dictionary<string, object>
{
{ "bootstrap.servers", "localhost:9092" }
};
//track how many per second
ConcurrentQueue<DateTime> queue = new ConcurrentQueue<DateTime>();
// Create a timer and set a two second interval.
var aTimer = new System.Timers.Timer();
aTimer.Interval = 5000;
int count = 0;
// Hook up the Elapsed event for the timer.
aTimer.Elapsed += (source, e) =>
{
Console.Clear();
Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss")} - {count} tweets written");
Interlocked.Add(ref count, (count * -1)); //set to 0
};
// Have the timer fire repeated events (true is the default)
aTimer.AutoReset = true;
// Start the timer
aTimer.Enabled = true;
using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
{
stream.MatchingTweetReceived += (sender, args) =>
//stream.TweetReceived += (sender, args) =>
{
Console.WriteLine($"{args.Tweet.CreatedBy.ScreenName} - {args.Tweet.Text}");
string tweetEventJson = JsonConvert.SerializeObject(args.Tweet);
Interlocked.Increment(ref count);//add 1
//SendSqs(getQeueuUrlResponse, tweetEventJson, sqsClient);
producer.ProduceAsync("my-topic", null, tweetEventJson);
//Console.WriteLine($"Delivered '{dr.Value}' to: {dr.TopicPartitionOffset}");
//console
//Console.WriteLine(tweetEventJson);
//Console.WriteLine();
};
stream.StartStreamMatchingAnyCondition();
//stream.StartStream();
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment