Skip to content

Instantly share code, notes, and snippets.

@maitrungduc1410
Last active March 27, 2019 09:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save maitrungduc1410/79f000781179e535c6a363129e0aacee to your computer and use it in GitHub Desktop.
Save maitrungduc1410/79f000781179e535c6a363129e0aacee to your computer and use it in GitHub Desktop.
Sg-decoding-ONLINE
MONGO_ROOT_USERNAME=root
MONGO_ROOT_PASSWORD=ce2pFD2jJNmqNJkG
MONGO_HOST=localhost
MONGO_PORT=27017
APP_MONGO_DB=sg-decoding-online
APP_MONGO_USER=user
APP_MONGO_PASS=ZfQDwQYN4T6PQARY
echo 'Creating application user and db'
mongo ${APP_MONGO_DB} \
--host localhost \
--port ${MONGO_PORT} \
-u ${MONGO_ROOT_USER} \
-p ${MONGO_ROOT_PASS} \
--authenticationDatabase admin \
--eval "db.createUser({user: '${APP_MONGO_USER}', pwd: '${APP_MONGO_PASS}', roles:[{role:'dbOwner', db: '${APP_MONGO_DB}'}]});"
from flask import Flask, request
from flask_bcrypt import Bcrypt
import jwt
import time
from datetime import datetime, timedelta
import pymongo
from pymongo import MongoClient
import json
app = Flask(__name__)
bcrypt = Bcrypt(app)
client = MongoClient('mongodb://user:ZfQDwQYN4T6PQARY@db:27017/sg-decoding-online') # for docker
# client = MongoClient('mongodb://localhost:27017/sg-decoding-online') # for local development
db = client['sg-decoding-online']
db.users.create_index([('username', 'text'), ('email', 'text')], unique=True)
users = db.users
@app.route('/')
def hello():
return "Hello World!"
@app.route('/createUser', methods=['POST'])
def createUser ():
username = request.form['username']
password = request.form['password']
email = request.form['email']
user = {"username":username,
"email": email,
"password": bcrypt.generate_password_hash(password).decode('utf-8'),
"role": 'user'}
try:
user_id = users.insert_one(user).inserted_id
access_token = jwt.encode({
'user_id': str(user_id),
'exp': datetime.utcnow() + timedelta(seconds=30 * 86400) # expires after 30 days
}, 'secret', algorithm='HS256').decode('utf-8')
users.find_one_and_update(
{'username': username},
{'$set': {'access_token': access_token}})
data = {
'success': True
}
return json.dumps(data)
except pymongo.errors.DuplicateKeyError:
data = {
'success': False,
'message': 'Username or email already exists',
}
return json.dumps(data)
@app.route('/generateAccessToken', methods=['POST'])
def generateAccessToken ():
username = request.form['username']
password = request.form['password']
user = users.find_one({"username": username})
if not user:
data = {
'success': False,
'message': 'Username doesn\'t exist',
}
return json.dumps(data)
pwdHash = user['password']
if bcrypt.check_password_hash(pwdHash, password):
access_token = jwt.encode({
'user_id': str(user['_id']),
'exp': datetime.utcnow() + timedelta(seconds=30 * 86400) # expires after 30 days
}, 'secret', algorithm='HS256').decode('utf-8')
users.find_one_and_update(
{'username': username},
{'$set': {'access_token': access_token}})
data = {
'success': True,
'username': user['username'],
'access_token': access_token
}
return json.dumps(data)
data = {
'success': False,
'message': 'Password is incorrect',
}
return json.dumps(data)
@app.route('/getAccessToken', methods=['POST'])
def getAccessToken ():
username = request.form['username']
password = request.form['password']
user = users.find_one({'username': username})
if not user:
data = {
'success': False,
'message': 'Username doesn\'t exist',
}
return json.dumps(data)
pwdHash = user['password']
if bcrypt.check_password_hash(pwdHash, password):
data = {
'success': True,
'username': user['username'],
'access_token': user['access_token']
}
return json.dumps(data)
data = {
'success': False,
'message': 'Password is incorrect',
}
return json.dumps(data)
@app.route('/revokeAccessToken', methods=['POST'])
def revokeAccessToken ():
username = request.form['username']
password = request.form['password']
user = users.find_one({'username': username})
if not user:
data = {
'success': False,
'message': 'Username doesn\'t exist',
}
return json.dumps(data)
pwdHash = user['password']
if bcrypt.check_password_hash(pwdHash, password):
users.find_one_and_update(
{'username': username},
{'$set': {'access_token': ''}})
data = {
'success': True
}
return json.dumps(data)
data = {
'success': False,
'message': 'Password is incorrect',
}
return json.dumps(data)
@app.route('/authenticate', methods=['POST'])
def authenticate ():
access_token = request.form['access_token']
user = users.find_one({'access_token': access_token})
if not user:
data = {
'success': False,
'message': 'Access Token doesn\'t exist',
}
return json.dumps(data)
result = jwt.decode(access_token, 'secret', algorithms=['HS256'])
if str(user['_id']) != result['user_id']:
data = {
'success': False,
'message': 'Token does not match with user',
}
return json.dumps(data)
if datetime.fromtimestamp(result['exp']) < datetime.utcnow():
data = {
'success': False,
'message': 'Token is expired',
}
return json.dumps(data)
data = {
'success': True
}
return json.dumps(data)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8002)
version: '2.1'
services:
webserver:
image: speechlab.azurecr.io/sgenglish-online-decoding-1018nnet3:1.1-mar05
restart: unless-stopped
ports:
- "8001:8001"
command: /opt/start.sh -p 8001
healthcheck:
test: ["CMD-SHELL", "[ curl -f http://localhost:8001/"]
interval: 30s
timeout: 30s
retries: 3
worker:
image: speechlab.azurecr.io/sgenglish-online-decoding-1018nnet3:1.1-mar05
restart: unless-stopped
command: /opt/start.sh -m 155.69.147.243 -p 8001
api:
image: maitrungduc1410/gstreamer-api:latest
ports:
- "8002:8002"
volumes:
- ./db.py:/app/db.py
depends_on:
- db
command: python3 db.py
restart: unless-stopped
db:
container_name: db
image: mongo
volumes:
- ./data:/data/db # data is stored in backend/data and is persistent, nextime when docker restart, our data won't be lost
- ./db-entrypoint/:/docker-entrypoint-initdb.d/
ports:
- "27017:27017"
environment:
- MONGO_INITDB_ROOT_USERNAME=${MONGO_ROOT_USERNAME}
- MONGO_INITDB_ROOT_PASSWORD=${MONGO_ROOT_PASSWORD}
- APP_MONGO_HOST=${MONGO_HOST}
- MONGO_PORT=${MONGO_PORT}
- MONGO_ROOT_USER=${MONGO_ROOT_USERNAME}
- MONGO_ROOT_PASS=${MONGO_ROOT_PASSWORD}
- APP_MONGO_USER=${APP_MONGO_USER}
- APP_MONGO_PASS=${APP_MONGO_PASS}
- APP_MONGO_DB=${APP_MONGO_DB}
restart: unless-stopped
FROM python:3.6.6-alpine3.6
ADD ./db.py /app/db.py
WORKDIR /app
RUN apk update && apk add libressl-dev postgresql-dev libffi-dev gcc musl-dev python3-dev
RUN pip3 install flask flask-bcrypt pymongo PyJWT
#!/usr/bin/env python
#
# Copyright 2013 Tanel Alumae
"""
Reads speech data via websocket requests, sends it to Redis, waits for results from Redis and
forwards to client via websocket
"""
import sys
import logging
import json
import codecs
import os.path
import uuid
import time
import threading
import functools
from Queue import Queue
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.websocket
import tornado.gen
import tornado.concurrent
import concurrent.futures
import settings
import common
import requests
baseUrl = 'http://13.76.82.135:8002'
def authRequest(token):
print("\033[1;32;40m Start authenticating... \n")
url = baseUrl + "/authenticate"
payload = {'access_token': token}
header = {"Content-type": "application/x-www-form-urlencoded"}
res = requests.post(url, data=payload, headers=header)
return res.json()
def authenticate(actionType, objectClass):
errorMessage = ''
if actionType == 'websocket':
if not 'token' in objectClass.request.arguments:
# send error message detail to client
event = dict(status=common.STATUS_NOT_AVAILABLE, message=('\033[1;31;40m No token provided' + '. \n \033[1;33;40m ' + errorMessage + ' \033[0m \n'))
objectClass.send_event(event)
objectClass.close()
return 1 # No token provided
else:
token = objectClass.request.arguments['token'][0].split('content-type')[0][:-1]
response = authRequest(token)
if not response['success']:
# send error message detail to client
event = dict(status=common.STATUS_NOT_AVAILABLE, message=('\033[1;31;40m ' + response['message'] + '. \n \033[1;33;40m ' + errorMessage + ' \033[0m \n'))
objectClass.send_event(event)
objectClass.close()
return 2 # authenticate faiked
else:
return 0 # success
else:
if not 'token' in objectClass.request.arguments:
objectClass.write('\033[1;31;40m No token provided' + '. \n \033[1;33;40m ' + errorMessage + ' \033[0m \n')
return 1 # No token provided
else:
token = objectClass.request.arguments['token'][0].split('content-type')[0]
response = authRequest(token)
if not response['success']:
objectClass.write('\033[1;31;40m ' + response['message'] + '. \n \033[1;33;40m ' + errorMessage + ' \033[0m \n')
return 2 # authenticate faiked
else:
return 0 # success
class Application(tornado.web.Application):
def __init__(self):
settings = dict(
cookie_secret="43oETzKXQAGaYdkL5gEmGeJJFuYh7EQnp2XdTP1o/Vo=",
template_path=os.path.join(os.path.dirname(os.path.dirname(__file__)), "templates"),
static_path=os.path.join(os.path.dirname(os.path.dirname(__file__)), "static"),
xsrf_cookies=False,
autoescape=None,
debug=True,
autoreload=True
)
handlers = [
(r"/", MainHandler),
(r"/client/ws/speech", DecoderSocketHandler),
(r"/client/ws/status", StatusSocketHandler),
(r"/client/dynamic/reference", ReferenceHandler),
(r"/client/dynamic/recognize", HttpChunkedRecognizeHandler),
(r"/worker/ws/speech", WorkerSocketHandler),
(r"/client/static/(.*)", tornado.web.StaticFileHandler, {'path': settings["static_path"]}),
]
tornado.web.Application.__init__(self, handlers, **settings)
self.available_workers = set()
self.status_listeners = set()
self.num_requests_processed = 0
def send_status_update_single(self, ws):
status = dict(num_workers_available=len(self.available_workers), num_requests_processed=self.num_requests_processed)
ws.write_message(json.dumps(status))
def send_status_update(self):
for ws in self.status_listeners:
self.send_status_update_single(ws)
def save_reference(self, content_id, content):
refs = {}
try:
with open("reference-content.json") as f:
refs = json.load(f)
except:
pass
refs[content_id] = content
with open("reference-content.json", "w") as f:
json.dump(refs, f, indent=2)
class MainHandler(tornado.web.RequestHandler):
def get(self):
checkAuth = authenticate('http', self)
if checkAuth == 0:
current_directory = os.path.dirname(os.path.abspath(__file__))
parent_directory = os.path.join(current_directory, os.pardir)
readme = os.path.join(parent_directory, "README.md")
self.render(readme)
def content_type_to_caps(content_type):
"""
Converts MIME-style raw audio content type specifier to GStreamer CAPS string
"""
default_attributes= {"rate": 16000, "format" : "S16LE", "channels" : 1, "layout" : "interleaved"}
media_type, _, attr_string = content_type.replace(";", ",").partition(",")
if media_type in ["audio/x-raw", "audio/x-raw-int"]:
media_type = "audio/x-raw"
attributes = default_attributes
for (key,_,value) in [p.partition("=") for p in attr_string.split(",")]:
attributes[key.strip()] = value.strip()
return "%s, %s" % (media_type, ", ".join(["%s=%s" % (key, value) for (key,value) in attributes.iteritems()]))
else:
return content_type
@tornado.web.stream_request_body
class HttpChunkedRecognizeHandler(tornado.web.RequestHandler):
"""
Provides a HTTP POST/PUT interface supporting chunked transfer requests, similar to that provided by
http://github.com/alumae/ruby-pocketsphinx-server.
"""
def prepare(self):
checkAuth = authenticate('http', self)
if checkAuth == 0:
self.id = str(uuid.uuid4())
self.final_hyp = ""
self.final_result_queue = Queue()
self.user_id = self.request.headers.get("device-id", "none")
self.content_id = self.request.headers.get("content-id", "none")
logging.info("%s: OPEN: user='%s', content='%s'" % (self.id, self.user_id, self.content_id))
self.worker = None
self.error_status = 0
self.error_message = None
#Waiter thread for final hypothesis:
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
try:
self.worker = self.application.available_workers.pop()
self.application.send_status_update()
logging.info("%s: Using worker %s" % (self.id, self.__str__()))
self.worker.set_client_socket(self)
content_type = self.request.headers.get("Content-Type", None)
if content_type:
content_type = content_type_to_caps(content_type)
logging.info("%s: Using content type: %s" % (self.id, content_type))
self.worker.write_message(json.dumps(dict(id=self.id, content_type=content_type, user_id=self.user_id, content_id=self.content_id)))
except KeyError:
logging.warn("%s: No worker available for client request" % self.id)
self.set_status(503)
self.finish("No workers available")
def data_received(self, chunk):
assert self.worker is not None
logging.debug("%s: Forwarding client message of length %d to worker" % (self.id, len(chunk)))
self.worker.write_message(chunk, binary=True)
def post(self, *args, **kwargs):
checkAuth = authenticate('http', self)
if checkAuth == 0:
self.end_request(args, kwargs)
def put(self, *args, **kwargs):
checkAuth = authenticate('http', self)
if checkAuth == 0:
self.end_request(args, kwargs)
@tornado.concurrent.run_on_executor
def get_final_hyp(self):
logging.info("%s: Waiting for final result..." % self.id)
return self.final_result_queue.get(block=True)
@tornado.web.asynchronous
@tornado.gen.coroutine
def end_request(self, *args, **kwargs):
logging.info("%s: Handling the end of chunked recognize request" % self.id)
assert self.worker is not None
self.worker.write_message("EOS", binary=True)
logging.info("%s: yielding..." % self.id)
hyp = yield self.get_final_hyp()
if self.error_status == 0:
logging.info("%s: Final hyp: %s" % (self.id, hyp))
response = {"status" : 0, "id": self.id, "hypotheses": [{"utterance" : hyp}]}
self.write(response)
else:
logging.info("%s: Error (status=%d) processing HTTP request: %s" % (self.id, self.error_status, self.error_message))
response = {"status" : self.error_status, "id": self.id, "message": self.error_message}
self.write(response)
self.application.num_requests_processed += 1
self.application.send_status_update()
self.worker.set_client_socket(None)
self.worker.close()
self.finish()
logging.info("Everything done")
def send_event(self, event):
event_str = str(event)
if len(event_str) > 100:
event_str = event_str[:97] + "..."
logging.info("%s: Receiving event %s from worker" % (self.id, event_str))
if event["status"] == 0 and ("result" in event):
try:
if len(event["result"]["hypotheses"]) > 0 and event["result"]["final"]:
if len(self.final_hyp) > 0:
self.final_hyp += " "
self.final_hyp += event["result"]["hypotheses"][0]["transcript"]
except:
e = sys.exc_info()[0]
logging.warn("Failed to extract hypothesis from recognition result:" + e)
elif event["status"] != 0:
self.error_status = event["status"]
self.error_message = event.get("message", "")
def close(self):
logging.info("%s: Receiving 'close' from worker" % (self.id))
self.final_result_queue.put(self.final_hyp)
class ReferenceHandler(tornado.web.RequestHandler):
def post(self, *args, **kwargs):
content_id = self.request.headers.get("Content-Id")
if content_id:
content = codecs.decode(self.request.body, "utf-8")
user_id = self.request.headers.get("User-Id", "")
self.application.save_reference(content_id, dict(content=content, user_id=user_id, time=time.strftime("%Y-%m-%dT%H:%M:%S")))
logging.info("Received reference text for content %s and user %s" % (content_id, user_id))
self.set_header('Access-Control-Allow-Origin', '*')
else:
self.set_status(400)
self.finish("No Content-Id specified")
def options(self, *args, **kwargs):
self.set_header('Access-Control-Allow-Origin', '*')
self.set_header('Access-Control-Allow-Methods', 'POST, OPTIONS')
self.set_header('Access-Control-Max-Age', 1000)
# note that '*' is not valid for Access-Control-Allow-Headers
self.set_header('Access-Control-Allow-Headers', 'origin, x-csrftoken, content-type, accept, User-Id, Content-Id')
class StatusSocketHandler(tornado.websocket.WebSocketHandler):
# needed for Tornado 4.0
def check_origin(self, origin):
return True
def open(self):
checkAuth = authenticate('websocket', self)
if checkAuth == 0: # in case successfully authenticated
logging.info("New status listener")
self.application.status_listeners.add(self)
self.application.send_status_update_single(self)
def on_close(self):
logging.info("Status listener left")
self.application.status_listeners.remove(self)
class WorkerSocketHandler(tornado.websocket.WebSocketHandler):
def __init__(self, application, request, **kwargs):
tornado.websocket.WebSocketHandler.__init__(self, application, request, **kwargs)
self.client_socket = None
# needed for Tornado 4.0
def check_origin(self, origin):
return True
def open(self):
checkAuth = authenticate('websocket', self)
if checkAuth == 0: # in case successfully authenticated
self.client_socket = None
self.application.available_workers.add(self)
logging.info("New worker available " + self.__str__())
self.application.send_status_update()
def on_close(self):
logging.info("Worker " + self.__str__() + " leaving")
self.application.available_workers.discard(self)
if self.client_socket:
self.client_socket.close()
self.application.send_status_update()
def on_message(self, message):
assert self.client_socket is not None
event = json.loads(message)
self.client_socket.send_event(event)
def set_client_socket(self, client_socket):
self.client_socket = client_socket
class DecoderSocketHandler(tornado.websocket.WebSocketHandler):
# needed for Tornado 4.0
def check_origin(self, origin):
return True
def send_event(self, event):
event["id"] = self.id
event_str = str(event)
if len(event_str) > 100:
event_str = event_str[:97] + "..."
logging.info("%s: Sending event %s to client" % (self.id, event_str))
self.write_message(json.dumps(event))
def open(self):
self.id = str(uuid.uuid4())
logging.info("%s: OPEN" % (self.id))
logging.info("%s: Request arguments: %s" % (self.id, " ".join(["%s=\"%s\"" % (a, self.get_argument(a)) for a in self.request.arguments])))
self.user_id = self.get_argument("user-id", "none", True)
self.content_id = self.get_argument("content-id", "none", True)
self.worker = None
checkAuth = authenticate('websocket', self)
if checkAuth == 0: # in case successfully authenticated
try:
self.worker = self.application.available_workers.pop()
self.application.send_status_update()
logging.info("%s: Using worker %s" % (self.id, self.__str__()))
self.worker.set_client_socket(self)
content_type = self.get_argument("content-type", None, True)
if content_type:
logging.info("%s: Using content type: %s" % (self.id, content_type))
self.worker.write_message(json.dumps(dict(id=self.id, content_type=content_type, user_id=self.user_id, content_id=self.content_id)))
except KeyError:
logging.warn("%s: No worker available for client request" % self.id)
event = dict(status=common.STATUS_NOT_AVAILABLE, message="No decoder available, try again later")
self.send_event(event)
self.close()
def on_connection_close(self):
logging.info("%s: Handling on_connection_close()" % self.id)
self.application.num_requests_processed += 1
self.application.send_status_update()
if self.worker:
try:
self.worker.set_client_socket(None)
logging.info("%s: Closing worker connection" % self.id)
self.worker.close()
except:
pass
def on_message(self, message):
assert self.worker is not None
logging.info("%s: Forwarding client message (%s) of length %d to worker" % (self.id, type(message), len(message)))
if isinstance(message, unicode):
self.worker.write_message(message, binary=False)
else:
self.worker.write_message(message, binary=True)
def main():
logging.basicConfig(level=logging.DEBUG, format="%(levelname)8s %(asctime)s %(message)s ")
logging.debug('Starting up server')
from tornado.options import define, options
define("certfile", default="", help="certificate file for secured SSL connection")
define("keyfile", default="", help="key file for secured SSL connection")
tornado.options.parse_command_line()
app = Application()
if options.certfile and options.keyfile:
ssl_options = {
"certfile": options.certfile,
"keyfile": options.keyfile,
}
logging.info("Using SSL for serving requests")
app.listen(options.port, ssl_options=ssl_options)
else:
app.listen(options.port)
tornado.ioloop.IOLoop.instance().start()
if __name__ == "__main__":
main()
@maitrungduc1410
Copy link
Author

maitrungduc1410 commented Mar 20, 2019

Prepare: create new folder (root folder), put all files into it. Notice of db-entrypoint.sh, you need to create folder named db-entrypoint then put the .sh file into it

  • Dockerfile (this file is for api service, I already published its image on docker hub. Build your own if needed)
  • db.py
  • client.py
  • docker-compose.yml
  • .env
  • db-entrypoint/db-entrypoint.sh

Run the app: at root folder, run docker-compose up

Test the app: at root folder

  • run: python client.py -u ws://localhost:8001/client/ws/speech?token=<your_token> -r 32000 <path to your audio file>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment