-
-
Save noxeee/d326251662083348fefd2da72dba1ab5 to your computer and use it in GitHub Desktop.
Trying to figure out what is wrong with my code. Help appreciated.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from playhouse.apsw_ext import APSWDatabase | |
from playhouse.sqlite_ext import SearchField, FTS5Model | |
import peewee as pw | |
import logging | |
log = logging.getLogger("test_indexing") | |
log.addHandler(logging.StreamHandler()) | |
# app = Flask(__name__) | |
db = APSWDatabase( | |
"test.db", | |
pragmas=(("cache_size", -1024 * 64), ("journal_mode", "wal"), ("foreign_keys", 1)), | |
) | |
# db_wrapper = FlaskDB(app, db) | |
class BaseModel(pw.Model): | |
class Meta: | |
database = db | |
class Name(BaseModel): | |
id_link = pw.CharField(unique=True) | |
name = pw.TextField() | |
def __repr__(self): | |
return f"Names(generated id, {self.get_id()!r}, id_link={self.id_link!r}, name={self.name!r})" | |
class Triplet(BaseModel): | |
head = pw.ForeignKeyField(Name, backref="head_id") | |
relation = pw.CharField() | |
tail = pw.ForeignKeyField(Name, backref="tail_id") | |
class Relation(BaseModel): | |
relation_name = pw.CharField() | |
description = pw.CharField() | |
class Rubbish(BaseModel): | |
head = pw.ForeignKeyField(Name, backref="head_id") | |
relation = pw.CharField() | |
tail = pw.ForeignKeyField(Name, backref="tail_id") | |
# full text search support | |
class TripletIndex(FTS5Model): | |
head = SearchField() | |
relation = SearchField() | |
tail = SearchField() | |
class Meta: | |
database = db | |
def index_triplets(triplets: list[Triplet] = None): | |
with db.atomic() as txn: | |
try: | |
if triplets: | |
named_triplets = get_triplets_with_names(triplets).select() | |
else: # index all | |
named_triplets: pw.ModelSelect = get_triplets_with_names().select() | |
# for triplet in named_triplets: | |
# log.debug(f"indexing triplet {triplet} with id {triplet.id}, containing {triplet.head.name, triplet.tail.name}") | |
# index_triplet(triplet) | |
# bulk insert FIXME head is always null? | |
to_insert = ( | |
{ | |
TripletIndex.rowid: triplet.id, | |
TripletIndex.head: "not found" | |
if triplet.head.name is None | |
else triplet.head.name, | |
TripletIndex.relation: triplet.relation, | |
TripletIndex.tail: "not found" | |
if triplet.tail.name is None | |
else triplet.tail.name, | |
} | |
for triplet in named_triplets | |
) | |
TripletIndex.insert_many(to_insert).execute() # FIXME Constraint failed? | |
TripletIndex.rebuild() | |
TripletIndex.optimize() | |
except pw.PeeweeException: | |
log.error("In index_triplets:", exc_info=True) | |
def names_contains(id_link) -> bool: | |
with db.atomic() as txn: | |
rowcount = 0 | |
try: | |
r = Name.select().where(Name.id_link == id_link).count() | |
rowcount = r | |
except pw.PeeweeException: | |
log.error("In names_contains", exc_info=True) | |
return True if rowcount > 0 else False | |
# INSERTING | |
def insert_name(id_link: str, name: str): | |
if names_contains(id_link): | |
log.warning(f"{id_link} already in database.") | |
else: | |
with db.atomic() as txn: | |
try: | |
Name.create(id_link=id_link, name=name) | |
# rows_modified = txn.commit() | |
# log.debug(f"Inserted: {id_link, name}, Rows modified: {rows_modified}") | |
txn.commit() | |
log.debug(f"Inserted: {id_link, name}") | |
except pw.PeeweeException: | |
# log.error("In insert_name:", getattr(e, 'message', repr(e))) | |
log.error("In insert_name:", exc_info=True) | |
txn.rollback() | |
def get_name_id(id_link) -> int: | |
with db.atomic() as txn: | |
try: | |
id_ = Name.get(id_link == id_link).get_id() | |
return id_ | |
except pw.DoesNotExist as e: | |
log.error(f"{id_link} does not exist in the databsase", exc_info=True) | |
txn.rollback() | |
def insert_triplet(head: str, rel: str, tail: str): | |
with db.atomic() as txn: | |
try: | |
head_id = get_name_id(head) | |
tail_id = get_name_id(tail) | |
new_triplet = Triplet(head=head_id, relation=rel, tail=tail_id) | |
rows_modified = new_triplet.save() | |
log.debug(f"Inserted: {head, rel, tail}, Rows modified: {rows_modified}") | |
# index the triplet | |
index_triplet(new_triplet) | |
except pw.IntegrityError: | |
log.warning("Integrity error happened", exc_info=True) | |
txn.rollback() | |
except pw.PeeweeException: | |
log.error("In insert_triplet:", exc_info=True) | |
txn.rollback() | |
# log.error("In insert_triplet:", getattr(e, 'message', repr(e))) | |
# TODO add log messages to deletion, creatino and updating | |
def index_triplet(triplet: Triplet): | |
with db.atomic() as txn: | |
try: | |
Head = Name.alias("head_obj") | |
Tail = Name.alias("tail_obj") | |
triplets = ( | |
Triplet.select( | |
Triplet, Head.name.alias("head_name"), Tail.name.alias("tail_name") | |
) | |
.join(Head, on=(Triplet.head == Head.id), attr="head_id") | |
.join(Tail, on=(Triplet.tail == Tail.id), attr="tail_id") | |
.where(Triplet.id == triplet.id) | |
.alias("query") | |
) | |
to_index = triplets.first() | |
TripletIndex.insert( | |
{ | |
TripletIndex.rowid: to_index.id, | |
TripletIndex.head: to_index.head.name, | |
TripletIndex.relation: to_index.relation, | |
TripletIndex.tail: to_index.tail.name, | |
} | |
).execute() | |
except pw.PeeweeException: | |
log.error("In index_triplet:", exc_info=True) | |
def get_triplets_with_names(triplets=None): | |
with db.atomic() as txn: | |
try: | |
Head = Name.alias("head_obj") | |
Tail = Name.alias("tail_obj") | |
if triplets: | |
named_triplets = ( | |
Triplet.select( | |
Triplet, | |
Head.name.alias("head_name"), | |
Tail.name.alias("tail_name"), | |
Head.id_link.alias("head_id_link"), | |
Tail.id_link.alias("tail_id_link"), | |
) | |
.join(Head, on=(Triplet.head == Head.id), attr="head_id") | |
.join(Tail, on=(Triplet.tail == Tail.id), attr="tail_id") | |
.where(Triplet.id.in_(triplets)) | |
) | |
else: | |
named_triplets = ( | |
Triplet.select( | |
Triplet, | |
Head.name.alias("head_name"), | |
Tail.name.alias("tail_name"), | |
Head.id_link.alias("head_id_link"), | |
Tail.id_link.alias("tail_id_link"), | |
) | |
.join(Head, on=(Triplet.head == Head.id), attr="head_id") | |
.join(Tail, on=(Triplet.tail == Tail.id), attr="tail_id") | |
.limit(40) | |
) | |
return named_triplets | |
except pw.PeeweeException: | |
log.error("In get_triplets_with_names:", exc_info=True) | |
def search(query: str): | |
try: | |
# queried = TripletIndex.search(query, with_score=True) | |
# queried = (TripletIndex | |
# .select(TripletIndex, TripletIndex.bm25().alias('score')) | |
# .where(TripletIndex.match(query)).order_by(TripletIndex.bm25())) | |
# log.debug(queried) | |
results = ( | |
Triplet.select() | |
.join(TripletIndex, on=(Triplet.id == TripletIndex.rowid)) | |
.where(TripletIndex.search(query, with_score=True)) | |
) | |
# queried.join() | |
return results.dicts() | |
except pw.PeeweeException: | |
log.error("In search:", exc_info=True) | |
if __name__ == "__main__": | |
db.create_tables([Name, Triplet, Relation, Rubbish, TripletIndex]) | |
names = [ | |
"epithelium of mammary gland", | |
"adrenal cortex", | |
"testis", | |
"large intestine", | |
"TJP1 tight junction protein 1", | |
] | |
id_links = [ | |
"https://identifiers.org/UBERON:0003244", | |
"https://identifiers.org/UBERON:0001235", | |
"https://identifiers.org/UBERON:0000473", | |
"https://identifiers.org/UBERON:0000059", | |
"https://identifiers.org/NCBIGENE:7082", | |
] | |
for name, id_link in zip(names, id_links): | |
insert_name(id_link, name) | |
insert_triplet( | |
"https://identifiers.org/NCBIGENE:7082", | |
"GENE_EXPRESSED_ANATOMY", | |
"https://identifiers.org/UBERON:0003244", | |
) | |
results = search("cortex") | |
for result in results: | |
print(result) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment