Skip to content

Instantly share code, notes, and snippets.

@rgajrawala
Created June 6, 2019 05:21
Show Gist options
  • Save rgajrawala/d042b633dc777ccca30715e4c11bb214 to your computer and use it in GitHub Desktop.
Save rgajrawala/d042b633dc777ccca30715e4c11bb214 to your computer and use it in GitHub Desktop.
Twitter Bot that responds to mentions with human faces in the media. Takes the face, runs emotion detection, and edits it into a corresponding photo of a flower.
import os
import time
import pickle
import urllib3 as urllib
import numpy as np
import cv2 as cv
from keras.models import load_model
from keras.applications import VGG16
import tweepy
# GPU does not have enough memory for prediction, lol
os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
TMP_IMG = './data/tmp.png'
# crop face from image
face_cascade = cv.CascadeClassifier('haarcascade_frontalface_default.xml')
RESIZE_DIM = 350
def getFace(img):
gray = cv.cvtColor(img, cv.COLOR_BGR2GRAY)
faces = face_cascade.detectMultiScale(gray, 1.3, 5)
if len(faces) == 0:
return None, None
x, y, w, h = faces[0]
return img[y:y+h, x:x+w], cv.resize(gray[y:y+h, x:x+w], (RESIZE_DIM, RESIZE_DIM))
# download image from internet
http = urllib.PoolManager()
def download(url):
res = http.request('GET', url)
if res.status != 200:
return None
image = np.asarray(bytearray(res.data), dtype='uint8')
return cv.imdecode(image, cv.IMREAD_COLOR)
# classify a new image
EMOTION_DICT={
0: 'neutral',
1: 'happy',
2: 'sadness',
3: 'surprise',
4: 'anger',
5: 'disgust',
6: 'fear'
}
vggModel = VGG16(weights='imagenet', include_top=False)
topModel = load_model('./data/model/model.h5')
def getEmotion(img):
cv.imwrite(TMP_IMG, img)
img = cv.imread(TMP_IMG)
img = img.reshape(1, img.shape[0], img.shape[1], img.shape[2]) / 255.0
vggPred = vggModel.predict(img)
vggPred = vggPred.reshape(1, vggPred.shape[1] * vggPred.shape[2] * vggPred.shape[3])
topPred = topModel.predict(vggPred)
emotionId = topPred[0].argmax()
return EMOTION_DICT[emotionId], emotionId
# load the face image into the correct flower image
def createEmotionImg(emotion, img):
if emotion == 'anger' or emotion == 'contempt' or emotion == 'disgust':
picName, (x, y), r = ('burning.jpg', (305, 623), 80)
elif emotion == 'fear' or emotion == 'sadness':
picName, (x, y), r = ('rainy.png', (674, 234), 70)
else:
picName, (x, y), r = ('sunflower.jpg', (487, 248), 70)
x -= int(r)
y -= int(r)
cv.imwrite(TMP_IMG, img)
img = cv.imread(TMP_IMG)
img = cv.resize(img, (r * 2, r * 2))
bg = cv.imread('./flower-imgs/' + picName, -1)
if len(bg.shape) > 2 and bg.shape[2] == 4:
bg = cv.cvtColor(bg, cv.COLOR_BGRA2BGR)
# write face to bg
for y_ in range(img.shape[0]):
for x_ in range(img.shape[1]):
if (x_ - r)**2 + (y_ - r)**2 <= r**2:
bg[y+y_][x+x_] = img[y_][x_]
# cv.imshow('image', bg)
# cv.waitKey(0)
# cv.destroyAllWindows()
# exit()
cv.imwrite(TMP_IMG, bg)
return TMP_IMG
### MAIN
# set up tweepy
auth = tweepy.OAuthHandler('XXX', 'XXX')
auth.set_access_token('XXX', 'XXX')
auth.secure = True
api = tweepy.API(auth_handler=auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)
# main loop, goes through new mentions and replies to them
lastId = pickle.load(open('lastid.pickle', 'rb'))
while True:
tweets = api.mentions_timeline(since_id=lastId)
for tweet in tweets:
if not 'media' in tweet.entities:
continue
image = download(tweet.entities['media'][0]['media_url_https'])
if image is None: # could not download media
api.update_status(status='@{} could not fetch your image 😔'.format(tweet.author.screen_name), in_reply_to_status_id=tweet.id)
print('[ERR] <@{}, ({})> No face found'.format(tweet.author.screen_name, tweet.id_str))
continue
img, grey = getFace(image)
if img is None or grey is None: # could not find face
api.update_status(status='@{} could not find your face 😔'.format(tweet.author.screen_name), in_reply_to_status_id=tweet.id)
print('[ERR] <@{}, ({})> No face found'.format(tweet.author.screen_name, tweet.id_str))
continue
emotion, emotionId = getEmotion(grey)
if emotion == 'neutral': # neutral expression
api.update_status(status='@{} where ur feels at? 😐'.format(tweet.author.screen_name), in_reply_to_status_id=tweet.id)
print('[OK] <@{}, ({})> Neutral expression'.format(tweet.author.screen_name, tweet.id_str))
continue
filePath = createEmotionImg(emotion, img)
res = api.media_upload(filePath)
api.update_status(status='@{} you seem to be feeling {}'.format(tweet.author.screen_name, emotion), in_reply_to_status_id=tweet.id, media_ids=[res.media_id])
print('[OK] <@{}, ({})> {} expression'.format(tweet.author.screen_name, tweet.id_str, emotion))
if len(tweets) > 0:
# store the most recent tweet we replied to
lastId = tweets[0].id_str # the first tweet is the most recent
pickle.dump(lastId, open('lastid.pickle', 'wb'))
time.sleep(10)
# Train.py: Train model
import os
import numpy as np
import pandas as pd
import glob
import cv2
from sklearn.model_selection import train_test_split
from keras.layers import Dropout, Dense
from keras.layers.normalization import BatchNormalization
from keras.models import Sequential
from keras.applications import VGG16
######### create data frames
def getDataFrame(emotion, emotionId):
imgs = glob.glob('./data/_' + emotion + '/*')
df = pd.DataFrame()
df['folderName'] = [str(i.split('\\')[0]) + '/' for i in imgs]
df['imageName'] = [str(i.split('\\')[1]) for i in imgs]
df['emotion'] = [emotion] * len(imgs)
df['labels'] = [emotionId] * len(imgs)
return df
# 0=neutral, 1=happy, 2=sadness, 3=surprise, 4=anger, 5=disgust, 6=fear
frames = pd.concat([
getDataFrame('neutral', 0),
getDataFrame('happy', 1),
getDataFrame('sadness', 2),
getDataFrame('surprise', 3),
getDataFrame('anger', 4),
getDataFrame('disgust', 5),
getDataFrame('fear', 6)
])
frames.reset_index(inplace=True, drop=True)
frames = frames.sample(frac=1.0) # shuffle data frame
frames.reset_index(inplace=True, drop=True)
#########
######### ONLY RUN IMAGE CONVERSION ONCE!
# convert to grayscale
for i in range(len(frames)):
path1 = frames['folderName'][i]
path2 = frames['imageName'][i]
img = cv2.imread(os.path.join(path1, path2))
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
cv2.imwrite(os.path.join(path1, path2), gray)
# crop face
face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')
for i, d in frames.iterrows():
imgPath = os.path.join(d['folderName'], d['imageName'])
img = cv2.imread(imgPath)
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
try:
(x, y, w, h) = face_cascade.detectMultiScale(gray, 1.3, 5)[0] # assume only one face per image
cv2.imwrite(imgPath, cv2.resize(img[y : y + h, x : x + w], (350, 350))) # cropping, resizing and saving image
except:
print('Could not detect face for file: ' + imgPath)
#########
######### split data frames into train/test/cv
trainFrames, testFrames = train_test_split(frames, stratify=frames['labels'], test_size=0.2)
trainFrames, cvFrames = train_test_split(trainFrames, stratify=trainFrames['labels'], test_size=0.15)
#########
######### bottleneck features
batchPointers = {
'train': 0,
'test': 0,
'cv': 0
}
# bottleneck features for train data
trainLabels = pd.get_dummies(trainFrames['labels']).to_numpy()
model = VGG16(weights='imagenet', include_top=False)
SAVEDIR_TRAIN = './data/bottleneck-features/train'
SAVEDIR_TRAIN_LABELS = './data/bottleneck-features/train-labels'
BATCH_SIZE = 10
for i in range(int(len(trainFrames) / BATCH_SIZE)):
# loadCombinedTrainBatch
batchImages = []
batchLabels = []
for j in range(BATCH_SIZE):
path1 = trainFrames.iloc[batchPointers['train'] + j]['folderName']
path2 = trainFrames.iloc[batchPointers['train'] + j]['imageName']
readImage = cv2.imread(os.path.join(path1, path2)) / 255.0 # normalize image
batchImages.append(readImage)
batchLabels.append(trainLabels[batchPointers['train'] + j]) # append corresponding labels
batchPointers['train'] += BATCH_SIZE
x, y = np.array(batchImages), np.array(batchLabels)
np.save(os.path.join(SAVEDIR_TRAIN_LABELS, 'bottleneck-labels-{}'.format(i + 1)), y)
np.save(os.path.join(SAVEDIR_TRAIN, 'bottleneck-{}'.format(i + 1)), model.predict(x))
# bottleneck features for cv data
cvLabels = pd.get_dummies(cvFrames['labels']).to_numpy()
model = VGG16(weights='imagenet', include_top=False)
SAVEDIR_CV = './data/bottleneck-features/cv'
SAVEDIR_CV_LABELS = './data/bottleneck-features/cv-labels'
for i in range(int(len(cvFrames) / BATCH_SIZE)):
batchImages = []
batchLabels = []
for j in range(BATCH_SIZE):
path1 = cvFrames.iloc[batchPointers['cv'] + j]['folderName']
path2 = cvFrames.iloc[batchPointers['cv'] + j]['imageName']
readImage = cv2.imread(os.path.join(path1, path2)) / 255.0 # normalize image
batchImages.append(readImage)
batchLabels.append(cvLabels[batchPointers['cv'] + j]) #appending corresponding labels
batchPointers['cv'] += BATCH_SIZE
x, y = np.array(batchImages), np.array(batchLabels)
np.save(os.path.join(SAVEDIR_CV_LABELS, "bottleneck-labels-{}".format(i + 1)), y)
np.save(os.path.join(SAVEDIR_CV, "bottleneck-{}".format(i + 1)), model.predict(x))
# bottleneck features for test data
testLabels = pd.get_dummies(testFrames['labels']).to_numpy()
model = VGG16(weights='imagenet', include_top=False)
SAVEDIR_TEST = './data/bottleneck-features/test'
SAVEDIR_TEST_LABELS = './data/bottleneck-features/test-labels'
for i in range(int(len(testFrames) / BATCH_SIZE)):
batchImages = []
batchLabels = []
for j in range(BATCH_SIZE):
path1 = testFrames.iloc[batchPointers['test'] + j]['folderName']
path2 = testFrames.iloc[batchPointers['test'] + j]['imageName']
readImage = cv2.imread(os.path.join(path1, path2)) / 255.0 # normalize image
batchImages.append(readImage)
batchLabels.append(testLabels[batchPointers['test'] + j]) #appending corresponding labels
batchPointers['test'] += BATCH_SIZE
x, y = np.array(batchImages), np.array(batchLabels)
np.save(os.path.join(SAVEDIR_TEST_LABELS, "bottleneck-labels-{}".format(i + 1)), y)
np.save(os.path.join(SAVEDIR_TEST, "bottleneck-{}".format(i + 1)), model.predict(x))
#########
######### modeling and training
def model(inputShape):
model = Sequential()
model.add(Dense(512, activation='relu', input_dim=inputShape))
model.add(Dropout(0.1))
model.add(Dense(256, activation='relu'))
model.add(Dense(128, activation='relu'))
model.add(BatchNormalization())
model.add(Dense(64, activation='relu'))
model.add(Dense(output_dim=7, activation='softmax'))
return model
SAVEDIR_MODEL = './data/model'
INPUT_SHAPE = 10 * 10 * 512 # shape of bottleneck feature of each image after passing through VGG-16
model = model(INPUT_SHAPE)
# model.load_weights(os.path.join(SAVEDIR_MODEL, 'model.h5')) # only if we want to keep updating previously saved model
model.summary()
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
EPOCHS = 12
step = 0
bottleneckFiles = {
'train': int(len(trainFrames) / BATCH_SIZE),
'cv': int(len(cvFrames) / BATCH_SIZE)
}
epochNum, trainLoss, trainAcc, cvLoss, cvAcc = [], [], [], [], []
for epoch in range(EPOCHS):
avgEpochTrainLoss, avgEpochTrainAcc, avgEpochCVLoss, avgEpochCVAcc = 0.0, 0.0, 0.0, 0.0
epochNum.append(epoch + 1)
for i in range(bottleneckFiles['train']):
step += 1
# load batch of train bottleneck features for training MLP
xTrainLoad = np.load(os.path.join(SAVEDIR_TRAIN, 'bottleneck-{}.npy'.format(i + 1)))
xTrain = xTrainLoad.reshape(xTrainLoad.shape[0], xTrainLoad.shape[1] * xTrainLoad.shape[2] * xTrainLoad.shape[3])
yTrain = np.load(os.path.join(SAVEDIR_TRAIN_LABELS, 'bottleneck-labels-{}.npy'.format(i + 1)))
# load batch of cv bottleneck features for cross-validation
xCVLoad = np.load(os.path.join(SAVEDIR_CV, 'bottleneck-{}.npy'.format((i % bottleneckFiles['cv']) + 1)))
xCV = xCVLoad.reshape(xCVLoad.shape[0], xCVLoad.shape[1] * xCVLoad.shape[2] * xCVLoad.shape[3])
yCV = np.load(os.path.join(SAVEDIR_CV_LABELS, 'bottleneck-labels-{}.npy'.format((i % bottleneckFiles['cv']) + 1)))
trainLoss_, trainAcc_ = model.train_on_batch(xTrain, yTrain) # train the model on batch
cvLoss_, cvAcc_ = model.test_on_batch(xCV, yCV) # cross validate the model on CV Human batch
print('Epoch: {}, Step: {}, Tr_Loss: {}, Tr_Acc: {}, CV_Loss: {}, CV_Acc: {}'.format(epoch + 1, step, np.round(float(trainLoss_), 2), np.round(float(trainAcc_), 2), np.round(float(cvLoss_), 2), np.round(float(cvAcc_), 2)))
avgEpochTrainLoss += trainLoss_ / bottleneckFiles['train']
avgEpochTrainAcc += trainAcc_ / bottleneckFiles['train']
avgEpochCVLoss += cvLoss_ / bottleneckFiles['train']
avgEpochCVAcc += cvAcc_ / bottleneckFiles['train']
print('Avg_Train_Loss: {}, Avg_Train_Acc: {}, Avg_CV_Loss: {}, Avg_CV_Acc: {}'.format(np.round(float(avgEpochTrainLoss), 2), np.round(float(avgEpochTrainAcc), 2), np.round(float(avgEpochCVLoss), 2), np.round(float(avgEpochCVAcc), 2)))
trainLoss.append(avgEpochTrainLoss)
trainAcc.append(avgEpochTrainAcc)
cvLoss.append(avgEpochCVLoss)
cvAcc.append(avgEpochCVAcc)
model.save(os.path.join(SAVEDIR_MODEL, 'model.h5')) # save model on each epoch
model.save_weights(os.path.join(SAVEDIR_MODEL, 'model_weights.h5')) # save weights on each epoch
print('Model and weights saved at epoch {}'.format(epoch + 1))
# save stats to log
log_frame = pd.DataFrame(columns = ['Epoch', 'Train_Loss', 'Train_Accuracy', 'CV_Loss', 'CV_Accuracy'])
log_frame['Epoch'] = epochNum
log_frame['Train_Loss'] = trainLoss
log_frame['Train_Accuracy'] = trainAcc
log_frame['CV_Loss'] = cvLoss
log_frame['CV_Accuracy'] = cvAcc
log_frame.to_csv('./data/log.csv', index=False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment