Skip to content

Instantly share code, notes, and snippets.

@nvictus
Created August 26, 2016 21:02
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nvictus/d82cbac6775cfa67cca7f6bd082d9af9 to your computer and use it in GitHub Desktop.
Save nvictus/d82cbac6775cfa67cca7f6bd082d9af9 to your computer and use it in GitHub Desktop.
UCSC fetcher
# -*- coding: utf-8 -*-
"""
Request snapshots from the UCSC Genome Browser.
"""
from __future__ import division, print_function, unicode_literals
from collections import OrderedDict
import sys
import re
if sys.version_info[0] >= 3:
from io import BytesIO
else:
try:
from cStringIO import StringIO as BytesIO
except ImportError:
from StringIO import StringIO as BytesIO
from matplotlib.image import imread
import numpy as np
import requests
import bs4
# European mirror is http://genome-euro.ucsc.edu/
BASE_URL = "https://genome.ucsc.edu"
def _scrape_for_pdf(resp, ideogram=False):
# Scrape the pdf download page
soup = bs4.BeautifulSoup(resp.text, 'html.parser')
if ideogram:
name = 'hgtIdeo_genome'
else:
name = 'hgt_genome'
a_tags = soup.find_all('a')
for a in a_tags:
if name in a.attrs['href']:
pdf_url = BASE_URL + a.attrs['href'][2:]
break
else:
raise ValueError('Link to PDF not found')
r_pdf = requests.get(pdf_url)
return r_pdf.content
def _scrape_for_images(resp):
# Scrape the UCSC browser page
soup = bs4.BeautifulSoup(resp.text, 'html.parser')
# Image of the main panel
img_tag1 = soup.find(id="img_data_ruler")
r_data = requests.get(BASE_URL + img_tag1.attrs['src'][2:])
r_data.raise_for_status()
im_data = imread(BytesIO(r_data.content))
# Image of the side panel
img_tag2 = soup.find(id="img_side_ruler")
r_side = requests.get(BASE_URL + img_tag2.attrs['src'][2:])
r_side.raise_for_status()
im_side = imread(BytesIO(r_side.content))
# Get the width of the side panel
css = img_tag1.attrs['style']
margin_txt = css.split(';')[0].split(':')[1]
margin_px = int(next(re.finditer(r'\d+', margin_txt)).group())
return im_side[:, :margin_px, :], im_data[:, margin_px:, :]
defaults = {
'intronEst': 'hide',
'knownGene': 'hide',
'mrna': 'hide',
'pubs': 'hide',
'refGene': 'hide',
'snp142Common': 'hide',
'cons100way': 'hide',
'wgEncodeReg': 'hide',
'snp144Common': 'hide',
'rmsk': 'hide',
}
def _process_params(params, region, db, session_id):
if isinstance(region, tuple):
params['position'] = '{}:{}-{}'.format(*region)
else:
params['position'] = region
params['db'] = db
if session_id is not None:
params['hgsid'] = session_id
else:
# hide all default tracks not explicitly turned on
for k, v in defaults.items():
params.setdefault(k, v)
def fetch(region, db='hg19', session_id=None, params=None, http_session=None):
"""
Download snapshot from UCSC Genome Browser as RGB array.
Input
-----
region: str
genomic coordinate query string
db: str
name of genome assembly
session_id: str, optional
genome browser session ID (hgsid)
params: dict, optional
url parameters, e.g. names of tracks
Notes
-----
UCSC track display modes:
'hide', 'dense', 'squish', 'pack', 'full'
Returns
-------
Two RGB image arrays (M x N x 3)
im_side: image of the side panel, variable width
im_data: image of the main panel, spans the genomic interval exactly
Example
-------
>>> side, data = fetch('chrX:15,000,000-16,000,000', params={'knownGene': 'full'})
"""
if params is None:
params = {}
_process_params(params, region, db, session_id)
if http_session is not None:
r = http_session.get(BASE_URL + "/cgi-bin/hgTracks", params=params)
else:
r = requests.get(BASE_URL + "/cgi-bin/hgTracks", params=params)
r.raise_for_status()
return _scrape_for_images(r)
def fetch_pdf(filepath, region, db='hg19', session_id=None, params=None, http_session=None):
"""
Download snapshot from UCSC Genome Browser as PDF file.
"""
if params is None:
params = {}
_process_params(params, region, db, session_id)
params['hgt.psOutput'] = 'on'
if http_session is not None:
r = http_session.get(BASE_URL + "/cgi-bin/hgTracks", params=params)
else:
r = requests.get(BASE_URL + "/cgi-bin/hgTracks", params=params)
r.raise_for_status()
content = _scrape_for_pdf(r)
with open(filepath, 'wb') as f:
f.write(content)
def fetch_ideogram(region, db='hg19', session_id=None, params=None):
if params is None:
params = {}
_process_params(params, region, db, session_id)
params['hgt.psOutput'] = 'on'
r = requests.get(BASE_URL + "/cgi-bin/hgTracks", params=params)
r.raise_for_status()
content = _scrape_for_pdf(r, ideogram=True)
return content
class UCSCSession(object):
def __init__(self, db='hg19', params=None):
self._session = requests.Session()
self.params = OrderedDict({'db': db})
self.params.update(defaults)
if params is not None:
self.params.update(params)
self.refresh()
def _from_text(self, text):
params = OrderedDict()
lines = text.split('\n')
for line in lines:
key, _, value = line.partition(' ')
params[key] = value
return params
def _to_text(self, params):
return '\n'.join('{} {}'.format(k, v) for k,v in params.items())
def _push(self):
content = self._to_text(self.params).encode('utf-8')
r = self._session.post(
BASE_URL + "/cgi-bin/hgSession",
data={'hgS_doLoadLocal':'submit'},
files={'hgS_loadLocalFileName': ('hgSession.txt', content, 'text/plain')})
r.raise_for_status()
return r
def _pull(self):
r = self._session.get(BASE_URL + '/cgi-bin/hgSession',
data={'hgS_doSaveLocal': 'submit',
'hgS_saveLocalFileName': '',
'hgS_saveLocalFileCompress': 'plain text'})
r.raise_for_status()
self.params.update(self._from_text(r.text))
return r
def refresh(self):
self._push()
self._pull()
def __getitem__(self, key):
self._pull()
return self.params[key]
def __setitem__(self, key, value):
self.params[key] = value
self.refresh()
def __delitem__(self, key):
del self.params[key]
self.refresh()
def update(self, *args, **kwargs):
self.params.update(*args, **kwargs)
self.refresh()
def loads(self, text):
self.params = self._from_text(text)
def dumps(self):
return self._to_text(self.params)
def load(self, filepath):
with open(filepath, 'r') as f:
self.params = self._from_text(f.read())
self.refresh()
def save(filepath):
with open(filepath, 'w') as f:
filepath.write(self._to_text(self.params))
def fetch(self, region):
r = self._session.get(BASE_URL + "/cgi-bin/hgTracks", data={'position': region})
r.raise_for_status()
return _scrape_for_images(r)
if __name__ == '__main__':
import matplotlib.pyplot as plt
im0, im1 = fetch('chr16:52,800,000-56,000,000', params={'knownGene': 'full'})
plt.imshow(im1)
plt.show()
@Phlya
Copy link

Phlya commented Apr 6, 2017

I have to say, this is amazing. Thank you for sharing, Nezar!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment