Created
November 24, 2023 05:29
-
-
Save sarchak/440f0ca027d24ab74150016aa98f8d8f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from flask import Flask, render_template | |
from flask_sock import Sock | |
import json | |
from great_things import audio | |
app = Flask(__name__) | |
app.config["SOCK_SERVER_OPTIONS"] = {"ping_interval": 25} | |
sock = Sock(app) | |
import wave | |
import base64 | |
from pydub import AudioSegment | |
# Path to your WAV file | |
file_path = "output.mp3" | |
# Read the WAV file | |
audio = AudioSegment.from_mp3(file_path) | |
# Convert the audio to µ-law | |
mu_law_audio = audio.set_channels(1).set_frame_rate(8000).set_sample_width(1) | |
mu_law_audio.export("bingo.wav", format="wav") | |
# Export the µ-law audio to bytes | |
mu_law_bytes = mu_law_audio.raw_data | |
# Encode in base64 | |
base64_encoded_audio = base64.b64encode(mu_law_bytes) | |
base64_encoded_audio_str = base64_encoded_audio.decode("utf-8") | |
def log(msg, *args): | |
print(msg, *args) | |
@app.route("/twiml", methods=["GET", "POST"]) | |
def index(): | |
return render_template("streams.xml") | |
@sock.route("/") | |
def echo(ws): | |
log("Connection Accepted!") | |
played_tone = False | |
count = 0 | |
stream_sid = "" | |
while True: | |
message = ws.receive() | |
if message is None: | |
log("No message receieved!") | |
continue | |
data = json.loads(message) | |
if data["event"] == "connected": | |
log(message) | |
if data["event"] == "start": | |
stream_sid = data["streamSid"] | |
log(message) | |
if data["event"] == "media": | |
if int(data["media"]["chunk"]) >= 100 and not played_tone: | |
send_to_twilio = { | |
"event": "media", | |
"streamSid": stream_sid, | |
"media": {"payload": base64_encoded_audio_str}, | |
} | |
mark_message = { | |
"event": "mark", | |
"streamSid": stream_sid, | |
"mark": {"name": "This is my mark message!"}, | |
} | |
ws.send(json.dumps(send_to_twilio)) | |
log(f"Sending the following message to Twilio: {mark_message}") | |
ws.send(json.dumps(mark_message)) | |
played_tone = True | |
continue | |
log(message) | |
if data["event"] == "mark": | |
log("We got a mark message back from Twilio!") | |
log(message) | |
if data["event"] == "closed": | |
log(message) | |
ws.close() | |
break | |
if __name__ == "__main__": | |
app.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment