Skip to content

Instantly share code, notes, and snippets.

@krhoyt
Created November 15, 2022 23:38
Show Gist options
  • Save krhoyt/46a8fac60cb25c44eaad9447e4c4685a to your computer and use it in GitHub Desktop.
Save krhoyt/46a8fac60cb25c44eaad9447e4c4685a to your computer and use it in GitHub Desktop.
AWS Transcribe with Python
AWS_ACCESS_KEY=_YOUR_ACCESS_KEY_
AWS_SECRET_KEY=_YOUR_SECRET_KEY_
AWS_REGION=_S3_REGION_
LOCAL_AUDIO=hello-world.m4a
S3_BUCKET=_S3_BUCKET_
S3_OBJECT=_PATH_TO_AUDIO_ON_S3_
SLEEP_TIME=5
TRANSCRIPTION_JOB=hello-world
import boto3
import io
import json
import os
import sys
import time
from dotenv import load_dotenv
from pathlib import Path
# Environment variables
dotenv_path = Path( "transcribe.env" )
load_dotenv( dotenv_path = Path( "transcribe.env" ) )
# Storage client
storage = boto3.client(
"s3",
aws_access_key_id = os.getenv( "AWS_ACCESS_KEY" ),
aws_secret_access_key = os.getenv( "AWS_SECRET_KEY" ),
region_name = os.getenv( "AWS_REGION" )
)
# Check audio file exists
try:
response = storage.get_object_attributes(
Bucket = os.getenv( "S3_BUCKET" ),
Key = os.getenv( "S3_OBJECT" ),
ObjectAttributes = ["ETag"]
)
print( response )
# Upload audio file if needed
except:
storage.upload_file(
Filename = os.getenv( "LOCAL_AUDIO" ),
Bucket = os.getenv( "S3_BUCKET" ),
Key = os.getenv( "S3_OBJECT" )
)
# Transcribe client
transcribe = boto3.client(
"transcribe",
aws_access_key_id = os.getenv( "AWS_ACCESS_KEY" ),
aws_secret_access_key = os.getenv( "AWS_SECRET_KEY" ),
region_name = os.getenv( "AWS_REGION" )
)
# Track job status
completed = False
# Check job exists
try:
response = transcribe.get_transcription_job(
TranscriptionJobName = os.getenv( "TRANSCRIPTION_JOB" )
)
print( response )
# Create job if needed
except:
response = transcribe.start_transcription_job(
TranscriptionJobName = os.getenv( "TRANSCRIPTION_JOB" ),
LanguageCode = "en-US",
OutputBucketName = os.getenv( "S3_BUCKET" ),
OutputKey = os.getenv( "S3_OBJECT" ) + ".json",
Media = {
"MediaFileUri": "s3://" + os.getenv( "S3_BUCKET" ) + "/" + os.getenv( "S3_OBJECT" )
}
)
print( response )
if response["TranscriptionJob"]["TranscriptionJobStatus"] != "IN_PROGRESS":
completed = True
# Job status
while completed == False:
response = transcribe.get_transcription_job(
TranscriptionJobName = os.getenv( "TRANSCRIPTION_JOB" )
)
print( response )
if response["TranscriptionJob"]["TranscriptionJobStatus"] != "IN_PROGRESS":
completed = True
break
time.sleep( int( os.getenv( "SLEEP_TIME" ) ) )
# Job results
# Local file for inspection
dir_index = os.getenv( "S3_OBJECT" ).index( "/" )
file_name = "./" + os.getenv( "S3_OBJECT" )[dir_index:] + ".json"
storage.download_file(
Filename = file_name,
Bucket = os.getenv( "S3_BUCKET" ),
Key = os.getenv( "S3_OBJECT" ) + ".json"
)
with open( file_name, "r" ) as local_file:
response = json.load( local_file )
print( response["results"]["transcripts"][0]["transcript"] )
local_file.close()
"""
# Alternative: Job results
# Straight to memory
data = io.BytesIO()
storage.download_fileobj(
Fileobj = data,
Bucket = os.getenv( "S3_BUCKET" ),
Key = os.getenv( "S3_OBJECT" ) + ".json"
)
response = json.loads( data.getvalue().decode( "utf-8" ) )
print( response )
print( response["results"]["transcripts"][0]["transcript"] )
"""
# Optional cleanup
if "-c" in sys.argv:
# Delete job
transcribe.delete_transcription_job(
TranscriptionJobName = os.getenv( "TRANSCRIPTION_JOB" )
)
# Delete audio file
response = storage.delete_object(
Bucket = os.getenv( "S3_BUCKET" ),
Key = os.getenv( "S3_OBJECT" )
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment