Skip to content

Instantly share code, notes, and snippets.

@dusual
Created September 18, 2013 12:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dusual/6608184 to your computer and use it in GitHub Desktop.
Save dusual/6608184 to your computer and use it in GitHub Desktop.
flask test
from flask import Flask, request, render_template,session
import plivohelper
import os
from simplekv.memory import DictStore
from flaskext.kvsession import KVSessionExtension
from datetime import datetime
import flask
import pickle
import urllib
store = DictStore()
app = Flask(__name__)
app.debug = True
REST_API_URL = 'http://127.0.0.1:8088'
API_VERSION = 'v0.1'
SID = '1234'
AUTH_TOKEN = '1234'
#Redirect All prints to error stream
import sys
sys.stdout = sys.stderr
app.config['SECRET_KEY'] = 'topsecret'
app.kvsession = KVSessionExtension(store, app)
@app.errorhandler(404)
def page_not_found(error):
"""error page"""
print "404 page not found"
return 'This URL does not exist', 404
@app.route('/ringing/', methods=['GET', 'POST'])
def ringing():
"""ringing URL"""
# Post params- 'to': ringing number, 'request_uuid': request id given at the time of api call
print "We got a ringing notification"
return "OK"
@app.route('/action/', methods=['GET', 'POST'])
def action():
""" action url """
# import pudb
# pudb.set_trace()
current_call_uuid = request.form.get('ALegRequestUUID')
if request.method == 'POST':
digits=request.form.getlist('Digits')[0]
session_args = pickle.loads(flask.session.store.get(current_call_uuid))
session_args['KEY_PRESSED'] = digits
flask.session.store.put(current_call_uuid,pickle.dumps(session_args))
r = plivohelper.Response()
r.addPlay("/usr/local/freeswitch/sounds/algo/order-script-part-3-if-order-placed.wav")
print "RESTXML Response => %s" % r
return render_template('response_template.xml', response=r)
else:
return "No Get Allowed" , 403
@app.route('/hangup/', methods=['GET', 'POST'])
def hangup():
"""hangup URL"""
# Post params- 'request_uuid': request id given at the time of api call,
# 'CallUUID': unique id of call, 'reason': reason of hangup
request_dict = request.form.to_dict()
response_dict = dict()
response_dict['call_uuid'] = request_dict['RequestUUID']
response_dict['END_TIME'] = str(datetime.now()).split('.')[0]
response_dict['STATUS'] = request_dict['HangupCause']
current_call_uuid = request.form.get('ALegRequestUUID')
session_args = pickle.loads(flask.session.store.get(current_call_uuid))
response_args = urllib.urlencode(response_dict)
session_args = urllib.urlencode(session_args)
all_args = session_args + '&' + response_args
from urllib2 import urlopen
base_url = "https://iorder-in.rder.com//mos-ivr.in/receivedIvr.html?"
url = base_url + all_args
print "calling following url with params " + url
try:
response = urlopen(url)
except Exception,e:
print "%s error raised from the called url" %e
print "Recieved hangup , the cause was %s" %(request.form.getlist('HangupCause')[0])
return "OK"
@app.route('/dhanyawad/', methods=['GET', 'POST'])
def dhanyawad():
r = plivohelper.Response()
r.addPlay("/usr/local/freeswitch/sounds/algo/dhanyawad.wav")
current_call_uuid = request.form.get('ALegRequestUUID')
session_args = pickle.loads(flask.session.store.get(current_call_uuid))
session_args['KEY_PRESSED'] = None
flask.session.store.put(current_call_uuid,pickle.dumps(session_args))
print "RESTXML Response => %s" % r
return render_template('response_template.xml', response=r)
@app.route('/answered/', methods=['GET', 'POST'])
def answered():
# Post params- 'CallUUID': unique id of call, 'Direction': direction of call,
# 'To': Number which was called, 'From': calling number,
# If Direction is outbound then 2 additional params:
# 'ALegUUID': Unique Id for first leg,
# 'ALegRequestUUID': request id given at the time of api call
current_call_uuid = request.form.get('ALegRequestUUID')
r = plivohelper.Response()
d = r.addGetDigits(action="http://127.0.0.1:5000/action/",
timeout=10, numDigits=1, playBeep=True,
)
d.addPlay("/usr/local/freeswitch/sounds/algo/order-script-part-1.wav")
root_fs_algo = "/usr/local/freeswitch/sounds/algo/"
session_args = pickle.loads(flask.session.store.get(current_call_uuid))
file_list = session_args['PRODUCT_LIST'].split(',')
files = [os.path.join(root_fs_algo,fl) for fl in file_list]
for f in files:
d.addPlay(f)
r.addRedirect("http://127.0.0.1:5000/dhanyawad/")
print "RESTXML Response => %s" % r
return render_template('response_template.xml', response=r)
@app.route('/dial/', methods=['GET', 'POST'])
def dial():
""" dials the initial number provided """
try:
to_number = request.args.getlist('MOBILE_NUMBER')[0]
from_number = request.args.getlist('from')[0]
transaction_id = request.args.getlist('TRANSACTION_ID')[0]
call_type = request.args.getlist('CALL_TYPE')[0]
product_list = request.args.getlist('PRODUCT_LIST')[0]
customer_code = request.args.getlist('CUSTOMER_CODE')[0]
except IndexError:
return "One of the required varibales missing"
plivo = plivohelper.REST(REST_API_URL, SID, AUTH_TOKEN, API_VERSION)
call_params = {
'From': '%s' %from_number,
'To' : '%s' %to_number ,
'Gateways' : "user/,user/",
'GatewayCodecs' : "'PCMA,PCMU','PCMA,PCMU'", # Codec string as needed by FS for each gateway separated by comma
'AnswerUrl' : "http://127.0.0.1:5000/answered/",
'HangupUrl' : "http://127.0.0.1:5000/hangup/",
'RingUrl' : "http://127.0.0.1:5000/ringing/",
'extra_dial_string':"hangup_after_bridge=true",
}
try:
r = plivo.call(call_params)
except Exception, e:
print e
raise
flask.session.generate(r['RequestUUID'])
session_args = {'MOBILE_NUMBER':to_number,
'TRANSACTION_ID': transaction_id,
'CALL_TYPE': call_type,
'PRODUCT_LIST':product_list,
'CUSTOMER_CODE':customer_code,
'START_TIME':str(datetime.now()).split('.')[0],
}
flask.session.store.put(session.sid_s,pickle.dumps(session_args))
print "Call executed , call uuid is %s " %(r['RequestUUID'])
return render_template('response_template.xml', response=r)
if __name__ == '__main__':
app.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment