Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@curious-codr
Created January 3, 2020 10:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save curious-codr/f2ef04438b3f332f4420c490ffe01d03 to your computer and use it in GitHub Desktop.
Save curious-codr/f2ef04438b3f332f4420c490ffe01d03 to your computer and use it in GitHub Desktop.
string = """{
"IMAGE_SOURCE_DIR": "documents_runtime",
"TEMPLATE_FOLDER": "documents_metadata",
"NORTHWELL_ENDPOINT" : 'https://us-central1-springmldemoproject.cloudfunctions.net/hello_gcs',
"PDF_TO_IMAGE_ENDPOINT" : 'https://us-central1-springmldemoproject.cloudfunctions.net/pdf_to_image',
"TEMPLATE_PROCESSOR_MAP" : {'northwell_template': 'NorthwellProcessor()', 'realestate_template': 'RealEstateProcessor()', 'cbre_template':'CBREProcessor()', 'gg_bearings_template':'GGBearingsProcessor()','MouDocuments':'CBREMOUProcessor()','RegusDocuments':'CBREESUITERegusProcessor()','WeworkDocuments':'CBREESUITEWeworkProcessor()', 'test':'CBREIncomeLeaseProcessor()'},
"DEFAULT_PROCESSOR" : "DocumentProcessor",
"IMAGE_DESTINATION_DIR" : 'invoice_documents_processed',
"DOCUMENT_KEY": "document_template"
}"""
import json
temp = json.loads(string)
class AppConstants:
DOCUMENT_KEY= temp.get("DOCUMENT_KEY", "document_template")
IMAGE_SOURCE_DIR = temp.get("IMAGE_SOURCE_DIR", "documents_runtime")
IMAGE_DESTINATION_DIR = temp.get("IMAGE_DESTINATION_DIR", "invoice_documents_processed")
TEMPLATE_FOLDER = temp.get("TEMPLATE_FOLDER", "documents_metadata")
NORTHWELL_ENDPOINT = temp.get("NORTHWELL_ENDPOINT", "https://us-central1-springmldemoproject.cloudfunctions.net/hello_gcs")
PDF_TO_IMAGE_ENDPOINT = temp.get("PDF_TO_IMAGE_ENDPOINT","https://us-central1-springmldemoproject.cloudfunctions.net/pdf_to_image")
TEMPLATE_PROCESSOR_MAP = temp.get() {'northwell_template': 'NorthwellProcessor()', 'realestate_template': 'RealEstateProcessor()', 'cbre_template':'CBREProcessor()', 'gg_bearings_template':'GGBearingsProcessor()','MouDocuments':'CBREMOUProcessor()','RegusDocuments':'CBREESUITERegusProcessor()','WeworkDocuments':'CBREESUITEWeworkProcessor()', 'test':'CBREIncomeLeaseProcessor()'}
DEFAULT_PROCESSOR = temp.get("DEFAULT_PROCESSOR", "DocumentProcessor")
import json
from flask import Blueprint, request, \
url_for, jsonify
from flask_restplus import Resource, Api, fields, reqparse, inputs
from werkzeug.datastructures import FileStorage
from src.doc_types.DocType import DocType
from templatestore import storage
from google.api_core.exceptions import NotFound
from datetime import datetime
from src.utils.Constants import Constants
from src.utils.other_utils import convert_image_format
from io import BytesIO
class MyApi(Api):
@property
def specs_url(self):
"""Monkey patch for HTTPS"""
scheme = 'http' if '127.0.0.1' in self.base_url or 'localhost' in self.base_url else 'https'
return url_for(self.endpoint('specs'), _external=True, _scheme=scheme)
crud = Blueprint('crud', __name__)
# api = Api(crud)
api = MyApi(crud, title='Document Extraction')
entity = api.model('entity', {'keyword': fields.String(), 'keyword_type': fields.String(
), 'aliases': fields.List(fields.String)})
invoice_keyword_aliases = api.model("document_template", {
'keyword_mapping': fields.List(fields.Nested(entity))})
template_model = api.model(
'template', {"document_template": fields.Nested(invoice_keyword_aliases)})
file_upload = reqparse.RequestParser()
file_upload.add_argument('image', type=FileStorage, location='files')
file_upload.add_argument('username', location='headers')
file_upload.add_argument('show_table', type= inputs.boolean,location='form')
header_parser = reqparse.RequestParser()
header_parser.add_argument('username', location='headers', default='test')
@api.route('/templates')
class Templates(Resource):
@api.expect(header_parser)
def get(self):
prefix = get_username(header_parser)
# args = header_parser.parse_args()
# prefix = args['username']
templates = []
blobs = storage.list_blobs_with_prefix(prefix + '/', '/')
for blob in blobs:
name = str(blob.name).split('/')[2]
if name:
templates.append(name.split('.json')[0])
return jsonify(templates)
@api.route('/template/<string:template_name>')
class Template(Resource):
@api.expect(header_parser)
def get(self, template_name):
prefix = get_username(header_parser)
# args = header_parser.parse_args()
# prefix = args['username']
template_name = prefix + '/' + template_name + '.json'
template_json = storage.read_file(template_name)
return json.loads(template_json)
@api.expect(template_model)
# @api.expect(header_parser)
def post(self, template_name):
prefix = get_username(header_parser)
# args = header_parser.parse_args()
# prefix = args['username']
template_name = prefix + '/' + template_name + '.json'
json_data = request.json
json_data['template_name'] = template_name.split('.')[0]
json_str = json.dumps(json_data)
data = storage.upload_file(
f'{template_name}', json_str, 'application/json')
return {'success': True}, 200, {'ContentType': 'application/json'}
@api.expect(header_parser)
def delete(self, template_name):
prefix = get_username(header_parser)
# args = header_parser.parse_args()
# prefix = args['username']
template_name = prefix + '/' + template_name + '.json'
storage.delete_file(template_name)
return {'success': True}, 200, {'ContentType': 'application/json'}
@api.route('/template/test_json/<string:template_name>')
class TestTemplate2(Resource):
def post(self, template_name):
import base64
import io
"""
Upload image with base64 format and save file stream
"""
if not request.content_type == 'application/json':
return jsonify({'error': 'Content-type must be application/json!', 'status_code':401})
print(request.content_type)
data = request.json
# print(data)
if data is None:
return jsonify({'error': 'No valid request body, json missing!'})
else:
img_data = data['image']
filename = data['filename']
user_name= data['username']
prefix = user_name.lower()
filename = filename.replace(' ', '-').replace('#','').lower()
image = base64.b64decode(img_data[img_data.find(',')+1:])
in_memory_image = io.BytesIO(image)
# this method convert and save the base64 string to image
# filename = self.convert_and_save(img_data, filename)
# saving in memory image to gcs bucket
img_public_url = storage.upload_from_string(
prefix + '/' + filename, in_memory_image.getvalue())
print(img_public_url)
doc = DocType()
extractTables = False
print(extractTables)
result = doc.process(template_name, filename, prefix=prefix, extractTables=extractTables)
print("RESULT from crud.py", result)
return jsonify(result)
@api.route('/template/admin/test')
class AdminPage(Resource):
def get(self):
print(request.content_type)
from src.utils.AppConstants import AppConstants
temp_json = src.utils.AppConstants.temp
print(temp_json)
print("Admin page loading ...")
return jsonify(temp_json)
@api.route('/template/test/<string:template_name>')
class TestTemplate(Resource):
@api.expect(file_upload)
def post(self, template_name):
print(request.content_type)
prefix = get_username(file_upload)
args = file_upload.parse_args()
image = args['image']
img_name = image.filename.replace(' ', '-').replace('#','').lower()
ext = img_name.split('.')[-1]
img_name = ".".join(img_name.split('.')[:-1]).replace('.','')
img_name = f'{img_name}.{ext}'
print("IMAGENAME:",img_name," image.filename:",image.filename)
content_type = ''
image = image.stream.read()
try:
img_public_url = storage.upload_from_string(
prefix + '/' + img_name, image)
print("PUBLIC:",img_public_url)
except Exception as e:
print("Upload error")
print(e)
pass
doc = DocType()
extractTables = args['show_table']
print(extractTables)
ext = ".jpg"
if '.png' in img_name:
ext = ".png"
result = doc.process(template_name, img_name, prefix=prefix, extractTables=extractTables)
outputImage = None
try:
boundingBoxBlob = storage.read_file_from_bucket(f'{prefix}/boundaries_{str(img_name).replace(".pdf",ext)}',Constants.IMAGE_SOURCE_DIR)
import base64
outputImage = base64.encodebytes(boundingBoxBlob).decode('ascii')
except Exception as e:
print("Couldn't find boundary file")
print(e)
pass
print("RESULT from crud.py", result)
return {"result":result,"outputImage": outputImage}
# if '.png' in img_name:
# ext = ".png"
# result = doc.process(template_name, img_name, prefix=prefix, extractTables=extractTables)
# boundingBoxBlob = storage.read_file_from_bucket(f'{prefix}/boundaries_{str(img_name).replace(".pdf",ext)}',Constants.IMAGE_SOURCE_DIR)
# import base64
# print("RESULT from crud.py", result)
# return {"result":result,"outputImage":base64.encodebytes(boundingBoxBlob).decode('ascii')}
@api.route('/template/import/<string:template_name>')
class Template(Resource):
@api.expect(header_parser)
def get(self, template_name):
prefix = get_username(header_parser)
generic_template = f'generic/templates/{template_name}.json'
d = datetime.utcnow()
try:
template_json = storage.read_file(generic_template)
except NotFound:
return {'error': 'Template not found'}, 404, {'ContentType': 'application/json'}
template_name = prefix + '/' + template_name + \
"-" + d.strftime("%Y%m%d%H%M%S") + '.json'
storage.upload_file(
f'{template_name}', template_json, 'application/json')
return {'success': True}, 200, {'ContentType': 'application/json'}
@api.route('/templates/generic')
class Templates(Resource):
def get(self):
prefix = 'generic' + "/" + "templates"
templates = []
blobs = storage.list_blobs_with_prefix(prefix + '/', '/')
for blob in blobs:
name = str(blob.name).split('/')[2]
if name:
templates.append(name.split('.json')[0])
return jsonify(templates)
@api.route('/templates/types')
class TemplateTypes(Resource):
@api.expect(header_parser)
def get(self):
args = header_parser.parse_args()
username = args.username.lower()
prefix = username + "/" + Constants.SAMPLE_DOCS
templates = []
try:
blobs = storage.list_blobs_with_prefix_from_bucket(prefix,None, Constants.TEMPLATE_FOLDER)
for blob in blobs:
name = blob.name
if not name.endswith("/") and 'default' not in name:
templates.append(name.split("/")[-2])
except:
templates = ['UB-04-form','birth-certificate','drivers-license','invoice','passport','w2-form']
return jsonify(list(set(templates)))
def add_template_hints(template_name, advanced_fields):
# Append advanced fields : hints in template
# hints: [{'keyword' :'label', 'hint':'[nearest, top, right, below]'}, {'label': 'nearest'}, {['label':'top']}]
hints = advanced_fields['add_hints']
template_json = storage.read_file(template_name)
template = json.loads(template_json)
keyword_mapping = template[Constants.DOCUMENT_KEY]['keyword_mapping']
for key in keyword_mapping:
for hint in hints:
if key['keyword']==hint['keyword']:
key['hint'] = hint['hint']
return template
def get_username(headers):
args = headers.parse_args()
return args.username.lower() + "/" + "templates"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment