Skip to content

Instantly share code, notes, and snippets.

@fcmendoza
Created December 16, 2019 16:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fcmendoza/26e9d7373fe55fe58b689a7e69677115 to your computer and use it in GitHub Desktop.
Save fcmendoza/26e9d7373fe55fe58b689a7e69677115 to your computer and use it in GitHub Desktop.
# See comments for examples
@fcmendoza
Copy link
Author

zentiment-transcribe

import boto3
import json
import logging
import time
import math
from decimal import *

logger = logging.getLogger("handler_logger")
logger.setLevel(logging.DEBUG)
s3 = boto3.resource('s3')
transcribe = boto3.client('transcribe')

def lambda_handler(event, context):
    do_work(event, context)
    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }

def do_work(event, context):
    logger.info("Lambda zentiment-transcribe started.")
    

    filename = event['Records'][0]['s3']['object']['key']
    bucket_name = event['Records'][0]['s3']['bucket']['name']

    obj = s3.Object("zentiment-recordings", filename)
    
    logger.info("Processing file " + filename)
    
    job_name = filename
    job_uri = "https://zentiment-recordings.s3.us-east-2.amazonaws.com/" + filename
    
    jobs = transcribe.list_transcription_jobs(JobNameContains=job_name)
    
    #print(jobs)
    logger.info("file format")
    logger.info(filename[-3:].lower())
    
    if len(jobs['TranscriptionJobSummaries']):
        logger.info("Transcription job " + job_name + " already exists.")
    else:
        logger.info("Creating transcription job " + job_name)
        
        transcribe.start_transcription_job(
            TranscriptionJobName=job_name,
            Media={'MediaFileUri': job_uri},
            MediaFormat=filename[-3:].lower(),
            LanguageCode='en-US',
            Settings={'ChannelIdentification': True},
            OutputBucketName="zentiment-recordings"
        )
        
        logger.info("Transcription job " + job_name + " created.")
        # while True:
        #     status = transcribe.get_transcription_job(TranscriptionJobName=job_name)
        #     if status['TranscriptionJob']['TranscriptionJobStatus'] in ['COMPLETED', 'FAILED']:
        #         break
        #     logger.debug("Job not ready yet...")
        #     time.sleep(15)
        # print(status)
        
    return _get_response(200, "Connect successful.")
        
def _get_response(status_code, body):
    if not isinstance(body, str):
        body = json.dumps(body)
    return {"statusCode": status_code, "body": body}

@fcmendoza
Copy link
Author

zentiment-comprehend

import boto3
import json
import logging
import time
import math
from decimal import *

logger = logging.getLogger("handler_logger")
logger.setLevel(logging.DEBUG)
dynamodb = boto3.resource("dynamodb")
s3 = boto3.resource('s3')
comprehend = boto3.client(service_name='comprehend', region_name='us-east-2')
table = dynamodb.Table("zentiment-results")


def lambda_handler(event, context):
    do_work(event, context)
    return { 'statusCode': 200, 'body': json.dumps('Hello from Lambda!') }

def do_work(event, context):
    logger.info("Lambda zentiment-comprehend started.")
                    
    #text = "It is raining today in Seattle"
    #print(json.dumps(comprehend.detect_sentiment(Text=text, LanguageCode='en'), sort_keys=True, indent=4))
    
    filename = event['Records'][0]['s3']['object']['key']
    bucket_name = event['Records'][0]['s3']['bucket']['name']
    
    logger.info("Processing transcript " + filename)

    obj = s3.Object("zentiment-recordings", filename)
    #body = obj.get()['Body'].read()
    #text = body.decode('utf8')
    
    json_file = obj.get()['Body']
    data = json.load(json_file)
    
    #text = data["results"]["transcripts"][0]["transcript"]
    
    text1 = get_channel_transcript(data, 0)
    sent_to_comprehend(filename, text1, 0)
    
    text2 = get_channel_transcript(data, 1)
    sent_to_comprehend(filename, text2, 1)
        
    return _get_response(200, "Connect successful.")
        

def get_channel_transcript(json_data, number):
    text = "" 
    
    items = json_data["results"]["channel_labels"]["channels"][number]["items"]
    
    for item in items:
        alternatives = item["alternatives"]
        for alt in alternatives:
            if item["type"] in "pronunciation":
                text = text + " " + alt["content"]
            else:
                text = text + alt["content"]
    
    if len(text) > 5000: # Max length of request text allowed is 5000 bytes 
        logger.info("Transcript size of " + str(len(text)) + " is larger than the allowed 5000 limit by the Comprehend API. Analysis will be performed with the first 5000 characters.")
        text = text[0:5000]
    return text

def sent_to_comprehend(filename, text, channel_number):
    logger.debug("Calling detect_sentiment for channel " + str(channel_number))
    sentiment_result = comprehend.detect_sentiment(Text=text, LanguageCode='en')
    logger.debug("End of detect_sentiment for channel " + str(channel_number))
    
    logger.debug(sentiment_result["Sentiment"])
    
    logger.debug('Calling detect_key_phrases')
    key_phrases_result = comprehend.detect_key_phrases(Text=text, LanguageCode='en')
    logger.debug('End of detect_key_phrases')
    
    logger.debug('Calling detect_entities')
    entities_result = comprehend.detect_entities(Text=text, LanguageCode='en')
    logger.debug('End of detect_entities')
    
    phrases = key_phrases_result["KeyPhrases"]
    entities = entities_result["Entities"]
    
    #print (phrases)
    # logger.debug(phrases[0]["Score"])
    
    for phrase in phrases:
        phrase["Score"] = Decimal(phrase["Score"])
        # phrase["BeginOffset"] = Decimal(phrase["BeginOffset"])
        # phrase["EndOffset"] = Decimal(phrase["EndOffset"])
        
    
    for entity in entities:
        entity["Score"] = Decimal(entity["Score"])
        # entity["BeginOffset"] = Decimal(entity["BeginOffset"])
        # entity["EndOffset"] = Decimal(entity["EndOffset"])
    
    logger.debug("Saving analysis to dynamodb.")
    
    table.put_item(Item={"conversation_id": filename + "_" + str(channel_number),
        "account_number": "12333",
        "sentiment" : sentiment_result["Sentiment"],
        "transcript" : text,
        "sentiment_score" : { 
            "mixed" : Decimal(sentiment_result["SentimentScore"]["Mixed"])*100 ,
            "negative" : Decimal(sentiment_result["SentimentScore"]["Negative"])*100 ,
            "neutral" : Decimal(sentiment_result["SentimentScore"]["Neutral"])*100 ,
            "positive" : Decimal(sentiment_result["SentimentScore"]["Positive"])*100 ,
        },
        "key_phrases" : phrases,
        "entities" : entities
    })

def _get_response(status_code, body):
    if not isinstance(body, str):
        body = json.dumps(body)
    return {"statusCode": status_code, "body": body}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment