Linkage Disequilibrium Calculation

This is complete taken from Ryan D on biostar:

It takes a region in a format like "chr2:1234-3456". If an rs number is specified after the region, it will output the R^2 for every SNP in that region with the requested SNP; otherwise, it is all-vs-all.

It requires plink, vcftools, and tabix

Also requires toolshed for python which can be installed with:

sudo pip install toolshed


sudo easy_install toolshed

Call like:

python chr11:1240203-1247497  > link.txt

and output looks like:

CHR_A   BP_A       SNP_A         CHR_B   BP_B     SNP_B         R2
11      1240338    rs2672792     11      1240338  rs2672792            1
11      1240338    rs2672792     11      1240381  rs112249530   0.00675058
11      1240338    rs2672792     11      1240435  rs115815572    0.0066929
11      1240338    rs2672792     11      1240439  rs146776493   0.00099445
11      1240338    rs2672792     11      1240485  rs72636989    0.0270542
11      1240338    rs2672792     11      1240533  rs189679987  3.71248e-05
11      1240338    rs2672792     11      1240626  rs192732186   0.00099445
11      1240338    rs2672792     11      1240628  rs185161871  5.51292e-05
11      1240338    rs2672792     11      1240713  rs150416385  0.000905582

where the final column is the R^2 value.

Linkage Blocks

with the output like that, we can calculate "blocks" in a simple way by finding an R2 value to seed a block and then a distance to search for more values. To do this, the command is:

python muc5b-linkage.txt > blocks.bed

The parameters including distance to search, seed and number of SNPs required to be a valid region are all hard-coded in the script.

import os
import sys
from itertools import groupby
from toolshed import reader
from cpv.peaks import peaks
import tempfile
import atexit
import shutil
import operator
THRESH = 0.002
SEED = 0.03
DIST = 350
def temp():
tmp = tempfile.mktemp()
atexit.register(os.unlink, tmp)
return open(tmp, 'w')
def dist(snp):
return abs(int(snp['BP_B']) - int(snp['BP_A']))
fout = temp()
input = sys.argv[1]
for snp, snps in groupby(reader("|sort -k1,1 -k2,2n -k4,4 -k5,5n %s" % input), lambda row: (row['CHR_A'], row['BP_A'])):
tmp = temp()
for snp in snps:
if snp['SNP_A'] == snp['SNP_B']: continue
if dist(snp) > DIST: continue
line = [snp['CHR_B'], str(int(snp['BP_B']) - 1), snp['BP_B'], snp['R2']]
print >>tmp, "\t".join(line)
list(peaks(, 3, THRESH, SEED, DIST, fout,
shutil.copyfile(, "/tmp/peaks.bed")
print "chrom\tstart\tend\tblock_size"
for toks in reader("|bedtools merge -i <(sort -k1,1 -k2,2n %s) -d %i -n -scores sum" %
(, DIST), header=False):
if int(toks[-1]) < 5: continue
print "\t".join(toks[:3] + [str(int(toks[2]) - int(toks[1]))])
from toolshed import nopen
import sys
import os
import glob
import tempfile
def main(args=sys.argv[1:]):
region = args[0].replace("chr", "")
chrom = "chr%s" % region.split(":")[0]
ld_snp = args[1] if len(args) > 1 else None
vcf = pull_vcf(chrom, region)
plink = plink_convert(gen_plink(vcf))
for i, line in enumerate(open(gen_ld(vcf, plink, ld_snp))):
if i == 0: line = "#" + line.strip()
print "\t".join(line.strip().split())
for pf in glob.glob("%s.%s"):
def gen_ld(vcf, plink, ld_snp):
print >>sys.stderr, "calculating LD..."
out = tempfile.mktemp(suffix=".ld.txt")
rs_list = tempfile.mktemp(suffix=".rs.txt")
seen = {}
with open(rs_list, "w") as fhrs:
j = 0
for toks in (line.rstrip().split("\t") for line in open(vcf)):
if toks[0][0] == "#": continue
if toks[2] in seen: continue
seen[toks[2]] = True
fhrs.write("%s\n" % toks[2])
j += 1
print >>sys.stderr, j, "SNPs"
cmd = "|plink --bfile %s --r2 --ld-window-kb 10000 --ld-window 99999 --ld-window-r2 0"
cmd += " --out %s "
if ld_snp is None:
cmd += " --inter-chr --ld-snp-list %s"
cmd %= (plink, out, rs_list)
cmd += "--ld-snp %s "
cmd %= (plink, out, ld_snp)
return out + ".ld"
def plink_convert(plink_in):
print >>sys.stderr, "converting to plink binary format ..."
plink_out = tempfile.mktemp(suffix=".plink")
cmd = "|plink --tped %(plink_in)s.tped --tfam %(plink_in)s.tfam --make-bed --out %(plink_out)s"
cmd %= locals()
return plink_out
def gen_plink(vcf):
print >>sys.stderr, "converting to plink format with vcftools..."
plinkout = tempfile.mktemp(suffix=".plink")
cmd = "|vcftools --vcf %s --plink-tped --out %s" % (vcf, plinkout)
return plinkout
def pull_vcf(chrom, region):
print >>sys.stderr, "downloading with tabix"
tmpvcf = tempfile.mktemp(suffix=".vcf")
cmd = "|tabix -p vcf -h %s > %s" \
% (chrom, region, tmpvcf)
return tmpvcf
if __name__ == "__main__":
