Created June 6, 2019 05:21
Twitter Bot that responds to mentions with human faces in the media. Takes the face, runs emotion detection, and edits it into a corresponding photo of a flower.
import os
import time
import pickle
import urllib3 as urllib
import numpy as np
import cv2 as cv
from keras.models import load_model
from keras.applications import VGG16
import tweepy
# GPU does not have enough memory for prediction, lol
os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
TMP_IMG = './data/tmp.png'
# crop face from image
face_cascade = cv.CascadeClassifier('haarcascade_frontalface_default.xml')
def getFace(img):
gray = cv.cvtColor(img, cv.COLOR_BGR2GRAY)
faces = face_cascade.detectMultiScale(gray, 1.3, 5)
if len(faces) == 0:
return None, None
x, y, w, h = faces[0]
return img[y:y+h, x:x+w], cv.resize(gray[y:y+h, x:x+w], (RESIZE_DIM, RESIZE_DIM))
# download image from internet
http = urllib.PoolManager()
def download(url):
res = http.request('GET', url)
if res.status != 200:
return None
image = np.asarray(bytearray(, dtype='uint8')
return cv.imdecode(image, cv.IMREAD_COLOR)
# classify a new image
0: 'neutral',
1: 'happy',
2: 'sadness',
3: 'surprise',
4: 'anger',
5: 'disgust',
6: 'fear'
vggModel = VGG16(weights='imagenet', include_top=False)
topModel = load_model('./data/model/model.h5')
def getEmotion(img):
cv.imwrite(TMP_IMG, img)
img = cv.imread(TMP_IMG)
img = img.reshape(1, img.shape[0], img.shape[1], img.shape[2]) / 255.0
vggPred = vggModel.predict(img)
vggPred = vggPred.reshape(1, vggPred.shape[1] * vggPred.shape[2] * vggPred.shape[3])
topPred = topModel.predict(vggPred)
emotionId = topPred[0].argmax()
return EMOTION_DICT[emotionId], emotionId
# load the face image into the correct flower image
def createEmotionImg(emotion, img):
if emotion == 'anger' or emotion == 'contempt' or emotion == 'disgust':
picName, (x, y), r = ('burning.jpg', (305, 623), 80)
elif emotion == 'fear' or emotion == 'sadness':
picName, (x, y), r = ('rainy.png', (674, 234), 70)
picName, (x, y), r = ('sunflower.jpg', (487, 248), 70)
x -= int(r)
y -= int(r)
cv.imwrite(TMP_IMG, img)
img = cv.imread(TMP_IMG)
img = cv.resize(img, (r * 2, r * 2))
bg = cv.imread('./flower-imgs/' + picName, -1)
if len(bg.shape) > 2 and bg.shape[2] == 4:
bg = cv.cvtColor(bg, cv.COLOR_BGRA2BGR)
# write face to bg
for y_ in range(img.shape[0]):
for x_ in range(img.shape[1]):
if (x_ - r)**2 + (y_ - r)**2 <= r**2:
bg[y+y_][x+x_] = img[y_][x_]
# cv.imshow('image', bg)
# cv.waitKey(0)
# cv.destroyAllWindows()
# exit()
cv.imwrite(TMP_IMG, bg)
return TMP_IMG
### MAIN
# set up tweepy
auth = tweepy.OAuthHandler('XXX', 'XXX')
auth.set_access_token('XXX', 'XXX') = True
api = tweepy.API(auth_handler=auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)
# main loop, goes through new mentions and replies to them
lastId = pickle.load(open('lastid.pickle', 'rb'))
while True:
tweets = api.mentions_timeline(since_id=lastId)
for tweet in tweets:
if not 'media' in tweet.entities:
image = download(tweet.entities['media'][0]['media_url_https'])
if image is None: # could not download media
api.update_status(status='@{} could not fetch your image 😔'.format(,
print('[ERR] <@{}, ({})> No face found'.format(, tweet.id_str))
img, grey = getFace(image)
if img is None or grey is None: # could not find face
api.update_status(status='@{} could not find your face 😔'.format(,
print('[ERR] <@{}, ({})> No face found'.format(, tweet.id_str))
emotion, emotionId = getEmotion(grey)
if emotion == 'neutral': # neutral expression
api.update_status(status='@{} where ur feels at? 😐'.format(,
print('[OK] <@{}, ({})> Neutral expression'.format(, tweet.id_str))
filePath = createEmotionImg(emotion, img)
res = api.media_upload(filePath)
api.update_status(status='@{} you seem to be feeling {}'.format(, emotion),, media_ids=[res.media_id])
print('[OK] <@{}, ({})> {} expression'.format(, tweet.id_str, emotion))
if len(tweets) > 0:
# store the most recent tweet we replied to
lastId = tweets[0].id_str # the first tweet is the most recent
pickle.dump(lastId, open('lastid.pickle', 'wb'))
# Train model
import os
import numpy as np
import pandas as pd
import glob
import cv2
from sklearn.model_selection import train_test_split
from keras.layers import Dropout, Dense
from keras.layers.normalization import BatchNormalization
from keras.models import Sequential
from keras.applications import VGG16
######### create data frames
def getDataFrame(emotion, emotionId):
imgs = glob.glob('./data/_' + emotion + '/*')
df = pd.DataFrame()
df['folderName'] = [str(i.split('\\')[0]) + '/' for i in imgs]
df['imageName'] = [str(i.split('\\')[1]) for i in imgs]
df['emotion'] = [emotion] * len(imgs)
df['labels'] = [emotionId] * len(imgs)
return df
# 0=neutral, 1=happy, 2=sadness, 3=surprise, 4=anger, 5=disgust, 6=fear
frames = pd.concat([
getDataFrame('neutral', 0),
getDataFrame('happy', 1),
getDataFrame('sadness', 2),
getDataFrame('surprise', 3),
getDataFrame('anger', 4),
getDataFrame('disgust', 5),
getDataFrame('fear', 6)
frames.reset_index(inplace=True, drop=True)
frames = frames.sample(frac=1.0) # shuffle data frame
frames.reset_index(inplace=True, drop=True)
# convert to grayscale
for i in range(len(frames)):
path1 = frames['folderName'][i]
path2 = frames['imageName'][i]
img = cv2.imread(os.path.join(path1, path2))
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
cv2.imwrite(os.path.join(path1, path2), gray)
# crop face
face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')
for i, d in frames.iterrows():
imgPath = os.path.join(d['folderName'], d['imageName'])
img = cv2.imread(imgPath)
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
(x, y, w, h) = face_cascade.detectMultiScale(gray, 1.3, 5)[0] # assume only one face per image
cv2.imwrite(imgPath, cv2.resize(img[y : y + h, x : x + w], (350, 350))) # cropping, resizing and saving image
print('Could not detect face for file: ' + imgPath)
######### split data frames into train/test/cv
trainFrames, testFrames = train_test_split(frames, stratify=frames['labels'], test_size=0.2)
trainFrames, cvFrames = train_test_split(trainFrames, stratify=trainFrames['labels'], test_size=0.15)
######### bottleneck features
batchPointers = {
'train': 0,
'test': 0,
'cv': 0
# bottleneck features for train data
trainLabels = pd.get_dummies(trainFrames['labels']).to_numpy()
model = VGG16(weights='imagenet', include_top=False)
SAVEDIR_TRAIN = './data/bottleneck-features/train'
SAVEDIR_TRAIN_LABELS = './data/bottleneck-features/train-labels'
for i in range(int(len(trainFrames) / BATCH_SIZE)):
# loadCombinedTrainBatch
batchImages = []
batchLabels = []
for j in range(BATCH_SIZE):
path1 = trainFrames.iloc[batchPointers['train'] + j]['folderName']
path2 = trainFrames.iloc[batchPointers['train'] + j]['imageName']
readImage = cv2.imread(os.path.join(path1, path2)) / 255.0 # normalize image
batchLabels.append(trainLabels[batchPointers['train'] + j]) # append corresponding labels
batchPointers['train'] += BATCH_SIZE
x, y = np.array(batchImages), np.array(batchLabels), 'bottleneck-labels-{}'.format(i + 1)), y), 'bottleneck-{}'.format(i + 1)), model.predict(x))
# bottleneck features for cv data
cvLabels = pd.get_dummies(cvFrames['labels']).to_numpy()
model = VGG16(weights='imagenet', include_top=False)
SAVEDIR_CV = './data/bottleneck-features/cv'
SAVEDIR_CV_LABELS = './data/bottleneck-features/cv-labels'
for i in range(int(len(cvFrames) / BATCH_SIZE)):
batchImages = []
batchLabels = []
for j in range(BATCH_SIZE):
path1 = cvFrames.iloc[batchPointers['cv'] + j]['folderName']
path2 = cvFrames.iloc[batchPointers['cv'] + j]['imageName']
readImage = cv2.imread(os.path.join(path1, path2)) / 255.0 # normalize image
batchLabels.append(cvLabels[batchPointers['cv'] + j]) #appending corresponding labels
batchPointers['cv'] += BATCH_SIZE
x, y = np.array(batchImages), np.array(batchLabels), "bottleneck-labels-{}".format(i + 1)), y), "bottleneck-{}".format(i + 1)), model.predict(x))
# bottleneck features for test data
testLabels = pd.get_dummies(testFrames['labels']).to_numpy()
model = VGG16(weights='imagenet', include_top=False)
SAVEDIR_TEST = './data/bottleneck-features/test'
SAVEDIR_TEST_LABELS = './data/bottleneck-features/test-labels'
for i in range(int(len(testFrames) / BATCH_SIZE)):
batchImages = []
batchLabels = []
for j in range(BATCH_SIZE):
path1 = testFrames.iloc[batchPointers['test'] + j]['folderName']
path2 = testFrames.iloc[batchPointers['test'] + j]['imageName']
readImage = cv2.imread(os.path.join(path1, path2)) / 255.0 # normalize image
batchLabels.append(testLabels[batchPointers['test'] + j]) #appending corresponding labels
batchPointers['test'] += BATCH_SIZE
x, y = np.array(batchImages), np.array(batchLabels), "bottleneck-labels-{}".format(i + 1)), y), "bottleneck-{}".format(i + 1)), model.predict(x))
######### modeling and training
def model(inputShape):
model = Sequential()
model.add(Dense(512, activation='relu', input_dim=inputShape))
model.add(Dense(256, activation='relu'))
model.add(Dense(128, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(output_dim=7, activation='softmax'))
return model
SAVEDIR_MODEL = './data/model'
INPUT_SHAPE = 10 * 10 * 512 # shape of bottleneck feature of each image after passing through VGG-16
model = model(INPUT_SHAPE)
# model.load_weights(os.path.join(SAVEDIR_MODEL, 'model.h5')) # only if we want to keep updating previously saved model
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
step = 0
bottleneckFiles = {
'train': int(len(trainFrames) / BATCH_SIZE),
'cv': int(len(cvFrames) / BATCH_SIZE)
epochNum, trainLoss, trainAcc, cvLoss, cvAcc = [], [], [], [], []
for epoch in range(EPOCHS):
avgEpochTrainLoss, avgEpochTrainAcc, avgEpochCVLoss, avgEpochCVAcc = 0.0, 0.0, 0.0, 0.0
epochNum.append(epoch + 1)
for i in range(bottleneckFiles['train']):
step += 1
# load batch of train bottleneck features for training MLP
xTrainLoad = np.load(os.path.join(SAVEDIR_TRAIN, 'bottleneck-{}.npy'.format(i + 1)))
xTrain = xTrainLoad.reshape(xTrainLoad.shape[0], xTrainLoad.shape[1] * xTrainLoad.shape[2] * xTrainLoad.shape[3])
yTrain = np.load(os.path.join(SAVEDIR_TRAIN_LABELS, 'bottleneck-labels-{}.npy'.format(i + 1)))
# load batch of cv bottleneck features for cross-validation
xCVLoad = np.load(os.path.join(SAVEDIR_CV, 'bottleneck-{}.npy'.format((i % bottleneckFiles['cv']) + 1)))
xCV = xCVLoad.reshape(xCVLoad.shape[0], xCVLoad.shape[1] * xCVLoad.shape[2] * xCVLoad.shape[3])
yCV = np.load(os.path.join(SAVEDIR_CV_LABELS, 'bottleneck-labels-{}.npy'.format((i % bottleneckFiles['cv']) + 1)))
trainLoss_, trainAcc_ = model.train_on_batch(xTrain, yTrain) # train the model on batch
cvLoss_, cvAcc_ = model.test_on_batch(xCV, yCV) # cross validate the model on CV Human batch
print('Epoch: {}, Step: {}, Tr_Loss: {}, Tr_Acc: {}, CV_Loss: {}, CV_Acc: {}'.format(epoch + 1, step, np.round(float(trainLoss_), 2), np.round(float(trainAcc_), 2), np.round(float(cvLoss_), 2), np.round(float(cvAcc_), 2)))
avgEpochTrainLoss += trainLoss_ / bottleneckFiles['train']
avgEpochTrainAcc += trainAcc_ / bottleneckFiles['train']
avgEpochCVLoss += cvLoss_ / bottleneckFiles['train']
avgEpochCVAcc += cvAcc_ / bottleneckFiles['train']
print('Avg_Train_Loss: {}, Avg_Train_Acc: {}, Avg_CV_Loss: {}, Avg_CV_Acc: {}'.format(np.round(float(avgEpochTrainLoss), 2), np.round(float(avgEpochTrainAcc), 2), np.round(float(avgEpochCVLoss), 2), np.round(float(avgEpochCVAcc), 2)))
cvAcc.append(avgEpochCVAcc), 'model.h5')) # save model on each epoch
model.save_weights(os.path.join(SAVEDIR_MODEL, 'model_weights.h5')) # save weights on each epoch
print('Model and weights saved at epoch {}'.format(epoch + 1))
# save stats to log
log_frame = pd.DataFrame(columns = ['Epoch', 'Train_Loss', 'Train_Accuracy', 'CV_Loss', 'CV_Accuracy'])
log_frame['Epoch'] = epochNum
log_frame['Train_Loss'] = trainLoss
log_frame['Train_Accuracy'] = trainAcc
log_frame['CV_Loss'] = cvLoss
log_frame['CV_Accuracy'] = cvAcc
log_frame.to_csv('./data/log.csv', index=False)
