Skip to content

Instantly share code, notes, and snippets.

@Lewiscowles1986
Last active May 14, 2017 17:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Lewiscowles1986/ea7d3245fdbb07c55d07 to your computer and use it in GitHub Desktop.
Save Lewiscowles1986/ea7d3245fdbb07c55d07 to your computer and use it in GitHub Desktop.
Python Flask REST server (Works for me on 2.7 & 3.4. 2.7 used to run tests)
from flask import Flask
from flask import g, abort, redirect, Response, request
import json
import sqlite3
import datetime
app = Flask(__name__)
@app.before_request
def db_connect():
g.conn = sqlite3.connect('people.db', detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES)
# MySQLdb.connect(host='127.0.0.1', user='test', passwd='password', db='test')
g.cursor = g.conn.cursor()
@app.after_request
def db_disconnect(response):
g.conn.commit()
g.cursor.close()
# g.conn.close()
return response
def query_db(query, data=(), none=False):
#print( json.dumps( data ) )
g.cursor.execute(query, data)
result = g.cursor.fetchall()
return None if none else result
@app.route("/")
def index():
obj = {
"operations":[
{"method":"GET","pattern":"/","description":"This list of operations"},
{"method":"GET","pattern":"/people","description":"Lists people"},
{"method":"GET","pattern":"/people/id/<int:id>","description":"Retrieves specified person from List"},
{"method":"POST","pattern":"/people","description":"adds person to List"},
{"method":"PUT","pattern":"/people/id/<int:id>","description":"updates person on List"},
{"method":"DELETE","pattern":"/people/id/<int:id>","description":"deletes person from List"},
{"method":"DELETE","pattern":"/people","description":"emptys List"}
]
}
data = json.dumps(obj)
return Response(''.join(['{"Message":"Operations List","result":',data,'}']), status=200, mimetype='application/json')
@app.route("/people", methods=['GET'])
def people():
result = query_db("SELECT * FROM people")
data = json.dumps(result)
resp = Response(''.join(['{"Message":"Records","results":',data,'}']), status=200, mimetype='application/json')
return resp
@app.route("/people/id/<int:id>", methods=['GET'])
def get_person(id):
result = query_db("SELECT * FROM people WHERE id = ?", [str(id)] )
if len( result ) > 0:
data = json.dumps(result)
resp = Response(''.join(['{"Message":"Record","result":"',data,'"}']), status=200, mimetype='application/json')
else:
abort(404)
return resp
@app.route("/people", methods=['POST'])
def add_person():
req_json = request.get_json()
query_db("INSERT INTO people (firstname, lastname) VALUES (?,?)", [ req_json['firstname'], req_json['lastname'] ], True )
resp = Response('{"Message":"Added"}', status=201, mimetype='application/json')
return resp
@app.route("/people/id/<int:id>", methods=['PUT'])
def update_person(id):
req_json = request.get_json()
query_db("UPDATE people SET firstname = ?, lastname = ? WHERE id = ?", [req_json['firstname'], req_json['lastname'], str(id) ], True )
resp = Response('{"Message":"Updated"}', status=202, mimetype='application/json')
return resp
@app.route("/people/id/<int:id>", methods=['DELETE'])
def remove_person(id):
query_db("DELETE FROM people WHERE id = ?", [str(id)], True )
resp = Response('{"Message":"Deleted"}', status=202, mimetype='application/json')
return resp
@app.route("/people", methods=['DELETE'])
def empty_people():
query_db("DELETE FROM people")
query_db("DELETE from sqlite_sequence where name=?", ['people']); # Reset Auto-Increment
resp = Response('{"Message":"Deleted All"}', status=202, mimetype='application/json')
return resp
@app.errorhandler(404)
def page_not_found(e):
return Response('{"Message":"Resource Does Not Exist"}', status=404, mimetype='application/json')
if __name__ == "__main__":
app.run(host='0.0.0.0',port=8000,debug=True)
CREATE TABLE `people` (
`id` INTEGER PRIMARY KEY AUTOINCREMENT,
`firstname` TEXT,
`lastname` TEXT
);
window.CD2 = window.CD2 || {};
window.CD2.utils = window.CD2.utils || {};
window.CD2.utils.form = window.CD2.utils.form || {};
window.CD2.utils.xhr = window.CD2.utils.xhr || {};
window.CD2.utils.xhr.CB = window.CD2.utils.xhr.CB || {};
//
// Default callback to manipulate xhr by adding onloadend event handler
//
window.CD2.utils.xhr.CB.logMsgOnDone = function( ) {
var done = function(){ if(this.readyState == this.DONE) { console.log( 'data sent', this ); } };
this.onreadystatechange = done;
};
//
// ensures xhrCB is valid or default
//
window.CD2.utils.xhr.ensureXHRCB = function() {
if ( typeof xhrCB !== 'function' ) {
xhrCB = window.CD2.utils.xhr.CB.logMsgOnDone;
}
};
//
// Send JSON via xhr
//
window.CD2.utils.xhr.sendJSON = function( data, action, method, xhrCB ) {
if ( typeof xhrCB !== 'function' ) {
xhrCB = window.CD2.utils.xhr.CB.logMsgOnDone;
}
// construct an HTTP request
var xhr = new XMLHttpRequest();
xhr.open(method, action, true);
xhr.setRequestHeader('Content-Type', 'application/json; charset=UTF-8');
// send the collected data as JSON
if( data != null ) {
xhr.send( data );
} else {
xhr.send();
}
xhrCB.call( xhr );
};
//
// Accepts xhrCB callable so you can operate directly on xhr, by default xhrCB
//
window.CD2.utils.form.postJSONBodyOnSubmit = function( selector, xhrCB ) {
form = document.querySelector( selector );
window.CD2.utils.xhr.ensureXHRCB.apply( this );
if ( obj instanceof HTMLElement ) {
form.onsubmit = function (e) {
// stop the regular form submission
e.preventDefault();
// collect the form data while iterating over the inputs
var data = {};
for (var i = 0; i < form.length; i++) {
var input = form[i];
if (input.name) {
data[input.name] = input.value;
}
}
window.CD2.utils.xhr.sendJSON(
JSON.stringify(data),
form.action,
form.method,
xhrCB
);
};
}
};
// window.CD2.utils.xhr.sendJSON( null, 'http://127.0.0.1:8000', 'GET' ); // get service description
// window.CD2.utils.xhr.sendJSON( null, 'http://127.0.0.1:8000/', 'GET' ); // get service description
// window.CD2.utils.xhr.sendJSON( null, 'http://127.0.0.1:8000/people', 'GET' ); // get list of people
// window.CD2.utils.xhr.sendJSON( '{"firstname": "FirstName", "lastname":"LastName"}', 'http://127.0.0.1:8000/people', 'POST' ); // add a new person
// window.CD2.utils.xhr.sendJSON( null, 'http://127.0.0.1:8000/people', 'GET' ); // get list of people
// window.CD2.utils.xhr.sendJSON( null, 'http://127.0.0.1:8000/people/id/1', 'GET' ); // get specific person
// window.CD2.utils.xhr.sendJSON( '{"firstname": "Lewis", "lastname":"Cowles"}', 'http://127.0.0.1:8000/people/id/1', 'PUT' ); // modify a new person
// window.CD2.utils.xhr.sendJSON( '{"firstname": "Another", "lastname":"Person"}', 'http://127.0.0.1:8000/people', 'POST' ); // add a new person
// window.CD2.utils.xhr.sendJSON( '{"firstname": "Some", "lastname":"Person"}', 'http://127.0.0.1:8000/people', 'POST' ); // add a new person
// window.CD2.utils.xhr.sendJSON( null, 'http://127.0.0.1:8000/people', 'GET' ); // get list of people
// window.CD2.utils.xhr.sendJSON( null, 'http://127.0.0.1:8000/people/id/2', 'DELETE' ); // delete specific person
// window.CD2.utils.xhr.sendJSON( null, 'http://127.0.0.1:8000/people', 'GET' ); // get list of people
// window.CD2.utils.xhr.sendJSON( null, 'http://127.0.0.1:8000/people', 'DELETE' ); // delete all people
// window.CD2.utils.xhr.sendJSON( null, 'http://127.0.0.1:8000/people', 'GET' ); // get list of people
import unittest, logging, sys, time, json
import app
class AppTestCase( unittest.TestCase ):
def setUp( self ):
#self.db_fd, flaskr.app.config['DATABASE'] = tempfile.mkstemp()
app.app.config[ 'TESTING' ] = True
self.app = app.app.test_client()
self.app.delete( '/people' )
#flaskr.init_db()
def tearDown( self ):
pass
def checkResponse( self, resp, status = '200 OK', mime='application/json' ):
#log = logging.getLogger()
#log.debug( "%r", resp.status )
return status == resp.status and mime == resp.mimetype
def clearCreateMin( self, num ):
self.app.delete( '/people' ) # clear record-set
target = int( num ) + 1
for i in range( 1, target ): # create at least this many records
headers = [ ( 'Content-Type', 'application/json' ) ]
data = ''.join( [ '{"firstname": "Lewis', str( i ), '", "lastname":"Cowles', str( i ), '"}' ] )
headers.append( ( 'Content-Length', len( data ) ) )
resp = self.app.post( '/people', data=data, headers=headers )
self.assertTrue( self.checkResponse( resp, '201 CREATED' ) )
def test_non_exist_operations( self ):
resp = self.app.get( '/nonexist' )
self.assertIn( '404', resp.status )
def test_get_operations( self ):
resp = self.app.get('/')
#log = logging.getLogger()
#log.debug( "%r", resp.data )
self.assertIn( '{', resp.data )
self.assertIn( '}', resp.data )
self.assertIn( '"Message":"Operations List"', resp.data )
self.assertTrue( self.checkResponse( resp ) )
def test_multi_inserts( self ):
for i in range( 1, 26 ):
headers = [ ( 'Content-Type', 'application/json' ) ]
data = ''.join( [ '{"firstname": "Lewis', str( i ), '", "lastname":"Cowles', str( i ), '"}' ] )
headers.append( ( 'Content-Length', len( data ) ) )
resp = self.app.post( '/people', data=data, headers=headers )
self.assertTrue( self.checkResponse( resp, '201 CREATED' ) )
resp = self.app.get( '/people' )
self.assertTrue( self.checkResponse( resp ) )
self.assertIn( '"Lewis', resp.data )
self.assertIn( '"Cowles', resp.data )
def test_get_single_record_exists( self ):
self.clearCreateMin( 3 )
resp = self.app.get( '/people/id/3' )
self.assertTrue( self.checkResponse( resp ) )
self.assertIn( '"Lewis3', resp.data )
self.assertIn( '"Cowles3', resp.data )
def test_update_multi_records( self ):
self.clearCreateMin( 11 )
for i in range( 3, 10 ):
headers = [ ( 'Content-Type', 'application/json' ) ]
data = ''.join( [ '{"firstname": "Updated', str( i ), '", "lastname":"Updaterson', str( i ), '"}' ] )
headers.append( ( 'Content-Length', len( data ) ) )
resp = self.app.put( ''.join(['/people/id/',str(i)]), data=data, headers=headers )
self.assertTrue( self.checkResponse( resp, '202 ACCEPTED' ) )
resp = self.app.get( '/people' )
self.assertTrue( self.checkResponse( resp ) )
self.assertIn( '"Updated3', resp.data )
self.assertIn( '"Updaterson3', resp.data )
self.assertIn( '"Updated4', resp.data )
self.assertIn( '"Updaterson4', resp.data )
self.assertIn( '"Updated5', resp.data )
self.assertIn( '"Updaterson5', resp.data )
self.assertIn( '"Updated6', resp.data )
self.assertIn( '"Updaterson6', resp.data )
self.assertIn( '"Updated7', resp.data )
self.assertIn( '"Updaterson7', resp.data )
self.assertIn( '"Updated8', resp.data )
self.assertIn( '"Updaterson8', resp.data )
self.assertIn( '"Updated9', resp.data )
self.assertIn( '"Updaterson9', resp.data )
def test_get_single_record_noexists( self ):
resp = self.app.get( '/people/id/9999' )
self.assertTrue( self.checkResponse( resp, '404 NOT FOUND' ) )
def test_delete_record( self ):
resp = self.app.delete( '/people/id/7' )
self.assertTrue( self.checkResponse( resp, '202 ACCEPTED' ) )
def test_delete_all_records( self ):
resp = self.app.delete( '/people' )
self.assertTrue( self.checkResponse( resp, '202 ACCEPTED' ) )
if __name__ == '__main__':
logging.basicConfig( stream=sys.stderr )
logging.getLogger( ).setLevel( logging.DEBUG )
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment