Created
July 10, 2015 16:56
-
-
Save ihodes/4ddc302a76b66a286014 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
"""Module with methods for ingesting VCFs into Impala tables. | |
Need to have an open tunnel (e.g. ssh demeter -L 21050:demeter-csmau08-10:21050) | |
with ControlMaster enabled. | |
""" | |
import base64 | |
import ibis | |
import subprocess | |
class SSHConnection(object): | |
"""Class which manages execution of commands over an existing | |
SSH tunnel using ControlMaster. | |
Calls out to ssh on the system. | |
""" | |
def __init__(self, hostname): | |
self.hostname = hostname | |
self.hdfs_client = HDFSClient(self) | |
def exec_command(self, command, **kwargs): | |
return subprocess.check_output( | |
['ssh', self.hostname, command], **kwargs) | |
class HDFSClient(object): | |
"""Clas which executes HDFS commands via an SSHConnection (over ssh). | |
""" | |
def __init__(self, ssh_connection): | |
self.ssh_connection = ssh_connection | |
def copy(self, src, dest): | |
command = 'hdfs dfs -cp {src} {dest}'.format(src=src, dest=dest) | |
return self.ssh_connection.exec_command(command) | |
def copy_from_local(self, src, dest): | |
command = 'hdfs dfs -copyFromLocal {src} {dest}'.format( | |
src=src, dest=dest) | |
return self.ssh_connection.exec_command(command) | |
def mkdir(self, path): | |
command = 'hdfs dfs -mkdir {path}'.format(path=path) | |
return self.ssh_connection.exec_command(command) | |
def create_ibis_client(impala_host, impala_port, webhdfs_host, webhdfs_port): | |
"""Return an ibis client connected to Impala and HDFS (via WebHDFS). | |
""" | |
ic = ibis.impala_connect(host='localhost', port=21050) | |
hdfs = ibis.hdfs_connect(host='demeter.hpc.mssm.edu', port=14000) | |
return ibis.make_client(ic, hdfs_client=hdfs) | |
def ingest_vcf(ssh_connection, ibis_client, vcf_path): | |
"""Copy VCF from local file system or HDFS to the ingest folder | |
(/datasets/vcfs/ingest/{base64partition}/) and add it to the ingest | |
table. | |
Args: | |
ssh_connection: An existing SSHConnection. | |
ibis_client: An existing IbisClient. | |
vcf_path: If the path starts with file://, it's copied from NFS. If it | |
starts hdfs:// or no scheme it is copied from HDFS. The path | |
should be absolute. | |
""" | |
on_nfs = False # assume it's on HDFS already | |
if vcf_path.startswith('file://'): | |
vcf_path = vcf_path[7:] | |
on_nfs = True | |
elif vcf_path.startswith('hdfs://'): | |
vcf_path = vcf_path[7:] | |
partition = base64.b64encode(vcf_path) | |
path = '/datasets/vcfs/ingest/{}/'.format(partition) | |
ssh_connection.hdfs_client.mkdir_hdfs(path) | |
if on_nfs: | |
ssh_connection.hdfs_client.copy_from_local(vcf_path, path) | |
else: | |
ssh_connection.hdfs_client.copy(vcf_path, path) | |
add_part_q = 'alter table _vcfs.ingest add partition (base64path={})' | |
ibis_client.con.execute(add_part_q.format(partition) | |
ibis_client.con.execute('refresh _vcfs.ingest') | |
def process_ingested_vcf(client, vcfpath): | |
"""Transforms and copies the VCF records from a | |
VCF already ingested with `ingest_vcf`. | |
The schema this results in is: | |
``` | |
chrom STRING | |
pos BIGINT | |
id STRING | |
ref STRING | |
alt STRING | |
qual STRING | |
filter STRING | |
info STRING | |
format_samples STRING -- (this is the format | |
and sample columns | |
concatenated into | |
one string) | |
``` | |
""" | |
base64path = base64.b64encode(vcfpath) | |
vcf_line_regexp = ('^([^\\t]*)\\t([^\\t]*)\\t([^\\t]*)\\t([^\\t]*)\\t([^\\t]' | |
'*)\\t([^\\t]*)\\t([^\\t]*)\\t([^\\t]*)\\t?(.*)') | |
etl_query = """insert into _vcfs.processed | |
partition (base64path="{base64path}") | |
select | |
regexp_extract(line, '{regexp}', 1) `chrom`, | |
cast(regexp_extract(line, '{regexp}', 2) as bigint) `pos`, | |
regexp_extract(line, '{regexp}', 3) `id`, | |
regexp_extract(line, '{regexp}', 4) `ref`, | |
regexp_extract(line, '{regexp}', 5) `alt`, | |
regexp_extract(line, '{regexp}', 6) `qual`, | |
regexp_extract(line, '{regexp}', 7) `filter`, | |
regexp_extract(line, '{regexp}', 8) `info`, | |
regexp_extract(line, '{regexp}', 9) `format_samples` | |
from _vcfs.ingest | |
where base64path="{base64path}" | |
and line not rlike '^#.*' | |
""".format(regexp=vcf_line_regexp, base64path=base64path) | |
client.con.execute(etl_query) | |
def get_vcfs_table(client): | |
"""Return the processed VCF ibis table. | |
Args: | |
client: An IbisClient.""" | |
return client.table('_vcfs.processed') | |
def vcf_to_ibis(vcf_path, ibis_client, ssh_connection): | |
"""Return an ibis Table with the records in the VCF and vcf_path. | |
""" | |
ingest_vcf(con, vcf_path) | |
process_ingested_vcf(client, vfc_path) | |
vcfs = get_vcfs_table(client) | |
return vcfs.filter(vcfs.base64path == base64.b64encode(vcfpath)) | |
def demeter_ibis_client(): | |
return create_ibis_client('localhost', 21050, 'demeter.hpc.mssm.edu', 14000) | |
def demeter_ssh_connection(named_connection='demeter'): | |
return SSHConnection(named_connection) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment