Skip to content

Instantly share code, notes, and snippets.

@JonasPf
Created April 9, 2013 20:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JonasPf/5349108 to your computer and use it in GitHub Desktop.
Save JonasPf/5349108 to your computer and use it in GitHub Desktop.
Collection of functions and classes I use to automate console applications. Tested on Python 2.7 and Windows 7.
import os
import re
import sys
import zipfile
import shutil
import subprocess
import time
import threading
import Queue
if sys.platform == 'win32':
eol = '\r\n'
else:
eol = '\n'
def q(mystring):
"""
Returns the string with surounding single quotes.
"""
return "'" + str(mystring) + "'"
def qq(mystring):
"""
Returns the string with surounding double quotes.
"""
return '"' + str(mystring) + '"'
def create_prompt(cmd, success_output = None, error_output = None, expect_output = None, time_until_read = 1):
"""
Creates an object to controll a prompt-like command (e.g. shell, repl, sqlplus). Returns a tuple with the
object and the first output.
Example:
p = create_prompt('sqlplus ' + oracle_username + '/' + oracle_password + '@' + oracle_servername)
print p[1]
print p[0].execute('SELECT * FROM users;')
p.execute('exit', success_output='Disconnected')
Args:
success_output (str): None or the success message that the output _must_ contain.
error_output (str): None or the error message that the output _must not_ contain.
expect_output (bool): Determines whether there must be any output or not. None means no check.
time_until_read (long): Time to wait before attempting to read the output
"""
print "Execute " + q(cmd)
p = NonBlockingPopen(cmd)
output = p.read_delayed_and_validate(success_output, error_output, expect_output, time_until_read)
return (p, output)
def execute(cmd, success_output = None, error_output = None, expect_output = None):
"""
Executes a command and returns the output. Performs basic checks on the output, depending on the parameter.
Args:
success_output (str): None or the success message that the output _must_ contain.
error_output (str): None or the error message that the output _must not_ contain.
expect_output (bool): Determines whether there must be any output or not. None means no check.
"""
print "Execute " + q(cmd)
output = subprocess.check_output(cmd)
_validate_output(output, success_output, error_output, expect_output)
return output
def force_copy(src, dst):
"""
Copies a file. Deletes destination if it exists.
"""
if os.path.exists(dst):
print "Delete " + q(dst)
os.remove(dst)
print "Copy '" + src + "' to '" + dst + "'"
shutil.copy(src, dst)
def force_move(src, dst):
"""
Moves a file. Deletes destination if it exists.
"""
if os.path.exists(dst):
print "Delete '" + dst + "'"
os.remove(dst)
print "Move '" + src + "' to '" + dst + "'"
shutil.move(src, dst)
def extract(filename, to_dir = None):
"""
Extracts a zip file. Creates destionation directory if necessary.
"""
if to_dir:
print "Extract " + q(filename) + " to " + q(to_dir)
if not os.path.exists(to_dir):
os.makedirs(to_dir)
zipfile.ZipFile(filename).extractall(to_dir)
else:
print "Extract '" + filename + "'"
zipfile.ZipFile(filename).extractall()
class ConfigFile(object):
"""
Helps to easily modify a default configuration file. Saves backup of the original file to [filename].orig.
Example:
with ConfigFile(directory + '/bin/test.cmd') as cfg:
cfg.replace('set value={{{default_value}}}', 12345)
cfg.add_after('echo off', 'set PATH="' + java + r'";%PATH%')
"""
def __init__(self, filename):
self._filename = filename
self._prepare_file()
def __enter__(self):
self._read_lines()
return self
def __exit__(self, *_):
self._write_lines()
def _prepare_file(self):
filename_orig = self._filename + '.orig'
if os.path.exists(filename_orig):
force_copy(filename_orig, self._filename)
else:
force_copy(self._filename, filename_orig)
def _read_lines(self):
print "Read from '" + self._filename + "'"
with open(self._filename, 'r') as f:
self._lines = f.readlines()
def _write_lines(self):
print "Write to '" + self._filename + "'"
with open(self._filename, 'w') as f:
for l in self._lines:
f.write(l)
def replace(self, pattern, replacement):
result = []
any_replaced = False
for l in self._lines:
replaced = replace_pattern(pattern, replacement, l)
if replaced:
print "Replace '" + l.strip() + "' by '" + replaced.strip() + "'"
result.append(replaced)
any_replaced = True
else:
result.append(l)
if not any_replaced:
print "Not replacing " + q(pattern) + ". Pattern not found."
self._lines = result
def add_after(self, after, additional_line):
result = []
for l in self._lines:
result.append(l)
if after.strip() == l.strip():
result.append(additional_line + '\n')
print "Add '" + additional_line + "' after '" + after + "'"
self._lines = result
def replace_pattern(pattern, replacement, line):
"""
Replaces everything in line that's surounded by {{{}}} with replacement. This can be useful
to replace values in default configuration files.
Example:
pattern = 'set value={{{12345}}}'
replacement = '54321'
line = 'set value=12345'
returns: 'set value=54321'
See also how the class ConfigFile uses this function.
"""
if line.strip() == re.sub('{{{(.*)}}}', r'\1', pattern):
whitespace_start = re.findall(r'^\s*', line)[0]
whitespace_end = re.findall(r'\s*$', line)[0]
return whitespace_start + re.sub('{{{(.*)}}}', replacement, pattern) + whitespace_end
else:
return None
def _validate_output(output, success_output = None, error_output = None, expect_output = None):
if success_output is not None:
if success_output not in output:
print "Error, expected " + q(success_output) + " but the output was: " + output
exit()
if error_output is not None:
if error_output in output:
print "Error found in output: " + output
exit()
if expect_output is not None:
if expect_output and len(output) <= 0:
print "Error, expected some output but got none"
exit()
elif not expect_output and len(output) > 0:
print "Error, didn't expect any output but got: " + output
exit()
class NonBlockingPopen(subprocess.Popen):
"""
Helper class for controlling prompt-like (e.g. shell, sqlplus, ...) commands.
It does so by creating a seperate thread that continously reads from the stdout
of the process.
"""
def __init__(self, cmd):
subprocess.Popen.__init__(self, cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
self._create_read_queue()
def _enqueue_output(self):
# keep reading until the thread dies and put the output into the queue
for line in iter(self.stdout.readline, ''):
self.reading_queue.put(line)
def _create_read_queue(self):
# create a thread that continously listens at the process stdout
self.reading_queue = Queue.Queue()
t = threading.Thread(target=self._enqueue_output)
t.daemon = True
t.start()
def execute(self, cmd, success_output = None, error_output = None, expect_output = None, time_until_read = 1):
"""
Executes a command at this prompt and returns the output. Performs basic checks on the output,
depending on the parameter.
Args:
success_output (str): None or the success message that the output _must_ contain.
error_output (str): None or the error message that the output _must not_ contain.
expect_output (bool): Determines whether there must be any output or not. None means no check.
time_until_read (long): Time to wait before attempting to read the output
"""
print "Execute " + q(cmd)
self.stdin.write(cmd + eol)
return self.read_delayed_and_validate(success_output, error_output, expect_output, time_until_read)
def read_delayed_and_validate(self, success_output = None, error_output = None, expect_output = None, time_until_read = 1):
time.sleep(time_until_read) # wait until we (hopefully) have some output
output = self.read()
_validate_output(output, success_output, error_output, expect_output)
return output
def read(self):
"""
Returns everything that's in the queue right now. Note that the application may
need some time to produce an output so it may be necessary to wait before calling
this method.
"""
result = ''
try:
while True:
result += self.reading_queue.get_nowait() + eol
except Queue.Empty:
return result
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment