Skip to content

Instantly share code, notes, and snippets.

@ihodes
Created July 10, 2015 16:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ihodes/4ddc302a76b66a286014 to your computer and use it in GitHub Desktop.
Save ihodes/4ddc302a76b66a286014 to your computer and use it in GitHub Desktop.
# coding: utf-8
"""Module with methods for ingesting VCFs into Impala tables.
Need to have an open tunnel (e.g. ssh demeter -L 21050:demeter-csmau08-10:21050)
with ControlMaster enabled.
"""
import base64
import ibis
import subprocess
class SSHConnection(object):
"""Class which manages execution of commands over an existing
SSH tunnel using ControlMaster.
Calls out to ssh on the system.
"""
def __init__(self, hostname):
self.hostname = hostname
self.hdfs_client = HDFSClient(self)
def exec_command(self, command, **kwargs):
return subprocess.check_output(
['ssh', self.hostname, command], **kwargs)
class HDFSClient(object):
"""Clas which executes HDFS commands via an SSHConnection (over ssh).
"""
def __init__(self, ssh_connection):
self.ssh_connection = ssh_connection
def copy(self, src, dest):
command = 'hdfs dfs -cp {src} {dest}'.format(src=src, dest=dest)
return self.ssh_connection.exec_command(command)
def copy_from_local(self, src, dest):
command = 'hdfs dfs -copyFromLocal {src} {dest}'.format(
src=src, dest=dest)
return self.ssh_connection.exec_command(command)
def mkdir(self, path):
command = 'hdfs dfs -mkdir {path}'.format(path=path)
return self.ssh_connection.exec_command(command)
def create_ibis_client(impala_host, impala_port, webhdfs_host, webhdfs_port):
"""Return an ibis client connected to Impala and HDFS (via WebHDFS).
"""
ic = ibis.impala_connect(host='localhost', port=21050)
hdfs = ibis.hdfs_connect(host='demeter.hpc.mssm.edu', port=14000)
return ibis.make_client(ic, hdfs_client=hdfs)
def ingest_vcf(ssh_connection, ibis_client, vcf_path):
"""Copy VCF from local file system or HDFS to the ingest folder
(/datasets/vcfs/ingest/{base64partition}/) and add it to the ingest
table.
Args:
ssh_connection: An existing SSHConnection.
ibis_client: An existing IbisClient.
vcf_path: If the path starts with file://, it's copied from NFS. If it
starts hdfs:// or no scheme it is copied from HDFS. The path
should be absolute.
"""
on_nfs = False # assume it's on HDFS already
if vcf_path.startswith('file://'):
vcf_path = vcf_path[7:]
on_nfs = True
elif vcf_path.startswith('hdfs://'):
vcf_path = vcf_path[7:]
partition = base64.b64encode(vcf_path)
path = '/datasets/vcfs/ingest/{}/'.format(partition)
ssh_connection.hdfs_client.mkdir_hdfs(path)
if on_nfs:
ssh_connection.hdfs_client.copy_from_local(vcf_path, path)
else:
ssh_connection.hdfs_client.copy(vcf_path, path)
add_part_q = 'alter table _vcfs.ingest add partition (base64path={})'
ibis_client.con.execute(add_part_q.format(partition)
ibis_client.con.execute('refresh _vcfs.ingest')
def process_ingested_vcf(client, vcfpath):
"""Transforms and copies the VCF records from a
VCF already ingested with `ingest_vcf`.
The schema this results in is:
```
chrom STRING
pos BIGINT
id STRING
ref STRING
alt STRING
qual STRING
filter STRING
info STRING
format_samples STRING -- (this is the format
and sample columns
concatenated into
one string)
```
"""
base64path = base64.b64encode(vcfpath)
vcf_line_regexp = ('^([^\\t]*)\\t([^\\t]*)\\t([^\\t]*)\\t([^\\t]*)\\t([^\\t]'
'*)\\t([^\\t]*)\\t([^\\t]*)\\t([^\\t]*)\\t?(.*)')
etl_query = """insert into _vcfs.processed
partition (base64path="{base64path}")
select
regexp_extract(line, '{regexp}', 1) `chrom`,
cast(regexp_extract(line, '{regexp}', 2) as bigint) `pos`,
regexp_extract(line, '{regexp}', 3) `id`,
regexp_extract(line, '{regexp}', 4) `ref`,
regexp_extract(line, '{regexp}', 5) `alt`,
regexp_extract(line, '{regexp}', 6) `qual`,
regexp_extract(line, '{regexp}', 7) `filter`,
regexp_extract(line, '{regexp}', 8) `info`,
regexp_extract(line, '{regexp}', 9) `format_samples`
from _vcfs.ingest
where base64path="{base64path}"
and line not rlike '^#.*'
""".format(regexp=vcf_line_regexp, base64path=base64path)
client.con.execute(etl_query)
def get_vcfs_table(client):
"""Return the processed VCF ibis table.
Args:
client: An IbisClient."""
return client.table('_vcfs.processed')
def vcf_to_ibis(vcf_path, ibis_client, ssh_connection):
"""Return an ibis Table with the records in the VCF and vcf_path.
"""
ingest_vcf(con, vcf_path)
process_ingested_vcf(client, vfc_path)
vcfs = get_vcfs_table(client)
return vcfs.filter(vcfs.base64path == base64.b64encode(vcfpath))
def demeter_ibis_client():
return create_ibis_client('localhost', 21050, 'demeter.hpc.mssm.edu', 14000)
def demeter_ssh_connection(named_connection='demeter'):
return SSHConnection(named_connection)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment