Skip to content

Instantly share code, notes, and snippets.

@abhinavlal
Created November 21, 2013 11:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save abhinavlal/7579915 to your computer and use it in GitHub Desktop.
Save abhinavlal/7579915 to your computer and use it in GitHub Desktop.
import sqlite3
import urllib, re
from flask import Flask, request, jsonify
from dbapi import g, closing, create_contact, get_user_by_email, get_contacts_by_created_by_id, \
get_contact_by_id, update_contacts, delete_entity_contact, get_contacts_by_phone, search_contacts
# configuration
DATABASE = '/home/ubuntu/ui-project/ui-project.db'
DEBUG = True
app = Flask(__name__)
app.config.from_object(__name__)
def connect_db():
return sqlite3.connect(app.config['DATABASE'])
def init_db():
with closing(connect_db()) as db:
with app.open_resource('schema.sql', mode='r') as f:
db.cursor().executescript(f.read())
db.commit()
@app.before_request
def before_request():
g.db = connect_db()
@app.teardown_request
def teardown_request(exception):
db = getattr(g, 'db', None)
if db is not None:
db.close()
@app.route('/contacts', methods=['POST'])
def add_contact():
created_by = request.headers.get('X-USER');
if not created_by:
return jsonify({'X-USER': 'Missing X-USER in headers'}), 401, {'Access-Control-Allow-Origin': '*','Access-Control-Allow-Headers': 'X-USER'}
user = get_user_by_email(created_by);
if len(user) == 0:
return jsonify({'404': 'Not exists'}), 404, {'Access-Control-Allow-Origin': '*','Access-Control-Allow-Headers': 'X-USER'}
created_by_id = user[0][0]
if not request.form.get('contact_name'):
return jsonify({'contact_name': 'Contact name is a required param'}), 400, {'Access-Control-Allow-Origin': '*','Access-Control-Allow-Headers': 'X-USER'}
if not request.form.get('contact_phone'):
return jsonify({'contact_phone': 'Contact phone is a required param'}), 400, {'Access-Control-Allow-Origin': '*','Access-Control-Allow-Headers': 'X-USER'}
if not re.search(r'^(\+\d{12}|\d{10})$',request.form.get('contact_phone')):
return jsonify({'contact_phone': 'Enter a valid number'}), 400, {'Access-Control-Allow-Origin': '*','Access-Control-Allow-Headers': 'X-USER'}
if request.form.get('contact_email') and not re.search(r'\w+@\w+',request.form.get('contact_email')):
return jsonify({'contact_email': 'Enter a valid email'}), 400, {'Access-Control-Allow-Origin': '*','Access-Control-Allow-Headers': 'X-USER'}
contacts = get_contacts_by_phone(request.form.get('contact_phone'))
if len(contacts) > 0:
return jsonify({'contact_phone': 'Phone number in use'}), 409, {'Access-Control-Allow-Origin': '*','Access-Control-Allow-Headers': 'X-USER'}
contact_id = create_contact(request.form, created_by_id)
contact = get_contact_by_id(contact_id, created_by_id)[0]
print contact
response_keys = {'id' : contact[0],'contact_name': contact[1], 'contact_phone': contact[2], 'contact_email': contact[3] }
return jsonify({'contacts': response_keys }), 201
@app.route('/contacts', methods=['GET'])
def get_contact():
created_by = request.headers.get('X-USER');
if not created_by:
return jsonify({'X-USER': 'Missing X-USER in headers'}), 401, {'Access-Control-Allow-Origin': '*','Access-Control-Allow-Headers': 'X-USER'}
user = get_user_by_email(created_by);
if len(user) == 0:
return jsonify({'404': 'User Not found'}), 404, {'Access-Control-Allow-Origin': '*','Access-Control-Allow-Headers': 'X-USER'}
created_by_id = user[0][0]
if request.args.get('q'):
contacts = search_contacts(created_by_id, request.args.get('q'))
else:
contacts = get_contacts_by_created_by_id(created_by_id)
response = []
for contact in contacts:
response_keys = {'id' : contact[0],'contact_name': contact[1], 'contact_phone': contact[2], 'contact_email': contact[3] }
response.append(response_keys)
return jsonify({'contacts' : response}), 200
@app.route('/contacts/<contact_id>', methods=['PUT'])
def update_contact(contact_id):
created_by = request.headers.get('X-USER');
if not created_by:
return jsonify({'X-USER': 'Missing X-USER in headers'}), 401, {'Access-Control-Allow-Origin': '*','Access-Control-Allow-Headers': 'X-USER'}
user = get_user_by_email(created_by);
if len(user) == 0:
return jsonify({'404': 'Page Not Found'}), 404, {'Access-Control-Allow-Origin': '*','Access-Control-Allow-Headers': 'X-USER'}
created_by_id = user[0][0]
contact = get_contact_by_id(contact_id, created_by_id)
if len(contact) == 0:
return jsonify({'404': 'Page Not Found'}), 404, {'Access-Control-Allow-Origin': '*','Access-Control-Allow-Headers': 'X-USER'}
contact_phone = contact[0][2]
if not request.form.get('contact_name'):
return jsonify({'contact_name': 'Contact name is a required param'}), 400, {'Access-Control-Allow-Origin': '*','Access-Control-Allow-Headers': 'X-USER'}
if not request.form.get('contact_phone'):
return jsonify({'contact_phone': 'Contact phone is a required param'}), 400, {'Access-Control-Allow-Origin': '*','Access-Control-Allow-Headers': 'X-USER'}
if not re.search(r'^(\+\d{12}|\d{10})$',request.form.get('contact_phone')):
return jsonify({'contact_phone': 'Enter a valid number'}), 400, {'Access-Control-Allow-Origin': '*','Access-Control-Allow-Headers': 'X-USER'}
if request.form.get('contact_email') and not re.search(r'\w+@\w+',request.form.get('contact_email')):
return jsonify({'contact_email': 'Enter a valid email'}), 400, {'Access-Control-Allow-Origin': '*','Access-Control-Allow-Headers': 'X-USER'}
if request.form.get('contact_phone') != contact_phone:
contacts = get_contacts_by_phone(request.form.get('contact_phone'))
if len(contacts) > 0:
return jsonify({'contacts': 'Phone number in use'}), 409, {'Access-Control-Allow-Origin': '*','Access-Control-Allow-Headers': 'X-USER'}
update_contacts(request.form, str(contact_id))
contact = get_contact_by_id(contact_id, created_by_id)[0]
response_keys = {'id' : contact[0],'contact_name': contact[1], 'contact_phone': contact[2], 'contact_email': contact[3] }
return jsonify({'contacts': response_keys}), 201
@app.route('/contacts/<contact_id>', methods=['DELETE'])
def delete_contact(contact_id):
created_by = request.headers.get('X-USER');
if not created_by:
return jsonify({'X-USER': 'Missing X-USER in headers'}), 401, {'Access-Control-Allow-Origin': '*','Access-Control-Allow-Headers': 'X-USER'}
user = get_user_by_email(created_by);
if len(user) == 0:
return jsonify({'404': 'Page Not Found'}), 404, {'Access-Control-Allow-Origin': '*','Access-Control-Allow-Headers': 'X-USER'}
contact = get_contact_by_id(contact_id, user[0][0])
if len(contact) == 0:
return jsonify({'404': 'Page Not Found'}), 404, {'Access-Control-Allow-Origin': '*','Access-Control-Allow-Headers': 'X-USER'}
delete_entity_contact(contact_id)
return jsonify({'contacts': 'Contact successfully deleted.'}), 201
@app.route('/contacts/<contact_id>', methods=['GET'])
def get_single_contact(contact_id):
created_by = request.headers.get('X-USER');
if not created_by:
return jsonify({'X-USER': 'Missing X-USER in headers'}), 401, {'Access-Control-Allow-Origin': '*','Access-Control-Allow-Headers': 'X-USER'}
user = get_user_by_email(created_by);
if len(user) == 0:
return jsonify({'404': 'Page Not Found'}), 404, {'Access-Control-Allow-Origin': '*','Access-Control-Allow-Headers': 'X-USER'}
contact = get_contact_by_id(contact_id, user[0][0])
if len(contact) == 0:
return jsonify({'404': 'Page Not Found'}), 404, {'Access-Control-Allow-Origin': '*','Access-Control-Allow-Headers': 'X-USER'}
contact = contact[0]
response_keys = {'id' : contact[0],'contact_name': contact[1], 'contact_phone': contact[2], 'contact_email': contact[3] }
return jsonify({'contacts': response_keys}), 201
if __name__ == '__main__':
app.run(host='127.0.0.1', port=3010)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment