Skip to content

Instantly share code, notes, and snippets.

@hrishikeshrt
Created March 23, 2021 14:59
Show Gist options
  • Save hrishikeshrt/231e91dbc364b50916f1d465afee18bb to your computer and use it in GitHub Desktop.
Save hrishikeshrt/231e91dbc364b50916f1d465afee18bb to your computer and use it in GitHub Desktop.
Running Samsaadhanii Parser From CLI
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Oct 17 20:06:38 2019
@author: Hrishikesh Terdalkar
General purpose decorators
https://wiki.python.org/moin/PythonDecoratorLibrary
# Function_Timeout
# Retry
"""
import time
import math
import random
import signal
import logging
import functools
###############################################################################
def both_please(decorator):
'''
Decorator Decorator to allow calling, a decorator with optional params,
without parentheses
'''
# ----------------------------------------------------------------------- #
@functools.wraps(decorator)
def wrapper(*args, **kwargs):
if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
return decorator()(args[0])
else:
return decorator(*args, **kwargs)
# ----------------------------------------------------------------------- #
return wrapper
###############################################################################
@both_please
def timeout(seconds=60, error="Function '{}' aborted after {} seconds."):
'''
Function Timeout Decorator
After a designated amount of time has passed, the function call will
terminate by raising a TimeoutError.
@params:
seconds: number of seconds for timeout
error: error message to be shown
Error message can be stating or can contain at most two placehodlers
First one will be replaced by name of the decorated function and
second one will be replaced by the number of seconds.
'''
if seconds <= 0:
raise ValueError("seconds must be greater than 0")
def decorator(func):
def _handle_timeout(signum, frame):
raise TimeoutError(error.format(func.__name__, seconds))
# ------------------------------------------------------------------- #
@functools.wraps(func)
def wrapper(*args, **kwargs):
signal.signal(signal.SIGALRM, _handle_timeout)
signal.alarm(seconds)
try:
result = func(*args, **kwargs)
finally:
signal.alarm(0)
return result
# ------------------------------------------------------------------- #
return wrapper
return decorator
###############################################################################
def exception_hook(tries_remaining, exception, delay):
"""
Simple Exception Hook
@params:
tries_remaining: number of tries remaining
exception: exception instance which was raised
delay: delay in seconds
"""
_error = [
f"Caught '{exception}'",
f"{tries_remaining} tries remaining",
f"Sleeping for {delay} seconds"
]
logging.error(', '.join(_error))
###############################################################################
@both_please
def timer(log=False, with_args=False):
'''
Decorator to print runtime of a function
@params:
log: boolean, if True, will log using logging module
'''
def decorator(func):
# ------------------------------------------------------------------- #
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.perf_counter()
result = func(*args, **kwargs)
end_time = time.perf_counter()
run_time = end_time - start_time
func_call = func.__name__
if with_args:
_args = [repr(arg) for arg in args]
_kwargs = [f"{k}={repr(v)}" for k, v in kwargs.items()]
func_call = f'{func.__name__}({", ".join(_args + _kwargs)})'
msg = f"Finished '{func_call}' in {run_time:.4f} seconds."
print(msg)
if log:
logging.info(msg)
return result
# ------------------------------------------------------------------- #
return wrapper
return decorator
###############################################################################
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Feb 16 10:49:33 2020
@author: Hrishikesh Terdalkar
Command Line Interface for Samsaadhanii Parser
Credits: Sanal Vikram (initial version of run_parser() function)
"""
import os
import shutil
import subprocess
from decorators import timer
###############################################################################
@timer(with_args=True)
def run_parser(wx_sent, sent_id, scl_path, tmp_path, text_type='Prose'):
"""
Run samsaadhanii parser for a sentence in 'WX' encoding scheme
Parameters
----------
wx_sent : string
Sanskrit sentence in WX encoding scheme
sent_id : string
Unique sentence ID
This is used to store the detailed parser output
scl_path : string (path)
Path to the SCL installation directory
tmp_path : string (path)
Path of the folder for parser output
Needs to be a writeable location for the user running the command.
text_type : string (optional)
Type of text.
Options: Sloka, Prose, Vedic
The default is 'Sloka'
"""
paths_file = os.path.join(scl_path, 'paths.sh')
with open(paths_file, 'r') as f:
paths = [line.split('=') for line in f.read().split('\n') if line]
paths = dict(paths)
ltp_path = paths['LTPROCBIN']
dot_path = paths['GraphvizDot']
sent_dir = f'{tmp_path}/tmp_in{sent_id}'
os.makedirs(sent_dir)
with open(os.path.join(sent_dir, f'wor.{sent_id}'), 'w') as f:
f.write(wx_sent + '\n')
subprocess.run([
f'{scl_path}/SHMT/prog/shell/callmtshell.pl',
tmp_path,
scl_path,
dot_path,
wx_sent,
'WX', # encoding ['VH', 'KH', 'SLP', "Itrans', 'Unicode', 'IAST']
# Note: Devanagari is "Unicode"
# The transliteration is handled by {scl_path}/converters/convert.pl
# Unicode does not work
# It is safer to convert to "WX" scheme and pass
str(sent_id),
'DEV', # script ['DEV', 'IAST', 'VH']
'NO', # sandhi
'UoHyd', # morph ['UoHyd', 'GH']:
'Full', # parser ['NO', 'Partial', 'Full']
text_type, # text_type [Prose', 'Sloka', 'Vedic']
ltp_path
])
return True
def interpret_output(sent_id, tmp_path):
"""
Interpret the output of the parser the given sentence id
Parameters
----------
sent_id : string
Unique sentence ID
This should be a valid ID of sentence for which the parser has
already produced output.
tmp_path : string (path)
Path of the folder for parser output
tmp_path location used during the generation of output of parser
Returns
-------
answer : string
contents of "table.csv" generated by the parser
success : bool
returns True if the parse tree was successfully formed
errors : string
errors encountered, if any
"""
sent_dir = os.path.join(tmp_path, f'tmp_in{sent_id}')
table_file = os.path.join(sent_dir, 'table.csv')
graph_file = os.path.join(sent_dir, 'parser_files', 'graph.txt')
sent_file = os.path.join(sent_dir, f'wor.{sent_id}')
error_file = os.path.join(sent_dir, f'err{sent_id}')
svg_file = os.path.join(sent_dir, '1.1.svg')
try:
with open(table_file, 'r') as f:
morph = parse_morphological_analysis(f.read().strip())
except FileNotFoundError:
morph = []
try:
with open(graph_file, 'r') as f:
relations = parse_dependency_graph(f.read().strip(), morph)
except FileNotFoundError:
relations = []
with open(sent_file, 'r') as f:
line = f.read().strip()
with open(error_file, 'r', errors='ignore') as f:
errors = f.read().strip()
success = os.path.isfile(svg_file)
answer = line, morph, relations
return answer, success, errors
def parse_morphological_analysis(morph_text):
analyses = [line.split('\t') for line in morph_text.split('\n')][1:]
sentence_analysis = []
for analysis in analyses:
_analysis = {
'anv_id': analysis[0],
'word': analysis[1],
'sandhied': analysis[3],
'morph': analysis[4].split('/'),
'context': analysis[5].split('/'),
}
if '{' not in _analysis['morph'][0]:
_analysis['morph'] = []
_analysis['context'] = []
sentence_analysis.append(_analysis)
return sentence_analysis
def parse_dependency_graph(graph_text, analysis):
lines = [line.split() for line in graph_text.split('\n') if line.strip()]
relations = []
for words in lines:
src_idx = int(words[3]) - 1
dst_idx = int(words[0]) - 1
src_morph_idx = int(words[4]) - 1
dst_morph_idx = int(words[1]) - 1
try:
relations.append({
'src': analysis[src_idx]['word'],
'relation': words[-2],
'dst': analysis[dst_idx]['word'],
'src_morph': analysis[src_idx]['morph'][src_morph_idx],
'dst_morph': analysis[dst_idx]['morph'][dst_morph_idx],
'relation_id': words[2],
'reference': words[-1]
})
except IndexError:
print(f"src_idx = {src_idx}, src_morph_idx = {src_morph_idx}")
print(f"dst_idx = {dst_idx}, dst_morph_idx = {dst_morph_idx}")
print(f"#(Analyses): {len(analysis)}")
return relations
def save_output(sent_id, tmp_path, output_path):
"""Save the output of all sentences in the current session"""
if not output_path:
print("Error: no output path provided.")
else:
if not os.path.isdir(output_path):
os.makedirs(output_path)
print(f"Created directory '{output_path}'.")
dirname = f'tmp_in{sent_id}'
sent_dir = os.path.join(tmp_path, dirname)
files = [f'in{sent_id}', f'in{sent_id}_trnsltn', f'in{sent_id}.html']
shutil.copytree(sent_dir, os.path.join(output_path, dirname))
for file in files:
shutil.copyfile(os.path.join(tmp_path, file),
os.path.join(output_path, file))
###############################################################################
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Dec 31 23:34:50 2019
@author: Hrishikesh Terdalkar
Command Line Interface for Samsaadhanii Parser in the form of REPL
"""
import os
import cmd
import time
import json
import glob
import random
import shutil
import tempfile
import pandas as pd
from natsort import natsorted
from indic_transliteration.sanscript import transliterate
from scl_parser import run_parser, interpret_output
from decorators import timeout
###############################################################################
VERSION = '2020.09.19.1528'
TIMEOUT = 300
###############################################################################
if TIMEOUT:
@timeout(TIMEOUT)
def timeout_run_parser(*args, **kwargs):
run_parser(*args, **kwargs)
else:
def timeout_run_parser(*args, **kwargs):
run_parser(*args, **kwargs)
###############################################################################
class BasicShell(cmd.Cmd):
def emptyline(self):
pass
def do_shell(self, commad):
"""Execute shell commands"""
os.system(commad)
def do_exit(self, arg):
"""Exit the shell"""
print("Bye")
return True
# do_EOF corresponds to Ctrl + D
do_EOF = do_exit
###############################################################################
class ParserShell(BasicShell):
intro = "Sanskrit Dependency Parser"
desc = "Type any sentence in the proprer input scheme to parse it. " \
" (help or ? for list of options)"
prompt = "(parser) "
def __init__(self, scl_path, tmp_path):
super(self.__class__, self).__init__()
self.debug = False
self.sent_id = int(random.random() * 5000)
self.scl_path = scl_path
self.tmp_path = tmp_path
self.schemes = ['hk', 'velthuis', 'itrans', 'slp1', 'wx', 'devanagari']
self.input_scheme = 'devanagari'
self.text_types = ['Sloka', 'Prose', 'Vedic']
self.text_type = 'Prose'
self.log = {}
self.answer_log = {}
self.status = {}
# ----------------------------------------------------------------------- #
# Starting ID
def do_setid(self, sent_id):
sent_id = int(sent_id)
self.sent_id = sent_id
print(f"Sentence ID: {sent_id}")
# ----------------------------------------------------------------------- #
# Debug Mode
def do_debug(self, arg):
"""Turn debug mode on/off"""
if arg.lower() in ["true", "on", "yes"]:
self.debug = True
if arg.lower() in ["false", "off", "no"]:
self.debug = False
print(f"Debug: {self.debug}")
# ----------------------------------------------------------------------- #
# Text Type - Prose, Vedic, Sloka
def complete_text_type(self, text, line, begidx, endidx):
return [tt for tt in self.text_types if tt.startswith(text)]
def do_text_type(self, text_type):
"""Set text type"""
if not text_type:
print(f"Text-type: {self.text_type}")
else:
if text_type not in self.text_types:
print("Invalid text_type (valid types are {self.text_types}.)")
else:
self.text_type = text_type
print(f"Text-type: {self.text_type}")
# ----------------------------------------------------------------------- #
# Input Transliteration Scheme
def complete_scheme(self, text, line, begidx, endidx):
return [sch for sch in self.schemes if sch.startswith(text)]
def do_scheme(self, scheme):
"""Change the input transliteration scheme"""
if not scheme:
print(f"Input scheme: {self.input_scheme}")
else:
if scheme not in self.schemes:
print(f"Invalid scheme. (valid schemes are {self.schemes}")
else:
self.input_scheme = scheme
print(f"Input scheme: {self.input_scheme}")
# ----------------------------------------------------------------------- #
# Visualize Output
def do_view_tree(self, sent_id, viewer='xdg-open'):
"""Open the dependency tree with the default image viewer"""
sent_id = self.validate_id(sent_id)
if sent_id:
image_file = os.path.join(tmp_path, f'tmp_in{sent_id}', '1.1.svg')
if not os.path.isfile(image_file):
print(f"Error: no dependency tree for sentence {sent_id}.")
else:
os.system(f"{viewer} {image_file}")
def do_view_analysis(self, sent_id):
"""View the morphological analysis of the sentence"""
sent_id = self.validate_id(sent_id)
if sent_id:
analysis = self.answer_log[sent_id]['morphological_analysis']
print("--- Morphological Analysis ---")
print(json.dumps(analysis, ensure_ascii=False, indent=2))
print("------------------------------")
def do_view_edges(self, sent_id):
"""View the possible directed edges"""
sent_id = self.validate_id(sent_id)
if sent_id:
edges = self.answer_log[sent_id]['directed_edges']
print("------- Possible Edges -------")
print(json.dumps(edges, ensure_ascii=False, indent=2))
print("------------------------------")
def do_view_summary(self, sent_id):
"""View summary of the result"""
summary = self.summary(sent_id)
print("----------- Summary ----------")
print(json.dumps(summary, ensure_ascii=False, indent=2))
print("------------------------------")
# ----------------------------------------------------------------------- #
def complete_session_summary(self, text, line, begidx, endidx):
return self.autocomplete_path(text, line, begidx, endidx)
def do_session_summary(self, path):
"""Save session summary"""
summaries = [self.summary(sent_id)
for sent_id in self.answer_log]
summary = pd.DataFrame.from_dict(summaries).set_index('id')
print(summary)
if path:
if not path.endswith('.xlsx'):
path = f'{path}.xlsx'
try:
summary.to_excel(path)
print(f"Summary has been saved to '{path}'.")
except Exception as e:
print("Error: could not save the summary.")
print(e)
# ----------------------------------------------------------------------- #
def complete_load(self, text, line, begidx, endidx):
return self.autocomplete_path(text, line, begidx, endidx)
def do_load(self, path):
"""Load a past folder in the history"""
if not path:
print("Error: no path provided.")
else:
if not os.path.isdir(path):
print("Error: no such folder.")
else:
sent_dirs = natsorted(glob.glob(os.path.join(path, 'sent_*')))
sent_ids = []
print(f"Found {len(sent_dirs)} outputs.")
for sent_dir in sent_dirs:
sent_id = int(sent_dir.split('sent_')[-1])
sent_ids.append(sent_id)
self.process_output(sent_id, sent_dir)
print(f"Loaded {len(sent_dirs)} outputs.")
print(f"Sentence IDs: {sent_ids}")
# ----------------------------------------------------------------------- #
def complete_input(self, text, line, begidx, endidx):
return self.autocomplete_path(text, line, begidx, endidx)
def do_input(self, path):
"""Load input from a file"""
if not path:
print("Error: no input file provided.")
else:
if not os.path.isfile(path):
print(f"Error: no such file. ('{path}')")
else:
with open(path, 'r') as f:
sentences = [s for s in f.read().split('\n') if s.strip()]
print(f"Loaded {len(sentences)} sentences.")
for sentence in sentences:
self.onecmd(sentence)
# ----------------------------------------------------------------------- #
def do_save(self, path):
"""Save the output of all sentences in the current session"""
if not path:
print("Error: no output path provided.")
else:
if not os.path.isdir(path):
os.makedirs(path)
print(f"Created directory '{path}'.")
for sent_id in self.answer_log:
sent_dir = os.path.join(path, f"sent_{sent_id}")
os.makedirs(sent_dir)
folder = f'tmp_in{sent_id}'
files = [f'in{sent_id}',
f'in{sent_id}_trnsltn',
f'in{sent_id}.html']
shutil.copytree(os.path.join(self.tmp_path, folder),
os.path.join(sent_dir, folder))
for file in files:
shutil.copyfile(os.path.join(self.tmp_path, file),
os.path.join(sent_dir, file))
self.do_session_summary(f'{path}.summary')
with open(os.path.join(path, 'session_log.json'), 'w') as f:
json.dump(self.log, f, ensure_ascii=False, indent=2)
# ----------------------------------------------------------------------- #
def do_reset(self, line):
self.log = {}
self.answer_log = {}
self.status = {}
# ----------------------------------------------------------------------- #
def default(self, line):
aborted = False
self.sent_id += 1
t_start = time.time()
self.log[self.sent_id] = {
'scheme': self.input_scheme,
'line': line,
}
if self.input_scheme == 'wx':
wx_sent = line
else:
wx_sent = transliterate(line, self.input_scheme, 'wx')
try:
timeout_run_parser(wx_sent, self.sent_id,
self.scl_path, self.tmp_path,
text_type=self.text_type)
except TimeoutError as e:
print(e)
aborted = True
success = False
t_finish = time.time()
self.log[self.sent_id].update({
'abort': aborted,
'runtime': t_finish - t_start
})
if not aborted:
success = self.process_output(self.sent_id, self.tmp_path)
print(f"\n(sentence id {self.sent_id}."
f" finished in {t_finish - t_start} seconds)")
print(f"Detailed Output: '{self.tmp_path}/tmp_in{self.sent_id}'")
else:
print(f"\n(sentence id {self.sent_id}."
f" aborted after {t_finish - t_start} seconds)")
self.log[self.sent_id].update({
'success': success,
})
print(f'Parse {"successful" if success else "failed"}.')
# ----------------------------------------------------------------------- #
def cmdloop(self, intro=None):
print(self.intro)
print(self.desc)
while True:
try:
super(self.__class__, self).cmdloop(intro="")
break
except KeyboardInterrupt:
print("\nKeyboardInterrupt")
# ----------------------------------------------------------------------- #
def do_version(self, text):
print(VERSION)
# ----------------------------------------------------------------------- #
def process_output(self, sent_id, tmp_path):
"""Process output and log the results"""
answer, success, errors = interpret_output(sent_id, tmp_path)
sent, analysis, edges = answer
total = len(analysis)
missing_analysis = [_wa['word'] for _wa in analysis
if not _wa['morph']]
nodes = [_e[x] for _e in edges for x in ['src', 'dst']]
missing_nodes = [_wa['word'] for _wa in analysis
if _wa['word'] not in nodes]
sent = transliterate(sent, 'wx', self.input_scheme)
for edge in edges:
relation = edge['relation'].replace('#', '')
edge['relation'] = transliterate(relation, 'wx', 'devanagari')
self.answer_log[sent_id] = {
'id': sent_id,
'sentence': sent,
'morphological_analysis': analysis,
'directed_edges': edges,
'success': success,
'analysis_stats': f"{total - len(missing_analysis)}/{total}",
'missing_analysis': missing_analysis,
'missing_nodes': missing_nodes,
'node_stats': f"{total - len(missing_nodes)}/{total}"
}
self.do_view_summary(sent_id)
if self.debug:
print("---------- Sentence ----------")
self.do_view_analysis(sent_id)
self.do_view_edges(sent_id)
print("----------- Errors -----------")
print(errors)
print("------------------------------")
return success
def summary(self, sent_id):
sent_id = self.validate_id(sent_id)
summary = {}
if sent_id:
if sent_id in self.answer_log:
summary = {k: self.answer_log[sent_id][k]
for k in ['id', 'sentence', 'success',
'analysis_stats', 'node_stats',
'missing_analysis', 'missing_nodes']}
else:
summary = {k: None
for k in ['id', 'sentence', 'success',
'analysis_stats', 'node_stats',
'missing_analysis', 'missing_nodes']}
summary['abort'] = self.log[sent_id]['abort']
summary['runtime'] = self.log[sent_id]['runtime']
return summary
def autocomplete_path(self, text, line, begidx, endidx):
"""Path autocompletion"""
before_arg = line.rfind(" ", 0, begidx)
if before_arg == -1:
return
fixed = line[before_arg+1:begidx] # fixed portion of the arg
arg = line[before_arg+1:endidx]
pattern = arg + '*'
completions = []
for path in glob.glob(pattern):
if os.path.isdir(path):
path = os.path.join(path, '')
completions.append(path.replace(fixed, "", 1))
return completions
def validate_id(self, sent_id):
if not sent_id:
sent_id = self.sent_id
else:
try:
sent_id = int(sent_id)
except Exception:
sent_id = -1
if sent_id not in self.log:
print(f"Invalid sentence id: '{sent_id}'.")
return False
else:
return sent_id
###############################################################################
if __name__ == '__main__':
home_dir = os.path.expanduser('~')
tmp_dir = tempfile.gettempdir()
scl_path = os.path.join(home_dir, 'git', 'samsaadhanii', 'scl')
tmp_path = os.path.join(tmp_dir, 'SCL_DIR')
ParserShell(scl_path, tmp_path).cmdloop()
###############################################################################
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment