Skip to content

Instantly share code, notes, and snippets.

@marron-akanishi
Last active February 21, 2018 15:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save marron-akanishi/479f8d4e24270a61cd03745f709835c0 to your computer and use it in GitHub Desktop.
Save marron-akanishi/479f8d4e24270a61cd03745f709835c0 to your computer and use it in GitHub Desktop.
# dlibによる顔検出
import numpy as np
import cv2
from dlib import simple_object_detector
face_detector = simple_object_detector("./detector_face.svm")
eye_detector = simple_object_detector("./detector_eye.svm")
def face2d_detect(raw_file):
# 取得するか
is_get = False
# 顔の位置
facex = []
facey = []
facew = []
faceh = []
# 画像をデコード
image = cv2.imdecode(np.asarray(bytearray(raw_file), dtype=np.uint8), 1)
# 画像から顔を検出
try:
faces = face_detector(image)
except:
faces = 0
# 顔が検出出来たか
if len(faces) > 0:
# 顔だけ切り出して目の検索
for i, area in enumerate(faces):
face = image[area.top():area.bottom(), area.left():area.right()]
# 出来た画像から目を検出
eyes = eye_detector(face)
if len(eyes) > 0:
facex.append(area.left())
facey.append(area.top())
facew.append(area.right()-area.left())
faceh.append(area.bottom()-area.top())
is_get = True
return is_get, [facex, facey, facew, faceh]
# TLに流れてる画像回収 with dlib+TPTSモデル
import os
import time
import datetime
import urllib
import json
import sqlite3
import tweepy as tp
from dlib_detector import face2d_detect
from image_hash import phash_calc
def get_oauth():
"""設定ファイルから各種キーを取得し、OAUTH認証を行う"""
setting = json.load(open("./setting.json"))
auth = tp.OAuthHandler(setting['CK'], setting['CS'])
auth.set_access_token(setting['Admin_Key'], setting['Admin_Secret'])
return auth
class StreamListener(tp.StreamListener):
def __init__(self, api):
"""コンストラクタ"""
self.api = api
self.me = self.api.me().screen_name
# 保存先
self.old_date = datetime.date.today()
self.mkdir()
def on_error(self, status_code):
"""接続エラー時対策"""
return True
def on_timeout(self):
"""接続タイムアウト時対策"""
return True
def on_status(self, status):
"""UserStreamから飛んできたStatusを処理する"""
# Tweetに画像がついているか
is_media = False
# 日付の確認
now = datetime.date.today()
if now != self.old_date:
self.old_date = now
self.dbfile.commit()
self.dbfile.close()
self.mkdir()
# TweetがRTかどうか
if hasattr(status, "retweeted_status"):
status = status.retweeted_status
# Tweetが引用ツイートかどうか
if hasattr(status, "quoted_status"):
status = status.quoted_status
# 複数枚の画像ツイートのとき
if hasattr(status, "extended_entities"):
if 'media' in status.extended_entities:
status_media = status.extended_entities
is_media = True
# 一枚の画像ツイートのとき
elif hasattr(status, "entities"):
if 'media' in status.entities:
status_media = status.entities
is_media = True
# 画像がついていたとき
if is_media:
# 自分のツイートは飛ばす(RT対策)
if status.user.screen_name != self.me:
for image in status_media['media']:
if image['type'] != 'photo':
break
# URL, ファイル名
media_url = image['media_url']
root, ext = os.path.splitext(media_url)
filename = str(self.fileno).zfill(5)
# URLによる重複確認
if media_url in self.file_url:
print("Skip : {}-{}".format(status.user.screen_name, filename))
continue
# ダウンロード
try:
temp_file = urllib.request.urlopen(media_url+":orig").read()
except:
print("Download Error")
continue
# ハッシュによる重複確認
is_overlap = False
image_hash = phash_calc(temp_file)
for hash_key in self.file_hash:
# dhashの場合は下の2を16に変更(2進数と16進数)
check = int(hash_key, 2) ^ int(image_hash, 2)
count = bin(check).count('1')
if count < 4:
is_overlap = True
break
if is_overlap:
print("Skip : {}-{}".format(status.user.screen_name, filename))
continue
# 顔検出へ
is_get, areas = face2d_detect(temp_file)
# 保存
if is_get:
out = open(self.base_path + filename + ext, "wb")
out.write(temp_file)
out.close()
self.file_url.append(media_url)
self.file_hash.append(image_hash)
# ハッシュタグがあれば保存する
tags = []
if hasattr(status, "entities"):
if "hashtags" in status.entities:
for hashtag in status.entities['hashtags']:
tags.append(hashtag['text'])
# データベースに保存
url = "https://twitter.com/" + status.user.screen_name + "/status/" + status.id_str
SQL = "insert into list values (?,?,?,?,?,?,?,?,?,?,?,?)"
value = (filename, media_url, status.user.screen_name, url, str(status.favorite_count), str(status.retweet_count),
str(tags).replace("'", ""), str(datetime.datetime.now()), str(areas[0]), str(areas[1]), str(areas[2]), str(areas[3]))
self.dbfile.execute(SQL, value)
self.dbfile.commit()
print("Save : {}-{}".format(status.user.screen_name, filename))
self.fileno += 1
else:
print("Skip : {}-{}".format(status.user.screen_name, filename))
temp_file = None
def mkdir(self):
"""保存用のフォルダーを生成し、必要な変数を初期化する"""
self.base_path = "./" + self.old_date.isoformat() + "/"
if os.path.exists(self.base_path) == False:
os.mkdir(self.base_path)
dbpath = self.base_path + "list.db"
if os.path.exists(dbpath):
print("DB file exist")
self.dbfile = sqlite3.connect(dbpath)
cur = self.dbfile.cursor()
cur.execute("select count(filename) from list")
self.fileno = cur.fetchone()[0]
cur.close()
else:
self.dbfile = sqlite3.connect(dbpath)
self.dbfile.execute("create table list (filename, image, username, url, fav, retweet, tags, time, facex, facey, facew, faceh)")
self.fileno = 0
self.file_hash = []
self.file_url = []
def main():
"""メイン関数"""
auth = get_oauth()
stream = tp.Stream(auth, StreamListener(tp.API(auth)), secure=True)
print('Start Streaming!')
while True:
try:
stream.userstream()
except KeyboardInterrupt:
exit()
except:
print('UserStream Error')
time.sleep(60)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment