Skip to content

Instantly share code, notes, and snippets.

@kaxil
Last active April 26, 2020 08:01
Show Gist options
  • Save kaxil/6511003d75b556fac1d8126f4b987239 to your computer and use it in GitHub Desktop.
Save kaxil/6511003d75b556fac1d8126f4b987239 to your computer and use it in GitHub Desktop.
Twitter Streaming API to PubSub
def publish(client, pubsub_topic, data_lines):
"""Publish to the given pubsub topic."""
messages = []
for line in data_lines:
messages.append({'data': line})
body = {'messages': messages}
str_body = json.dumps(body)
data = base64.urlsafe_b64encode(bytearray(str_body, 'utf8'))
client.publish(topic=pubsub_topic, data=data)
class TweetStreamListener(StreamListener):
"""
A listener handles tweets that are received from the stream.
This listener dumps the tweets into a PubSub topic
"""
client = pubsub.PublisherClient()
pubsub_topic = client.topic_path(GCP_PROJECT_NAME, PUBSUB_TOPIC_NAME)
count = 0
tweets = []
batch_size = 1
# total_tweets = 10000
total_tweets = TOTAL_TWEETS
def write_to_pubsub(self, tweets):
publish(self.client, self.pubsub_topic, tweets)
def on_status(self, status):
# Converting the time to isoformat for serialisation
created_at = status.created_at.isoformat()
id_str = status.id_str
text = status.text
source = status.source
user_name = status.user.name
user_screen_name = status.user.screen_name
loc = status.user.location
bio = status.user.description
tw = dict(text=text, bio=bio, created_at=created_at, tweet_id=id_str,
location=loc, user_name=user_name,
user_screen_name=user_screen_name,
source=source)
self.tweets.append(tw)
if len(self.tweets) >= self.batch_size:
self.write_to_pubsub(self.tweets)
print(self.tweets)
self.tweets = []
self.count += 1
if self.count >= self.total_tweets:
return False
if (self.count % 5) == 0:
print("count is: {} at {}".format(self.count, datetime.datetime.now()))
return True
def on_error(self, status_code):
print(status_code)
if __name__ == '__main__':
print '....'
auth = OAuthHandler(TWITTER_CONSUMER_KEY, TWITTER_CONSUMER_SECRET)
auth.set_access_token(TWITTER_ACCESS_TOKEN, TWITTER_ACCESS_TOKEN_SECRET)
stream_listener = TweetStreamListener()
stream = Stream(auth, stream_listener)
stream.filter(
track=['Royal Wedding', '#RoyalWedding', 'Prince Harry', 'Meghan Markle']
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment