Skip to content

Instantly share code, notes, and snippets.

@cobookman
Last active November 25, 2021 14:26
Show Gist options
  • Save cobookman/ea507d5825464bbc0757e1b1c2492fde to your computer and use it in GitHub Desktop.
Save cobookman/ea507d5825464bbc0757e1b1c2492fde to your computer and use it in GitHub Desktop.
Speech API + Tornado + Python3 + WSS

How to setup this PoC. In this example we'll be using user credentials vs service accounts.

  1. Create a GCP Project. For example let's call it tivo-test

  2. In the cloud console, open VPC Network->Firewall Rules. Firewall Rule

  3. Enable the speech API. (Follow steps in slides)

  4. Create a VM in the GCP project running Ubuntu.

  5. SSH into the vm and run the following commands:

   # Install python3 & pip
   $ sudo apt-get update && sudo apt-get install python3-pip python3 -y
   
   # Install python dependencies
   $ sudo pip3 install -r requirements.txt

   # Generate self signed SSH Certificates. Fill out the prompts with some bogus dummy data
   $ ./gen_cert.sh

   # Run the webserver
   $ sudo python3 main.py
  1. Visit your VM's external ip address. For example https://35.193.65.170/
  2. Disregard the self signed certificate warning and proceed anyways.
  3. Accept microphone recording.
# This generates a self sigend certificate for your web service
mkdir tls
openssl req -nodes -new -x509 -keyout tls/server.key -out tls/server.cert
<!DOCTYPE html>
<html>
<head>
<title>Speech PoC</title>
</head>
<body>
<h2>Transcribed text</h2>
<div id="socket-server">wss://35.193.65.170/translate</div>
<div id="status"></div>
<div id="log"></div>
<script>
(function(window) {
var app = {
socket: null,
bufferSize: 4096,
$status: window.document.getElementById("status"),
$log: window.document.getElementById("log"),
main: function() {
if(!this.init()) {
alert("Your browser does not support websockets / audio context.");
return;
}
this.socket = new WebSocket("wss://35.193.65.170/translate");
this.socket.addEventListener("open", this.onOpen.bind(this));
this.socket.addEventListener("message", this.onMessage.bind(this));
this.socket.addEventListener("close", this.onClose.bind(this));
},
init: function() {
try {
// Shims
window.AudioContext = window.AudioContext || window.webkitAudioContext;
navigator.getUserMedia = navigator.getUserMedia ||
navigator.webkitGetUserMedia ||
navigator.mozGetUserMedia ||
navigator.msGetUserMedia;
} catch (e) {
console.error(e);
return false;
}
return (navigator.getUserMedia && window.AudioContext);
},
onOpen: function(event) {
this.startRecording(this.onAudio.bind(this)).then(() => {
this.$status.textContent = "Recording";
}).catch((error) => {
alert("Failed to start recording audio :'(");
console.log(error);
});
},
onMessage: function(event) {
this.$log.innerHTML += [
"<p class=\"message\">",
" <code>",
" " + event.data,
" </code>",
"</p>"
].join("\n");
},
onClose: function(event) {
this.$status.textContent = "Stopped Transcoding & Recording stopped";
},
/**
* Starts recording audio from the computers microphone,
* and has the data hit a javascript function.
*/
startRecording: function(callback) {
return navigator.mediaDevices.getUserMedia({"audio": true, "video": false}).then((stream) => {
var context = new window.AudioContext();
// send metadata on audio stream to backend
this.socket.send(JSON.stringify({
sampleRateHz: context.sampleRate,
language: "en-US",
format: "LINEAR16"
}));
// Capture mic audio data into a stream
var audioIn = context.createMediaStreamSource(stream);
// only record mono audio and call a js function after a buffer of
// size this.bufferSize is full
var recorder = context.createScriptProcessor(this.bufferSize, 1, 1);
//specify the processing function
recorder.onaudioprocess = callback;
// connect audioIn data to the recorder
audioIn.connect(recorder)
// connect recorder's output to previous destination
recorder.connect(context.destination);
});
},
/**
* Converts audio stored as float32 to be Linear 16 bit PCM
* data. Aka FLOAT322->INT16
*/
float32ToLinear16: function(float32Arr) {
var int16 = new Int16Array(float32Arr.length);
for (var i =0; i < float32Arr.length; ++i) {
// force number in [-1, 1]
var s = Math.max(-1, Math.min(1, float32Arr[i]));
// convert 32 bit float -> 16 bit int.
// 0x7fff = max 16 bit num. 0x8000 = min 16 bit num.
int16[i] = s < 0 ? s * 0x8000 : s * 0x7FFF;
}
return int16;
},
onAudio: function(event) {
var float32Audio = event.inputBuffer.getChannelData(0) || new Float32Array(this.bufferSize);
var linear16 = this.float32ToLinear16(float32Audio);
this.socket.send(linear16.buffer);
}
};
app.main();
})(window);
</script>
</body>
import json
import queue
import threading
from tornado import websocket, web, ioloop, httpserver
from google.cloud import speech
from google.cloud import logging
logger = logging.Client().logger("speech-poc")
class IndexHandler(web.RequestHandler):
def get(self):
self.render("index.html")
class AudioWebSocket(websocket.WebSocketHandler):
def check_origin(self, origin):
return True
def open(self):
self.transcoder = None
print("WebSocket opened")
def on_message(self, message):
if self.transcoder is None:
config = json.loads(message)
print(config)
self.transcoder = Transcoder(
sample_rate=config["sampleRateHz"],
language_code=config["language"])
# Start the transcoding of audio to text
self.transcoder.start()
else:
self.transcoder.write(message)
interim_results = self.transcoder.interim_results()
if len(interim_results) != 0:
print(interim_results)
logger.log_struct({
"request": "/translate",
"interim_results": interim_results})
self.write_message(json.dumps(interim_results))
def on_close(self):
self.transcoder.stop()
self.transcoder = None
class AudioStream(object):
"""An iteratable object which holds audio data that is pending processing."""
def __init__(self):
self.buff = queue.Queue()
self.closed = False
def __iter__(self):
return self
def __next__(self):
return self.next()
def write(self, data):
self.buff.put(data)
def close(self):
self.closed = True
self.buff.clear()
def next(self):
while not self.closed:
chunk = self.buff.get()
if chunk is not None:
return chunk
raise StopIteration
class Transcoder(object):
"""Coordinates the translation of a raw audio stream."""
def __init__(self, sample_rate, language_code,
encoding=speech.enums.RecognitionConfig.AudioEncoding.LINEAR16):
self.sample_rate = sample_rate
self.language_code = language_code
self.encoding = encoding
self.closed = True
self.audio = AudioStream()
self.result_queue = queue.Queue()
def write(self, data):
"""Write a chunk of audio to be translated."""
self.audio.write(data)
def start(self):
"""Start transcoding audio."""
self.closed = False
thread = threading.Thread(target=self._process)
thread.start()
def stop(self):
"""Stop transcoding audio."""
self.closed = True
self.audio.close()
def _process(self):
"""Handles the setup of translation request, and retreving audio chunks in queue."""
client = speech.SpeechClient()
config = speech.types.RecognitionConfig(
encoding=self.encoding,
sample_rate_hertz=self.sample_rate,
language_code=self.language_code)
streaming_config = speech.types.StreamingRecognitionConfig(
config=config,
interim_results=True)
# Give it a config and a generator which procduces audio chunks. in return
# you get an iterator of results
responses = client.streaming_recognize(streaming_config, self.generator())
# This will block until there's no more interim translation results left
for response in responses:
self.result_queue.put(self._response_to_dict(response))
def _response_to_dict(self, response):
"""Converts a response from streaming api to python dict."""
if response is None:
return []
output = []
for result in response.results:
r = {}
r["stability"] = result.stability
r["is_final"] = result.is_final
r["alternatives"] = []
for alt in result.alternatives:
r["alternatives"].append({
"transcript": alt.transcript,
"confidence": alt.confidence,
})
output.append(r)
return output
def interim_results(self, max_results=10):
"""Grabs interm results from the queue."""
results = []
while len(results) < max_results and not self.result_queue.empty():
try:
result = self.result_queue.get(block=False)
except queue.QueueEmpty:
return results
results.append(result)
return results
def generator(self):
"""Generator that yields audio chunks."""
for chunk in self.audio:
yield speech.types.StreamingRecognizeRequest(audio_content=chunk)
app = web.Application([
(r"/", IndexHandler),
(r"/index.html", IndexHandler),
(r"/translate", AudioWebSocket),
])
if __name__ == "__main__":
ssl_options = {
"certfile": "tls/server.cert",
"keyfile": "tls/server.key",
}
server = httpserver.HTTPServer(
app, xheaders=True, ssl_options=ssl_options)
server.listen(443)
ioloop.IOLoop.current().start()
google-cloud-speech==0.27.1
google-cloud-logging==1.1.0
tornado==4.5.1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment