Last active
February 11, 2018 18:04
-
-
Save jpeckham/c276f81ffc923eded5db72a5fb11eb8d to your computer and use it in GitHub Desktop.
using Tweetinvi to stream a topic
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.IO; | |
using System.Text; | |
using Amazon; | |
using Amazon.KinesisFirehose; | |
using Amazon.KinesisFirehose.Model; | |
using Amazon.Runtime.CredentialManagement; | |
using Amazon.SQS; | |
using Amazon.SQS.Model; | |
using Newtonsoft.Json; | |
using Tweetinvi.Models; | |
using Stream = Tweetinvi.Stream; | |
namespace TwitterTopicStream | |
{ | |
class Program | |
{ | |
public static void Main(string[] cmdArgs) | |
{ | |
//iam setup | |
var options = new CredentialProfileOptions | |
{ | |
AccessKey = "", | |
SecretKey = "" | |
}; | |
var profile = new CredentialProfile("custom_runtime", options); | |
profile.Region = RegionEndpoint.USEast1; | |
var credentialsFile = new NetSDKCredentialsFile(); | |
credentialsFile.RegisterProfile(profile); | |
//sqs setup | |
var amazonSqsConfig = new AmazonSQSConfig(); | |
amazonSqsConfig.ServiceURL = "https://sqs.us-east-1.amazonaws.com"; | |
var sqsClient = new AmazonSQSClient(amazonSqsConfig); | |
var getQueueUrlRequest = new GetQueueUrlRequest | |
{ | |
QueueName = "twitter", | |
QueueOwnerAWSAccountId = "" | |
}; | |
var getQeueuUrlResponse = sqsClient.GetQueueUrlAsync(getQueueUrlRequest).Result; | |
//kinesis setup | |
var kinesisClientConfig = new AmazonKinesisFirehoseConfig(); | |
//kinesisClientConfig.RegionEndpoint = RegionEndpoint.USEast1; | |
kinesisClientConfig.ServiceURL = "https://firehose.us-east-1.amazonaws.com"; | |
var kinesisClient = new AmazonKinesisFirehoseClient(kinesisClientConfig); | |
//twitter app keys | |
string consumerKey = ""; | |
string consumerSecret = ""; | |
//twitter user credentials | |
var twitterUserCreds = new TwitterCredentials(consumerKey, consumerSecret); | |
twitterUserCreds.AccessToken = ""; | |
twitterUserCreds.AccessTokenSecret = ""; | |
//tweetinvi setup (setup a tweet stream) | |
var stream = Stream.CreateFilteredStream(twitterUserCreds); | |
stream.AddTrack("FISA"); | |
stream.MatchingTweetReceived += (sender, args) => | |
{ | |
//make json of a tweet | |
var tweetEvent = new TweetEvent | |
{ | |
ScreenName = args.Tweet.CreatedBy.ScreenName, | |
Text = args.Tweet.Text, | |
CreatedAt = args.Tweet.CreatedAt | |
}; | |
string tweetEventJson = JsonConvert.SerializeObject(tweetEvent); | |
//SendSqs(getQeueuUrlResponse, tweetEventJson, sqsClient); | |
SendKinesis(tweetEventJson, kinesisClient); | |
//console | |
Console.WriteLine(tweetEventJson); | |
Console.WriteLine(); | |
}; | |
stream.StartStreamMatchingAllConditions(); | |
} | |
private static void SendKinesis(string tweetEventJson, AmazonKinesisFirehoseClient client) | |
{ | |
//kinesis | |
var record = new Record(); | |
byte[] stringBytes = Encoding.UTF8.GetBytes(tweetEventJson); | |
using (record.Data = new MemoryStream(stringBytes)) | |
{ | |
var result = client.PutRecordBatchAsync("jdptwitter",new List<Record>{record}).Result; | |
} | |
} | |
private static void SendSqs(GetQueueUrlResponse getQeueuUrlResponse, string tweetEventJson, AmazonSQSClient sqsClient) | |
{ | |
//SQS | |
var sqsSendMessageRequest = new SendMessageRequest(); | |
sqsSendMessageRequest.QueueUrl = getQeueuUrlResponse.QueueUrl; | |
sqsSendMessageRequest.MessageBody = tweetEventJson; | |
var sqsSendMessageResponse = sqsClient.SendMessageAsync(sqsSendMessageRequest).Result; | |
} | |
} | |
[Serializable] | |
public class TweetEvent | |
{ | |
public string ScreenName { get; set; } | |
public string Text { get; set; } | |
public DateTime CreatedAt { get; set; } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment