Created
May 14, 2014 15:18
-
-
Save orithena/eb136ed7e0826abbd4db to your computer and use it in GitHub Desktop.
KIF42 Tweetbot pre-release. Not configurable enough yet for a normal release. Check the code for hardcoded stuff before running.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import twython, time, pprint, traceback | |
import config | |
from twython import TwythonStreamer | |
""" | |
Config: create a file named config.py with these lines: | |
app_key="<your app key>" | |
app_secret="<your app secret>" | |
oauth_token="<your oauth token>" | |
oauth_token_secret="<your oauth token secret" | |
owner="<twitter name of owner>" | |
All keys, secrets and tokens are creatable on http://dev.twitter.com | |
Run: until I implement a better solution, run it in a screen session | |
""" | |
MAXPERHOUR=4 | |
OVERALLMAXPERHOUR=50 | |
lasttry=0 | |
class MyStreamer(TwythonStreamer): | |
friends = [] | |
listmembers = [] | |
rts = [] | |
twitter = None | |
def update_friends(self, friendlist): | |
self.friends = friendlist | |
print("updated friends", self.friends) | |
def update_list(self): | |
self.listmembers = [ user["id"] for user in self.twitter.get_list_members(slug="Orga", owner_screen_name="kif42")["users"] ] | |
print("updated list member", self.listmembers) | |
def cleanrts(self): | |
for r in self.rts: | |
u,i,m,t = r | |
if t < time.time() - 3600: | |
self.rts.remove(r) | |
def rt(self, user, userid, tweet): | |
self.cleanrts() | |
if self.twitter is not None: | |
if not tweet in [ m for u,i,m,t in self.rts ]: | |
if userid in self.friends or len([ u for u,i,m,t in self.rts if u == user]) < MAXPERHOUR: | |
if len(self.rts) < OVERALLMAXPERHOUR: | |
print("retweeting %s (%d): %s" % (user, userid, tweet)) | |
msg = ("RT @%s: %s" % (user, tweet[len(self.creds['screen_name'])+1:].lstrip(': ')))[0:138] | |
twitter.update_status(status=msg.encode('utf-8')) | |
self.rts.append( (user, userid, tweet, time.time(), ) ) | |
else: | |
print("Overall rate limit exceeded") | |
else: | |
print("Rate limit exceeded", user) | |
else: | |
print("Tweet has been posted already: ", user, tweet) | |
def tweet(self, tweet): | |
if self.twitter is not None: | |
twitter.update_status(status=tweet[0:140]) | |
def on_success(self, data): | |
if 'text' in data: | |
print ("%s: %s" % (data['user']['screen_name'], data['text'])).encode('utf-8') | |
###print data | |
#if data['text'].lower().startswith("@%s" % self.creds['screen_name'].lower()): | |
# self.rt(data['user']['screen_name'], data['user']['id'], data['text']) | |
elif 'direct_message' in data: | |
self.update_list() | |
print ("DM %s: %s" % (data['direct_message']['sender']['screen_name'], data['direct_message']['text'])) | |
if data['direct_message']['sender']['id'] in self.listmembers: | |
self.tweet(data['direct_message']['text']) | |
elif 'friends' in data: | |
self.update_friends(data['friends']) | |
else: | |
print("unknown notification received:") | |
print(data) | |
def on_error(self, status_code, data): | |
print status_code | |
# Want to stop trying to get data because of the error? | |
# Uncomment the next line! | |
# self.disconnect() | |
while True: | |
try: | |
twitter = twython.Twython(app_key=config.app_key, app_secret=config.app_secret, oauth_token=config.oauth_token, oauth_token_secret=config.oauth_token_secret) | |
creds = twitter.verify_credentials() | |
userid = creds['id_str'] | |
stream = MyStreamer(config.app_key, config.app_secret, config.oauth_token, config.oauth_token_secret) | |
stream.twitter = twitter | |
stream.creds = creds | |
stream.update_list() | |
stream.user() | |
except Exception, e: | |
print('==Exception==') | |
print(e) | |
print(traceback.format_exc()) | |
if int(time.time()) - lasttry < 120: | |
print('==Too many Exceptions in the last 2 minutes, exiting...') | |
break | |
else: | |
time.sleep(5) | |
lasttry = int(time.time()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment