Skip to content

Instantly share code, notes, and snippets.

@danking
Last active August 7, 2018 18:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save danking/863b66cbe0e7a650701c7550795161fa to your computer and use it in GitHub Desktop.
Save danking/863b66cbe0e7a650701c7550795161fa to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"Running on Apache Spark version 2.2.1\n",
"SparkUI available at http://10.240.0.21:4041\n",
"Welcome to\n",
" __ __ <>__\n",
" / /_/ /__ __/ /\n",
" / __ / _ `/ / /\n",
" /_/ /_/\\_,_/_/_/ version devel-30bc2bcdf2ba\n",
"NOTE: This is a beta version. Interfaces may change\n",
" during the beta period. We recommend pulling\n",
" the latest changes weekly.\n"
]
}
],
"source": [
"import hail as hl\n",
"import pickle\n",
"import time\n",
"\n",
"phenotype = 'skin_color'\n",
"\n",
"sumstats_text_file = 'gs://armartin/pigmentation/pheno_31063_eur_gwas_skin_color.clumped2.gz'\n",
"\n",
"generate_prs_loci_table = True\n",
"prs_loci_table_location = 'gs://danking/alicia/ukb-'+phenotype+'-prs-loci-locus-allele-keyed.kt'\n",
"generate_contig_row_dict = True\n",
"contig_row_dict_location = 'gs://danking/alicia/contig_row_dict'+phenotype\n",
"\n",
"output_location = 'gs://danking/caitlin/UKB_'+phenotype+'_PRS.txt'\n",
"\n",
"contigs = {'0{}'.format(x):str(x) for x in range(1, 10)}\n",
"\n",
"bgen_files = 'gs://fc-7d5088b4-7673-45b5-95c2-17ae00a04183/imputed/ukb_imp_chr{1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22}_v3.bgen'\n",
"\n",
"start = time.time()\n",
"# large block size because we read very little data (due to filtering & ignoring genotypes)\n",
"hl.init(branching_factor=10, min_block_size=3000)\n",
"\n",
"def flip_text(base):\n",
" \"\"\"\n",
" :param StringExpression base: Expression of a single base\n",
" :return: StringExpression of flipped base\n",
" :rtype: StringExpression\n",
" \"\"\"\n",
" return (hl.switch(base)\n",
" .when('A', 'T')\n",
" .when('T', 'A')\n",
" .when('C', 'G')\n",
" .when('G', 'C')\n",
" .default(base))\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2018-08-07 18:02:12 Hail: INFO: Reading table to impute column types\n",
"2018-08-07 18:02:23 Hail: INFO: Finished type imputation\n",
" Loading column '' as type 'str' (imputed)\n",
" Loading column 'CHR' as type 'int32' (imputed)\n",
" Loading column 'F' as type 'int32' (imputed)\n",
" Loading column 'SNP' as type 'str' (imputed)\n",
" Loading column 'BP' as type 'int32' (imputed)\n",
" Loading column 'P' as type 'float64' (imputed)\n",
" Loading column 'TOTAL' as type 'int32' (imputed)\n",
" Loading column 'NSIG' as type 'int32' (imputed)\n",
" Loading column 'S05' as type 'int32' (imputed)\n",
" Loading column 'S01' as type 'int32' (imputed)\n",
" Loading column 'S001' as type 'int32' (imputed)\n",
" Loading column 'S0001' as type 'int32' (imputed)\n",
" Loading column 'SP2' as type 'str' (imputed)\n",
"2018-08-07 18:02:35 Hail: INFO: Ordering unsorted dataset with network shuffle\n",
"2018-08-07 18:03:05 Hail: INFO: wrote 354495 items in 1 partition\n"
]
}
],
"source": [
"################################################################################\n",
"### set up the sumstats table\n",
"if (generate_prs_loci_table):\n",
" t = hl.import_table(sumstats_text_file, \n",
" delimiter='\\s+',\n",
" impute=True)\n",
" t = t.select(locus = hl.locus(hl.str(t.CHR), t.BP))\n",
" t = t.key_by('locus')\n",
" t.write(prs_loci_table_location, overwrite=True)\n",
"\n",
"ss = hl.read_table(prs_loci_table_location)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"################################################################################\n",
"### determine the file locations of the prs variants\n",
"if (generate_contig_row_dict):\n",
" mt = hl.methods.import_bgen(bgen_files,\n",
" [],\n",
" contig_recoding=contigs,\n",
" _row_fields=['file_row_idx'])\n",
" prs_rows = mt.filter_rows(hl.is_defined(ss[mt.locus])).rows()\n",
" print('about to collect')\n",
" # remove all unnecessary data, dropping keys and other irrelevant fields\n",
" prs_rows = prs_rows.key_by()\n",
" prs_rows = prs_rows.select(contig=prs_rows.locus.contig,\n",
" file_row_index=prs_rows.file_row_idx)\n",
" contig_row_list = prs_rows.collect()\n",
" print('finished collecting')\n",
" contig_reformed = [(x['contig'], x['file_row_idx']) for x in contig_row_list]\n",
" print('reformed')\n",
" from collections import defaultdict\n",
" contig_row_dict = defaultdict(list)\n",
" for k, v in contig_reformed:\n",
" contig_row_dict[k].append(v)\n",
" print('dictionary created')\n",
"\n",
" with hl.hadoop_open(contig_row_dict_location, 'wb') as f:\n",
" pickle.dump(contig_row_dict, f)\n",
"else:\n",
" with hl.hadoop_open(contig_row_dict_location, 'rb') as f:\n",
" contig_row_dict = pickle.load(f)\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ss = hl.import_table('gs://armartin/pigmentation/pheno_31063_eur_gwas_skin_color.txt.gz',\n",
" delimiter='\\s+',\n",
" impute=True)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"################################################################################\n",
"### Run the PRS\n",
"contig_row_dict2 = {'gs://fc-7d5088b4-7673-45b5-95c2-17ae00a04183/imputed/ukb_imp_chr{contig}_v3.bgen'.format(contig=k): v for k, v in contig_row_dict.items()}\n",
"mt = hl.methods.import_bgen(bgen_files,\n",
" ['dosage'],\n",
" sample_file='gs://phenotype_31063/ukb31063.autosomes.sample',\n",
" contig_recoding=contigs,\n",
" _variants_per_file=contig_row_dict2,\n",
" _row_fields=[])\n",
"\n",
"sampleids = hl.import_table('gs://ukb31063-mega-gwas/hail-0.1/qc/ukb31063.gwas_samples.txt', delimiter='\\s+').key_by('s')\n",
"\n",
"mt = mt.filter_cols(hl.is_defined(sampleids[mt.s]))\n",
"mt = mt.annotate_rows(ss=ss[mt.locus])\n",
"\n",
"mt = mt.annotate_rows(\n",
" beta = (hl.case()\n",
" .when(((mt.alleles[0] == mt.ss.nt1) &\n",
" (mt.alleles[1] == mt.ss.nt2)) |\n",
" ((flip_text(mt.alleles[0]) == mt.ss.nt1) &\n",
" (flip_text(mt.alleles[1]) == mt.ss.nt2)),\n",
" (-1*mt.ss.ldpred_inf_beta))\n",
" .when(((mt.alleles[0] == mt.ss.nt2) &\n",
" (mt.alleles[1] == mt.ss.nt1)) |\n",
" ((flip_text(mt.alleles[0]) == mt.ss.nt2) &\n",
" (flip_text(mt.alleles[1]) == mt.ss.nt1)),\n",
" mt.ss.ldpred_inf_beta)\n",
" .or_missing()))\n",
"\n",
"mt = mt.filter_rows(hl.is_defined(mt.beta))\n",
"\n",
"mt = mt.annotate_cols(prs=hl.agg.sum(mt.beta * mt.dosage))\n",
"\n",
"mt.cols().export(output_location)\n",
"end = time.time()\n",
"print(\"Success! Job was completed in %s\" % time.strftime(\"%H:%M:%S\", time.gmtime(end - start)))\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Hail",
"language": "python",
"name": "hail"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.4"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment