Skip to content

Instantly share code, notes, and snippets.

@mivanov
Forked from JasperVanDenBosch/fexpect.py
Created April 5, 2012 20:38
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mivanov/2313914 to your computer and use it in GitHub Desktop.
Save mivanov/2313914 to your computer and use it in GitHub Desktop.
Fexpect module
from fexpect import expect, expecting, run, sudo
# fabric extension to enable handling expected prompts
#
# Read more at http://ilogue.com/jasper/blog/fexpect--dealing-with-prompts-in-fabric-with-pexpect/
#
# This file Copyright (c) Jasper van den Bosch, ilogue, jasper@ilogue.com
# Pexpect Copyright (c) 2012 Noah Spurrier ,see: http://www.noah.org/wiki/pexpect#License
from fabric.state import env
import fabric.api
import shortuuid
from StringIO import StringIO
def expect(promptexpr, response, exitAfter=-1):
if not exitAfter == -1:
return [(promptexpr, response, exitAfter)]
return [(promptexpr, response)]
def expecting(e):
return ExpectationContext(e)
class ExpectationContext(object):
def __init__(self,expectations):
self.expectations = expectations
def __enter__(self):
env.expectations = self.expectations
def __exit__(self, type, value, tb):
env.expectations = []
def run(cmd):
#sudo wrapper
wrappedCmd = wrapExpectations(cmd,env)
return fabric.api.run(wrappedCmd)
def sudo(cmd):
#sudo wrapper
wrappedCmd = wrapExpectations(cmd,env)
return fabric.api.sudo(wrappedCmd)
def wrapExpectations(cmd,env):
script = createScript(cmd,env)
remoteScript = '/tmp/fexpect_'+shortuuid.uuid()
import pexpect
pexpect_module = pexpect.__file__
if pexpect_module.endswith('.pyc'):
pexpect_module = pexpect_module[:-1]
fabric.api.put(pexpect_module,'/tmp/')
fabric.api.put(StringIO(script),remoteScript)
wrappedCmd = 'python '+remoteScript
return wrappedCmd
def createScript(cmd,env):
to = 30*60 # readline timeout 8 hours
#write header:
s = '#!/usr/bin/python\n'
s+= 'import sys\n'
s+= 'from time import sleep\n'
s+= 'import pexpect\n'
#write expectation list:
s+= 'expectations=['
for e in env.expectations:
s+= '"{0}",'.format(e[0])
s+= ']\n'
#start
s+= """child = pexpect.spawn('/bin/bash -c "{0}"',timeout={1})\n""".format(cmd,to)
s+= "child.logfile = sys.stdout\n"
s+= "while True:\n"
s+= "\ttry:\n"
s+= "\t\ti = child.expect(expectations)\n"
i = 0
for e in env.expectations:
ifkeyw = 'if' if i == 0 else 'elif'
s+= "\t\t{0} i == {1}:\n".format(ifkeyw,i)
s+= "\t\t\tchild.sendline('{0}')\n".format(e[1])
if len(e)>2:
s+= "\t\t\tsleep({0})\n".format(e[2])
s+= "\t\t\tprint('Exiting fexpect for expected exit.')\n"
s+= '\t\t\tbreak\n'
i += 1
s+= '\texcept pexpect.EOF:\n'
s+= "\t\tprint('Exiting fexpect for EOF.')\n"
s+= '\t\tbreak\n'
s+= '\n'
return s
# fabric extension to enable handling expected prompts
#
# Read more at http://ilogue.com/jasper/blog/fexpect--dealing-with-prompts-in-fabric-with-pexpect/
#
# This file Copyright (c) Jasper van den Bosch, ilogue, jasper@ilogue.com
# Pexpect Copyright (c) 2012 Noah Spurrier ,see: http://www.noah.org/wiki/pexpect#License
# Run from a fabfile.py with:
#@task()
#def test():
# from tests import FexpectTests, runtest
# runtest(FexpectTests)
import unittest
from fabric.api import *
def runtest(testclass):
suite = unittest.TestLoader().loadTestsFromTestCase(testclass)
unittest.TextTestRunner(verbosity=2).run(suite)
class FexpectTests(unittest.TestCase):
def test_one_expectation(self):
cmd = 'echo "Hello" && read NAME && echo "Hi $NAME."'
from fexpect import expect, expecting, run
expectation = expect('Hello','answer')
with expecting(expectation):
output = run(cmd)
self.assertIn('answer',output)
def test_two_expectations(self):
cmd = 'echo "Hello" && read ONE && echo "bladiebla" && read TWO && echo "First $ONE than $TWO."'
from fexpect import expect, expecting, run
exp1 = expect('Hello','111')
exp2 = expect('bladiebla','222')
with expecting(exp1+exp2):
output = run(cmd)
self.assertIn('111',output)
self.assertIn('222',output)
def test_order_inconsequential(self):
#sequence shouldn't matter
cmd = 'echo "Hello" && read ONE && echo "bladiebla" && read TWO && echo "First $ONE than $TWO."'
from fexpect import expect, expecting, run
exp1 = expect('Hello','111')
exp2 = expect('bladiebla','222')
with expecting(exp2+exp1):
output = run(cmd)
self.assertIn('111',output)
self.assertIn('222',output)
def test_exit_after_expectation(self):
import time
from StringIO import StringIO
#sequence shouldn't matter
script = "#!/usr/bin/python\nimport time\nfor i in range(1,8):\n\tprint(i)\n\ttime.sleep(1)"
cmd = 'python /tmp/test.py'
put(StringIO(script),'/tmp/test.py')
from fexpect import expect, expecting, run
exp1 = expect('Hello','111')
exp2 = expect('3','expected',exitAfter=0)
t = time.time()
with expecting(exp1+exp2):
output = run(cmd)
elapsed = time.time() - t
self.assertGreater(elapsed,2)
self.assertLess(elapsed,4)
def tryOrFailOnPrompt(self,method,args):
try:
with settings(abort_on_prompts=True):
result = method(*args)
except SystemExit as promptAbort:
self.fail("There was an unexpected (password) prompt.")
return result
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment