Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wanghaisheng/d985ebc944fdb73e2bf7897e96913f47 to your computer and use it in GitHub Desktop.
Save wanghaisheng/d985ebc944fdb73e2bf7897e96913f47 to your computer and use it in GitHub Desktop.
Takes as input a video with hardsubs, and will generate translated softsubs in the target language
# Requirements:
# - ImageMagick binary
# - Windows.Media.Ocr.Cli binary
# - VideoSubFinder binary
#
# Official GCloud Translate Setup:
# First 500k characters / mo is free: https://cloud.google.com/translate/pricing
# Install Python Module: pip install google-cloud-translate
# Setup Google Cloud account and billing information: https://cloud.google.com/
# Make a new project and enable "Cloud Translation API": https://console.cloud.google.com/apis/dashboard
# Install GCloud CLI: https://cloud.google.com/sdk/docs/install
# Setup authentication: https://cloud.google.com/docs/authentication/provide-credentials-adc#on-prem
import argparse
import glob
import os
import shutil
import subprocess
import sys
import time
from google.cloud import translate
def gcloud_API_translate_text(textList, project_id, target_language_code):
client = translate.TranslationServiceClient()
location = "global"
parent = f"projects/{project_id}/locations/{location}"
MAX_STRINGS = 1024 # GCloud API has hard limit of 1024 lines per request
numStringsToTranslate = len(textList)
numStringsTranslated = 0
translations = []
while numStringsTranslated < numStringsToTranslate:
endIndex = numStringsTranslated + MAX_STRINGS
if endIndex > numStringsToTranslate:
endIndex = numStringsToTranslate
contents = textList[numStringsTranslated:endIndex]
response = client.translate_text(
request={
"parent": parent,
"contents": contents,
"mime_type": "text/plain",
"target_language_code": target_language_code,
}
)
numStringsTranslated += len(response.translations)
translations.extend(response.translations)
return translations
os.system("") # enables ansi escape characters in terminal
LINE_CLEAR = '\x1b[2K' # <-- ANSI sequence
parser = argparse.ArgumentParser(description="Takes as input a video with hardsubs, and will generate translated softsubs in the target language.")
parser.add_argument('video_path', help='Path to input video')
parser.add_argument('-pid', '--project_id', help='Google Cloud Project ID', required=True)
parser.add_argument('-t', '--tmp_dir', help='Path to tmp dir', default='tmp')
parser.add_argument('-te', '--top_edge_offset', help='How much of top of video to ignore', default=0.75)
parser.add_argument('-be', '--bottom_edge_offset', help='How much of bottom of video to ignore', default=0.0)
parser.add_argument('-l', '--language', help='Language to translate to', default='en-US', choices=['en-US', 'zh-CN', 'zh-TW', 'ja', 'ko'])
parser.add_argument('--test_run', action='store_true', help='Runs a few times and displays debug info')
parser.add_argument('--skip_cleanup', action='store_true', help='Does not delete temporary files')
parser.add_argument('--skip_extract', action='store_true', help='Skips extracting subtitle images step (uses cached temporary files)')
args = parser.parse_args()
video_path = args.video_path
video_filename, ext = os.path.splitext(video_path)
tmp_dir = args.tmp_dir
txt_images_folder = os.path.join(tmp_dir, "TXTImages")
txt_results_folder = os.path.join(tmp_dir, "TXTResults")
target_language_code = args.language
project_id = args.project_id
test_run = args.test_run
skip_cleanup = args.skip_cleanup or test_run
skip_extract = args.skip_extract
# Generate raw images of the subtitles
if not skip_extract:
print("Extracting subtitle images with VideoSubFinder (takes quite a long time) ...")
startTime = time.time()
subprocess.run([
"VideoSubFinderWXW.exe",
"--clear_dirs",
"--run_search",
"--create_cleared_text_images",
"--input_video", video_path,
"--output_dir", tmp_dir,
"--num_threads", str(4),
"--num_ocr_threads", str(4),
"--top_video_image_percent_end", str(0.25),
"--bottom_video_image_percent_end", str(0.0)
], capture_output=True)
endTime = time.time()
print("Completed! Took "+str(endTime - startTime)+"s")
# Enumerate all the images
imagePaths = []
if os.path.isdir(txt_images_folder):
filetypes = ('*.jpg', '*.jpeg', '*.png')
for filetype in filetypes:
globPath = os.path.join(txt_images_folder, filetype);
imagePaths.extend(glob.glob(globPath));
else:
print("ERROR: Invalid paths provided!\n")
parser.print_help()
sys.exit(2)
numImages = len(imagePaths)
# Reduce image size OCR has a max image size
i = 0
totalTimeElapsedS = 0
estimateText = "Estimated time remaining: Unknown"
for imagePath in imagePaths:
if test_run and i > 10:
break
progressText = "Preprocessing image "+str(i+1)+"/"+str(numImages)+". "+estimateText+". Filename: " + os.path.basename(imagePath)
print(end=LINE_CLEAR)
print(progressText, end='\r')
startTime = time.time()
# Leaving some padding and not doing "-trim" seems to be important
subprocess.run(["magick", "convert", imagePath, "-resize", "x200>", imagePath], capture_output=True)
endTime = time.time()
i += 1
# Some logic to provide an estimated time
timeElapsed = endTime - startTime
totalTimeElapsedS += timeElapsed
averageTime = totalTimeElapsedS / float(i)
numImagesRemaining = numImages - i
estimateText = "Estimated time remaining: "+str(averageTime * numImagesRemaining)+"s"
# Run Windows OCR on the images and save to text file
i = 0
totalTimeElapsedS = 0
totalCharacterCount = 0
estimateText = "Estimated time remaining: Unknown"
baseNameList = []
textList = []
for imagePath in imagePaths:
if test_run and i > 10:
break
progressText = "Running OCR on image "+str(i+1)+"/"+str(numImages)+". "+estimateText+". Filename: " + os.path.basename(imagePath)
print(end=LINE_CLEAR)
print(progressText, end='\r')
startTime = time.time()
result = subprocess.run(["Windows.Media.Ocr.Cli.exe", imagePath], capture_output=True)
ocrText = result.stdout.decode("utf-8")
# Replace carriage returns with spaces.
# This leads to more naturally flowing translations most of the time.
ocrText = " ".join(ocrText.splitlines()).strip()
filename, ext = os.path.splitext(imagePath)
basename = os.path.basename(filename)
# GCloud API throws error if we give it blanks
if len(ocrText) > 0:
baseNameList.append(basename)
textList.append(ocrText)
totalCharacterCount += len(ocrText)
endTime = time.time()
i += 1
# Some logic to provide an estimated time
timeElapsed = endTime - startTime
totalTimeElapsedS += timeElapsed
averageTime = totalTimeElapsedS / float(i)
numImagesRemaining = numImages - i
estimateText = "Estimated time remaining: "+str(averageTime * numImagesRemaining)+"s"
# Additionally writeout to file for easier debugging
original_text_file = os.path.join(tmp_dir, "original_text.txt")
with open(original_text_file, 'w', encoding="utf-8") as f:
for i in range(len(textList)):
f.write(str(i)+": "+textList[i]+"\n")
print('')
# Translate via Google Cloud Translation API and output to disk
print("Translating text...")
translations = gcloud_API_translate_text(textList, project_id, target_language_code)
print("# of characters translated: " + str(totalCharacterCount))
print("# of lines translated: " + str(len(translations)))
for i in range(len(translations)):
out_filepath = os.path.join(txt_results_folder, baseNameList[i]) + '.txt'
translated_text = translations[i].translated_text
if test_run:
print("Orignal text: " + repr(textList[i]))
print("Translated text: " + repr(translated_text))
with open(out_filepath, 'w', encoding="utf-8") as f:
f.write(translated_text)
# Additionally writeout to file for easier debugging
with open(os.path.join(tmp_dir, "translated_text.txt"), 'w', encoding="utf-8") as f:
for i in range(len(translations)):
f.write(str(i)+": "+translations[i].translated_text+"\n")
# Generate subtitle file
srt_path = video_filename+".srt"
print("Generating softsubs with VideoSubFinder... Output file: "+srt_path)
startTime = time.time()
subprocess.run([
"VideoSubFinderWXW.exe",
"--create_sub_from_txt_results", srt_path,
"--output_dir", tmp_dir
], capture_output=True)
endTime = time.time()
print("Completed! Took "+str(endTime - startTime)+"s")
# Cleanup if needed
if not skip_cleanup:
shutil.rmtree(tmp_dir, ignore_errors=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment