Last active
February 21, 2018 15:45
-
-
Save marron-akanishi/479f8d4e24270a61cd03745f709835c0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# dlibによる顔検出 | |
import numpy as np | |
import cv2 | |
from dlib import simple_object_detector | |
face_detector = simple_object_detector("./detector_face.svm") | |
eye_detector = simple_object_detector("./detector_eye.svm") | |
def face2d_detect(raw_file): | |
# 取得するか | |
is_get = False | |
# 顔の位置 | |
facex = [] | |
facey = [] | |
facew = [] | |
faceh = [] | |
# 画像をデコード | |
image = cv2.imdecode(np.asarray(bytearray(raw_file), dtype=np.uint8), 1) | |
# 画像から顔を検出 | |
try: | |
faces = face_detector(image) | |
except: | |
faces = 0 | |
# 顔が検出出来たか | |
if len(faces) > 0: | |
# 顔だけ切り出して目の検索 | |
for i, area in enumerate(faces): | |
face = image[area.top():area.bottom(), area.left():area.right()] | |
# 出来た画像から目を検出 | |
eyes = eye_detector(face) | |
if len(eyes) > 0: | |
facex.append(area.left()) | |
facey.append(area.top()) | |
facew.append(area.right()-area.left()) | |
faceh.append(area.bottom()-area.top()) | |
is_get = True | |
return is_get, [facex, facey, facew, faceh] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# TLに流れてる画像回収 with dlib+TPTSモデル | |
import os | |
import time | |
import datetime | |
import urllib | |
import json | |
import sqlite3 | |
import tweepy as tp | |
from dlib_detector import face2d_detect | |
from image_hash import phash_calc | |
def get_oauth(): | |
"""設定ファイルから各種キーを取得し、OAUTH認証を行う""" | |
setting = json.load(open("./setting.json")) | |
auth = tp.OAuthHandler(setting['CK'], setting['CS']) | |
auth.set_access_token(setting['Admin_Key'], setting['Admin_Secret']) | |
return auth | |
class StreamListener(tp.StreamListener): | |
def __init__(self, api): | |
"""コンストラクタ""" | |
self.api = api | |
self.me = self.api.me().screen_name | |
# 保存先 | |
self.old_date = datetime.date.today() | |
self.mkdir() | |
def on_error(self, status_code): | |
"""接続エラー時対策""" | |
return True | |
def on_timeout(self): | |
"""接続タイムアウト時対策""" | |
return True | |
def on_status(self, status): | |
"""UserStreamから飛んできたStatusを処理する""" | |
# Tweetに画像がついているか | |
is_media = False | |
# 日付の確認 | |
now = datetime.date.today() | |
if now != self.old_date: | |
self.old_date = now | |
self.dbfile.commit() | |
self.dbfile.close() | |
self.mkdir() | |
# TweetがRTかどうか | |
if hasattr(status, "retweeted_status"): | |
status = status.retweeted_status | |
# Tweetが引用ツイートかどうか | |
if hasattr(status, "quoted_status"): | |
status = status.quoted_status | |
# 複数枚の画像ツイートのとき | |
if hasattr(status, "extended_entities"): | |
if 'media' in status.extended_entities: | |
status_media = status.extended_entities | |
is_media = True | |
# 一枚の画像ツイートのとき | |
elif hasattr(status, "entities"): | |
if 'media' in status.entities: | |
status_media = status.entities | |
is_media = True | |
# 画像がついていたとき | |
if is_media: | |
# 自分のツイートは飛ばす(RT対策) | |
if status.user.screen_name != self.me: | |
for image in status_media['media']: | |
if image['type'] != 'photo': | |
break | |
# URL, ファイル名 | |
media_url = image['media_url'] | |
root, ext = os.path.splitext(media_url) | |
filename = str(self.fileno).zfill(5) | |
# URLによる重複確認 | |
if media_url in self.file_url: | |
print("Skip : {}-{}".format(status.user.screen_name, filename)) | |
continue | |
# ダウンロード | |
try: | |
temp_file = urllib.request.urlopen(media_url+":orig").read() | |
except: | |
print("Download Error") | |
continue | |
# ハッシュによる重複確認 | |
is_overlap = False | |
image_hash = phash_calc(temp_file) | |
for hash_key in self.file_hash: | |
# dhashの場合は下の2を16に変更(2進数と16進数) | |
check = int(hash_key, 2) ^ int(image_hash, 2) | |
count = bin(check).count('1') | |
if count < 4: | |
is_overlap = True | |
break | |
if is_overlap: | |
print("Skip : {}-{}".format(status.user.screen_name, filename)) | |
continue | |
# 顔検出へ | |
is_get, areas = face2d_detect(temp_file) | |
# 保存 | |
if is_get: | |
out = open(self.base_path + filename + ext, "wb") | |
out.write(temp_file) | |
out.close() | |
self.file_url.append(media_url) | |
self.file_hash.append(image_hash) | |
# ハッシュタグがあれば保存する | |
tags = [] | |
if hasattr(status, "entities"): | |
if "hashtags" in status.entities: | |
for hashtag in status.entities['hashtags']: | |
tags.append(hashtag['text']) | |
# データベースに保存 | |
url = "https://twitter.com/" + status.user.screen_name + "/status/" + status.id_str | |
SQL = "insert into list values (?,?,?,?,?,?,?,?,?,?,?,?)" | |
value = (filename, media_url, status.user.screen_name, url, str(status.favorite_count), str(status.retweet_count), | |
str(tags).replace("'", ""), str(datetime.datetime.now()), str(areas[0]), str(areas[1]), str(areas[2]), str(areas[3])) | |
self.dbfile.execute(SQL, value) | |
self.dbfile.commit() | |
print("Save : {}-{}".format(status.user.screen_name, filename)) | |
self.fileno += 1 | |
else: | |
print("Skip : {}-{}".format(status.user.screen_name, filename)) | |
temp_file = None | |
def mkdir(self): | |
"""保存用のフォルダーを生成し、必要な変数を初期化する""" | |
self.base_path = "./" + self.old_date.isoformat() + "/" | |
if os.path.exists(self.base_path) == False: | |
os.mkdir(self.base_path) | |
dbpath = self.base_path + "list.db" | |
if os.path.exists(dbpath): | |
print("DB file exist") | |
self.dbfile = sqlite3.connect(dbpath) | |
cur = self.dbfile.cursor() | |
cur.execute("select count(filename) from list") | |
self.fileno = cur.fetchone()[0] | |
cur.close() | |
else: | |
self.dbfile = sqlite3.connect(dbpath) | |
self.dbfile.execute("create table list (filename, image, username, url, fav, retweet, tags, time, facex, facey, facew, faceh)") | |
self.fileno = 0 | |
self.file_hash = [] | |
self.file_url = [] | |
def main(): | |
"""メイン関数""" | |
auth = get_oauth() | |
stream = tp.Stream(auth, StreamListener(tp.API(auth)), secure=True) | |
print('Start Streaming!') | |
while True: | |
try: | |
stream.userstream() | |
except KeyboardInterrupt: | |
exit() | |
except: | |
print('UserStream Error') | |
time.sleep(60) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment