Created
April 9, 2013 20:29
-
-
Save JonasPf/5349108 to your computer and use it in GitHub Desktop.
Collection of functions and classes I use to automate console applications. Tested on Python 2.7 and Windows 7.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import re | |
import sys | |
import zipfile | |
import shutil | |
import subprocess | |
import time | |
import threading | |
import Queue | |
if sys.platform == 'win32': | |
eol = '\r\n' | |
else: | |
eol = '\n' | |
def q(mystring): | |
""" | |
Returns the string with surounding single quotes. | |
""" | |
return "'" + str(mystring) + "'" | |
def qq(mystring): | |
""" | |
Returns the string with surounding double quotes. | |
""" | |
return '"' + str(mystring) + '"' | |
def create_prompt(cmd, success_output = None, error_output = None, expect_output = None, time_until_read = 1): | |
""" | |
Creates an object to controll a prompt-like command (e.g. shell, repl, sqlplus). Returns a tuple with the | |
object and the first output. | |
Example: | |
p = create_prompt('sqlplus ' + oracle_username + '/' + oracle_password + '@' + oracle_servername) | |
print p[1] | |
print p[0].execute('SELECT * FROM users;') | |
p.execute('exit', success_output='Disconnected') | |
Args: | |
success_output (str): None or the success message that the output _must_ contain. | |
error_output (str): None or the error message that the output _must not_ contain. | |
expect_output (bool): Determines whether there must be any output or not. None means no check. | |
time_until_read (long): Time to wait before attempting to read the output | |
""" | |
print "Execute " + q(cmd) | |
p = NonBlockingPopen(cmd) | |
output = p.read_delayed_and_validate(success_output, error_output, expect_output, time_until_read) | |
return (p, output) | |
def execute(cmd, success_output = None, error_output = None, expect_output = None): | |
""" | |
Executes a command and returns the output. Performs basic checks on the output, depending on the parameter. | |
Args: | |
success_output (str): None or the success message that the output _must_ contain. | |
error_output (str): None or the error message that the output _must not_ contain. | |
expect_output (bool): Determines whether there must be any output or not. None means no check. | |
""" | |
print "Execute " + q(cmd) | |
output = subprocess.check_output(cmd) | |
_validate_output(output, success_output, error_output, expect_output) | |
return output | |
def force_copy(src, dst): | |
""" | |
Copies a file. Deletes destination if it exists. | |
""" | |
if os.path.exists(dst): | |
print "Delete " + q(dst) | |
os.remove(dst) | |
print "Copy '" + src + "' to '" + dst + "'" | |
shutil.copy(src, dst) | |
def force_move(src, dst): | |
""" | |
Moves a file. Deletes destination if it exists. | |
""" | |
if os.path.exists(dst): | |
print "Delete '" + dst + "'" | |
os.remove(dst) | |
print "Move '" + src + "' to '" + dst + "'" | |
shutil.move(src, dst) | |
def extract(filename, to_dir = None): | |
""" | |
Extracts a zip file. Creates destionation directory if necessary. | |
""" | |
if to_dir: | |
print "Extract " + q(filename) + " to " + q(to_dir) | |
if not os.path.exists(to_dir): | |
os.makedirs(to_dir) | |
zipfile.ZipFile(filename).extractall(to_dir) | |
else: | |
print "Extract '" + filename + "'" | |
zipfile.ZipFile(filename).extractall() | |
class ConfigFile(object): | |
""" | |
Helps to easily modify a default configuration file. Saves backup of the original file to [filename].orig. | |
Example: | |
with ConfigFile(directory + '/bin/test.cmd') as cfg: | |
cfg.replace('set value={{{default_value}}}', 12345) | |
cfg.add_after('echo off', 'set PATH="' + java + r'";%PATH%') | |
""" | |
def __init__(self, filename): | |
self._filename = filename | |
self._prepare_file() | |
def __enter__(self): | |
self._read_lines() | |
return self | |
def __exit__(self, *_): | |
self._write_lines() | |
def _prepare_file(self): | |
filename_orig = self._filename + '.orig' | |
if os.path.exists(filename_orig): | |
force_copy(filename_orig, self._filename) | |
else: | |
force_copy(self._filename, filename_orig) | |
def _read_lines(self): | |
print "Read from '" + self._filename + "'" | |
with open(self._filename, 'r') as f: | |
self._lines = f.readlines() | |
def _write_lines(self): | |
print "Write to '" + self._filename + "'" | |
with open(self._filename, 'w') as f: | |
for l in self._lines: | |
f.write(l) | |
def replace(self, pattern, replacement): | |
result = [] | |
any_replaced = False | |
for l in self._lines: | |
replaced = replace_pattern(pattern, replacement, l) | |
if replaced: | |
print "Replace '" + l.strip() + "' by '" + replaced.strip() + "'" | |
result.append(replaced) | |
any_replaced = True | |
else: | |
result.append(l) | |
if not any_replaced: | |
print "Not replacing " + q(pattern) + ". Pattern not found." | |
self._lines = result | |
def add_after(self, after, additional_line): | |
result = [] | |
for l in self._lines: | |
result.append(l) | |
if after.strip() == l.strip(): | |
result.append(additional_line + '\n') | |
print "Add '" + additional_line + "' after '" + after + "'" | |
self._lines = result | |
def replace_pattern(pattern, replacement, line): | |
""" | |
Replaces everything in line that's surounded by {{{}}} with replacement. This can be useful | |
to replace values in default configuration files. | |
Example: | |
pattern = 'set value={{{12345}}}' | |
replacement = '54321' | |
line = 'set value=12345' | |
returns: 'set value=54321' | |
See also how the class ConfigFile uses this function. | |
""" | |
if line.strip() == re.sub('{{{(.*)}}}', r'\1', pattern): | |
whitespace_start = re.findall(r'^\s*', line)[0] | |
whitespace_end = re.findall(r'\s*$', line)[0] | |
return whitespace_start + re.sub('{{{(.*)}}}', replacement, pattern) + whitespace_end | |
else: | |
return None | |
def _validate_output(output, success_output = None, error_output = None, expect_output = None): | |
if success_output is not None: | |
if success_output not in output: | |
print "Error, expected " + q(success_output) + " but the output was: " + output | |
exit() | |
if error_output is not None: | |
if error_output in output: | |
print "Error found in output: " + output | |
exit() | |
if expect_output is not None: | |
if expect_output and len(output) <= 0: | |
print "Error, expected some output but got none" | |
exit() | |
elif not expect_output and len(output) > 0: | |
print "Error, didn't expect any output but got: " + output | |
exit() | |
class NonBlockingPopen(subprocess.Popen): | |
""" | |
Helper class for controlling prompt-like (e.g. shell, sqlplus, ...) commands. | |
It does so by creating a seperate thread that continously reads from the stdout | |
of the process. | |
""" | |
def __init__(self, cmd): | |
subprocess.Popen.__init__(self, cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE) | |
self._create_read_queue() | |
def _enqueue_output(self): | |
# keep reading until the thread dies and put the output into the queue | |
for line in iter(self.stdout.readline, ''): | |
self.reading_queue.put(line) | |
def _create_read_queue(self): | |
# create a thread that continously listens at the process stdout | |
self.reading_queue = Queue.Queue() | |
t = threading.Thread(target=self._enqueue_output) | |
t.daemon = True | |
t.start() | |
def execute(self, cmd, success_output = None, error_output = None, expect_output = None, time_until_read = 1): | |
""" | |
Executes a command at this prompt and returns the output. Performs basic checks on the output, | |
depending on the parameter. | |
Args: | |
success_output (str): None or the success message that the output _must_ contain. | |
error_output (str): None or the error message that the output _must not_ contain. | |
expect_output (bool): Determines whether there must be any output or not. None means no check. | |
time_until_read (long): Time to wait before attempting to read the output | |
""" | |
print "Execute " + q(cmd) | |
self.stdin.write(cmd + eol) | |
return self.read_delayed_and_validate(success_output, error_output, expect_output, time_until_read) | |
def read_delayed_and_validate(self, success_output = None, error_output = None, expect_output = None, time_until_read = 1): | |
time.sleep(time_until_read) # wait until we (hopefully) have some output | |
output = self.read() | |
_validate_output(output, success_output, error_output, expect_output) | |
return output | |
def read(self): | |
""" | |
Returns everything that's in the queue right now. Note that the application may | |
need some time to produce an output so it may be necessary to wait before calling | |
this method. | |
""" | |
result = '' | |
try: | |
while True: | |
result += self.reading_queue.get_nowait() + eol | |
except Queue.Empty: | |
return result |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment