Skip to content

Instantly share code, notes, and snippets.

@yekibud
Created August 6, 2019 02:43
Show Gist options
  • Save yekibud/af9c980ff6fdbf131211dab703f829f0 to your computer and use it in GitHub Desktop.
Save yekibud/af9c980ff6fdbf131211dab703f829f0 to your computer and use it in GitHub Desktop.
Python SCIM server
import os
import re
import uuid
from flask import Flask
from flask import render_template
from flask import request
from flask import url_for
from flask_socketio import SocketIO
from flask_socketio import emit
from flask_sqlalchemy import SQLAlchemy
import flask
app = Flask(__name__)
database_url = os.getenv('DATABASE_URL', 'sqlite:///test-users.db')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_DATABASE_URI'] = database_url
db = SQLAlchemy(app)
socketio = SocketIO(app)
class ListResponse():
def __init__(self, list, start_index=1, count=None, total_results=0):
self.list = list
self.start_index = start_index
self.count = count
self.total_results = total_results
def to_scim_resource(self):
rv = {
"schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"],
"totalResults": self.total_results,
"startIndex": self.start_index,
"Resources": []
}
resources = []
for item in self.list:
resources.append(item.to_scim_resource())
if self.count:
rv['itemsPerPage'] = self.count
rv['Resources'] = resources
return rv
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.String(36), primary_key=True)
active = db.Column(db.Boolean, default=False)
userName = db.Column(db.String(250),
unique=True,
nullable=False,
index=True)
familyName = db.Column(db.String(250))
middleName = db.Column(db.String(250))
givenName = db.Column(db.String(250))
def __init__(self, resource):
self.update(resource)
def update(self, resource):
for attribute in ['userName', 'active']:
if attribute in resource:
setattr(self, attribute, resource[attribute])
for attribute in ['givenName', 'middleName', 'familyName']:
if attribute in resource['name']:
setattr(self, attribute, resource['name'][attribute])
def to_scim_resource(self):
rv = {
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"],
"id": self.id,
"userName": self.userName,
"name": {
"familyName": self.familyName,
"givenName": self.givenName,
"middleName": self.middleName,
},
"active": self.active,
"meta": {
"resourceType": "User",
"location": url_for('user_get',
user_id=self.id,
_external=True),
# "created": "2010-01-23T04:56:22Z",
# "lastModified": "2011-05-13T04:42:34Z",
}
}
return rv
def scim_error(message, status_code=500):
rv = {
"schemas": ["urn:ietf:params:scim:api:messages:2.0:Error"],
"detail": message,
"status": str(status_code)
}
return flask.jsonify(rv), status_code
def send_to_browser(obj):
socketio.emit('user',
{'data': obj},
broadcast=True,
namespace='/test')
def render_json(obj):
rv = obj.to_scim_resource()
send_to_browser(rv)import os
import re
import uuid
from flask import Flask
from flask import render_template
from flask import request
from flask import url_for
from flask_socketio import SocketIO
from flask_socketio import emit
from flask_sqlalchemy import SQLAlchemy
import flask
app = Flask(__name__)
database_url = os.getenv('DATABASE_URL', 'sqlite:///test-users.db')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_DATABASE_URI'] = database_url
db = SQLAlchemy(app)
socketio = SocketIO(app)
class ListResponse():
def __init__(self, list, start_index=1, count=None, total_results=0):
self.list = list
self.start_index = start_index
self.count = count
self.total_results = total_results
def to_scim_resource(self):
rv = {
"schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"],
"totalResults": self.total_results,
"startIndex": self.start_index,
"Resources": []
}
resources = []
for item in self.list:
resources.append(item.to_scim_resource())
if self.count:
rv['itemsPerPage'] = self.count
rv['Resources'] = resources
return rv
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.String(36), primary_key=True)
active = db.Column(db.Boolean, default=False)
userName = db.Column(db.String(250),
unique=True,
nullable=False,
index=True)
familyName = db.Column(db.String(250))
middleName = db.Column(db.String(250))
givenName = db.Column(db.String(250))
def __init__(self, resource):
self.update(resource)
def update(self, resource):
for attribute in ['userName', 'active']:
if attribute in resource:
setattr(self, attribute, resource[attribute])
for attribute in ['givenName', 'middleName', 'familyName']:
if attribute in resource['name']:
setattr(self, attribute, resource['name'][attribute])
def to_scim_resource(self):
rv = {
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:User"],
"id": self.id,
"userName": self.userName,
"name": {
"familyName": self.familyName,
"givenName": self.givenName,
"middleName": self.middleName,
},
"active": self.active,
"meta": {
"resourceType": "User",
"location": url_for('user_get',
user_id=self.id,
_external=True),
# "created": "2010-01-23T04:56:22Z",
# "lastModified": "2011-05-13T04:42:34Z",
}
}
return rv
def scim_error(message, status_code=500):
rv = {
"schemas": ["urn:ietf:params:scim:api:messages:2.0:Error"],
"detail": message,
"status": str(status_code)
}
return flask.jsonify(rv), status_code
def send_to_browser(obj):
socketio.emit('user',
{'data': obj},
broadcast=True,
namespace='/test')
def render_json(obj):
rv = obj.to_scim_resource()
send_to_browser(rv)
return flask.jsonify(rv)
@socketio.on('connect', namespace='/test')
def test_connect():
for user in User.query.filter_by(active=True).all():
emit('user', {'data': user.to_scim_resource()})
@socketio.on('disconnect', namespace='/test')
def test_disconnect():
print('Client disconnected')
@app.route('/')
def hello():
return render_template('base.html')
@app.route("/scim/v2/Users/<user_id>", methods=['GET'])
def user_get(user_id):
try:
user = User.query.filter_by(id=user_id).one()
except:
return scim_error("User not found", 404)
return render_json(user)
@app.route("/scim/v2/Users", methods=['POST'])
def users_post():
user_resource = request.get_json(force=True)
user = User(user_resource)
user.id = str(uuid.uuid4())
db.session.add(user)
db.session.commit()
rv = user.to_scim_resource()
send_to_browser(rv)
resp = flask.jsonify(rv)
resp.headers['Location'] = url_for('user_get',
user_id=user.userName,
_external=True)
return resp, 201
@app.route("/scim/v2/Users/<user_id>", methods=['PUT'])
def users_put(user_id):
user_resource = request.get_json(force=True)
user = User.query.filter_by(id=user_id).one()
user.update(user_resource)
db.session.add(user)
db.session.commit()
return render_json(user)
@app.route("/scim/v2/Users/<user_id>", methods=['PATCH'])
def users_patch(user_id):
patch_resource = request.get_json(force=True)
for attribute in ['schemas', 'Operations']:
if attribute not in patch_resource:
message = "Payload must contain '{}' attribute.".format(attribute)
return message, 400
schema_patchop = 'urn:ietf:params:scim:api:messages:2.0:PatchOp'
if schema_patchop not in patch_resource['schemas']:
return "The 'schemas' type in this request is not supported.", 501
user = User.query.filter_by(id=user_id).one()
for operation in patch_resource['Operations']:
if 'op' not in operation and operation['op'] != 'replace':
continue
value = operation['value']
for key in value.keys():
setattr(user, key, value[key])
db.session.add(user)
db.session.commit()
return render_json(user)
@app.route("/scim/v2/Users", methods=['GET'])
def users_get():
query = User.query
request_filter = request.args.get('filter')
match = None
if request_filter:
match = re.match('(\w+) eq "([^"]*)"', request_filter)
if match:
(search_key_name, search_value) = match.groups()
search_key = getattr(User, search_key_name)
query = query.filter(search_key == search_value)
count = int(request.args.get('count', 100))
start_index = int(request.args.get('startIndex', 1))
if start_index < 1:
start_index = 1
start_index -= 1
query = query.offset(start_index).limit(count)
total_results = query.count()
found = query.all()
rv = ListResponse(found,
start_index=start_index,
count=count,
total_results=total_results)
return flask.jsonify(rv.to_scim_resource())
@app.route("/scim/v2/Groups", methods=['GET'])
def groups_get():
rv = ListResponse([])
return flask.jsonify(rv.to_scim_resource())
@app.route("/db", methods=['POST'])
def create_db():
db.create_all()
return "create_all OK"
if __name__ == "__main__":
try:
User.query.one()
except:
db.create_all()
# app.debug = True
socketio.run(app)
return flask.jsonify(rv)
@socketio.on('connect', namespace='/test')
def test_connect():
for user in User.query.filter_by(active=True).all():
emit('user', {'data': user.to_scim_resource()})
@socketio.on('disconnect', namespace='/test')
def test_disconnect():
print('Client disconnected')
@app.route('/')
def hello():
return render_template('base.html')
@app.route("/scim/v2/Users/<user_id>", methods=['GET'])
def user_get(user_id):
try:
user = User.query.filter_by(id=user_id).one()
except:
return scim_error("User not found", 404)
return render_json(user)
@app.route("/scim/v2/Users", methods=['POST'])
def users_post():
user_resource = request.get_json(force=True)
user = User(user_resource)
user.id = str(uuid.uuid4())
db.session.add(user)
db.session.commit()
rv = user.to_scim_resource()
send_to_browser(rv)
resp = flask.jsonify(rv)
resp.headers['Location'] = url_for('user_get',
user_id=user.userName,
_external=True)
return resp, 201
@app.route("/scim/v2/Users/<user_id>", methods=['PUT'])
def users_put(user_id):
user_resource = request.get_json(force=True)
user = User.query.filter_by(id=user_id).one()
user.update(user_resource)
db.session.add(user)
db.session.commit()
return render_json(user)
@app.route("/scim/v2/Users/<user_id>", methods=['PATCH'])
def users_patch(user_id):
patch_resource = request.get_json(force=True)
for attribute in ['schemas', 'Operations']:
if attribute not in patch_resource:
message = "Payload must contain '{}' attribute.".format(attribute)
return message, 400
schema_patchop = 'urn:ietf:params:scim:api:messages:2.0:PatchOp'
if schema_patchop not in patch_resource['schemas']:
return "The 'schemas' type in this request is not supported.", 501
user = User.query.filter_by(id=user_id).one()
for operation in patch_resource['Operations']:
if 'op' not in operation and operation['op'] != 'replace':
continue
value = operation['value']
for key in value.keys():
setattr(user, key, value[key])
db.session.add(user)
db.session.commit()
return render_json(user)
@app.route("/scim/v2/Users", methods=['GET'])
def users_get():
query = User.query
request_filter = request.args.get('filter')
match = None
if request_filter:
match = re.match('(\w+) eq "([^"]*)"', request_filter)
if match:
(search_key_name, search_value) = match.groups()
search_key = getattr(User, search_key_name)
query = query.filter(search_key == search_value)
count = int(request.args.get('count', 100))
start_index = int(request.args.get('startIndex', 1))
if start_index < 1:
start_index = 1
start_index -= 1
query = query.offset(start_index).limit(count)
total_results = query.count()
found = query.all()
rv = ListResponse(found,
start_index=start_index,
count=count,
total_results=total_results)
return flask.jsonify(rv.to_scim_resource())
@app.route("/scim/v2/Groups", methods=['GET'])
def groups_get():
rv = ListResponse([])
return flask.jsonify(rv.to_scim_resource())
@app.route("/db", methods=['POST'])
def create_db():
db.create_all()
return "create_all OK"
if __name__ == "__main__":
try:
User.query.one()
except:
db.create_all()
# app.debug = True
socketio.run(app)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment