Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
# 画像ハッシュ生成
import numpy as np
import scipy.fftpack
import cv2
# phash(perceptual hash)による実装例
def phash_calc(raw_file, hash_size=32):
image = cv2.imdecode(np.asarray(bytearray(raw_file), dtype=np.uint8), 1)
check_image = cv2.resize(image, (hash_size, hash_size))
check_image = cv2.cvtColor(check_image, cv2.COLOR_RGB2GRAY)
dct = scipy.fftpack.dct(check_image)
dctlowfreq = dct[:8, 1:9]
avg = dctlowfreq.mean()
diff = dctlowfreq > avg
value_str = ""
for value in [flatten for inner in diff for flatten in inner]:
value_str += '1' if value else '0'
return value_str
# dhash(different hash)による実装例
def dhash_calc(raw_file, hash_size=7):
image = cv2.imdecode(np.asarray(bytearray(raw_file), dtype=np.uint8), 1)
check_image = cv2.resize(image,(hash_size,hash_size+1))
check_image = cv2.cvtColor(check_image, cv2.COLOR_RGB2GRAY)
difference = []
for row in range(hash_size):
for col in range(hash_size):
pixel_left = check_image[col, row]
pixel_right = check_image[col + 1, row]
difference.append(pixel_left > pixel_right)
decimal_value = 0
hex_string = []
for index, value in enumerate(difference):
if value:
decimal_value += 2**(index % 8)
if (index % 8) == 7:
hex_string.append(hex(decimal_value)[2:].rjust(2, '0'))
decimal_value = 0
return ''.join(hex_string)
# TLに流れてる画像回収 with lbpcascade_animeface 重複検出版
import os
import time
import datetime
import urllib
import json
import tweepy as tp
from lbpcascade import face2d_detect
from image_hash import phash_calc
def get_oauth():
"""設定ファイルから各種キーを取得し、OAUTH認証を行う"""
setting = json.load(open("./setting.json"))
auth = tp.OAuthHandler(setting['CK'], setting['CS'])
auth.set_access_token(setting['Admin_Key'], setting['Admin_Secret'])
return auth
class StreamListener(tp.StreamListener):
def __init__(self, api):
"""コンストラクタ"""
self.api = api
self.me = self.api.me().screen_name
# 保存先
self.old_date = datetime.date.today()
self.mkdir()
def on_error(self, status_code):
"""接続エラー時対策"""
return True
def on_timeout(self):
"""接続タイムアウト時対策"""
return True
def on_status(self, status):
"""UserStreamから飛んできたStatusを処理する"""
# Tweetに画像がついているか
is_media = False
# 日付の確認
now = datetime.date.today()
if now != self.old_date:
self.old_date = now
self.mkdir()
# TweetがRTかどうか
if hasattr(status, "retweeted_status"):
status = status.retweeted_status
# Tweetが引用ツイートかどうか
if hasattr(status, "quoted_status"):
status = status.quoted_status
# 複数枚の画像ツイートのとき
if hasattr(status, "extended_entities"):
if 'media' in status.extended_entities:
status_media = status.extended_entities
is_media = True
# 一枚の画像ツイートのとき
elif hasattr(status, "entities"):
if 'media' in status.entities:
status_media = status.entities
is_media = True
# 画像がついていたとき
if is_media:
# 自分のツイートは飛ばす(RT対策)
if status.user.screen_name != self.me:
for image in status_media['media']:
if image['type'] != 'photo':
break
# URL, ファイル名
media_url = image['media_url']
root, ext = os.path.splitext(media_url)
filename = str(self.fileno).zfill(5)
# URLによる重複確認
if media_url in self.file_url:
print("Skip : {}-{}".format(status.user.screen_name, filename))
continue
# ダウンロード
try:
temp_file = urllib.request.urlopen(media_url+":orig").read()
except:
print("Download Error")
continue
# ハッシュによる重複確認
is_overlap = False
image_hash = phash_calc(temp_file)
for hash_key in self.file_hash:
# dhashの場合は下の2を16に変更(2進数と16進数)
check = int(hash_key, 2) ^ int(image_hash, 2)
count = bin(check).count('1')
if count < 4:
is_overlap = True
break
if is_overlap:
print("Skip : {}-{}".format(status.user.screen_name, filename))
continue
# 顔検出へ
is_get, areas = face2d_detect(temp_file)
# 保存
if is_get:
out = open(self.base_path + filename + ext, "wb")
out.write(temp_file)
out.close()
self.file_url.append(media_url)
self.file_hash.append(image_hash)
print("Save : {}-{}".format(status.user.screen_name, filename))
self.fileno += 1
else:
print("Skip : {}-{}".format(status.user.screen_name, filename))
temp_file = None
def mkdir(self):
"""保存用のフォルダーを生成し、必要な変数を初期化する"""
self.base_path = "./" + self.old_date.isoformat() + "/"
if os.path.exists(self.base_path) == False:
os.mkdir(self.base_path)
self.fileno = 0
self.file_hash = []
self.file_url = []
def main():
"""メイン関数"""
auth = get_oauth()
stream = tp.Stream(auth, StreamListener(tp.API(auth)), secure=True)
print('Start Streaming!')
while True:
try:
stream.userstream()
except KeyboardInterrupt:
exit()
except:
print('UserStream Error')
time.sleep(60)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment