Skip to content

Instantly share code, notes, and snippets.

@jtprince
Last active July 31, 2016 23:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jtprince/506016cfc0b5111c7ad48757a6620670 to your computer and use it in GitHub Desktop.
Save jtprince/506016cfc0b5111c7ad48757a6620670 to your computer and use it in GitHub Desktop.
import subprocess
class SaveIndixDataError(Exception):
""" There was some error with saving indix data to the database. """
pass
class SaveIndixDataMismatchError(SaveIndixDataError):
""" Number of submitted records did not match number processed. """
pass
class IndixDataSubmitter():
""" Submits data to the legacy DB via a specified process.
Expects the last line of output from the process's stdout to indicate the
number of records successfully processed and take the form:
^(\d+) <Optional Text>
For example:
12 objects processed successfully.
"""
DEFAULT_UPLOAD_CMD = "cd /home/build/suppliers.doba.com/automate && execphp ./loadIndixData.php"
@classmethod
def submit_filename(cls, filename, upload_cmd=DEFAULT_UPLOAD_CMD):
""" Convenience method to submit a file to a process specified by some command.
Args:
filename (str): The name of the file with the data.
upload_cmd (str): The shell command to use for uploading the data from the file.
"""
with open(filename) as datafile:
options = dict(
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True,
)
with subprocess.Popen(upload_cmd, **options) as proc:
cls().submit(datafile, proc)
def submit(self, infile, upload_process):
""" Submits infile data to upload_process and verifies success.
After submitting data will call upload_process.stdin.close() to signal EOF.
Args:
infile (io): the object with data to write. One complete json object per line.
upload_process (io): process with pipes to stdin, stdout, and stderr
Raises:
SaveIndixDataError: if the upload was not successful.
"""
number_submitted = self._submit_data(infile, upload_process.stdin)
upload_process.stdin.close()
upload_process.wait()
self._raise_if_any_errors(upload_process.stderr)
reply = upload_process.stdout.read().decode()
last_line = reply.strip().split("\n")[-1]
self._verify_success(last_line, number_submitted)
@staticmethod
def _verify_success(reply, number_submitted):
# will be of form \d+ indix data lines processed
num_successfully_processed = int(reply.split(" ")[0])
if num_successfully_processed != number_submitted:
raise SaveIndixDataMismatchError(
"number submitted ({}) did not match stated number processed ({})!".format(
number_submitted,
num_successfully_processed,
)
)
@staticmethod
def _submit_data(infile, pipe):
""" Writes data in infile to pipe and returns number of data lines submitted. """
num_submitted = 0
for line in infile:
if line:
pipe.write(line.encode())
num_submitted += 1
return num_submitted
@staticmethod
def _raise_if_any_errors(error_pipe):
error_reply = error_pipe.read().decode()
if error_reply:
raise SaveIndixDataError(error_reply)
if __name__ == '__main__':
# for testing
fake_datafile = "data.json"
# fake_datafile is just this line repeated 4 times
# {"min_price": 23.22, "last_update": "2011-01-01", "most_recent": true}
fake_indix_cmd = "python3 receiver.py"
IndixDataSubmitter.submit_filename(fake_datafile, fake_indix_cmd)
# contents of receiver.py
"""
try:
num_taken = 0
while True:
print('****'+input())
#raise Exception("There was a problem people")
num_taken += 1
except EOFError:
print("{} lines submitted".format(num_taken))
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment