Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save wabiloo/391793bc22ac17ceeeab14e4ed0a9d83 to your computer and use it in GitHub Desktop.
Save wabiloo/391793bc22ac17ceeeab14e4ed0a9d83 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"nbformat": 4,
"nbformat_minor": 0,
"metadata": {
"colab": {
"name": "Experiments - Flask + ngrok/localtunnel.ipynb",
"private_outputs": true,
"provenance": []
},
"kernelspec": {
"name": "python3",
"display_name": "Python 3"
},
"language_info": {
"name": "python"
}
},
"cells": [
{
"cell_type": "markdown",
"source": [
"# Experiments - Local Flask server + public URL & tunnel\n"
],
"metadata": {
"id": "8mJOfKG8pmoi"
}
},
{
"cell_type": "markdown",
"source": [
"## flask_ngrok"
],
"metadata": {
"id": "OFyeXdeUIf2k"
}
},
{
"cell_type": "code",
"source": [
"%%capture\n",
"!pip install flask\n",
"!pip install flask_ngrok\n",
"!pip install flask_cors\n",
"!pip install jq"
],
"metadata": {
"id": "HQjWVRywOZNy"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"!ngrok config add-authtoken 1iuqc1as456AoHWs4lSDULYXSey_5TK3zqkVMpFg6NYrsjWei"
],
"metadata": {
"id": "O-ATbjQVQ7Pb"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"## imports\n",
"import flask\n",
"from flask import Flask\n",
"from flask_cors import CORS\n",
"from flask_ngrok import run_with_ngrok\n",
"import threading\n",
"import trace \n",
"import sys\n",
"import time\n",
"\n",
"## create Flask app\n",
"app = Flask(__name__)\n",
"run_with_ngrok(app)\n",
"CORS(app)\n",
"\n",
"## a global variable your application can update which the server can provide\n",
"## This can easily be replaced by an alternative request to an actual DB to avoid a global variable\n",
"YOURDATA = []\n",
"\n",
"@app.route('/data', methods=['GET'])\n",
"def get_signals():\n",
" # this is returned to the client - note, it will take data_obj, and convert it to a Javascript object\n",
" data_obj = {'data': YOURDATA }\n",
" print(data_obj)\n",
" return response\n",
"\n",
"class server_thread_wrapper(threading.Thread): \n",
" def __init__(self, *args, **keywords): \n",
" threading.Thread.__init__(self, *args, **keywords) \n",
" self.killed = False\n",
"\n",
" def start(self): \n",
" self.__run_backup = self.run \n",
" self.run = self.__run \n",
" threading.Thread.start(self) \n",
" \n",
" def __run(self): \n",
" sys.settrace(self.globaltrace) \n",
" self.__run_backup() \n",
" self.run = self.__run_backup \n",
" \n",
" def globaltrace(self, frame, event, arg): \n",
" if event == 'call': \n",
" return self.localtrace \n",
" else: \n",
" return None\n",
" \n",
" def localtrace(self, frame, event, arg): \n",
" if self.killed: \n",
" if event == 'line': \n",
" raise SystemExit() \n",
" return self.localtrace \n",
" \n",
" def kill(self): \n",
" self.killed = True\n",
"\n",
"def start_server():\n",
" server = server_thread_wrapper(target=app.run)\n",
" server.start()\n",
" return server\n",
"\n",
"def kill_server():\n",
" server.kill()\n",
" server.join()\n",
" print(\"Server killed\")\n",
"\n",
"## start the server\n",
"server = start_server()\n",
"## temp wait\n",
"time.sleep(5)\n",
"## show the url to copy to access the server\n",
"print(\"\\n\\nCopy this address to the Observable notebook!\")\n",
"! curl -s http://localhost:4040/api/tunnels \n",
"# ! curl -s http://localhost:4040/api/tunnels | jq -r '.tunnels[0].public_url'\n",
"## kill the server\n",
"# kill_server()"
],
"metadata": {
"id": "rL3uhhHGM5_3"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"! curl -s http://localhost:4040/api/tunnels "
],
"metadata": {
"id": "4webC5vzOw3Q"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"kill_server()"
],
"metadata": {
"id": "oWzQ2a9jPS9M"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"## pyngrok"
],
"metadata": {
"id": "_2lFqdfSptfN"
}
},
{
"cell_type": "code",
"source": [
"!pip install pyngrok\n",
"!pip install flask"
],
"metadata": {
"id": "HXn68gJCp2ot"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"import os\n",
"import threading\n",
"\n",
"from flask import Flask\n",
"from pyngrok import ngrok, conf\n",
"\n",
"os.environ[\"FLASK_ENV\"] = \"development\"\n",
"\n",
"app = Flask(__name__)\n",
"port = 5000\n",
"\n",
"conf.get_default().auth_token = \"1iuqc1as456AoHWs4lSDULYXSey_5TK3zqkVMpFg6NYrsjWei\"\n",
"\n",
"# Open a ngrok tunnel to the HTTP server\n",
"public_url = ngrok.connect(port).public_url\n",
"print(\" * ngrok tunnel \\\"{}\\\" -> \\\"http://127.0.0.1:{}\\\"\".format(public_url, port))\n",
"\n",
"# Update any base URLs to use the public ngrok URL\n",
"app.config[\"BASE_URL\"] = public_url\n",
"\n",
"# ... Update inbound traffic via APIs to use the public-facing ngrok URL\n",
"\n",
"\n",
"# Define Flask routes\n",
"@app.route(\"/\", methods=['GET', 'POST'])\n",
"def index():\n",
" print(\"Inbound!\")\n",
" return \"Hello from Colab!\"\n"
],
"metadata": {
"id": "tI4e-y-xq9Ov"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"# Start the Flask server here (blocking)\n",
"app.run(use_reloader=False)"
],
"metadata": {
"id": "ZdYUxBfn9cID"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"# Start the Flask server in a new thread\n",
"t = threading.Thread(target=app.run, kwargs={\"use_reloader\": False})\n",
"t.start()"
],
"metadata": {
"id": "xCvS8JbB9ZxT"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"t.kill()\n",
"t.join()"
],
"metadata": {
"id": "giXfJy43rsXK"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"## Localtunnel"
],
"metadata": {
"id": "NU4cCUAnxBN9"
}
},
{
"cell_type": "code",
"source": [
"# Install\n",
"!npm install -g localtunnel"
],
"metadata": {
"id": "se_iUJqTxC8t"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"get_ipython().system_raw('lt --port 5000 --subdomain bmll > url.txt 2>&1 &')"
],
"metadata": {
"id": "4a2DSVfwzZuw"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"!cat url.txt"
],
"metadata": {
"id": "uQM1AAa0xKME"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"res = get_ipython().getoutput('lt --port 5000 --subdomain bmll &')\n",
"res"
],
"metadata": {
"id": "wtivWD8R12tq"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"!nohup lt --port 5000 --subdomain bmll4 > localtunnel.txt 2>&1 &\n",
"!sleep 1\n",
"output = !cat localtunnel.txt\n",
"\n",
"import re\n",
"regex = r\"(?i)\\b((?:https?://|www\\d{0,3}[.]|[a-z0-9.\\-]+[.][a-z]{2,4}/)(?:[^\\s()<>]+|\\(([^\\s()<>]+|(\\([^\\s()<>]+\\)))*\\))+(?:\\(([^\\s()<>]+|(\\([^\\s()<>]+\\)))*\\)|[^\\s`!()\\[\\]{};:'\\\".,<>?«»“”‘’]))\"\n",
"urls = re.findall(regex, output[0]) \n",
"public_url = urls[0][0]\n",
"print(public_url)"
],
"metadata": {
"id": "_uN1or9DzNNV"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"## doens't work\n",
"%%bash --bg --out url_lines\n",
"lt --port 5000 --subdomain bmll2"
],
"metadata": {
"id": "xULvLoQr0EfS"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"### Flask"
],
"metadata": {
"id": "D7aNo4Sj1zLK"
}
},
{
"cell_type": "code",
"source": [
"import os\n",
"import json\n",
"\n",
"from flask import Flask, request, jsonify\n",
"\n",
"os.environ[\"FLASK_ENV\"] = \"development\"\n",
"\n",
"app = Flask(__name__)\n",
"port = 5000\n",
"\n",
"# Update any base URLs to use the public ngrok URL\n",
"app.config[\"BASE_URL\"] = public_url\n",
"\n",
"# ... Update inbound traffic via APIs to use the public-facing ngrok URL\n",
"\n",
"\n",
"# Define Flask routes\n",
"@app.route(\"/\", methods=['GET'])\n",
"def index():\n",
" print(\"Inbound!\")\n",
" return \"Hello from Colab!\"\n",
"\n",
"@app.route(\"/\", methods=['POST'])\n",
"def index_post():\n",
" print(json.dumps(request.json))\n",
" return \"Received!\"\n",
"\n",
"@app.route(\"/heartbeat\")\n",
"def heartbeat():\n",
" return jsonify({\"status\": \"healthy\"})\n"
],
"metadata": {
"id": "xa4Qgcouxjpt"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"\n",
"app.run(use_reloader=False)"
],
"metadata": {
"id": "MTGFH5yex_mI"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"### Audio Notification test"
],
"metadata": {
"id": "7UP9jUd2YQjr"
}
},
{
"cell_type": "code",
"source": [
"from google.colab import output\n",
"output.eval_js('new Audio(\"https://upload.wikimedia.org/wikipedia/commons/0/05/Beep-09.ogg\").play()')"
],
"metadata": {
"id": "Ux975A39YOPt"
},
"execution_count": null,
"outputs": []
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment