Last active
March 18, 2020 22:47
-
-
Save iamevn/f95c1403171ed9e4416e68a41f9b60ed to your computer and use it in GitHub Desktop.
find boops from https://twitter.com/alecrobbins/status/1233445773682991104
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import webbrowser | |
from camel import Camel | |
from dataclasses import dataclass | |
from typing import List | |
from tweet import Tweet, gen_tweet | |
from camel_registry import reg | |
from find_boops import find_boops | |
@dataclass | |
class State: | |
last_boop: Tweet | |
boop_thread: List[Tweet] | |
global_state = State(gen_tweet(1233445773682991104), [gen_tweet(1233445773682991104)]) | |
def boops_since_last(last=None): | |
if last is None: | |
last = global_state.last_boop | |
new_boops = find_boops(global_state.boop_thread, after=last.date) | |
global_state.boop_thread.extend(filter(lambda boop: boop not in global_state.boop_thread, new_boops)) | |
global_state.last_boop = global_state.boop_thread[-1] | |
return global_state.boop_thread[global_state.boop_thread.index(last) + 1:] | |
def save_to_file(filename='boops.yaml'): | |
s = Camel([reg]).dump((global_state.last_boop, global_state.boop_thread)) | |
with open(filename, mode='w') as file: | |
file.write(s) | |
def load_from_file(filename='boops.yaml'): | |
try: | |
with open(filename, mode='r') as file: | |
s = file.read() | |
except FileNotFoundError: | |
print(f'{filename} not found!') | |
return | |
global_state.last_boop, global_state.boop_thread = Camel([reg]).load(s) | |
def boop2str(boop): | |
return f'[{boop.date.strftime("%Y-%m-%d")}] {boop.text}' | |
def main(): | |
load_from_file() | |
new_boops = boops_since_last() | |
to_open = global_state.last_boop | |
if not new_boops: | |
print('No new boops :c\nLast boop:\n') | |
print(boop2str(to_open)) | |
else: | |
print(f'{len(new_boops)} new boops!\n') | |
to_open = new_boops[0] | |
for boop in new_boops: | |
print(boop2str(boop)) | |
save_to_file() | |
i = input('\n[o]pen in webbrowser? ') | |
if any(c in i for c in 'oOyY'): | |
print(f'Opening {to_open.url}') | |
webbrowser.open(to_open.url) | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from camel import CamelRegistry | |
from tweet import Tweet | |
# for serialization/deserialization | |
reg = CamelRegistry() | |
@reg.dumper(Tweet, 'tweet', version=1) | |
def _dump_tweet(tweet): | |
return dict( | |
id=tweet.id, | |
author=tweet.author, | |
text=tweet.text, | |
date=tweet.date, | |
media_urls=tweet.media_urls, | |
in_reply_to=tweet.in_reply_to, | |
) | |
@reg.loader('tweet', version=1) | |
def _load_tweet(data, version): | |
return Tweet( | |
id=data['id'], | |
author=data['author'], | |
text=data['text'], | |
date=data['date'], | |
media_urls=data['media_urls'], | |
in_reply_to=data['in_reply_to'], | |
) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import lru_cache | |
from tweet import Tweet, gen_tweet | |
import tweepy | |
from tweepy_api import get_api | |
def find_boops(boop_thread, after=None): | |
api = get_api() | |
@lru_cache(999) | |
def is_boop(tweet: Tweet, original_boop=1233445773682991104) -> bool: | |
if tweet in boop_thread: | |
return True | |
if tweet.in_reply_to: | |
return is_boop(gen_tweet(tweet.in_reply_to)) | |
else: | |
return tweet.id == original_boop | |
original_boop = 1233445773682991104 | |
boop_man = 'alecrobbins' | |
if after is None: | |
query = f'to:{boop_man} from:{boop_man} filter:images' | |
else: | |
query = f'to:{boop_man} from:{boop_man} filter:images since:{after.strftime("%Y-%m-%d")}' | |
cursor = tweepy.Cursor(api.search, | |
q=query, | |
result_type='recent', | |
timeout=999999, | |
tweet_mode='extended') | |
boops = set() | |
for tweet in map(gen_tweet, cursor.items()): | |
if is_boop(tweet): | |
boops.add(tweet) | |
all_boops = set() | |
for tweet in boops: | |
boop = tweet | |
all_boops.add(boop) | |
if boop.in_reply_to: | |
parent = gen_tweet(boop.in_reply_to) | |
while parent not in all_boops and (parent.date > after if after is not None else True): | |
all_boops.add(parent) | |
if parent.in_reply_to: | |
parent = gen_tweet(parent.in_reply_to) | |
new_boops = list(all_boops) | |
new_boops.sort(key=lambda tweet: tweet.date) | |
return new_boops |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tweepy | |
def get_api(): | |
consumer_key = 'https:/developer.twitter.com/' | |
consumer_secret = 'https:/developer.twitter.com/' | |
access_token = 'https:/developer.twitter.com/' | |
access_token_secret = 'https:/developer.twitter.com/' | |
auth = tweepy.OAuthHandler(consumer_key, consumer_secret) | |
auth.set_access_token(access_token, access_token_secret) | |
return tweepy.API(auth) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import List, Optional | |
from dataclasses import dataclass | |
from datetime import datetime | |
import tweepy | |
from tweepy_api import get_api | |
@dataclass | |
class Tweet: | |
'''Class to track tweet id, author, text, media urls, reply type''' | |
id: int | |
author: str | |
text: str | |
date: datetime | |
media_urls: List[str] | |
in_reply_to: Optional[int] | |
@property | |
def url(self) -> str: | |
return f'https://twitter.com/{self.author}/status/{self.id}' | |
def __hash__(self): | |
return hash(self.id) | |
def __eq__(self, other): | |
return isinstance(other, Tweet) and self.id == other.id | |
def gen_tweet(status) -> Tweet: | |
if not isinstance(status, tweepy.models.Status): | |
api = get_api() | |
status = api.get_status(status, tweet_mode='extended') | |
return Tweet( | |
status.id, | |
status.author.screen_name, | |
status.full_text, | |
status.created_at, | |
[entity['media_url'] for entity in status.entities['media']] if 'media' in status.entities else [], | |
status.in_reply_to_status_id) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment