Skip to content

Instantly share code, notes, and snippets.

@odashi
Last active December 30, 2015 11:09
Show Gist options
  • Save odashi/7820750 to your computer and use it in GitHub Desktop.
Save odashi/7820750 to your computer and use it in GitHub Desktop.
IBM Translation Model 1
# coding: utf-8
import codecs
import sys
from collections import defaultdict
# calculate IBM Model 1 translation probability
# params:
# fname_e: <str> name of corpus file in target language
# fname_f: <str> name of corpus file in foreign language
# num_iteration: <int> number of learning iteration
# add_null: <bool> whether inserting (null) word into foreign corpus or not
# return: (t, wid_e, wid_f)
# t: <{(int e, int f): float}> word translation probability
# wid_e: <{str: int}> word id table in target language
# wid_f: <{str: int}> word id table in foreign language
def ibm1(fname_e, fname_f, num_iteration, add_null=True):
wid_e = defaultdict(lambda: len(wid_e))
wid_f = defaultdict(lambda: len(wid_f))
corpus_e = []
corpus_f = []
# null word
if add_null:
_ = wid_f['(null)'] # = 0
# read corpus
with \
codecs.open(fname_e, 'r', 'utf-8') as file_e, \
codecs.open(fname_f, 'r', 'utf-8') as file_f:
for str_e, str_f in zip(file_e, file_f):
corpus_e.append([wid_e[w] for w in str_e.strip().split(' ')])
fs = [wid_f[w] for w in str_f.strip().split(' ')]
if add_null:
fs = [0] + fs
corpus_f.append(fs)
# initialize
t = {}
for e in range(len(wid_e)):
for f in range(len(wid_f)):
t[e, f] = 1.0 / len(wid_e)
# learn
for iteration in range(num_iteration):
sys.stderr.write('iteration %d/%d...\n' % (iteration+1, num_iteration))
count = defaultdict(lambda: 0)
total = defaultdict(lambda: 0)
for fs, es in zip(corpus_f, corpus_e):
s_total = defaultdict(lambda: 0)
for e in es:
for f in fs:
s_total[e] += t[e, f]
for e in es:
for f in fs:
x = t[e, f] / s_total[e]
count[e, f] += x
total[f] += x
for e in range(len(wid_e)):
for f in range(len(wid_f)):
t[e, f] = count[e, f] / total[f]
return t, wid_e, wid_f
def parse_options():
import optparse
usage = 'Usage: python ibm1.py -e PATH -f PATH -o PATH [options]'
parser = optparse.OptionParser(usage)
# settings
parser.add_option('-e', '--target',
action='store', type='str', dest='fname_e', default='', metavar='PATH',
help='[IN] path of corpus file in target language');
parser.add_option('-f', '--foreign',
action='store', type='str', dest='fname_f', default='', metavar='PATH',
help='[IN] path of corpus file in foreign language');
parser.add_option('-o', '--output',
action='store', type='str', dest='fname_o', default='', metavar='PATH',
help='[OUT] path of translation probability file')
parser.add_option('-I', '--iteration',
action='store', type='int', dest='num_iteration', default=100, metavar='INT',
help='number of learning iteration (must be positive)')
parser.add_option('-T', '--threshold',
action='store', type='float', dest='threshold', default=1e-8, metavar='FLOAT',
help='threshold of word translation table (must be in range [0.0, 1.0])')
parser.add_option('-N', '--no-null',
action='store_false', dest='add_null', default=True,
help='Never insert NULL word into foreign corpus')
options, args = parser.parse_args()
# checking
if not options.fname_e:
sys.stderr.write('option -e must be set.\n')
return None
if not options.fname_f:
sys.stderr.write('option -f must be set.\n')
return None
if not options.fname_o:
sys.stderr.write('option -o must be set.\n')
return None
if options.num_iteration <= 0:
sys.stderr.write('option -I must be positive.\n')
return None
if options.threshold < 0.0 or options.threshold > 1.0:
sys.stderr.write('option -T must be in range [0.0, 1.0].\n')
return None
return options
def main():
options = parse_options()
if options is None:
sys.stderr.write('insufficient options.\n')
return
t, wid_e, wid_f = ibm1(options.fname_e, options.fname_f, options.num_iteration, options.add_null)
# output results
# word translation probability
with codecs.open(options.fname_o, 'w', 'utf-8') as f:
for k, v in sorted(t.items(), key=lambda x:x[0]):
if v > options.threshold:
f.write('%d\t%d\t%.10f\n' % (k[0], k[1], v))
# word id table in target language
with codecs.open(options.fname_e+'.wid', 'w', 'utf-8') as f:
for k, v in sorted(wid_e.items(), key=lambda x:x[1]):
f.write('%s\t%d\n' % (k, v))
# word id table in foreign language
with codecs.open(options.fname_f+'.wid', 'w', 'utf-8') as f:
for k, v in sorted(wid_f.items(), key=lambda x:x[1]):
f.write('%s\t%d\n' % (k, v))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment