Skip to content

Instantly share code, notes, and snippets.

@hapo31
Created December 15, 2015 08:51
Show Gist options
  • Save hapo31/f436be6fa2a634c4455c to your computer and use it in GitHub Desktop.
Save hapo31/f436be6fa2a634c4455c to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
import re
import datetime
import tweepy
# from tweepy.streaming import StreamListener
# from tweepy import OAuthHandler
# from tweepy import Stream
import config
class MyStreamListener(tweepy.StreamListener):
def __init__(self, auth):
super(MyStreamListener, self).__init__(api=tweepy.API(auth))
self._repatter = re.compile(config.updatename_regex)
self._user_history = { "": UsesCount()}
def on_status(self, status):
if(self._repatter.match(status.text)):
try:
if(self._rec_tweet.id == status.id):
return False
except AttributeError:
pass
#ツイートのID
tweetid = status.id
#ツイートしたユーザーのScreenName
tweetuser_screenname = status.author.screen_name
#新しく設定する名前 (名前に適用できない文字を削除)
newname = status.text
for n in ['\n', '\r', '\"', '\'']:
newname = newname.replace(n, '')
#ツイート本文
tweetstr = ""
#長すぎ
if len(newname) > 20:
newname = ""
tweetstr = "@%s 長すぎィ!" % (tweetuser_screenname)
#使用履歴の確認
elif tweetuser_screenname in self._user_history and not self._user_history[tweetuser_screenname].Use():
#使いすぎマンにはおしおき
tweetstr = "@%s 使いすぎじゃボケ" % (tweetuser_screenname)
newname = ""
#正常処理
else:
#使用者が使用履歴にまだいない場合は追加
if tweetuser_screenname not in self._user_history:
self._user_history[tweetuser_screenname] = UsesCount()
#成功時のツイート本文
tweetstr = "@%s 我が名は%sである!" % (tweetuser_screenname, newname)
#NGワードチェック
for v in config.NGWordList:
if re.match(v, newname):
tweetstr = "@%s 我が名は%sではない!" % (tweetuser_screenname, newname)
newname = ""
break;
try:
#名前が入っていれば名前変更の処理をする
if newname:
last = self._user_history[tweetuser_screenname].last_time
tstr = last.strftime("%y/%m/%d %H:%M:%S")
print("@%s: %s => %s (%s)" % (tweetuser_screenname, self.api.me().name, newname, tstr))
self.api.update_profile(name=newname)
except tweepy.error.TweepError as e:
print(e.reason)
return False
finally:
self._rec_tweet = self.api.update_status(status = tweetstr, in_reply_to_status_id = tweetid)
return True
def on_error(self, status):
print(status)
class UsesCount(object):
def __init__(self):
self._first_time = datetime.datetime.now(tz=JST())
self._last_time = datetime.datetime.now(tz=JST())
self._use_count = 1
def Use(self):
self._use_count += 1
self._last_time = datetime.datetime.now(tz=JST())
#20分以内の5回以上の使用を禁ずる
if (self._last_time - self._first_time).seconds <= 20 * 60:
if self._use_count > 5:
return False
else:
self._first_time = datetime.datetime.now(tz=JST())
self._use_count = 1
return True
@property
def first_time(self):
return self._first_time
@property
def last_time(self):
return self._last_time
@property
def use_count(self):
return self._use_count
def Reset(self):
self.__init__()
class JST(datetime.tzinfo):
def utcoffset(self, dt):
return datetime.timedelta(hours=9)
def dst(self, dt):
return datetime.timedelta(0)
def tzname(self, dt):
return 'JST'
if __name__ == '__main__':
auth = tweepy.OAuthHandler(
consumer_key = config.consumer_key,
consumer_secret = config.consumer_secret)
auth.set_access_token(config.access_token, config.access_secret)
mystream = MyStreamListener(auth=auth)
stream = tweepy.Stream(auth=auth, listener=mystream)
stream.userstream()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment