Skip to content

Instantly share code, notes, and snippets.

@vepo
Created October 12, 2022 21:53
Show Gist options
  • Save vepo/6cadb8e460e901636052906ca9b2ffe4 to your computer and use it in GitHub Desktop.
Save vepo/6cadb8e460e901636052906ca9b2ffe4 to your computer and use it in GitHub Desktop.
from TwitterAPI import TwitterAPI, TwitterOAuth, HydrateType
import time
import pandas as pd
from pymongo import MongoClient
import pymongo
import os
EXPANSIONS = 'author_id,referenced_tweets.id,referenced_tweets.id.author_id,in_reply_to_user_id,attachments.media_keys,attachments.poll_ids,geo.place_id,entities.mentions.username'
TWEET_FIELDS='author_id,conversation_id,created_at,entities,geo,id,lang,public_metrics,source,text'
USER_FIELDS='created_at,description,entities,location,name,profile_image_url,public_metrics,url,username'
def main():
o = TwitterOAuth.read_file(os.path.join('.', 'credentials.txt'))
api = TwitterAPI(o.consumer_key, o.consumer_secret, auth_type='oAuth2', api_version='2')
r = api.request('tweets/search/stream/rules', {'add':[{'value': 'Bolsonaro'},
{'value': 'Lula'},
{'value': 'Ciro'},
{'value': 'Tebet'},
{'value': 'Olavo'},
{'value': '7 de setembro'},
{'value': 'independência'}
]})
if r.status_code != 201:
print(r)
exit(-1)
r = api.request('tweets/search/stream/rules', method_override='GET')
print(f'[{r.status_code}] RULES: {r.text}')
if r.status_code != 200:
print(r)
exit(-1)
CONNECTION_STRING = "mongodb://localhost:27017/tweets"
while True:
client = MongoClient(CONNECTION_STRING)
database = client['dados']
tweets_collections = database["tweets"]
r = api.request('tweets/search/stream',{ 'expansions': EXPANSIONS,
'tweet.fields': TWEET_FIELDS,
'user.fields': USER_FIELDS,},
hydrate_type=HydrateType.APPEND)
print(f'[{r.status_code} START...')
if r.status_code != 200:
print(f'[{r.status_code}] RULES: {r.text}')
exit(-1)
for item in r:
tweet = item['data']
#print(tweet)
try:
## Usando id do tweet para evitar tweets repetidos
tweet['_id'] = tweet['id']
del tweet['id']
tweets_collections.insert_one(tweet)
except Exception as ex:
print(ex)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment