Skip to content

Instantly share code, notes, and snippets.

@hbldh
Created February 4, 2015 09:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hbldh/9a9b81b8c199ec41c4eb to your computer and use it in GitHub Desktop.
Save hbldh/9a9b81b8c199ec41c4eb to your computer and use it in GitHub Desktop.
Subprocess with timout. Inspired by http://stackoverflow.com/a/4825933
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
:mod:`subprocess_thread`
========================
.. module:: subprocess_thread
:platform: Unix, Windows
:synopsis:
.. moduleauthor:: hbldh <henrik.blidh@nedomkull.com>
Created on 2014-09-03, 08:45
"""
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from __future__ import absolute_import
import subprocess
import threading
class SubprocessThread(object):
"""Tool for starting a subprocess with a timeout.
Inspired by http://stackoverflow.com/a/4825933
"""
def __init__(self, cmd, cwd=None):
self.cmd = cmd
self.cwd = cwd
self._process = None
self._output = None
def run(self, timeout):
"""Run the new process in a separate thread.
:param timeout: The time that this thread should be allowed to live.
:type timeout: int, float
:return: The Process stdout.
:rtype: unicode
"""
def target():
self._process = subprocess.Popen(self.cmd, stdout=subprocess.PIPE, cwd=self.cwd)
self._output = self._process.communicate()[0]
thread = threading.Thread(target=target)
thread.start()
thread.join(timeout)
if thread.is_alive():
self._process.terminate()
thread.join()
return self._output
def main():
import os
import argparse
parser = argparse.ArgumentParser(description='Running a subprocess call in a separate thread to enable timeout.')
parser.add_argument('cmd', help='The command to run in separate process.')
parser.add_argument('--cwd', action='store', default=os.path.dirname(os.path.abspath(__file__)),
help='The directory to execute the command in.')
parser.add_argument('--timeout', action='store', type=int, default=60,
help='Timeout for the process. Default is 60 seconds.')
args = parser.parse_args()
t = SubprocessThread(args.cmd.split(' '), cwd=args.cwd)
output = t.run(args.timeout)
if output is not None:
print(output)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment