Skip to content

Instantly share code, notes, and snippets.

@en129
Created January 23, 2015 08:45
Show Gist options
  • Save en129/74e371c648654b1755a9 to your computer and use it in GitHub Desktop.
Save en129/74e371c648654b1755a9 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
# -*- coding: utf-8 -*-
# http://www.nari64.com/?p=200
import sys
import tweepy
from tweepy import Stream, TweepError
import logging
import urllib
CONSUMER_KEY=''
CONSUMER_SECRET=''
ACCESS_TOKEN=''
ACCESS_TOKEN_SECRET=''
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
print "start"
class CustomStreamListener(tweepy.StreamListener):
#TL data
def on_status(self, status):
try:
if status.text.find("RT") == -1 :
fixed_status = status.text.replace("@bot_en129","")
fixed_screen_name = status.author.screen_name.replace("@","from:")
print "@%s %s" % (fixed_screen_name.encode('shift-jis'),
fixed_status.encode('shift-jis',errors='ignore'))
except Exception, e:
print >> sys.stderr, 'Encountered Exception:', e
pass
def on_error(self, status_code):
print >> sys.stderr, 'Encountered error with status code:', status_code
return True # Don't kill the stream
def on_timeout(self):
print >> sys.stderr, 'Timeout...'
return True # Don't kill the stream
class UserStream(Stream):
def user_stream(self, follow=None, track=None, async=False, locations=None):
self.parameters = {"delimited": "length", }
self.headers['Content-type'] = "application/x-www-form-urlencoded"
if self.running:
raise TweepError('Stream object already connected!')
self.scheme = "https"
self.host = 'userstream.twitter.com'
self.url = '/2/user.json'
if follow:
self.parameters['follow'] = ','.join(map(str, follow))
if track:
self.parameters['track'] = ','.join(map(str, track))
if locations and len(locations) > 0:
assert len(locations) % 4 == 0
self.parameters['locations'] = ','.join(['%.2f' % l for l in locations])
self.body = urllib.urlencode(self.parameters)
logging.debug("[ User Stream URL ]: %s://%s%s" % (self.scheme, self.host, self.url))
logging.debug("[ Request Body ] :" + self.body)
self._start(async)
def main():
stream = UserStream(auth, CustomStreamListener())
stream.timeout = None
stream.filter(track=['@bot_denkou','#MFTdenkou'])
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment