Skip to content

Instantly share code, notes, and snippets.

@jpr71
Last active November 24, 2015 19:59
Show Gist options
  • Save jpr71/71463a989ca4415265fc to your computer and use it in GitHub Desktop.
Save jpr71/71463a989ca4415265fc to your computer and use it in GitHub Desktop.
import gzip
import os.path
from peewee import *
from subprocess import check_output
from collections import OrderedDict
import re
build = "WS245"
gff_url = "ftp://ftp.wormbase.org/pub/wormbase/releases/{build}/species/c_elegans/PRJNA13758/c_elegans.PRJNA13758.{build}.annotations.gff3.gz".format(build = build)
gff = "c_elegans.{build}.gff3.gz".format(build = build)
if not os.path.exists(gff):
comm = "curl {gff_url} > {gff}".format(**locals())
print(check_output(comm, shell = True))
os.remove("worms.db")
db = SqliteDatabase('worms.db')
db.connect()
class BaseModel(Model):
class Meta:
database = db
class Region(BaseModel):
chrom= CharField()
type_of = CharField(null = True)
start = IntegerField()
end = IntegerField()
strand = CharField()
reading_frame = CharField()
class Meta:
order_by = ('chrom',)
class Id_Set(BaseModel):
region = ForeignKeyField(Region, related_name="ID", null=True )
id_key = CharField(null=True)
id_value = CharField(null=True)
class Meta:
database = db
db.create_tables([Region,Id_Set])
def boolify(s):
if s == 'True':
return True
if s == 'False':
return False
raise ValueError("huh?")
def autoconvert(s):
for fn in (boolify, int, float):
try:
return fn(s)
except ValueError:
pass
return s
correct_ids = set(["locus", "Name", "ID", "sequence_name"])
with gzip.open(gff, 'rb') as f:
c = 0
while True:
c += 1
if c > 2000000:
break
line = f.next().strip().split("\t")
if line[0].startswith("#"):
continue
header = ["chrom", "source", "type_of", "start", "end", "score", "strand", "reading_frame", "attributes"]
if line[1] == "WormBase":
#Put in bulk, every 1000th iteration
record = OrderedDict(zip(header, map(autoconvert, line)))
record = {k:v for k,v in record.items() if k not in ["source", "score", "strand", "reading_frame"]}
attributes = {}
region_save = {k:v for k,v in record.items() if k != "attributes"}
region_id = Region.create(**region_save)
region_id.save()
for ID, VAL in re.findall("([^=;]+)=([^; ]+)", record["attributes"]):
if ID in correct_ids:
if VAL.find(":") > 0:
VAL = VAL.split(":")[1]
attributes["id_key"]= ID
attributes["id_value"] = VAL
attributes["region"] = region_id.get_id()
#print attributes
id_save = {k:v for k,v in attributes.items()}
id_set = Id_Set.create(**id_save)
id_set.save()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment