Skip to content

Instantly share code, notes, and snippets.

@devdave

devdave/data_migration.py

Last active Apr 8, 2020
Embed
What would you like to do?
sqlalchemy alembic data migration example
"""Convert lat/long from float to int
Revision ID: b020841d98e4
Revises: 6e741a21efc8
Create Date: 2019-07-10 20:03:38.282042
Given a source table like
class GPS(Base):
# $--RMC, hhmmss.sss, x, llll.lll, a, yyyyy.yyy, a, x.x, u.u, xxxxxx,, , v * hh < CR > < LF >
__table_args__ = (UniqueConstraint("date_time", name="uix_dt"),)
video_id = Column(Integer, ForeignKey("Video.id"))
video = relationship("Video", back_populates="coordinates")
#time = Column(Time)
date_time = Column(DateTime)
status = Column(String)
latitude = Column(Float)
north_south = Column(String)
longitude = Column(Float)
east_west = Column(String)
speed = Column(Float)
course = Column(Float)
Where I want to convert all of the floating point columns to integer AND convert the data.
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.ext.declarative import declarative_base
# revision identifiers, used by Alembic.
revision = 'b020841d98e4'
down_revision = '6e741a21efc8'
branch_labels = None
depends_on = None
Base = declarative_base()
class GPS(Base):
__tablename__ = "GPS"
id = sa.Column(sa.Integer, primary_key=True)
latitude = sa.Column(sa.Float)
_latitude = sa.Column(sa.Integer)
longitude = sa.Column(sa.Float)
_longitude = sa.Column(sa.Integer)
speed = sa.Column(sa.Float)
_speed = sa.Column(sa.Integer)
course = sa.Column(sa.Float)
_course = sa.Column(sa.Integer)
def upgrade():
with op.batch_alter_table("GPS") as batch_op:
batch_op.add_column(sa.Column("_latitude", sa.Integer))
batch_op.add_column(sa.Column("_longitude", sa.Integer))
batch_op.add_column(sa.Column("_speed", sa.Integer))
batch_op.add_column(sa.Column("_course", sa.Integer))
####
# Here is where you can connect your declarative model to the database going through the migration
bind = op.get_bind()
session = orm.Session(bind=bind)
# now that you've got a session
i = 0
c_count = session.query(GPS).count() #you can query the database table you are working on
seven = 10 ** 7
for coordinate in session.query(GPS): # type: GPS
i += 1
coordinate._latitude = int(coordinate.latitude * seven)
coordinate._longitude = int(coordinate.longitude * seven)
coordinate._course = int(coordinate.course * 1000)
coordinate._speed = int(coordinate.speed * 100)
session.add(coordinate)
if i % 3000 == 0:
print(f"\tProcessed {i}/{c_count}")
session.commit()
session.commit()
with op.batch_alter_table("GPS") as batch_op:
batch_op.drop_column("latitude")
batch_op.drop_column("longitude")
batch_op.drop_column("status")
batch_op.drop_column("speed")
batch_op.drop_column("course")
# noinspection PyProtectedMember
def downgrade():
with op.batch_alter_table("GPS") as batch_op:
batch_op.add_column(sa.Column("latitude", sa.Float))
batch_op.add_column(sa.Column("longitude", sa.Float))
batch_op.add_column(sa.Column("course", sa.Float))
batch_op.add_column(sa.Column("speed", sa.Float))
bind = op.get_bind()
session = orm.Session(bind=bind)
i = 0
for coordinate in session.query(GPS): # type: GPS
i += 1
coordinate.latitude = coordinate._latitude / 10 ** 7
coordinate.longitude = coordinate._longitude / 10 ** 7
coordinate.speed = coordinate._speed / 1000
coordinate.course = coordinate._course / 100
session.add(coordinate)
if i % 1000 == 0:
session.commit()
session.commit()
with op.batch_alter_table("GPS") as batch_op:
batch_op.drop_column("_latitude")
batch_op.drop_column("_longitude")
batch_op.drop_column("_speed")
batch_op.drop_column("_course")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment