Skip to content

Instantly share code, notes, and snippets.

Created January 4, 2018 06:36
Show Gist options
  • Save hliang/f0029acb31a8e8632c0f04545faaf75b to your computer and use it in GitHub Desktop.
Save hliang/f0029acb31a8e8632c0f04545faaf75b to your computer and use it in GitHub Desktop.
#! /usr/bin/env python
retrieve pictures from the UCSC Genome Browser based on coordinates
specified from BED3+ file and a session ID (hgsid).
UCSC Genome Browser should be set up with tracks prior to using the
utility and saved as a session. This includes all track settings, sizes
etc. Once these are setup, load the page source and identify the hgsid by
searching for "hgsid="
Supply the hgsid and a BED file. Images will be retrieved based on the
coordinates in the BED file and saved to the directory as::
.. note::
beware of multiple procs accessessing the same hgsid at
the same time, they will affect each other's strand settings
.. warning::
if you have many regions, you will need to run this overnight (e.g.
via a job submitted to the LSF night queue), otherwise UCSC will
throttle your connection (limit 1 request / 15 sec, 5000 requests per
import sys
import re
import time
import ucscsession
from pybedtools import BedTool
from cyvcf2 import VCF
import os
from path import Path as path
__author__ = 'Jay Hesselberth'
__contact__ = ''
__version__ = '0.1.8'
def ucsc_snapshots(bedfilename, session_id, rev_display, dir_annots,
upstream, downstream, img_types, no_delay, verbose):
ucscsession.settings.hgsid = session_id
session = Session(rev_display, no_delay, verbose)
# for each BED field, set params on the UCSC browser and fetch the
# images
for region in BedTool(bedfilename):
if region.file_type == "vcf":
position = '%s:%s-%s' % (region.chrom, region.start - upstream - 1, region.start + downstream)
highlightregions = [[session.cart['db'], region.chrom, region.start, region.start, "#aaedff"]]
# TODO highlight indel region instead of just the start site
elif region.file_type == "bed":
position = '%s:%s-%s' % (region.chrom, region.start, region.end)
raise OSError, ">> target region type %s not supported. BED3+ or VCF are supported" % region.file_type
if verbose:
print >>sys.stderr, ">> set position %s" % position
#print "session.cart=", session.cart
for imgtype in img_types:
filename = getfilename(session_id,
position=position if region.file_type == "bed" else '%s:%s' % (region.chrom, region.start), if False else ".",
score=region.score if region.file_type == "bed" else ".",
annots=dir_annots, ext=imgtype)
session.write_image(imgtype=imgtype, filename=filename,
def ucsc_snapshots_vcf(vcffilename, session_id, rev_display, dir_annots,
upstream, downstream, img_types, no_delay, verbose):
ucscsession.settings.hgsid = session_id
session = Session(rev_display, no_delay, verbose)
# for each entry in vcf, set params on the UCSC browser and fetch the
# images
if not os.path.isfile(vcffilename):
print >>sys.stderr, ">> WARNING vcf file %s not found" % vcffilename
for variant in VCF(vcffilename):
position = '%s:%s-%s' % (variant.CHROM, variant.start - upstream, variant.start + downstream)
highlightregions = [[session.cart['db'], variant.CHROM, variant.start + 1, variant.end, "#aaedff"]] # highlight indel region instead of just the start site
if verbose:
print >>sys.stderr, ">> set position %s" % position
#print "session.cart=", session.cart
print "INFO updated session.tracks_url=", session.tracks_url
print "INFO session.cart position=", session.cart["position"]
print "INFO session.cart highligh=", session.cart["highlight"]
for imgtype in img_types:
filename = getfilename(session_id,
position='%s:%s' % (variant.CHROM, variant.start + 1),
annots=dir_annots, ext=imgtype)
session.write_image(imgtype=imgtype, filename=filename,
class Session(ucscsession.session._UCSCSession):
def __init__(self, rev_display, no_delay, verbose):
self.verbose = verbose
# keep track of the flipped state in the session. if flipped is
# True, will show neg strand genes 5'->3'
self.rev_display = rev_display
flipped = self._find_flipped_state()
self.flipped = flipped
# XXX no_delay for debugging only
if no_delay:
self._SLEEP = 1
# required minimum time between requests per UCSC
#self._SLEEP = 15
self._SLEEP = 3
def write_image(self, imgtype, filename, strand):
wrapper for image generation. writes requested content to
only PDF and PNG are currently supported.
imgtype (str): pdf or png
filename (str): name of file to write
strand (str): optional strand argument
# XXX could add more image capability by converting retrieved png
# files to jpeg, tif etc.
if imgtype.lower() == 'pdf':
content = self.pdf(strand)
elif imgtype.lower() == 'png':
content = self.png(strand)
raise OSError, ">> image type %s not supported" % imgtype
with open(filename, 'w') as fileout:
if self.verbose:
print >>sys.stderr, ">> wrote %s" % filename
def pdf(self, strand):
retrieve PDF of the current browser view for currently set
strand (str)
pdf content (str)
payload = {'hgt.psOutput':'on'}
# deal with display flipping
payload = self._flip_display(strand, payload)
response =, data=payload)
link = ucscsession.helpers.pdf_link(response)
content = self.session.get(link).content
return content
def png(self, strand):
retrieve merged PNG of the current browser view for currently set
strand (str)
png content (str)
# these CGI settings ensure the browser returns a single, merged
# PNG for the viewport
payload = {'hgt.imageV1':1,
# deal with display flipping
payload = self._flip_display(strand, payload)
# example: <IMG SRC='../trash/hgt/hgt_genome_6243_68cdb0.png'
#img_regex = re.compile('<IMG SRC = \"\.\.\/(trash\/hgt\/hgt_.*\.png)')
img_regex = re.compile('<IMG SRC\s*=\s*.*\.\.\/(trash.*png)')
response =, data=payload)
#print type(self.session)
#print img_regex.findall(response.text)[0]
imgfilename = img_regex.findall(response.text)[0]
link = self._mirror + "/" + imgfilename
content = self.session.get(link).content
return content
def _flip_display(self, strand, payload):
flip display for such that all genes (pos and neg strands) are
shown 5'->3'
adds hgt.toggleRevCmplDisp to CGI payload if required
beware of multiple procs accessessing the same hgsid at the
same time, they will affect each other's strand settings
strand (str)
payload (dict): updated payload with toggle key
payload (dict)
# the CGI var is a true toggle so must be flipped back to e.g.
# pos strand genes after a neg strand gene is retrieved and vice
# versa
if strand == '-' and self.rev_display and not self.flipped:
payload['hgt.toggleRevCmplDisp'] = '1'
self.flipped = True
elif strand == '+' and self.rev_display and self.flipped:
payload['hgt.toggleRevCmplDisp'] = '1'
self.flipped = False
return payload
def _find_flipped_state(self):
Determine whether the display is currently flipped.
bool: Whether state is flipped
cart_info = self.cart_info()
db = cart_info['db']
comp_key = 'hgt.revCmplDisp_%s' % db
state = cart_info[comp_key]
except KeyError:
msg = ">> forcing display to + strand ... "
self._flip_display(strand = '+', payload = {})
state = 0
# possible vals are 0 and 1
if int(state) == 0:
return False
return True
def getfilename(session_id, position, name, score, annots, ext='pdf'):
Generate a filename for the image output. creates directory if it does
not exist.
position (str): e.g. chr1:1-100)
name (str): gene name
score (str) score
annots (str): annots
filename (str): <name>-<score>-<pos>.pdf
#imgdir = 'ucsc-snapshots-hgsid-%s' % session_id
imgdir = 'ucsc-snapshots'
if annots:
for key, val in annots:
imgdir += '-%s-%s' % (key, val)
imgdir = path(imgdir)
if not imgdir.exists():
# chr:1-2 -> chr-1-2
pos = position.replace(':','-')
fname = pos
if score and score != '.':
fname = '%s-%s' % (score, fname)
if name and name != '.':
fname = '%s-%s' % (name, fname)
imgfilename = path('%s.%s' % (fname, ext.lower()))
imgfilepath = imgdir.joinpath(imgfilename)
return imgfilepath
def parse_options(args):
from optparse import OptionParser
#usage = "%prog [OPTION]... BED3+ UCSC-hgsid"
usage = "%prog <--sessionid XXXX> <--bedfile YYYY.bed | --vcffile ZZZZ.vcf> [OPTION]..."
version = "%%prog %s" % __version__
description = ("retrieve images from UCSC based on BED regions and "
"UCSC session ID ('hgsid')")
parser = OptionParser(usage=usage, version=version,
parser.add_option("--sessionid", action="store", dest="sessionid",
default=None, help="session id, UCSC-hgsid")
parser.add_option("--bedfile", action="store", dest="bedfile",
default=None, help="BED3+ file.")
parser.add_option("--vcffile", action="store", dest="vcffile",
default=None, help="vcf file.")
parser.add_option("--upstream", action="store_const", dest="upstream",
default=25, help="upstream bp to show. "
"(default: %default)")
parser.add_option("--downstream", action="store_const", dest="downstream",
default=25, help="downstream bp to show. "
"(default: %default)")
parser.add_option("--img-types", action="append",
default=[], help="image types to write. "
"(default: PNG, PDF)")
parser.add_option("--dir-annotations", action="append",
default=[], help="additional key=value pairs for annotating "
"output directory. (default: None)")
parser.add_option("--no-delay", action="store_true",
default=False, help="no delay between requests. for debugging "
"only. "
"(default: %default)")
parser.add_option("--reverse-display", action="store_true",
default=False, help="reverse display for neg strand genes "
"(default: %default)")
parser.add_option("-v", "--verbose", action="store_true",
default=False, help="verbose output (default: %default)")
options, args = parser.parse_args(args)
# if len(args) < 2:
# parser.error("specify BED filename and UCSC session ID")
if options.sessionid is None:
parser.error("specify one UCSC session ID")
if options.bedfile is None and options.vcffile is None:
parser.error("specify one BED filename or one VCF filename")
if len(options.img_types) == 0:
return options, args
def main(args=sys.argv[1:]):
options, args = parse_options(args)
# bedfilename = args[0]
# sessionid = args[1]
bedfilename = options.bedfile
vcffilename = options.vcffile
sessionid = options.sessionid
if options.dir_annotations:
annots = [(i.split('=')) for i in options.dir_annotations]
annots = []
kwargs = {'verbose':options.verbose,
if bedfilename is not None:
return ucsc_snapshots(bedfilename, sessionid, **kwargs)
return ucsc_snapshots_vcf(vcffilename, sessionid, **kwargs)
if __name__ == '__main__':
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment