Skip to content

Instantly share code, notes, and snippets.

@mingyuan-xia
Created March 11, 2017 18:13
Show Gist options
  • Save mingyuan-xia/f8f6b517f5cba55191137121638d0c5a to your computer and use it in GitHub Desktop.
Save mingyuan-xia/f8f6b517f5cba55191137121638d0c5a to your computer and use it in GitHub Desktop.
Control a process that produces JSON output in Python
# -*- coding: utf-8 -*-
import subprocess
import time
import os
import fcntl
import json
from datastruct import truncate_string
from colorprint import print_warning, print_emph
def non_block_read(fobj):
""" Perform a non-blocking read by setting the file object to O_NONBLOCK
:param fobj: A file-like object
:return:
"""
fd = fobj.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
return fobj.read()
except:
return ""
class JSONProcess:
PROCESS_BUF_SIZE = 1024*256
MAX_JSON_PROCESS_RESULT_LENGTH = 4096
""" If anything wrong happens, only preserve 4KB output from the output
"""
def __init__(self, name, cmdline, cwd=None):
self.name = name
self.cmdline = cmdline
self.cwd = cwd
self.out, self.err = '', ''
self.result = None
self.proc = None
def start(self):
self.proc = subprocess.Popen(self.cmdline, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, cwd=self.cwd, bufsize=JSONProcess.PROCESS_BUF_SIZE)
def is_finished(self):
"""
Check if the process is finished
:return: True if the process is terminated, not started or finished normally
"""
if self.proc is None: return True
self.out += non_block_read(self.proc.stdout)
self.err += non_block_read(self.proc.stderr)
return self.proc.poll() is not None
def terminate(self):
if self.proc is not None:
self.proc.terminate()
self.proc = None
self.result = {}
def get_result(self):
"""
Retrieve the result of the JSON process
:return:
None if not finished or not started;
an empty dict if the process is terminated;
a dict with only stdout and stderr if failed to parse JSON from the process stdout;
a dict with parsed JSON from the process stdout
"""
# terminated or the result is already parsed
if self.result is not None: return self.result
# not started
if self.proc is None: return None
# still running
if not self.is_finished(): return None
out, err = self.out, self.err
try:
self.result = json.loads(out)
except ValueError:
print_warning('failed to parse JSON from a JSONProcess: ' + self.name)
self.result = {'stderr': truncate_string(err, JSONProcess.MAX_JSON_PROCESS_RESULT_LENGTH),
'stdout': truncate_string(out, JSONProcess.MAX_JSON_PROCESS_RESULT_LENGTH)}
print_emph(self.result)
return self.result
def wait_task(self, timeout):
""" Wait until the JSON process is finished, with a timeout
:param timeout: in seconds
:return: status (True is successful), result (result object from the process)
"""
status, result = True, None
while not self.is_finished():
time.sleep(1)
timeout -= 1
if timeout < 0:
self.terminate()
status = False
break
result = self.get_result()
return status, result
@ulrick65
Copy link

ulrick65 commented Mar 3, 2018

This is exactly what I need for a small project I am working on but I am on Python 2.7 on Windows and having some trouble adapting your code to work. I am stuck on fcntl as that is not available on Windows Python. Would you be able to suggest a way to get this working on Windows?

I'm net to Python and not sure exactly what is going on with the non block read.

Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment