Skip to content

Instantly share code, notes, and snippets.

@dynamicguy
Created December 13, 2020 11:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dynamicguy/2248c08b81f1477c46af8a47a87ed866 to your computer and use it in GitHub Desktop.
Save dynamicguy/2248c08b81f1477c46af8a47a87ed866 to your computer and use it in GitHub Desktop.
# encoding: utf-8
# !/usr/bin/env python
from __future__ import absolute_import, unicode_literals
# from tesserocr import PyTessBaseAPI
from PIL import Image
import hashlib
import os
import datetime as dt
from django.conf import settings
import cv2 as cv
import numpy as np
from celery import shared_task
from model_utils.models import now
import imutils
from nid.users.models import User
from .common import auto_crop, get_grayscale, remove_noise, deskew_img
from .models import Output, Document, Credit
import pytesseract
config = "--oem 1 --psm 6 -c tessedit_char_blacklist=|"
@shared_task
def extract_text(document_id, username, lang="ben"):
print("received task for: %s" % document_id)
document = Document.objects.get(id=document_id)
user = User.objects.get(username=username)
if not can_extract(user):
output = Output.objects.create(
document=document,
pub_date=now,
user=user,
result="Insufficient funds. Please recharge your account.",
)
return output.result
print("processing task for: %s" % document.doc.path)
# local_filename = get_img_from_s3(img_file_path)
# local_filename = get_img_from_s3(img_file_path)
result = dict()
try:
output = []
text = ''
img = Image.open(document.doc.path).convert('LA')
output = pytesseract.image_to_string(img, lang=lang, config=config)
# with PyTessBaseAPI(lang=lang) as api:
# # api.SetVariable("save_blob_choices", "T")
# # pil_image = Image.fromarray(img)
# # pil_image.filter(ImageFilter.SHARPEN)
# api.SetImage(img)
#
# text = api.GetUTF8Text()
# result['text'] = text
# result_txt = text.split('\n')
# output = [i for i in result_txt if i and len(i) > 2]
# result['parsed'] = output
print(len(output), output)
if output[-1]: result['id_no'] = output[-1].split(':')[-1].strip() if output[-1].find(':') else output[-1]
if output[-2]: result['dob'] = output[-2].split(':')[-1].strip() if output[-2].find(':') else output[-2]
if output[-3]: result['bn_mother'] = output[-3].split(':')[-1].strip() if output[-3].find(':') else output[-3]
if output[-4]: result['bn_father'] = output[-4].split(':')[-1].strip() if output[-4].find(':') else output[-4]
if output[-5]: result['en_name'] = output[-5].split(':')[-1].strip() if output[-5].find(':') else output[-5]
if output[-6]: result['bn_name'] = output[-6].split(':')[-1].strip() if output[-6].find(':') else output[-6]
out = Output.objects.create(
document=document, pub_date=now, user=user, result=text
)
charge(user)
except Exception as err:
# print("OCR error: {0}".format(err))
raise
finally:
return result
@shared_task
def extract_pre_processed_text(
document_id, pre_processed_file_path, username, lang="ben"
):
document = Document.objects.get(id=document_id)
user = User.objects.get(username=username)
if not can_extract(user):
output = Output.objects.create(
document=document,
pub_date=now,
user=user,
result="Insufficient funds. Please recharge your account.",
)
return output.result
full_path = os.path.join(settings.MEDIA_ROOT, pre_processed_file_path)
print("processing task for: %s" % full_path)
# local_filename = get_img_from_s3(img_file_path)
# local_filename = get_img_from_s3(img_file_path)
result = dict()
try:
output = []
text = ''
# img = cv.imread(full_path, 0)
img = Image.open(full_path).convert('LA')
output = pytesseract.image_to_string(img, lang=lang, config=config)
# with PyTessBaseAPI(lang=lang) as api:
# # api.SetVariable("save_blob_choices", "T")
# # pil_image = Image.fromarray(img)
# # pil_image.filter(ImageFilter.SHARPEN)
# api.SetImage(img)
#
# text = api.GetUTF8Text()
# result['text'] = text
# result_txt = text.split('\n')
# output = [i for i in result_txt if i and len(i) > 2]
# result['parsed'] = output
print(len(output), output)
if output[-1]: result['id_no'] = output[-1].split(':')[-1].strip() if output[-1].find(':') else output[-1]
if output[-2]: result['dob'] = output[-2].split(':')[-1].strip() if output[-2].find(':') else output[-2]
if output[-3]: result['bn_mother'] = output[-3].split(':')[-1].strip() if output[-3].find(':') else output[-3]
if output[-4]: result['bn_father'] = output[-4].split(':')[-1].strip() if output[-4].find(':') else output[-4]
if output[-5]: result['en_name'] = output[-5].split(':')[-1].strip() if output[-5].find(':') else output[-5]
if output[-6]: result['bn_name'] = output[-6].split(':')[-1].strip() if output[-6].find(':') else output[-6]
out = Output.objects.create(
document=document, pub_date=now, user=user, result=text
)
charge(user)
except Exception as er:
# print("OCR error: {0}".format(err))
raise
finally:
return result
def pre_process(doc, bw, threshold, blur, enhance, reset, autocrop, deskew):
if reset == "true":
return doc.name
image_path = doc.path
img = cv.imread(image_path)
img = imutils.resize(img, height=500)
if deskew == "true":
gray = get_grayscale(img)
img = deskew_img(gray)
if autocrop == "true":
gray = get_grayscale(img)
kernel = np.ones((5, 5), np.uint8)
erosion = cv.erode(gray, kernel, iterations=1)
dilation = cv.dilate(erosion, kernel, iterations=1)
edges = cv.Canny(dilation, 100, 300, apertureSize=3)
img = auto_crop(edges, img)
if enhance == "true":
# Apply dilation and erosion to remove some noise
kernel = np.ones((1, 1), np.uint8)
img = cv.dilate(img, kernel, iterations=1)
img = cv.erode(img, kernel, iterations=1)
if bw == "true":
# Convert to gray
img = cv.cvtColor(img, cv.COLOR_BGR2GRAY)
img = cv.cvtColor(img, cv.COLOR_GRAY2BGR)
if threshold == "true":
img = cv.cvtColor(img, cv.COLOR_BGR2GRAY)
img = apply_threshold(img, 3)
if blur == "true":
img = remove_noise(img)
now = dt.datetime.now().isoformat()
filename, ext = os.path.splitext(image_path)
local_filename = filename + "_processed_" + now + ext
filename, ext = os.path.splitext(doc.name)
file_path = filename + "_processed_" + now + ext
# with open(local_filename, "rb") as data:
# s3.upload_fileobj(
# data, "nid", "media/" + file_path, {"ACL": "public-read"}
# )
try:
cv.imwrite(local_filename, img)
except Exception as err:
raise
finally:
return file_path
def apply_threshold(img, argument):
switcher = {
1: cv.threshold(
cv.GaussianBlur(img, (9, 9), 0),
0,
255,
cv.THRESH_BINARY + cv.THRESH_OTSU,
)[1],
2: cv.threshold(
cv.GaussianBlur(img, (7, 7), 0),
0,
255,
cv.THRESH_BINARY + cv.THRESH_OTSU,
)[1],
3: cv.threshold(
cv.GaussianBlur(img, (5, 5), 0),
0,
255,
cv.THRESH_BINARY + cv.THRESH_OTSU,
)[1],
4: cv.threshold(
cv.medianBlur(img, 5), 0, 255, cv.THRESH_BINARY + cv.THRESH_OTSU
)[1],
5: cv.threshold(
cv.medianBlur(img, 3), 0, 255, cv.THRESH_BINARY + cv.THRESH_OTSU
)[1],
6: cv.adaptiveThreshold(
cv.GaussianBlur(img, (5, 5), 0),
255,
cv.ADAPTIVE_THRESH_GAUSSIAN_C,
cv.THRESH_BINARY,
31,
2,
),
7: cv.adaptiveThreshold(
cv.medianBlur(img, 3),
255,
cv.ADAPTIVE_THRESH_GAUSSIAN_C,
cv.THRESH_BINARY,
31,
2,
),
}
return switcher.get(argument, "Invalid method")
def compute_md5_hash(my_string):
m = hashlib.md5()
m.update(my_string.encode("utf-8"))
return m.hexdigest()
def charge(user):
credit = Credit.objects.get(user=user)
credit.balance = credit.balance - 10
credit.save()
def update_document(document):
record = Document.objects.get(document.id)
record.doc.path = document.doc.path
record.save()
def can_extract(user):
credit = Credit.objects.get(user=user)
return credit.balance > 9
def crop_image(doc, x=0, y=0, w=0, h=0, cw=0, ch=0):
image_path = doc.path
img = cv.imread(image_path)
cropped_img = imcrop(img, x, y, w, h, cw, ch)
now = dt.datetime.now().isoformat()
filename, ext = os.path.splitext(image_path)
local_filename = filename + "_processed_" + now + ext
cv.imwrite(local_filename, cropped_img)
filename, ext = os.path.splitext(doc.name)
file_path = filename + "_processed_" + now + ext
# with open(local_filename, "rb") as data:
# s3.upload_fileobj(
# data, "nid", "media/" + file_path, {"ACL": "public-read"}
# )
return file_path
def imcrop(img, x, y, w, h, cw, ch):
cw = int(cw)
ch = int(ch)
wp = img.shape[1] / cw
hp = img.shape[0] / ch
x = int(int(x) * wp)
y = int(int(y) * hp)
w = int(int(w) * wp)
h = int(int(h) * hp)
if x < 0 or y < 0 or w > img.shape[1] or h > img.shape[0]:
img, x, y, w, h = pad_img_to_fit_bbox(img, x, y, w, h)
return img[y: y + h, x: x + w, :]
def pad_img_to_fit_bbox(img, x1, x2, y1, y2):
img = cv.copyMakeBorder(
img,
-min(0, y1),
max(y2 - img.shape[0], 0),
-min(0, x1),
max(x2 - img.shape[1], 0),
cv.BORDER_REPLICATE,
)
y2 += -min(0, y1)
y1 += -min(0, y1)
x2 += -min(0, x1)
x1 += -min(0, x1)
return img, x1, x2, y1, y2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment