Skip to content

Instantly share code, notes, and snippets.

@zorbaproject
Created August 4, 2022 16:07
Show Gist options
  • Save zorbaproject/3b045a7074afb83485a8cbbe4caefc13 to your computer and use it in GitHub Desktop.
Save zorbaproject/3b045a7074afb83485a8cbbe4caefc13 to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
"""OCR with PDF/TIFF as source files on GCS"""
#Source: https://towardsdatascience.com/how-to-extract-the-text-from-pdfs-using-python-and-the-google-cloud-vision-api-7a0a798adc13
#https://github.com/szeamer/google-cloud-vision-script/blob/main/script.py
#Requirements:
#pip3 install google-cloud-vision
#pip3 install google-cloud-storage
#Enable Google Cloud Vision API
#https://console.cloud.google.com/apis/api/vision.googleapis.com/overview?
#
import sys
import os
import os.path
import json
import re
from google.cloud import vision
from google.cloud import storage
accountfile = "service_account.json"
text_file = open(accountfile, "r", encoding='utf-8')
lines = text_file.read()
text_file.close()
accountdata = json.loads(lines.replace("\n", "").replace("\r", ""))
bucket_name = accountdata["project_id"] + "-pdf2ocr"
def upload_blob(bucket_name, source_file_name, destination_blob_name):
"""Uploads a file to the bucket."""
global accountfile
storage_client = storage.Client.from_service_account_json(accountfile) #storage.Client()
try:
bucket = storage_client.get_bucket(bucket_name)
except:
bucket = storage_client.create_bucket(bucket_name)
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
print("Upload to bucket "+bucket_name)
blob.upload_from_filename(source_file_name)
def delete_blob(bucket_name, blob_name):
"""Deletes a blob from the bucket."""
global accountfile
storage_client = storage.Client.from_service_account_json(accountfile) #storage.Client()
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(blob_name)
print("Delete from bucket")
blob.delete()
def async_detect_document(gcs_source_uri, gcs_destination_uri, lang = ""):
# Supported mime_types are: 'application/pdf' and 'image/tiff'
mime_type = 'application/pdf'
# How many pages should be grouped into each json output file.
batch_size = 100
print("Start detecting text with OCR")
client = vision.ImageAnnotatorClient.from_service_account_json(accountfile) #vision.ImageAnnotatorClient()
feature = vision.Feature(
type_=vision.Feature.Type.DOCUMENT_TEXT_DETECTION)
gcs_source = vision.GcsSource(uri=gcs_source_uri)
input_config = vision.InputConfig(
gcs_source=gcs_source, mime_type=mime_type)
#https://cloud.google.com/python/docs/reference/vision/latest/google.cloud.vision_v1.types.ImageContext
#https://cloud.google.com/vision/docs/ocr#specify_the_language_optional
#https://cloud.google.com/vision/docs/languages
if lang != "":
image_context = vision.ImageContext(
language_hints=lang)
else:
image_context = vision.ImageContext()
gcs_destination = vision.GcsDestination(uri=gcs_destination_uri)
output_config = vision.OutputConfig(
gcs_destination=gcs_destination, batch_size=batch_size)
async_request = vision.AsyncAnnotateFileRequest(
features=[feature], input_config=input_config, image_context=image_context,
output_config=output_config)
operation = client.async_batch_annotate_files(
requests=[async_request])
print('Waiting for the operation to finish.')
operation.result(timeout=420)
def write_to_text(gcs_destination_uri):
# Once the request has completed and the output has been
# written to GCS, we can list all the output files.
storage_client = storage.Client.from_service_account_json(accountfile)
match = re.match(r'gs://([^/]+)/(.+)', gcs_destination_uri)
bucket_name = match.group(1)
prefix = match.group(2)
bucket = storage_client.get_bucket(bucket_name)
# List objects with the given prefix.
blob_list = list(bucket.list_blobs(prefix=prefix))
print('Output files:')
transcription = open(prefix+".txt", "w")
transcription.write("")
transcription.close()
for blob in blob_list:
print(blob.name)
# Process the first output file from GCS.
# Since we specified batch_size=2, the first response contains
# the first two pages of the input file.
for n in range(len(blob_list)):
output = blob_list[n]
json_string = output.download_as_string()
response = json.loads(json_string)
#https://stackoverflow.com/a/59179254
transcription = open(output.name, "w")
transcription.write(json.dumps(response))
transcription.close()
# The actual response for the first page of the input file.
for m in range(len(response['responses'])):
first_page_response = response['responses'][m]
try:
annotation = first_page_response['fullTextAnnotation']
except(KeyError):
print("No annotation for this page.")
# Here we print the full text from the first page.
# The response contains more information:
# annotation/pages/blocks/paragraphs/words/symbols
# including confidence scores and bounding boxes
#print('Full text:\n')
#print(annotation['text'])
#We could decide to ignore block with too few words, or too close to page margin
try:
#print(annotation)
print('Page: '+str(first_page_response['context']['pageNumber'])+" "+str(annotation['pages'][0]['width'])+"x"+str(annotation['pages'][0]['height']))
print('Number of blocks found: '+str(len(annotation['pages'][0]['blocks'])))
totP = 0
for nb in annotation['pages'][0]['blocks']:
totP = totP + len(nb['paragraphs'])
print('Number of paragraphs in page: '+str(totP))
except Exception as e:
print(str(e))
with open(prefix+".txt", "a+", encoding="utf-8") as f:
f.write(annotation['text'])
for blob in blob_list:
delete_blob(bucket_name, blob.name)
def pdf2ocr(file_name, lang = ""):
if not os.path.isfile(file_name):
return
source_file_name = file_name
destination_blob_name = os.path.basename(file_name).lower().replace(".pdf", "")
upload_blob(bucket_name, source_file_name, destination_blob_name)
async_detect_document("gs://"+bucket_name+"/"+destination_blob_name, "gs://"+bucket_name+"/"+destination_blob_name+"-OCR", lang)
write_to_text("gs://"+bucket_name+"/"+destination_blob_name+"-OCR")
delete_blob(bucket_name, destination_blob_name)
if len(sys.argv)==1:
print("Usage: python3 pdf2GoogleOCR.py mydocument.pdf en|it|de|fr...")
sys.exit()
#langs = "en-t-i0-handwrit"
lang = ""
if len(sys.argv)>2:
lang = sys.argv[2]
print(lang)
pdf2ocr(sys.argv[1], lang)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment