Skip to content

Instantly share code, notes, and snippets.

@alyssadev
Created October 21, 2020 21:27
Show Gist options
  • Save alyssadev/7c35c8049b0e7885e2cea676a193c37c to your computer and use it in GitHub Desktop.
Save alyssadev/7c35c8049b0e7885e2cea676a193c37c to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# pip3 install google_cloud_storage google_cloud_speech srt
from google.cloud import speech, storage
from sys import stderr
from time import sleep
from argparse import ArgumentParser
parser = ArgumentParser()
parser.add_argument("filename")
parser.add_argument("-w", "--word_time", action="store_true", help="enable_word_time_offsets")
parser.add_argument("-p", "--punctuation", action="store_true")
parser.add_argument("-l", "--lang", default="en_US")
parser.add_argument("-r", "--sample_rate", type=int, default=44100)
parser.add_argument("--bucket", default="adslivetranscribe")
parser.add_argument("--keep_in_storage", action="store_true")
args = parser.parse_args()
if args.word_time:
import srt, datetime
# from https://medium.com/searce/generate-srt-file-subtitles-using-google-clouds-speech-to-text-api-402b2f1da3bd
def subtitle_generation(response, bin_size=3):
"""We define a bin of time period to display the words in sync with audio.
Here, bin_size = 3 means each bin is of 3 secs.
All the words in the interval of 3 secs in result will be grouped togather."""
transcriptions = []
index = 0
for result in response.results:
try:
if result.alternatives[0].words[0].start_time.seconds:
# bin start -> for first word of result
start_sec = result.alternatives[0].words[0].start_time.seconds
start_microsec = result.alternatives[0].words[0].start_time.microseconds
else:
# bin start -> For First word of response
start_sec = 0
start_microsec = 0
end_sec = start_sec + bin_size # bin end sec
# for last word of result
last_word_end_sec = result.alternatives[0].words[-1].end_time.seconds
last_word_end_microsec = result.alternatives[0].words[-1].end_time.microseconds
# bin transcript
transcript = result.alternatives[0].words[0].word
index += 1 # subtitle index
for i in range(len(result.alternatives[0].words) - 1):
try:
word = result.alternatives[0].words[i + 1].word
word_start_sec = result.alternatives[0].words[i + 1].start_time.seconds
word_start_microsec = result.alternatives[0].words[i + 1].start_time.microseconds # 0.001 to convert nana -> micro
word_end_sec = result.alternatives[0].words[i + 1].end_time.seconds
word_end_microsec = result.alternatives[0].words[i + 1].end_time.microseconds
if word_end_sec < end_sec:
transcript = transcript + " " + word
else:
previous_word_end_sec = result.alternatives[0].words[i].end_time.seconds
previous_word_end_microsec = result.alternatives[0].words[i].end_time.microseconds
# append bin transcript
transcriptions.append(srt.Subtitle(index, datetime.timedelta(0, start_sec, start_microsec), datetime.timedelta(0, previous_word_end_sec, previous_word_end_microsec), transcript))
# reset bin parameters
start_sec = word_start_sec
start_microsec = word_start_microsec
end_sec = start_sec + bin_size
transcript = result.alternatives[0].words[i + 1].word
index += 1
except IndexError:
pass
# append transcript of last transcript in bin
transcriptions.append(srt.Subtitle(index, datetime.timedelta(0, start_sec, start_microsec), datetime.timedelta(0, last_word_end_sec, last_word_end_microsec), transcript))
index += 1
except IndexError:
pass
# turn transcription list into subtitles
subtitles = srt.compose(transcriptions)
return subtitles
client = speech.SpeechClient()
config = speech.RecognitionConfig(encoding=speech.RecognitionConfig.AudioEncoding.ENCODING_UNSPECIFIED, sample_rate_hertz=args.sample_rate, language_code=args.lang, enable_automatic_punctuation=args.punctuation, enable_word_time_offsets=args.word_time)
storage_client = storage.Client()
bucket = storage_client.bucket(args.bucket)
blob = bucket.blob(args.filename)
print("uploading {}...".format(blob.name), file=stderr)
blob.upload_from_filename(blob.name)
print("done uploading, processing", file=stderr)
audio = speech.RecognitionAudio(uri="gs://{}/{}".format(args.bucket, blob.name))
operation = client.long_running_recognize(config=config, audio=audio)
x = 0
while not operation.done():
print("Waiting" + ("." * x) + "\r", end="", file=stderr)
x += 1
sleep(2)
print("", file=stderr)
response = operation.result()
if not args.word_time:
print("".join(r.alternatives[0].transcript for r in response.results))
else:
print(subtitle_generation(response))
if not args.keep_in_storage:
blob.delete()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment