Created
February 19, 2018 15:36
-
-
Save marron-akanishi/f0f3358e0a1ba825392c4c4f8cef15e0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# 画像ハッシュ生成 | |
import numpy as np | |
import scipy.fftpack | |
import cv2 | |
# phash(perceptual hash)による実装例 | |
def phash_calc(raw_file, hash_size=32): | |
image = cv2.imdecode(np.asarray(bytearray(raw_file), dtype=np.uint8), 1) | |
check_image = cv2.resize(image, (hash_size, hash_size)) | |
check_image = cv2.cvtColor(check_image, cv2.COLOR_RGB2GRAY) | |
dct = scipy.fftpack.dct(check_image) | |
dctlowfreq = dct[:8, 1:9] | |
avg = dctlowfreq.mean() | |
diff = dctlowfreq > avg | |
value_str = "" | |
for value in [flatten for inner in diff for flatten in inner]: | |
value_str += '1' if value else '0' | |
return value_str | |
# dhash(different hash)による実装例 | |
def dhash_calc(raw_file, hash_size=7): | |
image = cv2.imdecode(np.asarray(bytearray(raw_file), dtype=np.uint8), 1) | |
check_image = cv2.resize(image,(hash_size,hash_size+1)) | |
check_image = cv2.cvtColor(check_image, cv2.COLOR_RGB2GRAY) | |
difference = [] | |
for row in range(hash_size): | |
for col in range(hash_size): | |
pixel_left = check_image[col, row] | |
pixel_right = check_image[col + 1, row] | |
difference.append(pixel_left > pixel_right) | |
decimal_value = 0 | |
hex_string = [] | |
for index, value in enumerate(difference): | |
if value: | |
decimal_value += 2**(index % 8) | |
if (index % 8) == 7: | |
hex_string.append(hex(decimal_value)[2:].rjust(2, '0')) | |
decimal_value = 0 | |
return ''.join(hex_string) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# TLに流れてる画像回収 with lbpcascade_animeface 重複検出版 | |
import os | |
import time | |
import datetime | |
import urllib | |
import json | |
import tweepy as tp | |
from lbpcascade import face2d_detect | |
from image_hash import phash_calc | |
def get_oauth(): | |
"""設定ファイルから各種キーを取得し、OAUTH認証を行う""" | |
setting = json.load(open("./setting.json")) | |
auth = tp.OAuthHandler(setting['CK'], setting['CS']) | |
auth.set_access_token(setting['Admin_Key'], setting['Admin_Secret']) | |
return auth | |
class StreamListener(tp.StreamListener): | |
def __init__(self, api): | |
"""コンストラクタ""" | |
self.api = api | |
self.me = self.api.me().screen_name | |
# 保存先 | |
self.old_date = datetime.date.today() | |
self.mkdir() | |
def on_error(self, status_code): | |
"""接続エラー時対策""" | |
return True | |
def on_timeout(self): | |
"""接続タイムアウト時対策""" | |
return True | |
def on_status(self, status): | |
"""UserStreamから飛んできたStatusを処理する""" | |
# Tweetに画像がついているか | |
is_media = False | |
# 日付の確認 | |
now = datetime.date.today() | |
if now != self.old_date: | |
self.old_date = now | |
self.mkdir() | |
# TweetがRTかどうか | |
if hasattr(status, "retweeted_status"): | |
status = status.retweeted_status | |
# Tweetが引用ツイートかどうか | |
if hasattr(status, "quoted_status"): | |
status = status.quoted_status | |
# 複数枚の画像ツイートのとき | |
if hasattr(status, "extended_entities"): | |
if 'media' in status.extended_entities: | |
status_media = status.extended_entities | |
is_media = True | |
# 一枚の画像ツイートのとき | |
elif hasattr(status, "entities"): | |
if 'media' in status.entities: | |
status_media = status.entities | |
is_media = True | |
# 画像がついていたとき | |
if is_media: | |
# 自分のツイートは飛ばす(RT対策) | |
if status.user.screen_name != self.me: | |
for image in status_media['media']: | |
if image['type'] != 'photo': | |
break | |
# URL, ファイル名 | |
media_url = image['media_url'] | |
root, ext = os.path.splitext(media_url) | |
filename = str(self.fileno).zfill(5) | |
# URLによる重複確認 | |
if media_url in self.file_url: | |
print("Skip : {}-{}".format(status.user.screen_name, filename)) | |
continue | |
# ダウンロード | |
try: | |
temp_file = urllib.request.urlopen(media_url+":orig").read() | |
except: | |
print("Download Error") | |
continue | |
# ハッシュによる重複確認 | |
is_overlap = False | |
image_hash = phash_calc(temp_file) | |
for hash_key in self.file_hash: | |
# dhashの場合は下の2を16に変更(2進数と16進数) | |
check = int(hash_key, 2) ^ int(image_hash, 2) | |
count = bin(check).count('1') | |
if count < 4: | |
is_overlap = True | |
break | |
if is_overlap: | |
print("Skip : {}-{}".format(status.user.screen_name, filename)) | |
continue | |
# 顔検出へ | |
is_get, areas = face2d_detect(temp_file) | |
# 保存 | |
if is_get: | |
out = open(self.base_path + filename + ext, "wb") | |
out.write(temp_file) | |
out.close() | |
self.file_url.append(media_url) | |
self.file_hash.append(image_hash) | |
print("Save : {}-{}".format(status.user.screen_name, filename)) | |
self.fileno += 1 | |
else: | |
print("Skip : {}-{}".format(status.user.screen_name, filename)) | |
temp_file = None | |
def mkdir(self): | |
"""保存用のフォルダーを生成し、必要な変数を初期化する""" | |
self.base_path = "./" + self.old_date.isoformat() + "/" | |
if os.path.exists(self.base_path) == False: | |
os.mkdir(self.base_path) | |
self.fileno = 0 | |
self.file_hash = [] | |
self.file_url = [] | |
def main(): | |
"""メイン関数""" | |
auth = get_oauth() | |
stream = tp.Stream(auth, StreamListener(tp.API(auth)), secure=True) | |
print('Start Streaming!') | |
while True: | |
try: | |
stream.userstream() | |
except KeyboardInterrupt: | |
exit() | |
except: | |
print('UserStream Error') | |
time.sleep(60) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment