Skip to content

Instantly share code, notes, and snippets.

@jpeckham
Last active February 11, 2018 18:04
Show Gist options
  • Save jpeckham/c276f81ffc923eded5db72a5fb11eb8d to your computer and use it in GitHub Desktop.
Save jpeckham/c276f81ffc923eded5db72a5fb11eb8d to your computer and use it in GitHub Desktop.
using Tweetinvi to stream a topic
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using Amazon;
using Amazon.KinesisFirehose;
using Amazon.KinesisFirehose.Model;
using Amazon.Runtime.CredentialManagement;
using Amazon.SQS;
using Amazon.SQS.Model;
using Newtonsoft.Json;
using Tweetinvi.Models;
using Stream = Tweetinvi.Stream;
namespace TwitterTopicStream
{
class Program
{
public static void Main(string[] cmdArgs)
{
//iam setup
var options = new CredentialProfileOptions
{
AccessKey = "",
SecretKey = ""
};
var profile = new CredentialProfile("custom_runtime", options);
profile.Region = RegionEndpoint.USEast1;
var credentialsFile = new NetSDKCredentialsFile();
credentialsFile.RegisterProfile(profile);
//sqs setup
var amazonSqsConfig = new AmazonSQSConfig();
amazonSqsConfig.ServiceURL = "https://sqs.us-east-1.amazonaws.com";
var sqsClient = new AmazonSQSClient(amazonSqsConfig);
var getQueueUrlRequest = new GetQueueUrlRequest
{
QueueName = "twitter",
QueueOwnerAWSAccountId = ""
};
var getQeueuUrlResponse = sqsClient.GetQueueUrlAsync(getQueueUrlRequest).Result;
//kinesis setup
var kinesisClientConfig = new AmazonKinesisFirehoseConfig();
//kinesisClientConfig.RegionEndpoint = RegionEndpoint.USEast1;
kinesisClientConfig.ServiceURL = "https://firehose.us-east-1.amazonaws.com";
var kinesisClient = new AmazonKinesisFirehoseClient(kinesisClientConfig);
//twitter app keys
string consumerKey = "";
string consumerSecret = "";
//twitter user credentials
var twitterUserCreds = new TwitterCredentials(consumerKey, consumerSecret);
twitterUserCreds.AccessToken = "";
twitterUserCreds.AccessTokenSecret = "";
//tweetinvi setup (setup a tweet stream)
var stream = Stream.CreateFilteredStream(twitterUserCreds);
stream.AddTrack("FISA");
stream.MatchingTweetReceived += (sender, args) =>
{
//make json of a tweet
var tweetEvent = new TweetEvent
{
ScreenName = args.Tweet.CreatedBy.ScreenName,
Text = args.Tweet.Text,
CreatedAt = args.Tweet.CreatedAt
};
string tweetEventJson = JsonConvert.SerializeObject(tweetEvent);
//SendSqs(getQeueuUrlResponse, tweetEventJson, sqsClient);
SendKinesis(tweetEventJson, kinesisClient);
//console
Console.WriteLine(tweetEventJson);
Console.WriteLine();
};
stream.StartStreamMatchingAllConditions();
}
private static void SendKinesis(string tweetEventJson, AmazonKinesisFirehoseClient client)
{
//kinesis
var record = new Record();
byte[] stringBytes = Encoding.UTF8.GetBytes(tweetEventJson);
using (record.Data = new MemoryStream(stringBytes))
{
var result = client.PutRecordBatchAsync("jdptwitter",new List<Record>{record}).Result;
}
}
private static void SendSqs(GetQueueUrlResponse getQeueuUrlResponse, string tweetEventJson, AmazonSQSClient sqsClient)
{
//SQS
var sqsSendMessageRequest = new SendMessageRequest();
sqsSendMessageRequest.QueueUrl = getQeueuUrlResponse.QueueUrl;
sqsSendMessageRequest.MessageBody = tweetEventJson;
var sqsSendMessageResponse = sqsClient.SendMessageAsync(sqsSendMessageRequest).Result;
}
}
[Serializable]
public class TweetEvent
{
public string ScreenName { get; set; }
public string Text { get; set; }
public DateTime CreatedAt { get; set; }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment