Skip to content

Instantly share code, notes, and snippets.

@yong27
Last active May 19, 2020 18:19
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yong27/5873098 to your computer and use it in GitHub Desktop.
Save yong27/5873098 to your computer and use it in GitHub Desktop.
GEO file downloader from query search result CSV
# -*- coding:utf-8 -*-
"""
GEO file downloader
Requirements
* beautifulsoup4 (pip install lxml beautifulsoup4)
* wget
Usage
1. get CSV file in the GEO browse page.
http://www.ncbi.nlm.nih.gov/geo/browse/?view=series&tax=9606 url is for
Human series. In the web page, click Export and download CSV files.
2. python geo_downloader.py < above.csv
This command make directory "series/geo_accession" and get the geo
series matrix file and rawfile from the server, and
"platform/geo_accession" file with that platform probe information.
"""
import os
import csv
import time
import urllib2
from ftplib import FTP
import subprocess
from bs4 import BeautifulSoup
NETWORK_WAIT_SEC = 30
def urlopen(url, wait_sec=NETWORK_WAIT_SEC):
result = None
while not result:
try:
result = urllib2.urlopen(url)
except Exception, e:
print 'url %s has problems, try again after %s sec...' % (
url, wait_sec)
print str(e)
time.sleep(wait_sec)
return result
class GeoParser(object):
geo_url = 'http://www.ncbi.nlm.nih.gov'
geo_query_url = geo_url + '/geo/query/'
def __init__(self, geo_id):
self.geo_id = geo_id
self.url = '%sacc.cgi?acc=%s' % (self.geo_query_url, geo_id)
html = urlopen(self.url)
self.soup = BeautifulSoup(html, 'lxml')
def parse_main_table(self):
table = self.soup.find('table', cellpadding='2', cellspacing='0',
width='600')
tr = table.tr.next_sibling.next_sibling
data = {}
while True:
try:
key = tr.td.getText()
value = tr.td.next_sibling.next_sibling.getText()
data[key] = value
tr = tr.next_sibling.next_sibling
except AttributeError:
break
return data
class SampleParser(GeoParser):
def __init__(self, sample_id, series_id, platform_id):
super(SampleParser, self).__init__(sample_id)
self.sample_id = sample_id
self.series_id = series_id
self.platform_id = platform_id
def get_sample_protocol_data(self):
d = self.parse_main_table()
sample_data = {
'sample_nm': self.sample_id,
'title': d.get('Title'),
'source_nm': d.get('Source name'),
'organism': d.get('Organism'),
'characteristics_tag': d.get('Characteristics'),
'biomaterial_provider': d.get('Biomaterial provider'),
'molecule': d.get('Extracted molecule'),
'label': d.get('Label'),
'description': d.get('Description'),
}
protocol_data = {
'growth': d.get('Growth protocol'),
'treatment': d.get('Treatment protocol'),
'extract': d.get('Extraction protocol'),
'label': d.get('Label protocol'),
'hybridization': d.get('Hybridization protocol'),
'scan': d.get('Scan protocol'),
'data_processing': d.get('Data processing'),
'value_definition': d.get('Value definition'),
}
return sample_data, protocol_data
class SeriesParser(GeoParser):
def __init__(self, geo_id):
super(SeriesParser, self).__init__(geo_id)
self.series_id = geo_id
self.target_path = os.path.join('series', self.series_id)
def get_series_data(self):
d = self.parse_main_table()
series_data = {
'geo_series_id': self.series_id,
'title': d.get('Title'),
'summary': d.get('Summary'),
'overall_design': d.get('Overall design'),
'contributor': d.get('Contributor(s)'),
'url': self.url,
'type': d.get('Experiment type', ''),
'organism': d.get('Sample organism'),
}
print 'series data...', series_data
return series_data
def get_platform_id(self):
table = self.soup.find('table',
style='position:relative;top:-5px;left:-5px')
platform_id = table.tr.td.string
return platform_id
def get_sample_ids(self):
table = self.soup.find('table',
style='position:relative;top:-5px;left:-5px')
tr = table.parent.parent.next_sibling.next_sibling.tr
sample_ids = []
while tr:
sample_id = tr.td.string
sample_ids.append(sample_id)
tr = tr.next_sibling.next_sibling
script = table.parent.parent.next_sibling.next_sibling.script
tr = script.next_sibling.next_sibling.table.tr
while tr:
sample_id = tr.td.string
sample_ids.append(sample_id)
try:
tr = tr.next_sibling.next_sibling
except AttributeError:
break
return sample_ids
def get_matrix_url(self):
domain = 'ftp.ncbi.nlm.nih.gov'
path = 'pub/geo/DATA/SeriesMatrix/%s' % self.series_id
files = []
ftp = None
while not ftp:
try:
ftp = FTP(domain)
except:
print 'ftp connection has problem. retry...'
time.sleep(NETWORK_WAIT_SEC)
ftp.login()
ftp.set_pasv(True)
ftp.cwd(path)
ftp.dir(files.append)
filename = files[0].split()[-1]
return "ftp://%s/%s/%s" % (domain, path, filename)
def get_matrix_filename(self):
url = self.get_matrix_url()
print 'matrix_url...', url
p = subprocess.Popen(['wget', '-P', self.target_path, url],
stdout=subprocess.PIPE)
output = p.communicate()[0]
filename = url.split('/')[-1]
p = subprocess.Popen(['gunzip', '-f',
os.path.join(self.target_path, filename)],
stdout=subprocess.PIPE)
output = p.communicate()[0]
filename = filename.replace('.gz', '')
return os.path.join(self.target_path, filename)
def get_matrix_rawfiles(self):
rawfile_url = None
matrix = []
with open(self.get_matrix_filename()) as f:
for line in f:
if line.startswith('!Series_supplementary_file'):
rawfile_url = line.split()[-1].replace('"', '')
if not line.startswith('!'):
line = line.strip()
words = [w.replace('"', '') for w in line.split()]
if words:
matrix.append(words)
if rawfile_url:
self.download_rawfiles(rawfile_url)
return matrix
def download_rawfiles(self, rawfile_url):
p = subprocess.Popen(['wget', '-P', self.target_path, rawfile_url],
stdout=subprocess.PIPE)
output = p.communicate()[0]
print output
class PlatformParser(GeoParser):
def __init__(self, geo_id):
super(PlatformParser, self).__init__(geo_id)
self.platform_id = geo_id
def get_platform_data(self):
table = self.soup.find('table', cellpadding='2', cellspacing='0',
width='600')
tr = table.tr.next_sibling.next_sibling
status = tr.contents[2].contents[0]
tr = tr.next_sibling.next_sibling
title = tr.contents[2].contents[0]
tr = tr.next_sibling.next_sibling
technology_type = tr.contents[2].contents[0]
tr = tr.next_sibling.next_sibling
distribution = tr.contents[2].contents[0]
tr = tr.next_sibling.next_sibling
organism = tr.contents[2].contents[0].string
tr = tr.next_sibling.next_sibling
manufacturer = tr.contents[2].contents[0]
tr = tr.next_sibling.next_sibling
manufacture_protocol = tr.contents[2].contents[0]
tr = tr.next_sibling.next_sibling.next_sibling.next_sibling
description = '\n'.join(str(s) for s in tr.contents[2])
tr = tr.next_sibling.next_sibling.next_sibling.next_sibling
web_link = tr.contents[2].contents[0].string
platform_data = {
'geo_platform_id': self.platform_id,
'title': title,
'technology': technology_type,
'distribution': distribution,
'organism': organism,
'manufacturer': manufacturer,
'manufacture_protocol': manufacture_protocol,
'description': description,
'url': web_link,
}
print 'platform data...', platform_data
return platform_data
def get_probe_file(self):
input = self.soup.find('input', value='Download full table...')
probe_url = self.geo_url + input.attrs['onclick'
].split(',')[0].replace("OpenLink('", '')[:-1]
print 'probe_url...', probe_url
return urlopen(probe_url)
def write_probe_file(self):
try:
os.mkdir('platform')
except:
pass
with open(os.path.join('platform', self.platform_id), 'w') as outfile:
outfile.write(self.get_probe_file().read())
def main(csvfile):
csvfile.next()
platforms = []
for record in csv.reader(csvfile):
geo_id = record[0]
print 'get geo series %s...' % geo_id
sp = SeriesParser(geo_id)
sp.get_matrix_rawfiles()
platform_id = sp.get_platform_id()
if platform_id not in platforms:
pp = PlatformParser(platform_id)
pp.write_probe_file()
platforms.append(platform_id)
print ' %s done!' % geo_id
print '----------------------------'
if __name__ == '__main__':
import sys
main(sys.stdin)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment