Skip to content

Instantly share code, notes, and snippets.

@lozybean
Last active July 5, 2016 09:30
Show Gist options
  • Save lozybean/2f9d20320c9c4eb1d3c8c98616236b80 to your computer and use it in GitHub Desktop.
Save lozybean/2f9d20320c9c4eb1d3c8c98616236b80 to your computer and use it in GitHub Desktop.
CIViC local based
#!/usr/bin/env python
# -*- coding: utf-8 -*- \#
"""
@author = 'liangzb'
@date = '2016/7/5 0005'
"""
import abc
from pymongo import MongoClient
from pymongo.errors import DuplicateKeyError
from CIViC_spider import (GeneListSpider, VariantsSpider,
VariantsGroupSpider, EvidenceItemsSpider,
VariantsDetailSpider)
client = MongoClient('x.x.x.x', 27017)
def admin_acquire(fn):
def wrapper(self, *args, **kwargs):
self.db.logout()
self.db.authenticate(name='admin', password='123')
fn(self, *args, **kwargs)
self.db.logout()
self.db.authenticate(name='user', password='123')
return wrapper
class Base(object):
def __init__(self):
self.db = client.CIViC
self.db.authenticate(name='user', password='123')
"""
must be declared in sub class
"""
self.collection = None
self.spider = None
@abc.abstractmethod
def create_index(self):
pass
@admin_acquire
def db_insert(self):
self.create_index()
records = self.spider.records
for record in records:
try:
self.collection.insert_one(record)
except DuplicateKeyError:
continue
class GeneList(Base):
def __init__(self):
super().__init__()
self.collection = self.db.gene_list
self.spider = GeneListSpider()
def create_index(self):
self.collection.create_index('id', unique=True)
self.collection.create_index('name', unique=True)
self.collection.create_index('entrez_id', unique=True)
class Variants(Base):
def __init__(self):
super().__init__()
self.collection = self.db.variants
self.spider = VariantsSpider()
def create_index(self):
self.collection.create_index('id', unique=True)
@property
def ids(self):
cursor = self.collection.find()
for item in cursor:
yield item['id']
class VariantsDetail(Base):
def __init__(self):
super().__init__()
self.collection = self.db.variants_detail
variants = Variants()
self.variants_ids = variants.ids
self.__exists_ids = None
def create_index(self):
self.collection.create_index('id', unique=True)
@property
def exists_ids(self):
if self.__exists_ids is None:
cursor = self.collection.find()
self.__exists_ids = [item['id'] for item in cursor]
return self.__exists_ids
@admin_acquire
def db_insert(self):
self.create_index()
for variant_id in self.variants_ids:
if variant_id in self.exists_ids:
continue
spider = VariantsDetailSpider(variant_id)
record = spider.record
if record is None:
continue
try:
self.collection.insert_one(record)
except DuplicateKeyError:
continue
class VariantsGroup(Base):
def __init__(self):
super().__init__()
self.collection = self.db.variants_group
self.spider = VariantsGroupSpider()
def create_index(self):
self.collection.create_index('id', unique=True)
class EvidenceItems(Base):
def __init__(self):
super().__init__()
self.collection = self.db.evidence_items
self.spider = EvidenceItemsSpider()
def create_index(self):
self.collection.create_index('id', unique=True)
#!/usr/bin/env python
# -*- coding: utf-8 -*- \#
"""
@author = 'liangzb'
@date = '2016/7/5 0005'
"""
import json
from http.client import IncompleteRead
from socket import timeout
from urllib import request
from urllib.error import URLError
def try_to_get_response(req):
try_count = 0
while 1:
if try_count >= 10:
return None
try:
with request.urlopen(req, timeout=10) as fp:
data = fp.read()
return data
except (URLError,
IncompleteRead,
ConnectionResetError,
timeout):
try_count += 1
print('error occurred, reloading {try_count} time: {req.full_url}... '
.format_map(vars()))
continue
def get_req(url):
headers = {
'Accept': '*/*',
'Accept - Encoding': 'gzip, deflate',
'Accept - Language': 'zh-CN,zh;q=0.8,en;q=0.6,zh-TW;q=0.4',
'cache-control': 'no-cache',
'Host': 'civic.genome.wustl.edu',
'Content-Type': 'application/x-www-form-urlencoded;'
'charset=utf-8',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_0) '
'AppleWebKit/601.1.56 (KHTML, like Gecko) '
'Version/9.0 Safari/601.1.56',
'X-requrested-With': 'XMLHttpRequested',
}
req = request.Request(url=url, headers=headers)
return req
class GeneListSpider(object):
def __init__(self):
self.start_url = 'https://civic.genome.wustl.edu/api/genes'
self.__total_count = None
self.__records = None
@property
def records(self):
if self.__records is None:
url = '{self.start_url}?count={self.total_count}'.format_map(vars())
result = try_to_get_response(get_req(url)).decode('utf-8')
result = json.loads(result)
self.__records = result['records']
return self.__records
@property
def total_count(self):
if self.__total_count is None:
result = try_to_get_response(get_req(self.start_url)).decode('utf-8')
result = json.loads(result)
self.__total_count = result['_meta']['total_count']
return self.__total_count
class VariantsSpider(GeneListSpider):
def __init__(self):
super().__init__()
self.start_url = 'https://civic.genome.wustl.edu/api/variants'
class VariantsDetailSpider(object):
def __init__(self, variant_id):
self.variant_id = variant_id
self.url = 'https://civic.genome.wustl.edu/api/variants/{variant_id}'.format_map(vars())
self.__record = None
@property
def record(self):
if self.__record is None:
result = try_to_get_response(get_req(self.url))
if result is None:
return None
result = result.decode('utf-8')
self.__record = json.loads(result)
return self.__record
class VariantsGroupSpider(GeneListSpider):
def __init__(self):
super().__init__()
self.start_url = 'https://civic.genome.wustl.edu/api/variant_groups'
class EvidenceItemsSpider(GeneListSpider):
def __init__(self):
super().__init__()
self.start_url = 'https://civic.genome.wustl.edu/api/evidence_items'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment