Skip to content

Instantly share code, notes, and snippets.

@c0bra
Created January 29, 2024 15:22
Show Gist options
  • Save c0bra/bacadd4ed1de2d18d58307458f403d66 to your computer and use it in GitHub Desktop.
Save c0bra/bacadd4ed1de2d18d58307458f403d66 to your computer and use it in GitHub Desktop.
Attempt: Twilio->Deepgram live streaming transcription in Node.js
import fs from 'fs'
import Stream from 'node:stream'
import WaveFile from 'wavefile';
import { WebSocketServer } from 'ws';
import { createClient, LiveTranscriptionEvents } from '@deepgram/sdk';
const deepgram = createClient(process.env.DEEEPGRAM_API_KEY);
process.on('unhandledRejection', (reason, p) => {
console.error(reason, 'Unhandled Rejection at Promise', p);
});
let transformStream
websocket_server();
function websocket_server() {
const wss = new WebSocketServer({
port: 6000,
});
wss.on('connection', async function connection(ws, req) {
if (req.url !== '/twilio') {
console.log("Invalid URL, must be /twilio")
ws.send({ error: "Invalid URL, must be /twilio" })
ws.close();
return;
}
transformStream = await deepgram_connect()
console.log("New connection from ", req.socket.remoteAddress)
// twilio sends audio data as 160 byte messages containing 20ms of audio each
// we will buffer 20 twilio messages corresponding to 0.4 seconds of audio to improve throughput performance
const BUFFERED_MESSAGES = 20
const BUFFER_SIZE = BUFFERED_MESSAGES * 160
let inbuffer = Buffer.alloc(0)
let outbuffer = Buffer.alloc(0)
let inbound_chunks_started = false
let outbound_chunks_started = false
let latest_inbound_timestamp = 0
let latest_outbound_timestamp = 0
let totalInFile = new Float32Array(0);
let totalOutFile = new Float32Array(0);
ws.on('close', () => {
console.log("Twilio websocket connection closed")
const allFile = new WaveFile.WaveFile();
allFile.fromScratch(2, 8000, '16', [totalInFile, totalOutFile])
fs.writeFileSync('all.wav', allFile.toBuffer(), 'binary')
});
ws.on('message', (msg) => {
const data = JSON.parse(Buffer.from(msg).toString());
if (data.event === 'start') {
const start = data.start
const callsid = start.callSid
} else if (data.event === 'connected') {
return;
} else if (data.event === 'media') {
const media = data.media
const chunk = Buffer.from(media.payload, "base64");
if (media.track === 'inbound') {
// fills in silence if there have been dropped packets
if (inbound_chunks_started) {
if (latest_inbound_timestamp + BUFFERED_MESSAGES < parseInt(media.timestamp, 10)) {
const bytes_to_fill = 8 * (parseInt(media.timestamp, 10) - (latest_inbound_timestamp + BUFFERED_MESSAGES))
// NOTE: 0xff is silence for mulaw audio
// and there are 8 bytes per ms of data for our format (8 bit, 8000 Hz)
inbuffer = Buffer.concat([inbuffer, Buffer.alloc(bytes_to_fill).fill(0xff)])
}
} else {
inbound_chunks_started = true
latest_inbound_timestamp = parseInt(media.timestamp, 10)
// this basically sets the starting point for outbound timestamps
latest_outbound_timestamp = latest_inbound_timestamp - BUFFERED_MESSAGES
}
latest_inbound_timestamp = parseInt(media.timestamp, 10)
inbuffer = Buffer.concat([inbuffer, chunk])
} else if (media.track === 'outbound') {
outbound_chunks_started = true
if (latest_outbound_timestamp + BUFFERED_MESSAGES < parseInt(media.timestamp, 10)) {
const bytes_to_fill = 8 * (parseInt(media.timestamp, 10) - (latest_outbound_timestamp + BUFFERED_MESSAGES))
// NOTE: 0xff is silence for mulaw audio
// and there are 8 bytes per ms of data for our format (8 bit, 8000 Hz)
outbuffer = Buffer.concat([outbuffer, Buffer.alloc(bytes_to_fill).fill(0xff)])
}
latest_outbound_timestamp = parseInt(media.timestamp)
outbuffer = Buffer.concat([outbuffer, chunk])
}
} else if (data.stop) {
// ws.close()
}
while (inbuffer.length >= BUFFER_SIZE && outbuffer.length >= BUFFER_SIZE) {
const wavIn = new WaveFile.WaveFile();
wavIn.fromScratch(1, 8000, '8m', inbuffer.subarray(0, BUFFER_SIZE))
wavIn.fromMuLaw()
const wavOut = new WaveFile.WaveFile();
wavOut.fromScratch(1, 8000, '8m', outbuffer.subarray(0, BUFFER_SIZE))
wavOut.fromMuLaw()
const mixed = new WaveFile.WaveFile();
mixed.fromScratch(2, 8000, '16', [wavIn.getSamples(true, Float32Array), wavOut.getSamples(true, Float32Array)])
transformStream.write(mixed.toBuffer())
// clearing buffers
inbuffer = inbuffer.subarray(BUFFER_SIZE)
outbuffer = outbuffer.subarray(BUFFER_SIZE)
}
})
ws.on('error', console.error);
});
// server.listen(6000);
console.log("Websocket server listening on port 6000\n")
return wss;
}
async function deepgram_connect() {
console.log("Connecting to Deepgram...")
const connection = deepgram.listen.live({
sample_rate: 8000,
channels: 2,
multichannel: true,
// encoding: 'mulaw',
});
const keepAlive = setInterval(() => {
console.log("deepgram: keepalive");
connection.keepAlive();
}, 10 * 1000);
return new Promise((resolve, reject) => {
connection.on(LiveTranscriptionEvents.Open, () => {
connection.on(LiveTranscriptionEvents.Close, (evt) => {
console.log(`Deepgram connection closed: [${evt.code}] ${evt.reason}`);
clearInterval(keepAlive);
// connection.finish();
});
// connection.on(LiveTranscriptionEvents.Metadata, (data) => {
// console.log('Metadata:', data);
// });
connection.on(LiveTranscriptionEvents.Transcript, (data) => {
console.log('Transcript:', data.channel.alternatives);
});
connection.on(LiveTranscriptionEvents.Error, (error) => {
console.error(error);
})
connection.on(LiveTranscriptionEvents.Warning, (error) => {
console.error(error);
})
// transformStream = new Stream.PassThrough();
const chunkHandler = (chunk, encoding, cb) => {
if (!chunk) return;
if (connection.getReadyState() === 1) {
connection.send(chunk)
connection.send(buf)
} else if (connection.getReadyState() >= 2 /* 2 = CLOSING, 3 = CLOSED */) {
console.log("socket: data couldn't be sent to deepgram");
console.log("socket: retrying connection to deepgram");
/* Attempt to reopen the Deepgram connection */
connection.removeAllListeners();
deepgram_connect()
.then((stream) => {
transformStream = stream;
transformStream.on('data', chunkHandler)
})
} else {
console.log("socket: data couldn't be sent to deepgram");
}
if (cb) cb(null);
}
// I've tried both transform and pass-through streams, but both have the same issue
transformStream = new Stream.Transform({
transform: chunkHandler,
});
transformStream.on('end', () => {
console.log("transform-stream: end");
})
resolve(transformStream);
});
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment