-
-
Save rlander/9d3eb2c436057cc31208 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import os | |
import csv | |
import collections | |
from datetime import datetime | |
from peewee import * | |
class BackupModel(Model): | |
@classmethod | |
def update(cls, **update): | |
update['modified'] = datetime.now() | |
return super(BackupModel, cls).update(**update) | |
@classmethod | |
def backup_table(cls, csvfile): | |
""" | |
Create a schema less backup of this model in a csv file. | |
""" | |
query = cls.select() | |
if csvfile.tell(): | |
desc = csvfile.fileno() | |
modified = datetime.fromtimestamp(os.path.getmtime(desc)) | |
query = query.where(cls.modified > modified) | |
writer = csv.writer(csvfile) | |
writer.writerows(query.naive().tuples()) | |
@classmethod | |
def restore_table(cls, csvfile): | |
""" | |
Restore this model from a csv file. | |
""" | |
latest_id = cls.select(fn.Max(cls.id).alias('latest_id')).scalar() | |
# last_id = cls.select(cls.id). | |
# reader = csv.reader(csvfile) | |
# for row in reader: | |
# print(row[0]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
from datetime import datetime | |
import random | |
from peewee import * | |
from backup_model import BackupModel | |
db = SqliteDatabase('test.db') | |
class BaseModel(BackupModel): | |
id = PrimaryKeyField() | |
created = DateTimeField(default=datetime.now) | |
modified = DateTimeField(default=datetime.now) | |
class Meta: | |
database = db # this model uses the people database | |
class User(BaseModel): | |
name = CharField() | |
birthday = DateField() | |
is_relative = BooleanField() | |
class Entry(BaseModel): | |
parent = ForeignKeyField('self', related_name='children', null=True) | |
author = ForeignKeyField(User) | |
severity = CharField() | |
message = TextField() | |
timestamp = DateTimeField(default=datetime.now) | |
if __name__ == '__main__': | |
started = datetime.now() | |
db.connect() | |
if True: | |
if User.table_exists(): | |
User.drop_table() | |
if Entry.table_exists(): | |
Entry.drop_table() | |
User.create_table() | |
Entry.create_table() | |
if True: | |
entries = [None] | |
with db.transaction(): | |
for user_number in range(3): | |
print("User #%.4d" % (user_number)) | |
user = User.create(name="Otto%.4d" % (user_number, ), | |
birthday='03.01.1987', | |
is_relative=False) | |
for entry_number in range(random.randint(5,10)): | |
entries.append(Entry.create( | |
parent=random.choice(entries), | |
author=user, | |
message='What up?', | |
severity=random.choice(('Info', 'Error')))) | |
if False: | |
q = Entry.select().where(Entry.parent >> None).order_by(Entry.timestamp) | |
children = [(0, e) for e in q] | |
while children: | |
level, entry = children.pop(0) | |
print(" "*level + "Entry @ %s from %s" % (entry.timestamp, entry.author.name, )) | |
q = Entry.select().where(Entry.parent >> entry).order_by(Entry.timestamp) | |
children = [(level + 1, e) for e in q] + children | |
q = Entry.update(message="CENSORED").where(Entry.id <= 10) | |
q.execute() | |
print(Entry.select().count()) | |
with open('test.csv', 'a+') as csvfile: | |
Entry.backup_table(csvfile) | |
print("Finished csv in %s" % (datetime.now() - started, )) | |
# import io | |
# import gzip | |
# with gzip.open('test.csv.gz', 'wb+') as gzfile: | |
# csvfile = io.TextIOWrapper(gzfile, newline="", write_through=True) | |
# Entry.backup_table(csvfile) | |
# with open('test.csv', 'r') as csvfile: | |
# Entry.restore_table(csvfile) | |
# print("Finished gzip in %s" % (datetime.datetime.now() - started, )) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment