Skip to content

Instantly share code, notes, and snippets.

@AndreasKueck
Last active January 28, 2023 12:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AndreasKueck/7d69222274eef6574dfab76235ea5b6c to your computer and use it in GitHub Desktop.
Save AndreasKueck/7d69222274eef6574dfab76235ea5b6c to your computer and use it in GitHub Desktop.
parolrekono_per_twilio_k_vosk_realtempe
import audioop
import base64
import json
import os
from flask import Flask, request
from flask_sock import Sock, ConnectionClosed
from twilio.twiml.voice_response import VoiceResponse, Start
from twilio.rest import Client
import vosk
from vosk import Model, KaldiRecognizer, SetLogLevel
SetLogLevel(-1)
dosiero = open('rezulto.txt','w')
app = Flask(__name__)
sock = Sock(app)
twilio_client = Client()
# model = vosk.Model('model')
model = Model(lang="eo")
CL = '\x1b[0K'
BS = '\x08'
@app.route('/call', methods=['POST'])
def call():
"""Accept a phone call."""
response = VoiceResponse()
start = Start()
start.stream(url=f'wss://{request.host}/stream')
response.append(start)
response.say('Bonwole parolu nun', voice='Polly.Ewa')
response.pause(length=60)
print(f'Eniranta voko de {request.form["From"]}')
return str(response), 200, {'Content-Type': 'text/xml'}
@sock.route('/stream')
def stream(ws):
"""Receive and transcribe audio stream."""
rec = vosk.KaldiRecognizer(model, 16000)
while True:
message = ws.receive()
packet = json.loads(message)
if packet['event'] == 'start':
print('Elsendfluo startis')
elif packet['event'] == 'stop':
print('\nElsendfluo haltis')
elif packet['event'] == 'media':
audio = base64.b64decode(packet['media']['payload'])
audio = audioop.ulaw2lin(audio, 2)
audio = audioop.ratecv(audio, 2, 1, 8000, 16000, None)[0]
if rec.AcceptWaveform(audio):
r = json.loads(rec.Result())
print(CL + r['text'] + ' ', end='', flush=True)
dosiero.write(r['text'] + " SEKVA VOKO: ")
else:
r = json.loads(rec.PartialResult())
print(CL + r['partial'] + BS * len(r['partial']), end='', flush=True)
if __name__ == '__main__':
from pyngrok import ngrok
port = 5000
public_url = ngrok.connect(port, bind_tls=True).public_url
number = twilio_client.incoming_phone_numbers.list()[0]
number.update(voice_url=public_url + '/call')
print(f'Atendante vokojn al {number.phone_number}')
app.run(port=port)
import audioop
import base64
import json
import os
import requests
from flask import Flask, request
from flask_sock import Sock, ConnectionClosed
from twilio.twiml.voice_response import VoiceResponse, Start
from twilio.rest import Client
import vosk
from vosk import Model, KaldiRecognizer, SetLogLevel
SetLogLevel(-1)
app = Flask(__name__)
sock = Sock(app)
twilio_client = Client()
# model = vosk.Model('model')
model = Model(lang="eo")
CL = '\x1b[0K'
BS = '\x08'
@app.route('/call', methods=['POST'])
def call():
"""Accept a phone call."""
response = VoiceResponse()
start = Start()
start.stream(url=f'wss://{request.host}/stream')
response.append(start)
response.say('Bonwole parolu nun', voice='Polly.Ewa')
response.pause(length=60)
print(f'Eniranta voko de {request.form["From"]}')
return str(response), 200, {'Content-Type': 'text/xml'}
@sock.route('/stream')
def stream(ws):
"""Receive and transcribe audio stream."""
rec = vosk.KaldiRecognizer(model, 16000)
while True:
message = ws.receive()
packet = json.loads(message)
if packet['event'] == 'start':
print('Elsendfluo startis')
elif packet['event'] == 'stop':
print('\nElsendfluo haltis')
elif packet['event'] == 'media':
audio = base64.b64decode(packet['media']['payload'])
audio = audioop.ulaw2lin(audio, 2)
audio = audioop.ratecv(audio, 2, 1, 8000, 16000, None)[0]
if rec.AcceptWaveform(audio):
r = json.loads(rec.Result())
print(CL + r['text'] + ' ', end='', flush=True)
pload = {'teksto':r['text']}
rq = requests.post("https://script.google.com/macros/s/.../exec", pload)
else:
r = json.loads(rec.PartialResult())
print(CL + r['partial'] + BS * len(r['partial']), end='', flush=True)
if __name__ == '__main__':
from pyngrok import ngrok
ngrok.set_auth_token("mia_ngrok_shlosilo")
port = 5001
public_url = ngrok.connect(port, bind_tls=True).public_url
number = twilio_client.incoming_phone_numbers.list()[0]
number.update(voice_url=public_url + '/call')
print(f'Atendante vokojn al {number.phone_number}')
app.run(port=port)
@AndreasKueck
Copy link
Author

Jen klarigoj.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment