Skip to content

Instantly share code, notes, and snippets.

@dylanbstorey
Last active July 25, 2017 17:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dylanbstorey/9d914d6c781c480ab3ec3ed9eee52868 to your computer and use it in GitHub Desktop.
Save dylanbstorey/9d914d6c781c480ab3ec3ed9eee52868 to your computer and use it in GitHub Desktop.
SQL Alchemy
from dateutil.parser import parse
from skyscout.db_models import *
from sqlalchemy.exc import IntegrityError
import logging
logger = logging.getLogger(__name__)
def drop_all():
db.drop_all()
def init_db():
db.create_all()
def insert_record(obj, json_return=True):
"""
Convenience method for inserting objects.
:param obj: A described declarative_base object from the described tables. Can be a list of objects.
:param json_return: whether or not to return as json (default True)
:rtype : list
:return: a list of passed formatted as either the raw object or json
"""
if not isinstance(obj, list):
obj = [obj]
for o in obj:
db.session.add(o)
db.session.commit()
if json_return:
obj = [o.as_json() for o in obj if o is not None]
return obj
def get_record(obj, pk=None, json_return=True , filter = None , filter_by = None):
"""
Convenience function for getting objects on primary key
:param json_return:
:param obj: The described declarative_base object from the described tables.
:param pk: value for the primary key, if not set all keys will be returned
:param filter: a list of key value pairs to filter on. If pk is passed, will return only pk matches.
:returns list: objects that matched the get
.. todo: need to add more complex filtering
"""
if pk:
o = obj.query.get(pk)
elif filter:
o = obj.query.filter(**filter)
else:
o = obj.query.all()
if not isinstance(o, list):
o = [o]
if json_return:
o = [obj.as_json() for obj in o if obj is not None]
return o
def update_record(obj, pk=None, json_return=True, update_info=None):
"""
Convenience function for updating objects.
:param obj:
:param pk:
:param json_return:
:param update_info:
:return:
"""
# do some introspection on the table to find out how things are defined
table_info = {column.name: str(column.type) for column in obj.__table__.c}
#Absolutely need information to update
if not update_info:
logger.warning()
return {}
#And a primary key
if not pk:
logger.warning()
return {}
#fetch record
record = get_record(obj, pk=pk, json_return=False)
if len(record) > 1:
raise NotImplementedError("Updating multiple records is not supported at this time")
record = record[0]
# use a settr to override the ORM then commit
for k, v in update_info.items():
if table_info[k] == 'DATETIME':
v = parse(str(v))
setattr(record, k, v)
db.session.commit()
if json_return:
record = get_record(obj, pk=pk)
else:
record = get_record(obj, pk=pk, json_return=False)
return record
def delete_record(obj, pk=None):
"""
Convenience function for deleting objects.
:param obj: Table class to utilize
:param pk: primary key for deletion
:return: None
"""
db.session.delete(obj.query.get(pk))
db.session.commit()
return
import datetime
from dateutil.parser import parse
import json
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.types import TEXT,BIGINT,DATETIME,INTEGER, FLOAT, DATE
from sqlalchemy.dialects import postgresql, mysql, sqlite
from skyscout import flask_app
__all__ = [ 'db' , 'StitchedImage' , 'SegmentedImage' , 'Flight', 'AerialPhoto' , 'Policies' , 'Event']
"""
Create a column type for auto-incrementing columns, changing the data type based on
which engine is being used.
https://stackoverflow.com/a/23175518/2350761
"""
AUTOINCREMENT = BIGINT()
AUTOINCREMENT = AUTOINCREMENT.with_variant(postgresql.BIGINT(), 'postgresql')
AUTOINCREMENT = AUTOINCREMENT.with_variant(mysql.BIGINT(), 'mysql')
AUTOINCREMENT = AUTOINCREMENT.with_variant(sqlite.INTEGER(), 'sqlite')
db = SQLAlchemy(flask_app)
class StitchedImage(db.Model):
__tablename__ = 'stitched_image'
image_id = db.Column(AUTOINCREMENT, nullable=False, autoincrement=True,primary_key=True)
event_id = db.Column(AUTOINCREMENT, db.ForeignKey('event.event_id'),nullable = False)
policy_number = db.Column(TEXT, db.ForeignKey('policies.policy_number'), nullable = False)
path_to_photo = db.Column(TEXT, nullable=False, unique=True)
checksum = db.Column(TEXT, nullable=False)
def __init__(self, event_id, policy_number, path_to_photo, checksum, image_id=None):
table_info = {column.name: str(column.type) for column in self.__table__.c}
for name, value in vars().iteritems():
if name != 'self':
setattr(self, name, value)
def as_json(self):
return json.dumps(self.as_dict())
def as_dict(self):
return {k: str(v) for k, v in self.__dict__.items() if k != '_sa_instance_state'}
class SegmentedImage(db.Model):
__tablename__ = 'segmented_image'
photo_id = db.Column(AUTOINCREMENT, primary_key=True, autoincrement=True, nullable=False)
event_id = db.Column(AUTOINCREMENT, db.ForeignKey('event.event_id'), nullable=False)
aerial_photo_id = db.Column(AUTOINCREMENT, db.ForeignKey('aerial_photo.photo_id'),nullable=False)
policy_number = db.Column(AUTOINCREMENT, db.ForeignKey('policies.policy_number'))
path_to_photo = db.Column(TEXT, nullable=False, unique=True)
checksum = db.Column(TEXT, nullable=False)
def __init__(self, event_id, aerial_photo_id, policy_number, path_to_photo, checksum,photo_id = None,):
table_info = {column.name: str(column.type) for column in self.__table__.c}
for name, value in vars().iteritems():
if name != 'self':
setattr(self, name, value)
def as_json(self):
return json.dumps(self.as_dict())
def as_dict(self):
return {k: str(v) for k, v in self.__dict__.items() if k != '_sa_instance_state'}
class Flight(db.Model):
__tablename__ = 'flight'
flight_id = db.Column(AUTOINCREMENT, primary_key=True, nullable=False, autoincrement=True)
event_id = db.Column(AUTOINCREMENT, db.ForeignKey('event.event_id'),nullable=False)
flight_plan = db.Column(TEXT, nullable=True) # <- this should be a format or a link to a flight plan in the future
def __init__(self, event_id, flight_plan , flight_id = None):
table_info = {column.name: str(column.type) for column in self.__table__.c}
for name, value in vars().iteritems():
if name != 'self':
setattr(self, name, value)
def as_json(self):
return json.dumps(self.as_dict())
def as_dict(self):
return {k: str(v) for k, v in self.__dict__.items() if k != '_sa_instance_state'}
class AerialPhoto(db.Model):
__tablename__ = 'aerial_photo'
photo_id = db.Column(AUTOINCREMENT, primary_key=True, autoincrement=True, nullable=False)
event_id = db.Column(AUTOINCREMENT, db.ForeignKey('event.event_id'), nullable=False)
flight_id = db.Column(AUTOINCREMENT, default = 'x') #db.ForeignKey()) <- if we get a flight_table we'll want this
path_to_geotiff = db.Column(TEXT, nullable=False, unique=True)
check_sum = db.Column(TEXT, nullable=False)
def __init__(self, event_id, flight_id, path_to_geotiff, check_sum, photo_id=None,):
table_info = {column.name: str(column.type) for column in self.__table__.c}
for name, value in vars().iteritems():
if name != 'self':
setattr(self, name, value)
def as_json(self):
return json.dumps(self.as_dict())
def as_dict(self):
return {k: str(v) for k, v in self.__dict__.items() if k != '_sa_instance_state'}
class Policies(db.Model):
__tablename__ = 'policies'
event_id = db.Column(AUTOINCREMENT, db.ForeignKey('event.event_id'), nullable=False)
policy_number = db.Column(TEXT, nullable=False)
parcel_boundry = db.Column(TEXT)
policy_lat = db.Column(FLOAT, nullable=False)
policy_long = db.Column(FLOAT, nullable=False)
policy_active_start = db.Column(DATE, nullable=False)
policy_active_end = db.Column(DATE, nullable=False)
__table_args__ = (
db.PrimaryKeyConstraint('event_id','policy_number'),
{}
)
def __init__(self, event_id, policy_number, parcel_boundry, policy_lat, policy_long, policy_active_start, policy_active_end):
table_info = {column.name: str(column.type) for column in self.__table__.c}
for name, value in vars().iteritems():
if name != 'self':
if table_info[name] == 'DATE':
value = parse(vale)
setattr(self, name, value)
def as_json(self):
return json.dumps(self.as_dict())
def as_dict(self):
return {k: str(v) for k, v in self.__dict__.items() if k != '_sa_instance_state'}
class Event(db.Model):
"""
#
"""
__tablename__ = 'event'
event_id = db.Column(AUTOINCREMENT, primary_key=True,autoincrement=True)
event_name = db.Column(TEXT, nullable = False)
event_description = db.Column(TEXT, nullable = False)
event_shape = db.Column(TEXT)
event_start = db.Column(DATETIME, nullable = False)
event_end = db.Column(DATETIME, nullable = False)
def __init__(self, event_name, event_description, event_shape, event_start, event_end, event_id = None,):
table_info = {column.name: str(column.type) for column in self.__table__.c}
for name, value in vars().items():
if name not in ['self' ,'table_info']:
if table_info[name] == 'DATETIME':
value = parse(value)
setattr(self, name, value)
def as_json(self):
return json.dumps(self.as_dict())
def as_dict(self):
return {k: str(v) for k, v in self.__dict__.items() if k != '_sa_instance_state'}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment