Skip to content

Instantly share code, notes, and snippets.

@gusdelact
Last active August 1, 2023 03:57
Show Gist options
  • Save gusdelact/642a297f863b405710baf71a03078c68 to your computer and use it in GitHub Desktop.
Save gusdelact/642a297f863b405710baf71a03078c68 to your computer and use it in GitHub Desktop.
import streamlit as st
import boto3
from datetime import datetime
import time
import requests
###manejo de estado
if 'transcribiendo' not in st.session_state:
st.session_state['transcribiendo'] = False
### Layout inicial
st.title('AILet : NLP & Image/Video Recognition')
idioma=st.sidebar.selectbox('Idioma', ('en-US', 'es-US'))
traducir=st.sidebar.selectbox('Traduccion', ('en-US', 'es-US','ja-JA'))
archivo_salida_audio=st.sidebar.text_input('Nombre archivo salida','')
tab_texto, tab_imagen, tab_video = st.tabs(["Texto", "Imagen", "Video"])
###
def debug_message(message) :
st.sidebar.write(message)
###
def text2speech(texto, archivo_audio,idioma) :
client = boto3.client('polly')
response= client.synthesize_speech(Text=texto,VoiceId="Mia",LanguageCode=idioma ,OutputFormat="mp3" )
debug_message(response)
with open(archivo_audio,"wb") as voz:
voz.write(response['AudioStream'].read())
debug_message(f'Archivo guardado :{archivo_audio}')
###
def hablar(archivo_audio):
st.audio(archivo_audio,format="audio/mp3")
###
def translate(texto,de_lenguaje, a_lenguaje) :
client = boto3.client('translate')
response=client.translate_text(
Text=texto,
SourceLanguageCode=de_lenguaje,
TargetLanguageCode=a_lenguaje
)
debug_message(response['TranslatedText'])
###
def sentiment_analysis(texto,de_lenguaje) :
client = boto3.client('comprehend')
response=client.detect_sentiment(Text=texto,LanguageCode=de_lenguaje)
sentimiento = response['Sentiment']
debug_message(sentimiento)
###
def speech2text(archivo_salida_audio,idioma) :
nombre_bucket = 'aiasaservicetranscribe740327929864'
client_s3 = boto3.client('s3')
debug_message(archivo_salida_audio)
client_s3.upload_file(archivo_salida_audio,nombre_bucket,archivo_salida_audio)
client= boto3.client('transcribe')
dt = datetime.now()
ts = datetime.timestamp(dt)
nombre_job = f"job_{ts}"
ruta_audio= f"s3://{nombre_bucket}/{archivo_salida_audio}"
debug_message(ruta_audio)
formato_audio='mp3'
response=client.start_transcription_job(
TranscriptionJobName = nombre_job,
LanguageCode= idioma,
MediaFormat=formato_audio,
Media={
'MediaFileUri': ruta_audio
}
)
st.session_state['transcribiendo'] = True
print(response)
print(response['TranscriptionJob']['TranscriptionJobName'])
print(st.session_state['transcribiendo'])
if st.session_state['transcribiendo'] :
with st.spinner('Esperar transcripcion...'):
time.sleep(45) #esperar 45 segundos
resultado_transcripcion=client.get_transcription_job(TranscriptionJobName=nombre_job)
debug_message(resultado_transcripcion)
estado = resultado_transcripcion['TranscriptionJob']['TranscriptionJobStatus']
url = resultado_transcripcion['TranscriptionJob']['Transcript']['TranscriptFileUri']
if estado == 'COMPLETED' :
respuesta =requests.get(url)
transcripcion = respuesta.json()
resultado_transcripcion = transcripcion['results']['transcripts'][0]['transcript']
st.text(resultado_transcripcion)
else:
debug_message("No se ha completado la transcripcion")
st.session_state['transcribiendo'] = False
#######################################################
#Funciones deteccion de imagenes
def detect_labels(bytes_image) :
client = boto3.client('rekognition')
response=client.detect_labels(Image={'Bytes':bytes_image})
etiquetas = response['Labels']
for etiqueta in etiquetas :
nombre = etiqueta['Name']
confianza = etiqueta['Confidence']
debug_message(f'{nombre} con confianza {confianza}')
def detect_faces(bytes_image) :
client = boto3.client('rekognition')
detecciones = [
'AGE_RANGE',
'BEARD',
'EMOTIONS',
'EYE_DIRECTION','EYEGLASSES','EYES_OPEN','GENDER','MOUTH_OPEN',
'MUSTACHE','FACE_OCCLUDED','SMILE','SUNGLASSES'
]
for deteccion in detecciones:
print("*********************************")
print(deteccion)
response=client.detect_faces(Image={'Bytes':bytes_image},Attributes=[deteccion])
debug_message(response)
def censure(bytes_image) :
client = boto3.client('rekognition')
response=client.detect_moderation_labels(Image = {'Bytes':bytes_image} )
debug_message(response['ModerationLabels'])
#######################################################
############### UI NLP ##############################
with tab_texto:
st.header('Natural Language Processing with AWS')
texto=st.text_area('Text to analyze','')
#debug_message(texto)
if texto != '' :
nlp_labels = ['Text2Speech','Translate','Sentiment Analysis','Speech2Text']
nlp_columns = st.columns(len(nlp_labels))
nlp_seleccionado = []
for i in range(len(nlp_labels)) :
activado=nlp_columns[i].checkbox(nlp_labels[i])
nlp_seleccionado.append( activado )
debug_message(nlp_seleccionado[i] )
#revisar que acciones fueron activadas
for i in range(len(nlp_seleccionado)) :
debug_message(i)
if nlp_seleccionado[i] :
match i :
case 0:
archivo_audio = archivo_salida_audio
text2speech(texto,archivo_audio,idioma)
hablar(archivo_audio)
break
case 1:
translate(texto,idioma,traducir)
break
case 2:
idioma_sentimiento = 'en'
if idioma == 'en-US' :
idioma_sentimiento = 'en'
elif idioma == 'es-US' :
idioma_sentimiento = 'es'
sentiment_analysis(texto,idioma_sentimiento)
break
case 3:
speech2text(archivo_salida_audio,idioma)
case _:
pass
if idioma == 'en-US' or idioma == 'es-US' :
st.sidebar.text(f'idioma seleccionado {idioma}')
else :
st.sidebar.write('Selecciona un idioma')
############### UI Imagenes ##############################
with tab_imagen:
st.header('Image Recognition with AWS')
image_file = st.file_uploader("Subir una imagen...")
if image_file is not None:
# To read file as bytes:
bytes_image = image_file.getvalue()
st.image(bytes_image)
img_columnas=st.columns(4)
with img_columnas[0] :
btn_detectar=st.button("Detectar etiquetas...")
if btn_detectar:
detect_labels(bytes_image)
with img_columnas[1]:
btn_caras=st.button("Detectar caras...")
if btn_caras :
detect_faces(bytes_image)
with img_columnas[2]:
btn_censura=st.button("Censurar ...")
if btn_censura :
censure(bytes_image)
with tab_video:
st.header('Video Recognition with AWS')
video_file = st.file_uploader("Subir un video...")
st.video(video_file)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment