Created
November 28, 2020 18:04
-
-
Save peacing/ca5272369bc631037a35aa7491525a8d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import time | |
import json, datetime | |
import pandas as pd | |
import argparse | |
import os | |
s3 = boto3.resource('s3') | |
s3_client = boto3.client('s3') | |
transcribe = boto3.client('transcribe', region_name='us-east-1') | |
aws_account_id = os.environ['AWS_ACCOUNT_ID'] | |
today = datetime.datetime.now().strftime('%Y-%m-%d') | |
def parse_transcribe_ouput(transcribe_json_data): | |
rawjsondata = transcribe_json_data | |
data_for_athena = {"time": [], "speaker_tag": [], "comment": []} | |
# Identifying speaker populating speakers into an array | |
if "speaker_labels" in rawjsondata["results"].keys(): | |
# processing segment for building array for speaker and time duration for building csv file | |
for segment in rawjsondata["results"]["speaker_labels"]["segments"]: | |
# if items | |
if len(segment["items"]) > 0: | |
data_for_athena["time"].append(time_conversion(segment["start_time"])) | |
timesm = time_conversion(segment["start_time"]) | |
data_for_athena["speaker_tag"].append(segment["speaker_label"]) | |
data_for_athena["comment"].append("") | |
# looping thru each word | |
for word in segment["items"]: | |
pronunciations = list( | |
filter( | |
lambda x: x["type"] == "pronunciation", | |
rawjsondata["results"]["items"], | |
) | |
) | |
word_result = list( | |
filter( | |
lambda x: x["start_time"] == word["start_time"] | |
and x["end_time"] == word["end_time"], | |
pronunciations, | |
) | |
) | |
result = sorted( | |
word_result[-1]["alternatives"], key=lambda x: x["confidence"] | |
)[-1] | |
# for the word! | |
data_for_athena["comment"][-1] += " " + result["content"] | |
# Check for punctuation !!!! | |
try: | |
word_result_index = rawjsondata["results"]["items"].index( | |
word_result[0] | |
) | |
next_item = rawjsondata["results"]["items"][word_result_index + 1] | |
if next_item["type"] == "punctuation": | |
data_for_athena["comment"][-1] += next_item["alternatives"][0][ | |
"content" | |
] | |
except IndexError: | |
pass | |
# Invalid File exiting! | |
else: | |
print("Need to have speaker identification, Please check the file") | |
return | |
return data_for_athena | |
def time_conversion(timeX): | |
times = datetime.timedelta(seconds=float(timeX)) | |
times = times - datetime.timedelta(microseconds=times.microseconds) | |
return str(times) | |
def run(input_file, file_format, num_speakers): | |
# upload input file to s3 | |
filename_no_ext = input_file.split('/')[-1].split('.')[0] | |
print(today, filename_no_ext) | |
key = f'input/dt{today}/{filename_no_ext}' | |
bucket = f'psdl-{aws_account_id}-transcribe' | |
s3.meta.client.upload_file(Filename=input_file, Bucket=bucket, Key=key) | |
# start transcribe job | |
job_name = f'Transcribe-{filename_no_ext}-Job' | |
job_uri = "https://" + bucket + ".s3.amazonaws.com/" + key | |
output_key = f'output/dt{today}/' | |
transcribe.start_transcription_job( | |
TranscriptionJobName=job_name, | |
Media={'MediaFileUri': job_uri}, | |
MediaFormat=file_format, | |
LanguageCode='en-US', | |
OutputBucketName=bucket, | |
OutputKey=output_key, | |
Settings={ | |
'ShowSpeakerLabels': True, | |
'MaxSpeakerLabels': int(num_speakers), | |
'ChannelIdentification': False | |
} | |
) | |
#wait? poll for s3 output | |
output_key = f'output/dt{today}/{job_name}.json' | |
time.sleep(30) | |
i = 1 | |
while True: | |
status = transcribe.get_transcription_job(TranscriptionJobName=job_name) | |
print(f'Check No. {i} if transcribe job complete....') | |
print(status) | |
if status['TranscriptionJob']['TranscriptionJobStatus'] in ['COMPLETED', 'FAILED']: | |
break | |
print('Job Still running....') | |
time.sleep(30) | |
i += 1 | |
if i > 15: | |
break | |
if status['TranscriptionJob']['TranscriptionJobStatus'] in ['COMPLETED']: | |
# Checks the status of transcribe job and once completed it will delete the job. | |
response = transcribe.delete_transcription_job(TranscriptionJobName=job_name) | |
if status['TranscriptionJob']['TranscriptionJobStatus'] in ['FAILED']: | |
print('Job Failed issue with Audio file please check audio file and upload proper audio file again') | |
response = transcribe.delete_transcription_job(TranscriptionJobName=job_name) | |
print(f'bucket: {bucket}, key: {output_key}') | |
text = s3_client.get_object(Bucket=bucket, Key=output_key)['Body'] | |
s3objectdata = text.read().decode() | |
transcribe_json_data = json.loads(s3objectdata) | |
parsed_output = parse_transcribe_ouput(transcribe_json_data) | |
df = pd.DataFrame(parsed_output) | |
df.to_csv(f'{filename_no_ext}Output.csv', index=False) | |
if __name__== '__main__': | |
aparser = argparse.ArgumentParser() | |
aparser.add_argument("-i", "--input-file", required=True) | |
aparser.add_argument("-f", "--file-format", required=True, default='mp3') | |
aparser.add_argument("-n", "--num-speakers", required=True, default=2) | |
args = aparser.parse_args() | |
input_file = args.input_file | |
file_format = args.file_format | |
num_speakers = args.num_speakers | |
run(input_file, file_format, num_speakers) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment