Skip to content

Instantly share code, notes, and snippets.

@biggers
Last active August 6, 2018 15:30
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save biggers/d5ac9306309cd0a9bb32560041f8af02 to your computer and use it in GitHub Desktop.
Save biggers/d5ac9306309cd0a9bb32560041f8af02 to your computer and use it in GitHub Desktop.
"pets" example from peewee "quickstart" / documentation
# Refs:
# (primary) http://peewee.readthedocs.io/en/latest/peewee/quickstart.html
# https://peewee.readthedocs.io/en/2.0.2/peewee/cookbook.html
from datetime import date
import peewee
from peewee import ( # noqa: F401
MySQLDatabase,
Model,
BooleanField,
CharField,
DateField,
DateTimeField,
ForeignKeyField,
IntegerField,
TextField,
fn,
JOIN,
)
import sys
from pprint import pprint as ppr
mysql_db = MySQLDatabase(None)
class Person(Model):
name = CharField()
# email = CharField() # add.me
birthday = DateField()
is_relative = BooleanField()
class Meta:
database = mysql_db
class Pet(Model):
owner = ForeignKeyField(Person, related_name='pets')
name = CharField()
animal_type = CharField()
class Meta:
database = mysql_db
def creat_folks():
uncle_bob = Person(name='Bob Billy',
birthday=date(1960, 1, 15),
is_relative=True)
uncle_bob.save()
herb = Person.create(name='Herb Spicer', # noqa: F841
birthday=date(1950, 5, 5),
is_relative=False)
billy = Person(name='Billy Dill',
birthday=date(1945, 3, 21),
is_relative=False)
billy.save()
grandma = Person.create(name='Grandma Jones',
birthday=date(1935, 3, 1),
is_relative=True)
grandma.name = 'Granny Jones'
grandma.save()
return {'uncle_bob': uncle_bob,
'herb': herb,
'billy': billy,
'grandma': grandma}
def print_pets_for_owner(owner, status='are'):
""" """
pets_q = Pet.select().join(Person).\
where(Person.name == owner).\
order_by(Pet.name)
the_pets = [pet.name for pet in pets_q] # run the INNER JOIN / query
ppr("{}'s pets ({}): {}".format(owner, status, the_pets))
def assign_pets(owners):
""" """
Pet.create(owner=owners['uncle_bob'], name='Kitty', animal_type='cat')
fido = Pet.create(owner=owners['herb'], name='Fido', animal_type='dog')
Pet.create(owner=owners['herb'], name='Mittens', animal_type='cat')
Pet.create(owner=owners['herb'], name='Mittens Jr', animal_type='cat')
# http://peewee.readthedocs.io/en/latest/peewee/quickstart.html#lists-of-records
print_pets_for_owner(owners['herb'].name, status='were')
print_pets_for_owner(owners['uncle_bob'].name, status='were')
fido.owner = owners['uncle_bob']
fido.save()
# "Herb's pets (are): ['Mittens', 'Mittens Jr']"
print_pets_for_owner(owners['herb'].name)
print_pets_for_owner(owners['uncle_bob'].name)
def do_person_queries():
""" """
first_char = 'b'
print("\nPersons with names starting with: {}".format(first_char))
expression = (fn.Lower(fn.Substr(Person.name, 1, 1)) == first_char)
for person in Person.select().where(expression):
print('\t', person.name)
d1940 = date(1940, 1, 1)
d1960 = date(1960, 1, 1)
q = (Person.select()
.where((Person.birthday > d1940) & (Person.birthday < d1960))
.order_by(Person.birthday))
print("\nPersons with birthdays between {} and {}"
.format(d1940, d1960))
for person in q:
print('\t', person.name, person.birthday)
def main(create=False, do_folks=False):
""" """
connargs = dict(database='adminstack', user='mrsql', password='mysqueel',
host='localhost', charset='utf8')
mysql_db.init(**connargs)
mysql_db.connect()
if create:
# q = User.delete().where(User.active == False)
# q.execute() # remove (just) these rows
try:
mysql_db.drop_tables([Pet, Person, ])
except peewee.InternalError as e:
# (1051, "Unknown table 'adminstack.pet'")
print(e, file=sys.stderr)
code, msg = e.args
if "Unknown table" not in msg:
raise
mysql_db.create_tables([Person, Pet, ])
# try:
# Person.create_table()
# except peewee.InternalError as e:
# # (1050, "Table 'person' already exists")
# print(e, file=sys.stderr)
if do_folks:
folks = creat_folks()
for person in Person.select().order_by(Person.birthday):
print(person.name, person.birthday)
assign_pets(folks)
# get the Pet-count per Person
subquery = Pet.select(fn.COUNT(Pet.id)).where(Pet.owner == Person.id)
query = (Person
.select(Person, Pet, subquery.alias('pet_count'))
.join(Pet, JOIN.LEFT_OUTER)
.order_by(Person.name))
ppr(subquery)
ppr(query)
for person in query.aggregate_rows(): # Note!! agg_rows()
print(person.name, person.pet_count, 'pets')
for pet in person.pets:
print('\t', pet.name, pet.animal_type)
do_person_queries()
if __name__ == '__main__':
main(int(sys.argv[1]), int(sys.argv[2]),)
mysql_db.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment