Skip to content

Instantly share code, notes, and snippets.

@gmyrianthous
Created November 21, 2021 16:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gmyrianthous/523ca6e0ac8275d04de58090be35a4e7 to your computer and use it in GitHub Desktop.
Save gmyrianthous/523ca6e0ac8275d04de58090be35a4e7 to your computer and use it in GitHub Desktop.
Real-time Speech-To-Text Tutorial - Part 3
import json
import base64
import asyncio
import websockets
SAMPLE_RATE=16000
FRAMES_PER_BUFFER = 3200
API_KEY = '<your AssemblyAI Key goes here>'
ASSEMBLYAI_ENDPOINT = f'wss://api.assemblyai.com/v2/realtime/ws?sample_rate={SAMPLE_RATE}'
async def speech_to_text():
"""
Asynchronous function used to perfrom real-time speech-to-text using AssemblyAI API
"""
async with websockets.connect(
ASSEMBLYAI_ENDPOINT,
ping_interval=5,
ping_timeout=20,
extra_headers=(('Authorization', API_KEY), ),
) as ws_connection:
await asyncio.sleep(0.5)
await ws_connection.recv()
print('Websocket connection initialised')
async def send_data():
"""
Asynchronous function used for sending data
"""
while True:
try:
data = audio_stream.read(FRAMES_PER_BUFFER)
data = base64.b64encode(data).decode('utf-8')
await ws_connection.send(json.dumps({'audio_data': str(data)}))
except Exception as e:
print(f'Something went wrong. Error code was {e.code}')
break
await asyncio.sleep(0.5)
return True
async def receive_data():
"""
Asynchronous function used for receiving data
"""
while True:
try:
received_msg = await ws_connection.recv()
print(json.loads(received_msg)['text'])
except Exception as e:
print(f'Something went wrong. Error code was {e.code}')
break
data_sent, data_received = await asyncio.gather(send_data(), receive_data())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment