Skip to content

Instantly share code, notes, and snippets.

@peacing
Created November 28, 2020 18:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save peacing/ca5272369bc631037a35aa7491525a8d to your computer and use it in GitHub Desktop.
Save peacing/ca5272369bc631037a35aa7491525a8d to your computer and use it in GitHub Desktop.
import boto3
import time
import json, datetime
import pandas as pd
import argparse
import os
s3 = boto3.resource('s3')
s3_client = boto3.client('s3')
transcribe = boto3.client('transcribe', region_name='us-east-1')
aws_account_id = os.environ['AWS_ACCOUNT_ID']
today = datetime.datetime.now().strftime('%Y-%m-%d')
def parse_transcribe_ouput(transcribe_json_data):
rawjsondata = transcribe_json_data
data_for_athena = {"time": [], "speaker_tag": [], "comment": []}
# Identifying speaker populating speakers into an array
if "speaker_labels" in rawjsondata["results"].keys():
# processing segment for building array for speaker and time duration for building csv file
for segment in rawjsondata["results"]["speaker_labels"]["segments"]:
# if items
if len(segment["items"]) > 0:
data_for_athena["time"].append(time_conversion(segment["start_time"]))
timesm = time_conversion(segment["start_time"])
data_for_athena["speaker_tag"].append(segment["speaker_label"])
data_for_athena["comment"].append("")
# looping thru each word
for word in segment["items"]:
pronunciations = list(
filter(
lambda x: x["type"] == "pronunciation",
rawjsondata["results"]["items"],
)
)
word_result = list(
filter(
lambda x: x["start_time"] == word["start_time"]
and x["end_time"] == word["end_time"],
pronunciations,
)
)
result = sorted(
word_result[-1]["alternatives"], key=lambda x: x["confidence"]
)[-1]
# for the word!
data_for_athena["comment"][-1] += " " + result["content"]
# Check for punctuation !!!!
try:
word_result_index = rawjsondata["results"]["items"].index(
word_result[0]
)
next_item = rawjsondata["results"]["items"][word_result_index + 1]
if next_item["type"] == "punctuation":
data_for_athena["comment"][-1] += next_item["alternatives"][0][
"content"
]
except IndexError:
pass
# Invalid File exiting!
else:
print("Need to have speaker identification, Please check the file")
return
return data_for_athena
def time_conversion(timeX):
times = datetime.timedelta(seconds=float(timeX))
times = times - datetime.timedelta(microseconds=times.microseconds)
return str(times)
def run(input_file, file_format, num_speakers):
# upload input file to s3
filename_no_ext = input_file.split('/')[-1].split('.')[0]
print(today, filename_no_ext)
key = f'input/dt{today}/{filename_no_ext}'
bucket = f'psdl-{aws_account_id}-transcribe'
s3.meta.client.upload_file(Filename=input_file, Bucket=bucket, Key=key)
# start transcribe job
job_name = f'Transcribe-{filename_no_ext}-Job'
job_uri = "https://" + bucket + ".s3.amazonaws.com/" + key
output_key = f'output/dt{today}/'
transcribe.start_transcription_job(
TranscriptionJobName=job_name,
Media={'MediaFileUri': job_uri},
MediaFormat=file_format,
LanguageCode='en-US',
OutputBucketName=bucket,
OutputKey=output_key,
Settings={
'ShowSpeakerLabels': True,
'MaxSpeakerLabels': int(num_speakers),
'ChannelIdentification': False
}
)
#wait? poll for s3 output
output_key = f'output/dt{today}/{job_name}.json'
time.sleep(30)
i = 1
while True:
status = transcribe.get_transcription_job(TranscriptionJobName=job_name)
print(f'Check No. {i} if transcribe job complete....')
print(status)
if status['TranscriptionJob']['TranscriptionJobStatus'] in ['COMPLETED', 'FAILED']:
break
print('Job Still running....')
time.sleep(30)
i += 1
if i > 15:
break
if status['TranscriptionJob']['TranscriptionJobStatus'] in ['COMPLETED']:
# Checks the status of transcribe job and once completed it will delete the job.
response = transcribe.delete_transcription_job(TranscriptionJobName=job_name)
if status['TranscriptionJob']['TranscriptionJobStatus'] in ['FAILED']:
print('Job Failed issue with Audio file please check audio file and upload proper audio file again')
response = transcribe.delete_transcription_job(TranscriptionJobName=job_name)
print(f'bucket: {bucket}, key: {output_key}')
text = s3_client.get_object(Bucket=bucket, Key=output_key)['Body']
s3objectdata = text.read().decode()
transcribe_json_data = json.loads(s3objectdata)
parsed_output = parse_transcribe_ouput(transcribe_json_data)
df = pd.DataFrame(parsed_output)
df.to_csv(f'{filename_no_ext}Output.csv', index=False)
if __name__== '__main__':
aparser = argparse.ArgumentParser()
aparser.add_argument("-i", "--input-file", required=True)
aparser.add_argument("-f", "--file-format", required=True, default='mp3')
aparser.add_argument("-n", "--num-speakers", required=True, default=2)
args = aparser.parse_args()
input_file = args.input_file
file_format = args.file_format
num_speakers = args.num_speakers
run(input_file, file_format, num_speakers)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment