Created
February 12, 2021 00:01
-
-
Save atrisovic/a285e5d8e592cf6df68b3fa81aefd9c2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# run this as "python clean_code.py $PWD" | |
import os | |
import re | |
import sys | |
import glob | |
import codecs | |
import chardet | |
import fileinput | |
list_of_r_files = glob.glob("*.R") | |
list_of_r_files.extend(glob.glob('*.r')) | |
def detect_encoding(input): | |
result = chardet.detect(input) | |
return result['encoding'], result['confidence'] | |
def parse_dependencies(line): | |
''' finds libs between two brackets | |
library(ABC) should be transformed into require(ABC) | |
and if not accessible install ABC''' | |
lib = line.replace(" ", "") | |
lib = line[line.find("(")+1:line.find(")")] | |
lib = lib.strip('"') | |
lib = lib.strip("'") | |
return_str = 'if (!require(\"{}\")) install.packages(\"{}\")\n'.format(lib, lib) | |
if "," in lib: # there are unparced arguments | |
return line | |
return return_str | |
def fix_abs_paths(line, index): | |
''' returns file name without absolute path ''' | |
rest = line[index:] | |
count = 0 # when -1 add ")" | |
for i,w in enumerate(rest): | |
if w == "(": | |
count = count +1 | |
if w == ")": | |
count = count -1 | |
if count == -1: | |
break | |
newline = line[:index]+"basename("+rest[:i]+")"+rest[i:] | |
return newline | |
def fix_abs_path_in_readcsv(line): | |
index = line.find('read.csv')+8 | |
rest = line[index:] | |
# find ( | |
b = rest.find("(")+1 | |
rest = line[index + b:] | |
count = 0 # when -1 add ")" | |
for i,w in enumerate(rest): | |
if w == "," and count == 0: # in this case there are arguments | |
break | |
if w == "(": | |
count = count +1 | |
if w == ")": | |
count = count -1 | |
if count == -1: # found both open and closed brackets | |
break | |
newline = line[:index + b]+"basename("+rest[:i]+")"+rest[i:] | |
return newline | |
def main(): | |
for r_file in list_of_r_files: | |
# encode to ascii | |
allcode = open(r_file, 'r').read() | |
encoding, confidence = detect_encoding(allcode) | |
if encoding != 'ascii': | |
allcode = allcode.decode(encoding).encode('ascii', 'ignore') | |
with codecs.open(r_file, 'w', encoding='ascii') as f: | |
f.write(allcode) | |
for linenum, line in enumerate(fileinput.input(r_file, inplace=True)): | |
# avoid setwd error | |
if linenum == 0: | |
wd = sys.argv[1] | |
print("setwd('{}')\n".format(wd)) | |
# setwd already set, remove it to avoid absolute paths | |
if "setwd(" in temp_line: | |
print(line.replace(line, '')) | |
continue | |
# ignore empty lines | |
if not line.strip(): | |
print(line.rstrip()) | |
continue | |
# ignore comments | |
if line.strip().startswith('#') or line.strip().startswith('"'): | |
print(line.rstrip()) | |
continue | |
# ignore cases that already have install.packages command | |
if "library(" in temp_line and "install.packages" in line and "#" not in line: | |
print(line.rstrip()) | |
continue | |
elif "require" in line and "install.packages" in line and "#" not in line: | |
print(line.rstrip()) | |
continue | |
elif "library(" in temp_line: | |
for match in re.finditer("library", line): | |
print(line.replace(line, parse_dependencies(line[match.start():]))) | |
continue | |
elif line.strip().startswith("install.packages"): | |
# install packages should work as is when default CRAN mirror is set | |
print(line.rstrip()) | |
continue | |
elif temp_line.strip().startswith("require("): | |
print(line.replace(line, parse_dependencies(line))) | |
continue | |
# all following IFs are for fixing fixed paths | |
if "file.path(" in temp_line: | |
index = line.find('file.path') | |
print(line.replace(line, fix_abs_paths(line, index))) | |
elif "source(" in temp_line and "/" in line: | |
if "http" not in line: | |
index = line.find('source')+6 | |
rest = line[index:] | |
# finding ( | |
b = rest.find("(")+1 | |
index = index + b | |
print(line.replace(line, fix_abs_paths(line, index))) | |
elif "read.csv(" in temp_line and "/" in line: | |
print(line.replace(line, fix_abs_path_in_readcsv(line))) | |
else: # for all other lines | |
print(line.rstrip()) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment