Last active
August 29, 2015 14:05
-
-
Save aebm/c3017656da200f4f5ea6 to your computer and use it in GitHub Desktop.
Borra DMs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import requests | |
from requests_oauthlib import OAuth1 | |
# Poner esto en CAPS para no confundirlo con una variable local (por convencion) | |
consumer_key='<YOUR CONSUMER KEY HERE>' | |
consumer_secret='<YOUR CONSUMER SECRET HERE>' | |
access_token_key='<YOUR ACCESS TOKEN HERE>' | |
access_token_secret='<YOUR ACCESS TOKEN SECRET HERE>' | |
# Ver que pasa si auth explota | |
def get_messages_ids(): | |
api_url='https://api.twitter.com/1.1/direct_messages.json' | |
payload = {'count':'200', 'cursor':'-1', 'skip_status':'1'} | |
auth = OAuth1(consumer_key, consumer_secret, access_token_key, access_token_secret) | |
r = requests.get(api_url, stream=False, auth=auth, params=payload) | |
# no es mejor hacer esta pregunta al final? Digo puede que te queden cero pero te habran respondido con un N <= 200 | |
if r.headers['x-rate-limit-remaining'] and r.headers['x-rate-limit-remaining'] == "0": | |
print("We reached rate limit for ", api_url) | |
print("Try again at", r.headers["x-rate-limit-reset"]) | |
quit() | |
DMs = json.loads(r.content) | |
message_ids=[] | |
for x in range(len(DMs)): | |
current_ids=DMs[x]['id'] | |
message_ids.append(current_ids) | |
# codigo casi duplicado (una funcion) | |
api_url='https://api.twitter.com/1.1/direct_messages/sent.json' | |
payload = {'count':'200'} | |
r = requests.get(api_url, stream=False, auth=auth, params=payload) | |
if r.headers['x-rate-limit-remaining'] and r.headers['x-rate-limit-remaining'] == "0": | |
print("We reached rate limit for ", api_url) | |
print("Try again at", r.headers["x-rate-limit-reset"]) | |
quit() | |
DMs = json.loads(r.content) | |
for x in range(len(DMs)): | |
current_ids=DMs[x]['id'] | |
message_ids.append(current_ids) | |
return message_ids | |
def nuke_messages(DMs): | |
for x in DMs: | |
api_url='https://api.twitter.com/1.1/direct_messages/destroy.json' | |
payload = {'id':x} | |
auth = OAuth1(consumer_key, consumer_secret, access_token_key, access_token_secret) | |
r = requests.post(api_url, stream=False, auth=auth, params=payload) | |
# Meterlo en un main por si te llegan a importar | |
while True: | |
DMs = get_messages_ids() | |
# arrays de size 0 son falsos | |
if DMs and len(DMs) > 0: | |
print('Deleting:', DMs) | |
nuke_messages(DMs) | |
else: | |
print('There appears that there are no more DMs', DMs) | |
break |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import requests | |
import sys | |
from requests_oauthlib import OAuth1 | |
# Auth stuff | |
CONSUMER_KEY = '<YOUR CONSUMER KEY HERE>' | |
CONSUMER_SECRET = '<YOUR CONSUMER SECRET HERE>' | |
ACCESS_TOKEN_KEY = '<YOUR ACCESS TOKEN HERE>' | |
ACCESS_TOKEN_SECRET = '<YOUR ACCESS TOKEN SECRET HERE>' | |
# Twitter URLs | |
DESTROY_URL = 'https://api.twitter.com/1.1/direct_messages/destroy.json' | |
DMS_URL = 'https://api.twitter.com/1.1/direct_messages.json' | |
SENT_DMS_URL = 'https://api.twitter.com/1.1/direct_messages/sent.json' | |
def _get_ids(api_url, auth, payload): | |
r = requests.get(api_url, stream=False, auth=auth, params=payload) | |
msgs = json.loads(r.content) | |
for msg in msgs: | |
yield msg['id'] | |
if r.headers['x-rate-limit-remaining'] and r.headers['x-rate-limit-remaining'] == "0": | |
print('We reached rate limit for {url}'.format(url=api_url)) | |
print('Try again at {reset}'.format(reset=r.headers["x-rate-limit-reset"])) | |
sys.exit() | |
def get_dms_ids(auth, batch_size=200): | |
payload = { 'count': str(batch_size), 'cursor':'-1', 'skip_status':'1' } | |
# this is prettier on python 3.3 with yield from | |
for id_ in _get_ids(api_url=DMS_URL, auth=auth, payload=payload): | |
yield id_ | |
payload = { 'count': str(batch_size) } | |
for id_ in _get_ids(api_url=SENT_DMS_URL, auth=auth, payload=payload): | |
yield id_ | |
# Using closure because auth doesn;t changes between calls | |
def get_nuke_message_func(auth): | |
def f(id_): | |
payload = { 'id': id_ } | |
r = requests.post(DESTROY_URL, stream=False, auth=auth, params=payload) | |
# Should check if the operation succeed | |
return f | |
def main(): | |
# should check if we are authenticated | |
auth = OAuth1(consumer_key, consumer_secret, access_token_key, access_token_secret) | |
nuke_message = get_nuke_message_func(auth) | |
ids = get_dms_ids(auth) | |
# we can also use itertools imap (python 2) / map (python 3) | |
for id_ in ids: | |
print('Deleting: {id_}'.format(id_=id_)) | |
nuke_message(id_) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Awesome!