Skip to content

Instantly share code, notes, and snippets.

@rlander
Forked from zdxerr/backup_model.py
Created July 8, 2014 14:23
Show Gist options
  • Save rlander/9d3eb2c436057cc31208 to your computer and use it in GitHub Desktop.
Save rlander/9d3eb2c436057cc31208 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import csv
import collections
from datetime import datetime
from peewee import *
class BackupModel(Model):
@classmethod
def update(cls, **update):
update['modified'] = datetime.now()
return super(BackupModel, cls).update(**update)
@classmethod
def backup_table(cls, csvfile):
"""
Create a schema less backup of this model in a csv file.
"""
query = cls.select()
if csvfile.tell():
desc = csvfile.fileno()
modified = datetime.fromtimestamp(os.path.getmtime(desc))
query = query.where(cls.modified > modified)
writer = csv.writer(csvfile)
writer.writerows(query.naive().tuples())
@classmethod
def restore_table(cls, csvfile):
"""
Restore this model from a csv file.
"""
latest_id = cls.select(fn.Max(cls.id).alias('latest_id')).scalar()
# last_id = cls.select(cls.id).
# reader = csv.reader(csvfile)
# for row in reader:
# print(row[0])
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from datetime import datetime
import random
from peewee import *
from backup_model import BackupModel
db = SqliteDatabase('test.db')
class BaseModel(BackupModel):
id = PrimaryKeyField()
created = DateTimeField(default=datetime.now)
modified = DateTimeField(default=datetime.now)
class Meta:
database = db # this model uses the people database
class User(BaseModel):
name = CharField()
birthday = DateField()
is_relative = BooleanField()
class Entry(BaseModel):
parent = ForeignKeyField('self', related_name='children', null=True)
author = ForeignKeyField(User)
severity = CharField()
message = TextField()
timestamp = DateTimeField(default=datetime.now)
if __name__ == '__main__':
started = datetime.now()
db.connect()
if True:
if User.table_exists():
User.drop_table()
if Entry.table_exists():
Entry.drop_table()
User.create_table()
Entry.create_table()
if True:
entries = [None]
with db.transaction():
for user_number in range(3):
print("User #%.4d" % (user_number))
user = User.create(name="Otto%.4d" % (user_number, ),
birthday='03.01.1987',
is_relative=False)
for entry_number in range(random.randint(5,10)):
entries.append(Entry.create(
parent=random.choice(entries),
author=user,
message='What up?',
severity=random.choice(('Info', 'Error'))))
if False:
q = Entry.select().where(Entry.parent >> None).order_by(Entry.timestamp)
children = [(0, e) for e in q]
while children:
level, entry = children.pop(0)
print(" "*level + "Entry @ %s from %s" % (entry.timestamp, entry.author.name, ))
q = Entry.select().where(Entry.parent >> entry).order_by(Entry.timestamp)
children = [(level + 1, e) for e in q] + children
q = Entry.update(message="CENSORED").where(Entry.id <= 10)
q.execute()
print(Entry.select().count())
with open('test.csv', 'a+') as csvfile:
Entry.backup_table(csvfile)
print("Finished csv in %s" % (datetime.now() - started, ))
# import io
# import gzip
# with gzip.open('test.csv.gz', 'wb+') as gzfile:
# csvfile = io.TextIOWrapper(gzfile, newline="", write_through=True)
# Entry.backup_table(csvfile)
# with open('test.csv', 'r') as csvfile:
# Entry.restore_table(csvfile)
# print("Finished gzip in %s" % (datetime.datetime.now() - started, ))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment