Last active
July 25, 2017 17:42
-
-
Save dylanbstorey/9d914d6c781c480ab3ec3ed9eee52868 to your computer and use it in GitHub Desktop.
SQL Alchemy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dateutil.parser import parse | |
from skyscout.db_models import * | |
from sqlalchemy.exc import IntegrityError | |
import logging | |
logger = logging.getLogger(__name__) | |
def drop_all(): | |
db.drop_all() | |
def init_db(): | |
db.create_all() | |
def insert_record(obj, json_return=True): | |
""" | |
Convenience method for inserting objects. | |
:param obj: A described declarative_base object from the described tables. Can be a list of objects. | |
:param json_return: whether or not to return as json (default True) | |
:rtype : list | |
:return: a list of passed formatted as either the raw object or json | |
""" | |
if not isinstance(obj, list): | |
obj = [obj] | |
for o in obj: | |
db.session.add(o) | |
db.session.commit() | |
if json_return: | |
obj = [o.as_json() for o in obj if o is not None] | |
return obj | |
def get_record(obj, pk=None, json_return=True , filter = None , filter_by = None): | |
""" | |
Convenience function for getting objects on primary key | |
:param json_return: | |
:param obj: The described declarative_base object from the described tables. | |
:param pk: value for the primary key, if not set all keys will be returned | |
:param filter: a list of key value pairs to filter on. If pk is passed, will return only pk matches. | |
:returns list: objects that matched the get | |
.. todo: need to add more complex filtering | |
""" | |
if pk: | |
o = obj.query.get(pk) | |
elif filter: | |
o = obj.query.filter(**filter) | |
else: | |
o = obj.query.all() | |
if not isinstance(o, list): | |
o = [o] | |
if json_return: | |
o = [obj.as_json() for obj in o if obj is not None] | |
return o | |
def update_record(obj, pk=None, json_return=True, update_info=None): | |
""" | |
Convenience function for updating objects. | |
:param obj: | |
:param pk: | |
:param json_return: | |
:param update_info: | |
:return: | |
""" | |
# do some introspection on the table to find out how things are defined | |
table_info = {column.name: str(column.type) for column in obj.__table__.c} | |
#Absolutely need information to update | |
if not update_info: | |
logger.warning() | |
return {} | |
#And a primary key | |
if not pk: | |
logger.warning() | |
return {} | |
#fetch record | |
record = get_record(obj, pk=pk, json_return=False) | |
if len(record) > 1: | |
raise NotImplementedError("Updating multiple records is not supported at this time") | |
record = record[0] | |
# use a settr to override the ORM then commit | |
for k, v in update_info.items(): | |
if table_info[k] == 'DATETIME': | |
v = parse(str(v)) | |
setattr(record, k, v) | |
db.session.commit() | |
if json_return: | |
record = get_record(obj, pk=pk) | |
else: | |
record = get_record(obj, pk=pk, json_return=False) | |
return record | |
def delete_record(obj, pk=None): | |
""" | |
Convenience function for deleting objects. | |
:param obj: Table class to utilize | |
:param pk: primary key for deletion | |
:return: None | |
""" | |
db.session.delete(obj.query.get(pk)) | |
db.session.commit() | |
return |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
from dateutil.parser import parse | |
import json | |
from flask_sqlalchemy import SQLAlchemy | |
from sqlalchemy.types import TEXT,BIGINT,DATETIME,INTEGER, FLOAT, DATE | |
from sqlalchemy.dialects import postgresql, mysql, sqlite | |
from skyscout import flask_app | |
__all__ = [ 'db' , 'StitchedImage' , 'SegmentedImage' , 'Flight', 'AerialPhoto' , 'Policies' , 'Event'] | |
""" | |
Create a column type for auto-incrementing columns, changing the data type based on | |
which engine is being used. | |
https://stackoverflow.com/a/23175518/2350761 | |
""" | |
AUTOINCREMENT = BIGINT() | |
AUTOINCREMENT = AUTOINCREMENT.with_variant(postgresql.BIGINT(), 'postgresql') | |
AUTOINCREMENT = AUTOINCREMENT.with_variant(mysql.BIGINT(), 'mysql') | |
AUTOINCREMENT = AUTOINCREMENT.with_variant(sqlite.INTEGER(), 'sqlite') | |
db = SQLAlchemy(flask_app) | |
class StitchedImage(db.Model): | |
__tablename__ = 'stitched_image' | |
image_id = db.Column(AUTOINCREMENT, nullable=False, autoincrement=True,primary_key=True) | |
event_id = db.Column(AUTOINCREMENT, db.ForeignKey('event.event_id'),nullable = False) | |
policy_number = db.Column(TEXT, db.ForeignKey('policies.policy_number'), nullable = False) | |
path_to_photo = db.Column(TEXT, nullable=False, unique=True) | |
checksum = db.Column(TEXT, nullable=False) | |
def __init__(self, event_id, policy_number, path_to_photo, checksum, image_id=None): | |
table_info = {column.name: str(column.type) for column in self.__table__.c} | |
for name, value in vars().iteritems(): | |
if name != 'self': | |
setattr(self, name, value) | |
def as_json(self): | |
return json.dumps(self.as_dict()) | |
def as_dict(self): | |
return {k: str(v) for k, v in self.__dict__.items() if k != '_sa_instance_state'} | |
class SegmentedImage(db.Model): | |
__tablename__ = 'segmented_image' | |
photo_id = db.Column(AUTOINCREMENT, primary_key=True, autoincrement=True, nullable=False) | |
event_id = db.Column(AUTOINCREMENT, db.ForeignKey('event.event_id'), nullable=False) | |
aerial_photo_id = db.Column(AUTOINCREMENT, db.ForeignKey('aerial_photo.photo_id'),nullable=False) | |
policy_number = db.Column(AUTOINCREMENT, db.ForeignKey('policies.policy_number')) | |
path_to_photo = db.Column(TEXT, nullable=False, unique=True) | |
checksum = db.Column(TEXT, nullable=False) | |
def __init__(self, event_id, aerial_photo_id, policy_number, path_to_photo, checksum,photo_id = None,): | |
table_info = {column.name: str(column.type) for column in self.__table__.c} | |
for name, value in vars().iteritems(): | |
if name != 'self': | |
setattr(self, name, value) | |
def as_json(self): | |
return json.dumps(self.as_dict()) | |
def as_dict(self): | |
return {k: str(v) for k, v in self.__dict__.items() if k != '_sa_instance_state'} | |
class Flight(db.Model): | |
__tablename__ = 'flight' | |
flight_id = db.Column(AUTOINCREMENT, primary_key=True, nullable=False, autoincrement=True) | |
event_id = db.Column(AUTOINCREMENT, db.ForeignKey('event.event_id'),nullable=False) | |
flight_plan = db.Column(TEXT, nullable=True) # <- this should be a format or a link to a flight plan in the future | |
def __init__(self, event_id, flight_plan , flight_id = None): | |
table_info = {column.name: str(column.type) for column in self.__table__.c} | |
for name, value in vars().iteritems(): | |
if name != 'self': | |
setattr(self, name, value) | |
def as_json(self): | |
return json.dumps(self.as_dict()) | |
def as_dict(self): | |
return {k: str(v) for k, v in self.__dict__.items() if k != '_sa_instance_state'} | |
class AerialPhoto(db.Model): | |
__tablename__ = 'aerial_photo' | |
photo_id = db.Column(AUTOINCREMENT, primary_key=True, autoincrement=True, nullable=False) | |
event_id = db.Column(AUTOINCREMENT, db.ForeignKey('event.event_id'), nullable=False) | |
flight_id = db.Column(AUTOINCREMENT, default = 'x') #db.ForeignKey()) <- if we get a flight_table we'll want this | |
path_to_geotiff = db.Column(TEXT, nullable=False, unique=True) | |
check_sum = db.Column(TEXT, nullable=False) | |
def __init__(self, event_id, flight_id, path_to_geotiff, check_sum, photo_id=None,): | |
table_info = {column.name: str(column.type) for column in self.__table__.c} | |
for name, value in vars().iteritems(): | |
if name != 'self': | |
setattr(self, name, value) | |
def as_json(self): | |
return json.dumps(self.as_dict()) | |
def as_dict(self): | |
return {k: str(v) for k, v in self.__dict__.items() if k != '_sa_instance_state'} | |
class Policies(db.Model): | |
__tablename__ = 'policies' | |
event_id = db.Column(AUTOINCREMENT, db.ForeignKey('event.event_id'), nullable=False) | |
policy_number = db.Column(TEXT, nullable=False) | |
parcel_boundry = db.Column(TEXT) | |
policy_lat = db.Column(FLOAT, nullable=False) | |
policy_long = db.Column(FLOAT, nullable=False) | |
policy_active_start = db.Column(DATE, nullable=False) | |
policy_active_end = db.Column(DATE, nullable=False) | |
__table_args__ = ( | |
db.PrimaryKeyConstraint('event_id','policy_number'), | |
{} | |
) | |
def __init__(self, event_id, policy_number, parcel_boundry, policy_lat, policy_long, policy_active_start, policy_active_end): | |
table_info = {column.name: str(column.type) for column in self.__table__.c} | |
for name, value in vars().iteritems(): | |
if name != 'self': | |
if table_info[name] == 'DATE': | |
value = parse(vale) | |
setattr(self, name, value) | |
def as_json(self): | |
return json.dumps(self.as_dict()) | |
def as_dict(self): | |
return {k: str(v) for k, v in self.__dict__.items() if k != '_sa_instance_state'} | |
class Event(db.Model): | |
""" | |
# | |
""" | |
__tablename__ = 'event' | |
event_id = db.Column(AUTOINCREMENT, primary_key=True,autoincrement=True) | |
event_name = db.Column(TEXT, nullable = False) | |
event_description = db.Column(TEXT, nullable = False) | |
event_shape = db.Column(TEXT) | |
event_start = db.Column(DATETIME, nullable = False) | |
event_end = db.Column(DATETIME, nullable = False) | |
def __init__(self, event_name, event_description, event_shape, event_start, event_end, event_id = None,): | |
table_info = {column.name: str(column.type) for column in self.__table__.c} | |
for name, value in vars().items(): | |
if name not in ['self' ,'table_info']: | |
if table_info[name] == 'DATETIME': | |
value = parse(value) | |
setattr(self, name, value) | |
def as_json(self): | |
return json.dumps(self.as_dict()) | |
def as_dict(self): | |
return {k: str(v) for k, v in self.__dict__.items() if k != '_sa_instance_state'} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment