Skip to content

Instantly share code, notes, and snippets.

@Pked01
Created July 4, 2018 08:15
Show Gist options
  • Save Pked01/a5ec0abe4807aeaa9d463b5772dad9dd to your computer and use it in GitHub Desktop.
Save Pked01/a5ec0abe4807aeaa9d463b5772dad9dd to your computer and use it in GitHub Desktop.
personal/Face recognition/facial_recognition_API_cleaned.ipynb
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"metadata": {
"trusted": true,
"ExecuteTime": {
"start_time": "2018-07-02T08:52:28.439532Z",
"end_time": "2018-07-02T08:52:28.518903Z"
}
},
"cell_type": "code",
"source": "class label_encoder():\n \"\"\"\n label_dict: prepared dictionary can be used\n data_path:saving loading path of file\n \"\"\"\n def __init__(self,labels_dict=None,data_path='models/labels.pickle'):\n if data_path is not None:\n self.data_path=data_path\n if labels_dict is None:\n if os.path.exists(self.data_path) : \n self.labels=pickle.load(open(self.data_path,'rb'))\n else:\n try:\n os.mkdir(os.path.dirname(self.data_path))\n except:\n pass\n self.labels={}\n else:\n self.labels=labels_dict\n else : \n raise ValueError('Give a valid dumping path for labels.pickle')\n\n def fit(self,x):\n \"\"\"\n x:list or a single element which has to be encoded\n \"\"\"\n if ((isinstance(x, Iterable)) & (type(x)!=str)):\n iter1=list(set(x)-set(self.labels.keys()))\n for i in iter1:\n self.labels[i]=len(self.labels.keys())+1\n else:\n if x not in self.labels.keys():\n self.labels[x]=len(self.labels.keys())+1 \n def transform(self,key):\n \"\"\"\n key: set(list/tuple) of elements for which values has to be retrieved \n \"\"\"\n l=[]\n if ((isinstance(key, Iterable))&(type(key)!=str)):\n print(\"its an iterable\")\n for i in key:\n try:\n l.append(self.labels[i])\n except Exception as e:\n print(\"iterable error\",e)\n return l\n else:\n try:\n return self.labels[key]\n except Exception as e:\n print(\"error\",e)\n def save(self):\n try:\n pickle.dump(self.labels,open(self.data_path,'wb'),protocol=2)\n except:\n #os.mkdir(self.data_path)\n pickle.dump(self.labels,open(self.data_path,'wb'),protocol=2)\n #--------------------------------------------------------------------------------------------------------------\n\n\n\n\n",
"execution_count": 1,
"outputs": []
},
{
"metadata": {
"trusted": true,
"ExecuteTime": {
"start_time": "2018-07-02T09:01:48.419416Z",
"end_time": "2018-07-02T09:01:56.490164Z"
}
},
"cell_type": "code",
"source": "import os,sys,re,time,dlib,subprocess,pickle,cv2\nimport tmdbsimple as tmdb\nfrom pytvdbapi import api\nfrom sklearn.linear_model import SGDClassifier\nimport pandas as pd\nimport numpy as np\nimport pandas_ml as pdml\nfrom sklearn.preprocessing import LabelEncoder\nfrom sklearn.metrics import accuracy_score\nfrom collections import Iterable\nfrom sklearn.model_selection import StratifiedShuffleSplit\n\nimport IPython.display as Disp\n\nclass FacialRecognition(object):\n\n \n def __init__(self,data_path,face_detector_type='cnn'):\n \"\"\"\n face_detector_type= hog(cpu)/cnn(gpu)\n \"\"\"\n self.data_path = data_path\n self.face_detector_type = face_detector_type\n self.cwd = os.getcwd()\n self.sys_path = os.path.expanduser('~')\n self.model_files_path = os.path.join(self.sys_path,'dlib_model_files')\n self.dump_file_path = os.path.join(self.cwd,self.data_path,'models')\n os.makedirs(self.dump_file_path, exist_ok=True)\n try :\n self.labels = pickle.load(open(os.path.join(self.dump_file_path,'labels.pickle'),'rb'))\n except:\n self.labels = None\n try :\n self.base_model = pickle.load(open(os.path.join(self.dump_file_path,'sgd_model_resampled.pickle'),'rb'))\n except:\n self.base_model = None\n \n if ((os.path.exists(self.model_files_path)) and (len(os.listdir(self.model_files_path))>3)) :\n # load all models \n self.__load_models()\n \n\n else:\n os.makedirs(self.model_files_path,exist_ok=True)\n #cnn\n \n mmod_face_link = \" http://dlib.net/files/mmod_human_face_detector.dat.bz2\"\n print(\"downloading-->\"+mmod_face_link)\n subprocess.call((\"wget -P \"+self.model_files_path+mmod_face_link).split(\" \"))\n #wget.download(mmod_face_link,self.model_files_path,bar=wget.bar_thermometer)\n self.__uncompress_file(os.path.join(self.model_files_path,\"mmod_human_face_detector.dat.bz2\"))\n \n #68 face landmarks\n face_landmark_link = \" http://dlib.net/files/shape_predictor_68_face_landmarks.dat.bz2\"\n print(\"downloading-->\"+face_landmark_link)\n #wget.download(face_landmark_link,self.model_files_path,bar=wget.bar_thermometer)\n subprocess.call((\"wget -P \"+self.model_files_path+face_landmark_link).split(\" \"))\n self.__uncompress_file(os.path.join(self.model_files_path,\"shape_predictor_68_face_landmarks.dat.bz2\"))\n \n #resnet model\n resnet_link = \" http://dlib.net/files/dlib_face_recognition_resnet_model_v1.dat.bz2\"\n print(\"downloading-->\"+resnet_link)\n #wget.download(resnet_link,self.model_files_path,bar=wget.bar_thermometer)\n subprocess.call((\"wget -P \"+self.model_files_path+resnet_link).split(\" \"))\n self.__uncompress_file(os.path.join(self.model_files_path,\"dlib_face_recognition_resnet_model_v1.dat.bz2\"))\n # path for saving all intermediate models \n self.__load_models()\n \n\n\n# self.face_detector = dlib.get_frontal_face_detector()\n# self.shape_pred = dlib.shape_predictor('/dlib_model_files/shape_predictor_68_face_landmarks.dat')\n# self.facerec = dlib.face_recognition_model_v1('/dlib_model_files/dlib_face_recognition_resnet_model_v1.dat')\n def __load_models(self):\n print(\"loading models\")\n if self.face_detector_type=='cnn':\n self.face_detector = dlib.cnn_face_detection_model_v1(os.path.join(self.model_files_path,'mmod_human_face_detector.dat'))\n else:\n self.face_detector = dlib.get_frontal_face_detector()\n self.shape_pred = dlib.shape_predictor(os.path.join(self.model_files_path,'shape_predictor_68_face_landmarks.dat'))\n self.facerec = dlib.face_recognition_model_v1(os.path.join(self.model_files_path,'dlib_face_recognition_resnet_model_v1.dat'))\n\n def __uncompress_file(self,readpath,writepath=None):\n if writepath is None:\n writepath = os.path.join(os.path.dirname(readpath) , os.path.basename(readpath).replace('.bz2',''))\n zipfile = bz2.BZ2File(readpath) # open the file\n data = zipfile.read() # get the decompressed data\n newfilepath = readpath[:-4] # assuming the filepath ends with .bz2\n open(writepath, 'wb').write(data) # write a uncompressed file\n\n def detect_face(self,image,return_image=False,upsample_num_times=0):\n rects,scores,weights= self.face_detector.run(image,upsample_num_times=upsample_num_times)\n op = []\n for idx,r in enumerate(rects):\n im_c = image.copy()\n op.append([(r.left(),r.top()),(r.right(),r.bottom())])\n im_c = cv2.rectangle(im_c,op[idx][0],op[idx][1],(0,255,0))\n if return_image:\n return op,scores,weights,im_c\n return op,scores,weights\n\n\n\n\n def get_face_landmark(self,image,return_image = False):\n dets = self.shape_pred(image,self.face_detector(image)[0])\n tot_landmark = dets.parts()\n if return_image:\n im_c = image.copy()\n for i in tot_landmark:\n cv2.circle(im_c,(i.x,i.y),1,(0,255,0),1)\n return tot_landmark,im_c\n return tot_landmark\n\n\n #face_descriptor = facerec.compute_face_descriptor(im, shape_pred(im,f_detector(im)[0]))\n\n # It should also be noted that you can also call this function like this:\n # face_descriptor = facerec.compute_face_descriptor(img, shape, 100)\n # The version of the call without the 100 gets 99.13% accuracy on LFW\n # while the version with 100 gets 99.38%. However, the 100 makes the\n # call 100x slower to execute, so choose whatever version you like. To\n # explain a little, the 3rd argument tells the code how many times to\n # jitter/resample the image. When you set it to 100 it executes the\n # face descriptor extraction 100 times on slightly modified versions of\n # the face and returns the average result. You could also pick a more\n # middle value, such as 10, which is only 10x slower but still gets an\n # LFW accuracy of 99.3%.\n\n def get_face_embedding(self,image):\n \"\"\"\n return embedding of an face image \n \"\"\"\n if self.face_detector_type=='cnn':\n face_descriptor = [self.facerec.compute_face_descriptor(image, self.shape_pred(image,i.rect)) for i in self.face_detector(image,1)]\n else:\n face_descriptor = [self.facerec.compute_face_descriptor(image, self.shape_pred(image,i)) for i in self.face_detector(image)]\n \n return face_descriptor\n \n def get_cast_name_tmdb(self,series_name='two and half man'):\n \"\"\"\n tv series or title of movie\n returns list of cast\n \"\"\"\n tmdb.API_KEY = 'd8eb79cd5498fd8d375ac1589bfc78ee'\n search = tmdb.Search()\n response = search.tv(query=series_name)\n tv1=tmdb.TV(id=response['results'][0]['id'])\n return [i['name'] for i in tv1.credits()['cast']]\n\n\n def get_cast_name_tvdb(self,series_name='two and half man'):\n \"\"\"\n tv series name \n return cast list\n \"\"\"\n db = api.TVDB(\"05669A6CC3005169\", actors=True, banners=True)\n result = db.search(series_name, \"en\")\n show = result[0]\n show.update()\n return show.Actors\n ###---------------------- download utility-------------------------------------------------------------------------\n #Downloading entire Web Document (Raw Page Content)\n #\n def download_page(self,url):\n version = (3,0)\n cur_version = sys.version_info\n if cur_version >= version: #If the Current Version of Python is 3.0 or above\n import urllib.request #urllib library for Extracting web pages\n try:\n headers = {}\n headers['User-Agent'] = \"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36\"\n req = urllib.request.Request(url, headers = headers)\n resp = urllib.request.urlopen(req)\n respData = str(resp.read())\n return respData\n except Exception as e:\n print(str(e))\n else: #If the Current Version of Python is 2.x\n #import urllib2\n try:\n headers = {}\n headers['User-Agent'] = \"Mozilla/5.0 (X11; Linux i686) AppleWebKit/537.17 (KHTML, like Gecko) Chrome/24.0.1312.27 Safari/537.17\"\n req = urllib.Request(url, headers = headers)\n response = urllib.request.urlopen(req)\n page = response.read()\n return page\n except:\n return\"Page Not found\"\n\n\n #Finding 'Next Image' from the given raw page\n def _images_get_next_item(self,s):\n start_line = s.find('rg_di')\n if start_line == -1: #If no links are found then give an error!\n end_quote = 0\n link = \"no_links\"\n return link, end_quote\n else:\n start_line = s.find('\"class=\"rg_meta\"')\n start_content = s.find('\"ou\"',start_line+1)\n end_content = s.find(',\"ow\"',start_content+1)\n content_raw = str(s[start_content+6:end_content-1])\n return content_raw, end_content\n\n\n #Getting all links with the help of '_images_get_next_image'\n\n def _images_get_all_items(self,page):\n items = []\n while True:\n item, end_content = self._images_get_next_item(page)\n if item == \"no_links\":\n break\n else:\n items.append(item) #Append all the links in the list named 'Links'\n time.sleep(0.1) #Timer could be used to slow down the request for image downloads\n page = page[end_content:]\n return items\n\n\n def downloaded_images(self,cast_list,data_path=None,series_name=\"\"):\n \"\"\"\n data_path: save path for data\n \"\"\"\n if data_path is None:\n data_path = self.data_path\n if not os.path.exists(data_path):\n os.makedirs(data_path,exist_ok=True)\n\n for cast in cast_list:\n #print(cast)\n os.makedirs(os.path.join(data_path,cast),exist_ok=True)\n files_folder = os.listdir(os.path.join(data_path,cast))\n if len(files_folder)>0:\n op = input(\"Folder contains files --\"+cast+\"-- you want to continue(y/n)\")\n if op=='y':\n pass\n if op=='n':\n continue\n \n \n ########### Edit From Here ###########\n\n #This list is used to search keywords. You can edit this list to search for google images of your choice. You can simply add and remove elements of the list.\n cast=re.sub(r'[^\\x00-\\x7F]+',' ', cast)\n search_keyword = [cast+\" \"+series_name]\n\n #This list is used to further add suffix to your search term. Each element of the list will help you download 100 images. First element is blank which denotes that no suffix is added to the search keyword of the above list. You can edit the list by adding/deleting elements from it.So if the first element of the search_keyword is 'Australia' and the second element of keywords is 'high resolution', then it will search for 'Australia High Resolution'\n keywords = ['']\n\n ########### End of Editing ###########\n\n ############## Main Program ############\n t0 = time.time() #start the timer\n\n #Download Image Links\n i= 0\n while i<len(search_keyword):\n items = []\n iteration = \"Item no.: \" + str(i+1) + \" -->\" + \" Item name = \" + str(search_keyword[i])\n print (iteration)\n print (\"Evaluating...for\", cast)\n search_keywords = search_keyword[i]\n search = search_keywords.replace(' ','%20')\n\n# #make a search keyword directory\n# try:\n# os.makedirs(os.path.join(data_path,cast))\n# except Exception as e:\n# if e.errno != 17:\n# raise \n# # time.sleep might help here\n# pass\n\n j = 0\n while j<len(keywords):\n pure_keyword = keywords[j].replace(' ','%20')\n url = 'https://www.google.com/search?q=' + search + pure_keyword + '&espv=2&biw=1366&bih=667&site=webhp&source=lnms&tbm=isch&sa=X&ei=XosDVaCXD8TasATItgE&ved=0CAcQ_AUoAg'\n raw_html = (self.download_page(url))\n time.sleep(0.1)\n items = items + (self._images_get_all_items(raw_html))\n j = j + 1\n #print (\"Image Links = \"+str(items))\n print (\"Total Image Links = \"+str(len(items)))\n print (\"\\n\")\n\n\n #This allows you to write all the links into a test file. This text file will be created in the same directory as your code. You can comment out the below 3 lines to stop writing the output to the text file.\n info = open(data_path+'output.txt', 'a') #Open the text file called database.txt\n info.write(str(i) + ': ' + str(search_keyword[i-1]) + \": \" + str(items) + \"\\n\\n\\n\") #Write the title of the page\n info.close() #Close the file\n\n t1 = time.time() #stop the timer\n total_time = t1-t0 #Calculating the total time required to crawl, find and download all the links of 60,000 images\n print(\"Total time taken: \"+str(total_time)+\" Seconds\")\n print (\"Starting Download...\")\n\n ## To save imges to the same directory\n # IN this saving process we are just skipping the URL if there is any error\n\n k=0\n errorCount=0\n while(k<len(items)):\n from urllib import request\n #from urllib import URLError, HTTPError\n\n try:\n req = request.Request(items[k], headers={\"User-Agent\": \"Mozilla/5.0 (X11; Linux i686) AppleWebKit/537.17 (KHTML, like Gecko) Chrome/24.0.1312.27 Safari/537.17\"})\n response = request.urlopen(req,None,15)\n if os.path.exists(data_path):\n output_file = open(os.path.join(data_path,cast,str(cast)+'_'+str(k+1)+\".jpg\"),'wb')\n\n data = response.read()\n output_file.write(data)\n response.close();\n\n print(\"completed ====> \"+str(k+1))\n\n k=k+1;\n\n except IOError: #If there is any IOError\n\n errorCount+=1\n print(\"IOError on image \"+str(k+1))\n k=k+1;\n\n except request.HTTPError as e: #If there is any HTTPError\n\n errorCount+=1\n print(\"HTTPError\"+str(k))\n k=k+1;\n except request.URLError as e:\n\n errorCount+=1\n print(\"URLError \"+str(k))\n k=k+1;\n except Exception as e:\n print(e)\n\n\n\n i = i+1\n\n print(\"\\n\")\n print(\"Everything downloaded!\")\n print(\"\\n\"+str(errorCount)+\" ----> total Errors\")\n Disp.clear_output()\n\n #----End of the main program ----#\n\n #--------------------------------------------------------------------------------------------------------\n # In[ ]:\n def get_stratified_sample(self,X,y,verbose=True,test_size=.2):\n sss = StratifiedShuffleSplit(n_splits=10, test_size=test_size, random_state=0)\n sss.get_n_splits(X, y)\n print(sss) \n for train_index, test_index in sss.split(X, y):\n if verbose:\n print(\"TRAIN:\", train_index, \"TEST:\", test_index)\n X_train, X_test = X[train_index], X[test_index]\n y_train, y_test = y[train_index], y[test_index]\n return [X_train,X_test,y_train,y_test]\n\n #get_name_only=lambda name:''.join([i for i in name if not i.isdigit()]).replace(\"_\",\"\").lower()\n get_name_only=lambda self, name:re.sub('[^A-Za-z]+', '', name).lower()\n get_name_only.__name__='get_name_only'\n \n inv_map=lambda self,my_map: {v: k for k, v in my_map.items()}\n inv_map.__name__='inverse_mapping'\n def get_image_files(self,directory_path=None,return_only_paths=True):\n \"\"\"\n directory_path=path of parent directory\n \"\"\"\n print(\"inside get_image_files function\")\n if directory_path is None:\n directory_path = self.data_path\n paths={}\n image_files={}\n for root, dirs, files in os.walk(directory_path, topdown=False):\n for name in files:\n path=os.path.join(root, name)\n if name.endswith('.jpg'):\n #removing name,number_from_data\n file_name=name.replace(\".jpg\",\"\")#root.split('/').pop()+name.replace(\".jpg\",\"\") \n #if file_name in file_name_filt:\n print(path,file_name)\n try:\n if return_only_paths:\n paths[file_name]=path\n else:\n image_files=cv2.imread(path)\n except:\n print(\"encoding error\")\n\n Disp.clear_output()\n print(\"returning from get_image_files function\") \n if return_only_paths:\n return paths\n else:\n return image_files \n def get_label_charac_dict(self,directory_path=None):\n \"\"\"\n It loads root directory and get their characters name and assign labels to them\n\n \"\"\"\n print(\"inside get_label_charac_dict function\")\n if directory_path is None:\n directory_path = self.data_path\n try:\n charac_names=pickle.load(open(os.path.join(directory_path,'charac_names.pickle'),'rb'))\n except:\n charac_names={}\n\n im_files = self.get_image_files(directory_path)\n for file_name in im_files.keys():\n charac_names[file_name]=self.get_name_only(file_name)\n lbl_enc = label_encoder(data_path=os.path.join(directory_path,'models','labels.pickle'))\n lbl_enc.fit(charac_names.values())\n lbl_enc.save()\n labels=lbl_enc.labels\n pickle.dump(charac_names,open(os.path.join(directory_path,'models','charac_names.pickle'),'wb'),protocol=2)\n print(\"returning from get_label_charac_dict function\")\n\n return {'charac_names':charac_names,'labels':labels}\n def prepare_data(self,directory_path=None,l_threshold=20,r_threshold=None,load_data=False):\n \"\"\"\n returns a vector X(128 sized) and encoded label y\n directory_path=source path of images folder, with each image have parent folder as label of it\n l_threshold: minimum images for a profile in images list\n r_threshold: maximum images for a profile in images list\n Autoresizing is done for any image more than 300 in max dimension\n minm_num: minimum number of images in a class\n output:get_all_files from folders and get encoding out of it\n dump_file_path: dumps a tuple of X,y\n \"\"\"\n if load_data:\n try: \n [X,y] = pickle.load(open(os.path.join(self.dump_file_path,'[X,y]_encoded_file.pickle'),'rb'))\n except Exception as e:\n print(e)\n else:\n print(\"entering into prepare data\")\n if directory_path is None:\n directory_path = self.data_path\n jsn=self.get_label_charac_dict(directory_path)\n charac_names = jsn['charac_names']\n self.labels = jsn['labels']\n encoding_files = {}\n t=pd.DataFrame(list(charac_names.values()))[0].value_counts()\n t=t[t>=l_threshold]\n if r_threshold is not None:\n t=t[t<=r_threshold]\n t1=t.index\n file_name_filt=[]\n print('total unique matches with criteria',t1.shape)\n for k,v in charac_names.items():\n if v in t1:\n file_name_filt.append(k)\n del t1\n\n # for root, dirs, files in os.walk(directory_path, topdown=False):\n # for name in files:\n # path=os.path.join(root, name)\n # if name.endswith('.jpg'):\n # #removing name,number_from_data\n # file_name=root.split('/').pop()+name.replace(\".jpg\",\"\") \n # if file_name in file_name_filt:\n # print(path,file_name)\n im_files=self.get_image_files(directory_path)\n for file_name,path in im_files.items():\n try:\n image = cv2.imread(path)\n image_res = image.copy()\n max_dim = max(image.shape[0:2])\n if max_dim > 300.0:\n #image_res=scipy.misc.imresize(image,300.0/max(image.shape[0:2]))\n image_res = cv2.resize(image_res,(0,0), fx=300.0/max_dim, fy=300.0/max_dim)\n encoding_1 = self.get_face_embedding(image_res)\n if len(encoding_1)==1:\n encoding_files[file_name] = list(encoding_1[0])\n\n except Exception as e:\n print(\"encoding error\",e)\n Disp.clear_output()\n #charac_names[file_name]=charac_name\n # if dump_file_path is not None:\n # print('dumping output')\n # pickle.dump(encoding_files,\\\n # open(dump_file_path+'_encoded_file.pickle','wb'),protocol=2)\n # l=list(encoding_files.keys())\n # for k in l:\n # if len(encoding_files[k])!=1:\n # del encoding_files[k]\n # else:\n # encoding_files[k]=encoding_files[k][0]\n encoding_df=pd.DataFrame(encoding_files).T\n encoding_df['label_enc']=[self.labels[self.get_name_only(i)] for i in encoding_df.index]\n X=encoding_df.iloc[:,:128].values\n y=encoding_df['label_enc'].values\n if self.dump_file_path is None:\n self.dump_file_path = os.path.join(directory_path,\"models\")\n else:\n os.makedirs(self.dump_file_path,exist_ok=True)\n print('dumping output')\n pickle.dump([X,y],open(os.path.join(self.dump_file_path,'[X,y]_encoded_file.pickle'),'wb'),protocol=2)\n print(\"returning prepare data\")\n return [X,y]\n \n def process_data(self,X,y,minm_num=30):\n \"\"\"\n SMOTE and resampling\n \"\"\"\n print(\"inside preprocessing function\")\n df = pdml.ModelFrame(X,y)\n sampler=df.imbalance.over_sampling.SMOTE()\n sampled=df.fit_sample(sampler)\n total_classes=len(np.unique(y))\n if sampled.shape[0]/total_classes<minm_num:\n resampled_class=resample(sampled.iloc[:,1:].values,sampled.target.values,n_samples=2*minm_num*total_classes)\n sampled=pd.DataFrame(resampled_class[0])\n sampled['.target']=resampled_class[1]\n\n desampled=sampled.groupby('.target').apply(lambda x: pd.DataFrame(x).sample(n=minm_num))\n desampled.reset_index(drop=True,inplace=True)\n print(\"returning from preprocess data\")\n return [desampled[list(range(128))],desampled['.target']]\n\n def partial_train_model(self,X,y,minm_image_process=None,threshold_accuracy=.9,classes=range(20)):\n \"\"\"\n incremental training module(SGD)\n returns a new model after partial fit on give data\n X=128 sized vector \n y=labels of vectors\n minm_image_process='how many images of a specific label have to be trained, oversampling undersampling is done, \n classes:number of that is going to be used in this model have to defined in advance\n \"\"\"\n print(\"entering training module\")\n self.base_model=SGDClassifier(loss='log',n_jobs=7,\\\n shuffle=True,class_weight=None,warm_start=False\\\n ,n_iter = np.ceil(10**6 / 600),average=True)\n \n [X_train,X_test,y_train,y_test]=self.get_stratified_sample(X,y,verbose=False)\n if minm_image_process is not None:\n [X_processed,y_processed]=self.process_data(X_train,y_train,minm_num=minm_image_process)\n else:\n [X_processed,y_processed]=[X_train,y_train]\n \n accuracy=0\n idx=0\n while accuracy<threshold_accuracy:\n try:\n self.base_model.partial_fit(X_processed,y_processed)\n except Exception as e:\n print(e)\n self.base_model.partial_fit(X_processed,y_processed,classes=classes)\n y_pred=self.base_model.predict(X_test)\n accuracy = accuracy_score(y_test,y_pred)\n print(\"accuracy in iteration \",idx+1,' is =',accuracy)\n idx+=1\n if idx>10:\n break\n if self.dump_file_path is None:\n self.dump_file_path = os.path.join(self.data_path,'models')\n else:\n os.makedirs(self.dump_file_path,exist_ok=True)\n pickle.dump(self.base_model,open(os.path.join(self.dump_file_path,'sgd_model_resampled.pickle'),'wb'))\n print(\"returning from train module\") \n \n def get_pred_on_frame(self,frame,verbose=False):\n \"\"\"\n provide prediction on a frame \n model: classifier model\n data_path: loading relevent file from the source \n \"\"\"\n inv_labels=self.inv_map(self.labels)\n #frame=frame.mean(axis=2)\n face_locations = self.face_detector(frame)\n if len(face_locations)>0:\n face_encodings = self.get_face_embedding(frame)\n if verbose:\n print(\"number of faces detected\",len(face_locations))\n face_names = []\n for face_encoding in face_encodings:\n # See if the face is a match for the known face(s)\n try:\n #print(face_encoding)\n match = self.base_model.predict(np.array(face_encoding).reshape([1,128]))[0]\n predict_probab=self.base_model.predict_proba(np.array(face_encoding).reshape([1,128]))[0].max()\n #bin_prob=math.exp(predict_probab[match])/sum([math.exp(i)for i in predict_probab])\n #bin_prob=(predict_probab[match]-np.mean(predict_probab))/np.std(predict_probab)\n if verbose:\n print(match,inv_labels[match],predict_probab)\n face_names.append(inv_labels[match]+ '(p='+str(np.round(predict_probab,3))+')')\n #face_names.append(inv_labels[match]+ ' prediction probability='+str(1/(1+math.exp(-bin_prob))))\n except Exception as e:\n print(e)\n face_locations_final =[]\n if self.face_detector_type=='cnn':\n for face_location in face_locations:\n face_locations_final.append([face_location.rect.top(),face_location.rect.right(),face_location.rect.bottom(),face_location.rect.left()])\n else:\n for face_location in face_locations:\n face_locations_final.append([face_location.top(),face_location.right(),face_location.bottom(),face_location.left()])\n face_locations = face_locations_final\n\n\n\n # Label the results\n for (top, right, bottom, left), name in zip(face_locations, face_names):\n if not name:\n continue\n\n # Draw a box around the face\n cv2.rectangle(frame, (left, top), (right, bottom), (0, 0, 255), 2)\n\n # Draw a label with a name below the face\n cv2.rectangle(frame, (left, bottom - 25), (right, bottom), (0, 0, 255), cv2.FILLED)\n font = cv2.FONT_HERSHEY_DUPLEX\n cv2.putText(frame, name, (left + 6, bottom - 6), font, 0.5, (255, 255, 255), 1)\n return frame\n\n",
"execution_count": 15,
"outputs": []
},
{
"metadata": {
"trusted": true,
"ExecuteTime": {
"start_time": "2018-07-02T09:01:56.491884Z",
"end_time": "2018-07-02T09:01:57.596553Z"
}
},
"cell_type": "code",
"source": "face_rec = FacialRecognition(data_path='Friends',face_detector_type='hog')",
"execution_count": 16,
"outputs": [
{
"output_type": "stream",
"text": "loading models\n",
"name": "stdout"
}
]
},
{
"metadata": {
"trusted": true,
"ExecuteTime": {
"start_time": "2018-07-02T07:08:50.150176Z",
"end_time": "2018-07-02T07:08:50.257381Z"
}
},
"cell_type": "code",
"source": "cast_list = face_rec.get_cast_name_tvdb('Friends')",
"execution_count": 155,
"outputs": []
},
{
"metadata": {
"trusted": true,
"ExecuteTime": {
"start_time": "2018-07-02T04:50:42.456254Z",
"end_time": "2018-07-02T04:50:49.287424Z"
}
},
"cell_type": "code",
"source": "face_rec.downloaded_images(cast_list)",
"execution_count": 49,
"outputs": [
{
"output_type": "stream",
"text": "Folder contains files --Lisa Kudrow-- you want to continue(y/n)n\nFolder contains files --Matt LeBlanc-- you want to continue(y/n)n\nFolder contains files --Matthew Perry-- you want to continue(y/n)n\nFolder contains files --Courteney Cox-- you want to continue(y/n)n\nFolder contains files --David Schwimmer-- you want to continue(y/n)n\nFolder contains files --Jennifer Aniston-- you want to continue(y/n)n\n",
"name": "stdout"
}
]
},
{
"metadata": {
"trusted": true,
"ExecuteTime": {
"start_time": "2018-07-02T09:02:01.660452Z",
"end_time": "2018-07-02T09:02:40.184759Z"
}
},
"cell_type": "code",
"source": "[X,y]=face_rec.prepare_data()",
"execution_count": 17,
"outputs": [
{
"output_type": "stream",
"text": "dumping output\nreturning prepare data\n",
"name": "stdout"
}
]
},
{
"metadata": {
"trusted": true,
"ExecuteTime": {
"start_time": "2018-07-02T09:02:44.362943Z",
"end_time": "2018-07-02T09:02:44.504741Z"
},
"scrolled": true
},
"cell_type": "code",
"source": "face_rec.partial_train_model(X,y,minm_image_process=30,classes=range(1,7))",
"execution_count": 18,
"outputs": [
{
"output_type": "stream",
"text": "entering training module\nStratifiedShuffleSplit(n_splits=10, random_state=0, test_size=0.2,\n train_size=None)\ninside preprocessing function\nreturning from preprocess data\nclasses must be passed on the first call to partial_fit.\naccuracy in iteration 1 is = 0.9714285714285714\nreturning from train module\n",
"name": "stdout"
},
{
"output_type": "stream",
"text": "/home/prateek/.virtualenvs/cv3/lib/python3.5/site-packages/sklearn/linear_model/stochastic_gradient.py:117: DeprecationWarning: n_iter parameter is deprecated in 0.19 and will be removed in 0.21. Use max_iter and tol instead.\n DeprecationWarning)\n/home/prateek/.virtualenvs/cv3/lib/python3.5/site-packages/sklearn/linear_model/stochastic_gradient.py:117: DeprecationWarning: n_iter parameter is deprecated in 0.19 and will be removed in 0.21. Use max_iter and tol instead.\n DeprecationWarning)\n",
"name": "stderr"
}
]
},
{
"metadata": {
"trusted": true,
"ExecuteTime": {
"start_time": "2018-07-02T09:04:55.394258Z",
"end_time": "2018-07-02T09:04:58.737873Z"
}
},
"cell_type": "code",
"source": "cap = cv2.VideoCapture('test_vid.mp4')\nwhile cap.isOpened():\n Disp.clear_output(wait=True)\n ret,frame = cap.read()\n op = face_rec.get_pred_on_frame(frame.copy())\n cv2.imshow(\"preview\",op)\n k = cv2.waitKey(1)\n if k==27:\n break\ncv2.destroyAllWindows()\ncap.release()\n ",
"execution_count": 20,
"outputs": []
},
{
"metadata": {
"trusted": true
},
"cell_type": "code",
"source": "",
"execution_count": null,
"outputs": []
}
],
"metadata": {
"kernelspec": {
"name": "cv3",
"display_name": "cv3 (python3)",
"language": "python"
},
"language_info": {
"mimetype": "text/x-python",
"codemirror_mode": {
"version": 3,
"name": "ipython"
},
"pygments_lexer": "ipython3",
"nbconvert_exporter": "python",
"version": "3.5.2",
"file_extension": ".py",
"name": "python"
},
"gist": {
"id": "90ddd1a6c13755492478fdbfd0278f1b",
"data": {
"description": "personal/Face recognition/facial_recognition_API_cleaned.ipynb",
"public": true
}
},
"_draft": {
"nbviewer_url": "https://gist.github.com/90ddd1a6c13755492478fdbfd0278f1b"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment