Skip to content

Instantly share code, notes, and snippets.

@Rurik
Created June 27, 2018 16:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Rurik/e55d0b4e11470e4a3c5b4218d2c291b2 to your computer and use it in GitHub Desktop.
Save Rurik/e55d0b4e11470e4a3c5b4218d2c291b2 to your computer and use it in GitHub Desktop.
Tracks a public Twitter List and posts updates to a given Slack channel
### Tracks a public Twitter List and posts updates to a given Slack channel
### Example: https://i.imgur.com/RMQB27N.png
import datetime
import time
import twitter
from slackclient import SlackClient
slack_bot_id = '<FILL OUT>'
slack_channel = '<FILL OUT>'
slack_client = SlackClient(slack_bot_id)
twitter_consumer_key = '<FILL OUT>'
twitter_consumer_secret = '<FILL OUT>'
twitter_access_token = '<FILL OUT>'
twitter_access_secret = '<FILL OUT>'
sinceID = None
counter = 0
twitter_list = '<FILL OUT>'
list_owner = 'bbaskin'
def read_tweets(list_slug, sinceID):
if sinceID:
statuses = t.lists.statuses(owner_screen_name=list_owner, slug=list_slug, since_id=sinceID)
else:
statuses = t.lists.statuses(owner_screen_name=list_owner, slug=list_slug)
return statuses
def parse_tweets(statuses):
global sinceID
global counter
for tweet in statuses:
text = tweet['text']
user = tweet['user']['screen_name']
url = 'https://twitter.com/{}/status/{}'.format(user, tweet['id'])
post = '[{}] @{}: {} (Source: {} )'.format(counter, user, text, url)
slack_post_channel(post, slack_channel)
print('[{}] {}'.format(counter, url))
counter += 1
if statuses:
sinceID = statuses[0]['id']
return True
return False
def slack_post_channel(msg, room):
slack_client.api_call('chat.postMessage', channel=room, text=msg)
if __name__ == '__main__':
global t
t = twitter.Twitter(auth=twitter.OAuth(twitter_access_token, twitter_access_secret, twitter_consumer_key, twitter_consumer_secret))
if slack_client.rtm_connect(with_team_state=False):
slack_client.api_call('auth.test')['user_id']
while True:
statuses = read_tweets(twitter_list, sinceID)
parse_tweets(statuses)
time.sleep(60)
else:
print('[*] Slack connection failed')
quit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment