Last active
September 21, 2022 18:44
-
-
Save simplymathematics/05ee979bbd9c455501fec570a79365d9 to your computer and use it in GitHub Desktop.
basic watcher script in python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Copyright 2022 @simplymathematics | |
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: | |
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import watchdog.events | |
import watchdog.observers | |
import time | |
from pssh.clients import ParallelSSHClient | |
import argparse | |
import json | |
from pathlib import Path | |
import logging | |
from gevent import joinall | |
import sys | |
events = [] | |
# TOTAL = len(pd.read_csv("original.csv")) | |
# QUEUE = len(pd.read_csv("queue.csv")) | |
# REGEX = '**/*predictions.json' | |
logger = logging.getLogger(__name__) | |
def createSSHClient(hosts, port, user, password): | |
client = ParallelSSHClient(hosts, port=port, user=user, password=password) | |
output = client.run_command("ls -l") | |
assert output[0].exit_code == 0, "SSH connection failed" | |
logger.info("Parallel SSH connections established") | |
return client | |
class JSONHandler(watchdog.events.PatternMatchingEventHandler): | |
def __init__(self, servers, port, user, password, filename, destination, **kwargs): | |
# Set the patterns for PatternMatchingEventHandler | |
watchdog.events.PatternMatchingEventHandler.__init__( | |
self, patterns=[REGEX], ignore_directories=True, case_sensitive=False | |
) | |
self.ssh = createSSHClient(servers, port, user, password) | |
logger.info("Initiated SSH client") | |
self.filename = filename | |
self.destination = destination | |
self.recurse = kwargs["recursive"] if "recurse" in kwargs else False | |
logger.info( | |
"Source file is {} and destination is {}".format( | |
self.filename, self.destination | |
) | |
) | |
logger.info("Regex is {}".format(REGEX)) | |
def on_created(self, event): | |
logger.info("Watchdog received created event - % s." % event.src_path) | |
events.append(event.src_path) | |
old = self.filename | |
self.filename = event.src_path | |
try: | |
self.transform_json() | |
logger.info("Transformed JSON") | |
except Exception as e: | |
logger.warning("Could not transform json") | |
logger.warning(e) | |
if "TOTAL" and "QUEUE" in locals(): | |
try: | |
self.calculate_progress(TOTAL, QUEUE) | |
logger.info("Calculated progress") | |
except Exception as e: | |
logger.warning("Could not calculate progress") | |
logger.warning(e) | |
try: | |
self.send_json_with_scp() | |
logger.info("Sent JSON") | |
except KeyboardInterrupt as e: | |
logger.warning("Keyboard interrupt") | |
raise e | |
except Exception as e: | |
logger.warning("Could not send json") | |
logger.warning(e) | |
# Event is created, you can process it now | |
def calculate_progress(total, queue): | |
progress = (total - queue) / total | |
dict_ = {"complete": progress, "remaining": 1 - progress} | |
with open("progress.json", "w") as f: | |
json.dump(dict_, f) | |
return dict_ | |
def transform_json(self): | |
pass | |
def send_json_with_scp(self): | |
remotename = Path(self.destination, self.filename).as_posix() | |
cmds = self.ssh.scp_send(self.filename, remotename, recurse=self.recurse) | |
joinall(cmds, raise_error=True) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser( | |
description="Process some json files and send them to a server. Or send and then process. Your choice." | |
) | |
parser.add_argument( | |
"--source", "-i", type=str, required=True, help="The source to watch for files." | |
) | |
parser.add_argument( | |
"--destination", | |
"-o", | |
type=str, | |
required=True, | |
help="The destination to send the files to.", | |
) | |
parser.add_argument( | |
"--server", | |
"-s", | |
type=str, | |
required=True, | |
help="The server to send the files to.", | |
) | |
parser.add_argument("--port", "-p", type=int, help="The port to send the files to.") | |
parser.add_argument( | |
"--user", "-u", type=str, required=True, help="The user to send the files to." | |
) | |
parser.add_argument( | |
"--password", | |
"-k", | |
type=str, | |
required=True, | |
help="The password to send the files to.", | |
) | |
parser.add_argument("--original", type=str, help="The original queue file.") | |
parser.add_argument("--queue", type=str, help="The current queue file.") | |
parser.add_argument( | |
"--regex", "-e", type=str, required=True, help="The regex to watch for." | |
) | |
parser.add_argument( | |
"--recursive", "-r", type=bool, default=True, help="Whether to recurse or not." | |
) | |
parser.add_argument( | |
"--n_jobs", | |
"-j", | |
type=int, | |
default=8, | |
help="The number of jobs to run in parallel.", | |
) | |
parser.add_argument( | |
"--log", "-l", type=int, default=logging.INFO, help="The log level." | |
) | |
args = parser.parse_args() | |
# Set up logging | |
logging.basicConfig( | |
level=args.log, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" | |
) | |
if args.regex is not None: | |
REGEX = args.regex | |
else: | |
raise ValueError("You must specify a regex to watch for.") | |
# Assuming this is watching some long-running process (like a model training), | |
# you may find it beneficial to watch the progress. | |
# First, generate an "original" file that contains one line | |
# for every experiment configuration you would like to test. | |
# The contents don't matter. It only counts lines. | |
# Then, when each experiment is complete, pop a line from that file. | |
# This is called the "queue" file. | |
# If these files exist, you will get a log to stdout and a | |
# progress.json file containing the "completed" and "remaining" amounts. | |
if args.original is not None: | |
with open(args.original, "r") as f: | |
TOTAL = len(f.readlines()) | |
if args.queue is not None: | |
with open(args.queue, "r") as f: | |
QUEUE = len(f.readlines()) | |
# SUPPORTS PARALLL HOSTS. Specify n jobs or write a list of hosts here. | |
hosts = [args.server] * args.n_jobs | |
src_path = Path(args.source).parent | |
event_handler = JSONHandler( | |
hosts, | |
args.port, | |
args.user, | |
args.password, | |
args.source, | |
args.destination, | |
recursive=args.recursive, | |
) | |
observer = watchdog.observers.Observer() | |
observer.schedule(event_handler, path=src_path, recursive=True) | |
observer.start() | |
try: | |
while True: | |
time.sleep(1) | |
except KeyboardInterrupt: | |
observer.stop() | |
observer.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
README.md
Dependencies
Example usage:
Will search for the current working directory (
.
) for all scores.json files. Upon creation, it send the file to lab.fake.com and put the file in Alice's home (~/
) directory using the normal ssh port 22.If you pass
--queue
and--original
params, it will read the number of lines in each file and calculate the percent of the queue that remains from the original.Update!
pass the
--n_jobs
or-j
command to set the number of parallel sessions. Default is one.will now recursively create directories with the
-r
or--recurse
flags.