Created
March 25, 2020 21:44
-
-
Save Porter97/22f3257f2f8f3874a8fdd8c7183b67c0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
import hashlib | |
from werkzeug.security import generate_password_hash, check_password_hash | |
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer | |
from flask import current_app | |
from flask_login import UserMixin, AnonymousUserMixin | |
from . import db, login_manager | |
import stream | |
class Permission: | |
FOLLOW = 1 | |
COMMENT = 2 | |
WRITE = 4 | |
MODERATE = 8 | |
ADMIN = 16 | |
class Role(db.Model): | |
__tablename__ = 'roles' | |
id = db.Column(db.Integer, primary_key=True) | |
name = db.Column(db.String(64), unique=True) | |
default = db.Column(db.Boolean, default=False, index=True) | |
permissions = db.Column(db.Integer) | |
users = db.relationship('User', backref='role', lazy='dynamic') | |
def __init__(self, **kwargs): | |
super(Role, self).__init__(**kwargs) | |
if self.permissions is None: | |
self.permissions = 0 | |
@staticmethod | |
def insert_roles(): | |
roles = { | |
'User': [Permission.FOLLOW, Permission.COMMENT, Permission.WRITE], | |
'Moderator': [Permission.FOLLOW, Permission.COMMENT, | |
Permission.WRITE, Permission.MODERATE], | |
'Administrator': [Permission.FOLLOW, Permission.COMMENT, | |
Permission.WRITE, Permission.MODERATE, | |
Permission.ADMIN], | |
} | |
default_role = 'User' | |
for r in roles: | |
role = Role.query.filter_by(name=r).first() | |
if role is None: | |
role = Role(name=r) | |
role.reset_permissions() | |
for perm in roles[r]: | |
role.add_permission(perm) | |
role.default = (role.name == default_role) | |
db.session.add(role) | |
db.session.commit() | |
def add_permission(self, perm): | |
if not self.has_permission(perm): | |
self.permissions += perm | |
def remove_permission(self, perm): | |
if self.has_permission(perm): | |
self.permissions -= perm | |
def reset_permissions(self): | |
self.permissions = 0 | |
def has_permission(self, perm): | |
return self.permissions & perm == perm | |
def __repr__(self): | |
return '<Role %r>' % self.name | |
class CollectionFollow(db.Model): | |
__tablename__ = 'collection_follows' | |
follower_id = db.Column(db.Integer, db.ForeignKey('users.id'), | |
primary_key=True) | |
collection_id = db.Column(db.Integer, db.ForeignKey('collections.id'), | |
primary_key=True) | |
timestamp = db.Column(db.DateTime, default=datetime.utcnow) | |
class Follow(db.Model): | |
__tablename__ = 'follows' | |
follower_id = db.Column(db.Integer, db.ForeignKey('users.id'), | |
primary_key=True) | |
followed_id = db.Column(db.Integer, db.ForeignKey('users.id'), | |
primary_key=True) | |
timestamp = db.Column(db.DateTime, default=datetime.utcnow) | |
class User(db.Model, UserMixin): | |
__tablename__ = 'users' | |
id = db.Column(db.Integer, primary_key=True) | |
email = db.Column(db.String(64), unique=True, index=True) | |
username = db.Column(db.String(64), unique=True, index=True) | |
role_id = db.Column(db.Integer, db.ForeignKey('roles.id')) | |
password_hash = db.Column(db.String(128)) | |
confirmed = db.Column(db.Boolean, default=False) | |
name = db.Column(db.String(64)) | |
about_me = db.Column(db.Text()) | |
member_since = db.Column(db.DateTime(), default=datetime.utcnow) | |
last_seen = db.Column(db.DateTime(), default=datetime.utcnow) | |
avatar_hash = db.Column(db.String(32)) | |
collections = db.relationship('Collection', backref='author', lazy='dynamic', cascade='all, delete-orphan') | |
followed = db.relationship('Follow', | |
foreign_keys=[Follow.follower_id], | |
backref=db.backref('follower', lazy='joined'), | |
lazy='dynamic', | |
cascade='all, delete-orphan') | |
followers = db.relationship('Follow', | |
foreign_keys=[Follow.followed_id], | |
backref=db.backref('followed', lazy='joined'), | |
lazy='dynamic', | |
cascade='all, delete-orphan') | |
followed_collection = db.relationship('CollectionFollow', | |
foreign_keys=[CollectionFollow.follower_id], | |
backref=db.backref('c_follower', lazy='joined'), | |
lazy='dynamic', | |
cascade='all, delete-orphan') | |
def __init__(self, **kwargs): | |
super(User, self).__init__(**kwargs) | |
if self.role is None: | |
if self.email == current_app.config['OFFBRAND_ADMIN']: | |
self.role = Role.query.filter_by(name='Administrator').first() | |
if self.role is None: | |
self.role = Role.query.filter_by(default=True).first() | |
if self.email is not None and self.avatar_hash is None: | |
self.avatar_hash = self.gravatar_hash() | |
self.follow(self) | |
@staticmethod | |
def add_self_follows(): | |
for user in User.query.all(): | |
if not user.is_following(user): | |
user.follow(user) | |
db.session.add(user) | |
db.session.commit() | |
@staticmethod | |
def add_self_collection_follows(): | |
for user in User.query.all(): | |
for collection in user.collections: | |
if not user.is_following_collection(collection): | |
user.follow_collection(collection) | |
db.session.add(user) | |
db.session.commit() | |
@property | |
def password(self): | |
raise AttributeError('password is not a readable attribute') | |
@password.setter | |
def password(self, password): | |
self.password_hash = generate_password_hash(password) | |
def verify_password(self, password): | |
return check_password_hash(self.password_hash, password) | |
def generate_confirmation_token(self, expiration=3600): | |
s = Serializer(current_app.config['SECRET_KEY'], expiration) | |
return s.dumps({'confirm': self.id}).decode('utf-8') | |
def confirm(self, token): | |
s = Serializer(current_app.config['SECRET_KEY']) | |
try: | |
data = s.loads(token.encode('utf-8')) | |
except: | |
return False | |
if data.get('confirm') != self.id: | |
return False | |
try: | |
# Attempt to add user to Stream | |
client = stream.connect(current_app.config["STREAM_API_KEY"], current_app.config['STREAM_SECRET']) | |
client.users.add(str(self.id), {"username": self.username, "gravatar": self.gravatar(size=40)}) | |
except: | |
# If attempt is unsuccessful, return False | |
return False | |
self.confirmed = True | |
db.session.add(self) | |
return True | |
def generate_reset_token(self, expiration=3600): | |
s = Serializer(current_app.config['SECRET_KEY'], expiration) | |
return s.dumps({'reset': self.id}).decode('utf-8') | |
@staticmethod | |
def reset_password(token, new_password): | |
s = Serializer(current_app.config['SECRET_KEY']) | |
try: | |
data = s.loads(token.encode('utf-8')) | |
except: | |
return False | |
user = User.query.get(data.get('reset')) | |
if user is None: | |
return False | |
user.password = new_password | |
db.session.add(user) | |
return True | |
def generate_email_change_token(self, new_email, expiration=3600): | |
s = Serializer(current_app.config['SECRET_KEY'], expiration) | |
return s.dumps( | |
{'change_email': self.id, 'new_email': new_email}).decode('utf-8') | |
def change_email(self, token): | |
s = Serializer(current_app.config['SECRET_KEY']) | |
try: | |
data = s.loads(token.encode('utf-8')) | |
except: | |
return False | |
if data.get('change_email') != self.id: | |
return False | |
new_email = data.get('new_email') | |
if new_email is None: | |
return False | |
if self.query.filter_by(email=new_email).first() is not None: | |
return False | |
self.email = new_email | |
self.avatar_hash = self.gravatar_hash() | |
db.session.add(self) | |
try: | |
# Attempt to add edited user to Stream | |
client = stream.connect(current_app.config["STREAM_API_KEY"], current_app.config['STREAM_SECRET']) | |
client.users.update(str(self.id), {"username": self.username, "gravatar": self.gravatar(size=40)}) | |
except: | |
# If attempt is unsuccessful, return False | |
return False | |
return True | |
def can(self, perm): | |
return self.role is not None and self.role.has_permission(perm) | |
def is_administrator(self): | |
return self.can(Permission.ADMIN) | |
def ping(self): | |
self.last_seen = datetime.utcnow() | |
db.session.add(self) | |
def gravatar_hash(self): | |
return hashlib.md5(self.email.lower().encode('utf-8')).hexdigest() | |
def gravatar(self, size=100, default='identicon', rating='g'): | |
url = 'https://secure.gravatar.com/avatar' | |
hash = self.avatar_hash or self.gravatar_hash() | |
return '{url}/{hash}?s={size}&d={default}&r={rating}'.format( | |
url=url, hash=hash, size=size, default=default, rating=rating) | |
def stream_user_token(self): | |
client = stream.connect(current_app.config['STREAM_API_KEY'], current_app.config['STREAM_SECRET']) | |
return client.create_user_token(str(self.id)) | |
def follow(self, user): | |
client = stream.connect(current_app.config['STREAM_API_KEY'], current_app.config['STREAM_SECRET']) | |
user_feed = client.feed("Notifications", str(self.id)) | |
if not self.is_following(user): | |
user_feed.follow("User", str(user.id)) | |
f = Follow(follower=self, followed=user) | |
db.session.add(f) | |
return True | |
def unfollow(self, user): | |
client = stream.connect(current_app.config['STREAM_API_KEY'], current_app.config['STREAM_SECRET']) | |
user_feed = client.feed("Notifications", str(self.id)) | |
f = self.followed.filter_by(followed_id=user.id).first() | |
if f: | |
user_feed.unfollow("User", str(user.id)) | |
db.session.delete(f) | |
return True | |
def is_following(self, user): | |
if user.id is None: | |
return False | |
return self.followed.filter_by( | |
followed_id=user.id).first() is not None | |
def is_followed_by(self, user): | |
if user.id is None: | |
return False | |
return self.followers.filter_by( | |
follower_id=user.id).first() is not None | |
def is_following_collection(self, collection): | |
if collection is None: | |
return False | |
if collection.id is None: | |
return False | |
return self.followed_collection.filter_by( | |
collection_id=collection.id).first() is not None | |
def follow_collection(self, collection): | |
client = stream.connect(current_app.config['STREAM_API_KEY'], current_app.config['STREAM_SECRET']) | |
user_feed = client.feed("Timeline", str(self.id)) | |
if not self.is_following_collection(collection): | |
user_feed.follow("Collections", str(collection.id)) | |
f = CollectionFollow(c_follower=self, following=collection) | |
db.session.add(f) | |
return True | |
def unfollow_collection(self, collection): | |
client = stream.connect(current_app.config['STREAM_API_KEY'], current_app.config['STREAM_SECRET']) | |
user_feed = client.feed("Timeline", str(self.id)) | |
f = self.followed_collection.filter_by( | |
collection_id=collection.id).first() | |
if f: | |
user_feed.unfollow("Collections", str(collection.id)) | |
db.session.delete(f) | |
return True | |
def __repr__(self): | |
return '<User %r>' % self.username | |
class AnonymousUser(AnonymousUserMixin): | |
def can(self, permissions): | |
return False | |
def is_administrator(self): | |
return False | |
def stream_user_token(self): | |
return None | |
login_manager.anonymous_user = AnonymousUser | |
@login_manager.user_loader | |
def load_user(user_id): | |
return User.query.get(int(user_id)) | |
class Collection(db.Model): | |
__tablename__ = 'collections' | |
id = db.Column(db.Integer, primary_key=True) | |
stream_id = db.Column(db.String(64)) | |
name = db.Column(db.String(64)) | |
description = db.Column(db.String(256)) | |
timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow) | |
author_id = db.Column(db.Integer, db.ForeignKey('users.id')) | |
content = db.relationship('Content', backref='collection', lazy='dynamic', cascade='all, delete-orphan') | |
user_followers = db.relationship('CollectionFollow', | |
foreign_keys=[CollectionFollow.collection_id], | |
backref=db.backref('following', lazy='joined'), | |
lazy='dynamic', | |
cascade='all, delete-orphan') | |
def has_content(self, url): | |
return self.content.filter_by(url=url).first() is not None | |
def add_to_stream(self): | |
# Initialize client and add activity to user feed | |
try: | |
client = stream.connect(current_app.config['STREAM_API_KEY'], current_app.config['STREAM_SECRET']) | |
client.feed("User", str(self.author_id)).add_activity({'actor': client.users.create_reference(str(self.author_id)), | |
'verb': 'create', | |
'object': 'Collection:' + str(self.id), | |
'create': self.name, | |
'foreign_id': 'Collection:' + str(self.id), | |
'description': self.description, | |
'time': self.timestamp | |
}) | |
return True | |
except: | |
# If the Stream Client throws an exception or there is a network issue | |
return False | |
def update_stream(self): | |
# Initialize client and update activity | |
try: | |
client = stream.connect(current_app.config['STREAM_API_KEY'], current_app.config['STREAM_SECRET']) | |
client.update_activity({'actor': client.users.create_reference(str(self.author_id)), | |
'verb': 'create', | |
'object': 'Collection:' + str(self.id), | |
'create': self.name, | |
'foreign_id': 'Collection:' + str(self.id), | |
'description': self.description, | |
'time': self.timestamp | |
}) | |
return True | |
except: | |
return False | |
def delete_from_stream(self): | |
# Initialize client and delete activity | |
try: | |
client = stream.connect(current_app.config['STREAM_API_KEY'], current_app.config['STREAM_SECRET']) | |
user_feed = client.feed('User', str(self.author_id)) | |
user_feed.remove_activity(foreign_id="Collection:" + str(self.id)) | |
return True | |
except: | |
return False | |
class Content(db.Model): | |
__tablename__ = 'content' | |
id = db.Column(db.Integer, primary_key=True) | |
stream_id = db.Column(db.String(64)) | |
image = db.Column(db.String, default=None) | |
title = db.Column(db.String(64)) | |
url = db.Column(db.String(128)) | |
description = db.Column(db.String(256)) | |
timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow) | |
collection_id = db.Column(db.Integer, db.ForeignKey('collections.id')) | |
def add_to_stream(self): | |
# Initialize client and add activity to user feed | |
try: | |
client = stream.connect(current_app.config['STREAM_API_KEY'], current_app.config['STREAM_SECRET']) | |
client.feed("Collections", str(self.collection_id)).add_activity({'actor': client.users.create_reference(str(self.collection.author_id)), | |
'verb': 'post', | |
'object': 'Content:' + str(self.id), | |
'post': self.title, | |
'url': self.url, | |
'image': self.image, | |
'description': self.description, | |
'time': self.timestamp, | |
'collection': { | |
'id': self.collection_id, | |
'name': self.collection.name | |
}, | |
'foreign_id': 'Content:' + str(self.id) | |
}) | |
return True | |
except: | |
# If the Stream Client throws an exception or there is a network issue | |
return False | |
def add_fields_to_stream(self, **kwargs): | |
# Update Stream with new/removed fields | |
try: | |
client = stream.connect(current_app.config['STREAM_API_KEY'], current_app.config['STREAM_SECRET']) | |
client.activity_partial_update(foreign_id='Content:' + str(self.id), | |
time=self.timestamp, | |
set=kwargs | |
) | |
return True | |
except: | |
return False | |
def remove_fields_from_stream(self, **kwargs): | |
try: | |
client = stream.connect(current_app.config['STREAM_API_KEY'], current_app.config['STREAM_SECRET']) | |
client.activity_partial_update(foreign_id='Content:' + str(self.id), | |
time=self.timestamp, | |
unset=kwargs | |
) | |
return True | |
except: | |
return False | |
def update_stream(self): | |
# Initialize client and update activity | |
try: | |
client = stream.connect(current_app.config['STREAM_API_KEY'], current_app.config['STREAM_SECRET']) | |
client.update_activity({'actor': client.users.create_reference(str(self.collection.author_id)), | |
'verb': 'post', | |
'object': 'Content:' + str(self.id), | |
'post': self.title, | |
'url': self.url, | |
'description': self.description, | |
'time': self.timestamp, | |
'collection': {'id': self.collection_id, | |
'name': self.collection.name}, | |
'foreign_id': 'Content:' + str(self.id) | |
}) | |
return True | |
except: | |
return False | |
def delete_from_stream(self): | |
# Initialize client and delete activity | |
try: | |
client = stream.connect(current_app.config['STREAM_API_KEY'], current_app.config['STREAM_SECRET']) | |
user_feed = client.feed('Collections', str(self.collection_id)) | |
user_feed.remove_activity(foreign_id="Content:" + str(self.id)) | |
return True | |
except: | |
return False |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment