Created
January 7, 2019 18:56
-
-
Save com4/836dfb6dc2b2130f7ac1b89ab8c6fd21 to your computer and use it in GitHub Desktop.
A utility for managing EventMQ schedules stored in Redis.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2 | |
# LocalWords: redis Namespace bool Args str STDOUT JSON myhost usr oldhost | |
# LocalWords: newhost env LocalWords argparse kwargs args func kwarg stdout | |
# LocalWords: tPayload namespace tElement deserializing url unix clusterunix | |
# LocalWords: rediscluster py EventMQ | |
import argparse | |
from importlib import import_module | |
import json | |
import sys | |
from urlparse import urlparse | |
# Placeholders for modules that are imported later | |
redis = None | |
rediscluster = None | |
__version__ = "0.1.3" | |
def _make_kwargs(args, skip_func_kwarg=True): | |
"""Convert argparse Namespace object into dictionary containing kwargs. | |
Args: | |
args (Namespace): the namespace object from parser.parse_args() | |
skip_func_kwarg (bool): Don't add ``func`` to the dictionary. | |
*Default is True* | |
Returns: | |
(dict) Dictionary containing the arguments contained in the provided | |
``Namespace``. | |
""" | |
retval = {} | |
for k, v in args._get_kwargs(): | |
if skip_func_kwarg == True and k == "func": | |
continue | |
retval[k] = v | |
return retval | |
def get_schedules_generator(from_): | |
"""Generator to retrieve schedules from the provided redis address. | |
Args: | |
from_ (str): connection string for redis host to read schedules from. | |
Returns: | |
generator: Yields scheduled jobs in the requested redis host as a tuple | |
with the form (id<str>, payload<str>) | |
""" | |
server = get_redis_connection_from_url(from_) | |
retval = {} | |
for id_ in server.lrange("interval_jobs", 0, -1): | |
payload = server.get(id_) | |
yield (id_, payload) | |
def get_redis_connection_from_url(url): | |
"""Create connection to redis based on url. | |
There are two different modules that need to be used depending on if you're | |
connecting to a redis host or a redis cluster. This checks the scheme and | |
passes the url to the proper low level class. | |
Args: | |
url (str): URL to connect to | |
Return: | |
A client object connected to the proper redis host with the correct class | |
""" | |
global redis, rediscluster | |
if url.startswith("redis") or url.startswith("unix"): | |
if not redis: | |
try: | |
redis = import_module("redis") | |
except ImportError: | |
sys.stdout.write("Missing redis module. pip install redis\n") | |
sys.exit(1) | |
return redis.StrictRedis.from_url(url) | |
elif url.startswith("cluster"): | |
if not rediscluster: | |
try: | |
rediscluster = import_module("rediscluster") | |
except ImportError: | |
sys.stdout.write("Missing rediscluster module. pip install redis-py-cluster\n") | |
sys.exit(1) | |
if url.startswith("clusterunix"): | |
url = url.replace("clusterunix", "unix") | |
else: | |
url = url.replace("cluster", "redis") | |
return rediscluster.StrictRedisCluster.from_url(url, skip_full_coverage_check=True) | |
else: | |
sys.stderr.write("Unrecognized schema in url: {}".format(url)) | |
sys.exit(1) | |
def save_schedule(id_, payload, server): | |
"""Set and save schedule to the provided redis connection | |
Args: | |
id_ (str): Hash of the job identifier | |
payload (str): JSON encoded string of job parameters | |
server (redis): client object used to write the information to | |
""" | |
server.lpush("interval_jobs", id_) | |
server.set(id_, payload) | |
def dump_schedules(from_): | |
"""Write JSON formatted schedules to STDOUT. | |
Args: | |
from_ (str): connection string for redis host to read schedules from. | |
""" | |
for id_, payload in get_schedules_generator(from_): | |
# Note: value is a JSON encoded string, so the back up will correctly | |
# have double encoded it. | |
print json.dumps([id_, payload]) | |
def load_schedules(to, source_file): | |
"""Load schedules from JSON formatted ``source_file`` to provided host. | |
Args: | |
to (str): connection string for redis host to write schedules to. | |
source_file (str): file path containing JSON encoded schedules to read | |
from. | |
""" | |
to_server = get_redis_connection_from_url(to) | |
with open(source_file, "r") as f: | |
print "Loading from {}...".format(source_file) | |
for line in f.readlines(): | |
if not line.strip(): | |
continue | |
value = json.loads(line) | |
id_ = value.pop(0) | |
payload = value[0] | |
# Note: Value is a JSON encoded string and should be written to | |
# redis as such | |
try: | |
json.loads(payload) | |
save_schedule(id_, payload, to_server) | |
except ValueError as e: | |
sys.stderr.write("Error parsing payload for id {}".format(id_)) | |
sys.stderr.write("\tPayload: {}".format(payload)) | |
sys.stderr.write("\tPayload Type: {}".format(type(payload))) | |
def migrate_schedules(from_, to): | |
"""Read schedules from ``from_`` host and write them to ``to`` host. | |
Args: | |
from_ (str): connection string of source redis host to migrate | |
schedules from. | |
to (str): connection string of destination redis host to migrate | |
schedules to. | |
""" | |
print "Migrating..." | |
to_server = get_redis_connection_from_url(to) | |
for id_, payload in get_schedules_generator(from_): | |
save_schedule(id_, payload, to_server) | |
def remove_schedule(from_, id_): | |
"""Remove a single schedule from a redis host. | |
Args: | |
from_ (str): connection string of redis host to remove schedule from | |
id_ (str): the id of the schedule to remove | |
""" | |
server = get_redis_connection_from_url(from_) | |
print "Removing id_ {}".format(id_) | |
server.lrem("interval_jobs", 0, id_) | |
server.delete(id_) | |
def purge_schedules(from_): | |
"""Remove all schedules from a redis host. | |
Args: | |
from_ (str): connection string of source redis host to purge | |
schedules from. | |
""" | |
server = get_redis_connection_from_url(from_) | |
for id_ in server.lrange("interval_jobs", 0, -1): | |
server.delete(id_) | |
server.delete("interval_jobs") | |
def validate_schedules(from_): | |
"""Validate JSON payload for schedules on a redis host. | |
Args: | |
from_ (str): connection string of source redis host to validate | |
schedules from. | |
""" | |
error_found = False | |
for id_, value in get_schedules_generator(from_): | |
# Note: value is a JSON encoded string, so the back up will correctly | |
# have double encoded it. | |
try: | |
payload = json.loads(value) | |
except ValueError: | |
error_found = True | |
sys.stderr.write("Error deserializing schedule with id: " | |
"{}\n".format(id_)) | |
sys.stderr.write("\tPayload: {}\n".format(value)) | |
sys.stderr.write("\tPayload Type: {}\n".format(type(value))) | |
continue | |
if not isinstance(payload, list): | |
error_found = True | |
sys.stderr.write("Invalid payload type for schedule with id: " | |
"{}\n".format(id_)) | |
sys.stderr.write("\tPayload: {}\n".format(payload)) | |
sys.stderr.write("\tPayload Type: {}\n".format(type(payload))) | |
continue | |
if len(payload) != 5: | |
error_found = True | |
sys.stderr.write("Invalid element count for schedule with id: " | |
"{}\n".format(id_)) | |
sys.stderr.write("\tPayload: {}\n".format(payload)) | |
sys.stderr.write("\tElement Count (5 expected): {}\n".format(len(payload))) | |
continue | |
try: | |
job_params = json.loads(payload[3]) | |
except ValueError: | |
error_found = True | |
sys.stderr.write("Error deserializing job from schedule with id: " | |
"{}\n".format(id_)) | |
sys.stderr.write("\tPayload: {}\n".format(value)) | |
sys.stderr.write("\tPayload Type: {}\n".format(type(value))) | |
continue | |
if not error_found: | |
print "No JSON errors found on {}".format(from_) | |
def print_version(): | |
"""Print version of this utility""" | |
print __version__ | |
if __name__ == "__main__": | |
import textwrap | |
parser = argparse.ArgumentParser( | |
formatter_class=argparse.RawTextHelpFormatter, | |
description="Utility for managing EventMQ schedules stored in redis.\n\n" | |
"Valid schemes:\n" | |
"- redis://\n" | |
"- rediss://\n" | |
"- unix://\n" | |
"- cluster://\n" | |
"- clusterunix://\n" | |
) | |
subparsers = parser.add_subparsers() | |
# ---- load | |
load_parser = subparsers.add_parser( | |
"load", | |
help="load schedules to a redis host from a file") | |
load_parser.set_defaults(func=load_schedules) | |
load_parser.add_argument( | |
"--to", | |
action="store", | |
required=True, | |
help="the redis host to load schedules into " | |
"from. E.g. redis://redis.myhost.com:6379/0") | |
load_parser.add_argument( | |
"--source_file", | |
action="store", | |
required=True, | |
help="the source file with JSON formatted schedules to load into --to") | |
# ---- dump | |
dump_parser = subparsers.add_parser( | |
"dump", | |
help="dump the schedules from a redis host to stdout") | |
dump_parser.set_defaults(func=dump_schedules) | |
dump_parser.add_argument( | |
"--from", | |
dest="from_", | |
action="store", | |
required=True, | |
help="the redis host to dump schedules " | |
"from. E.g. redis://redis.myhost.com:6379/0") | |
# ---- migrate | |
migrate_parser = subparsers.add_parser( | |
"migrate", | |
help="copy schedules from one redis host to another") | |
migrate_parser.set_defaults(func=migrate_schedules) | |
migrate_parser.add_argument( | |
"--from", | |
dest="from_", | |
action="store", | |
required=True, | |
help="the redis host to copy schedules " | |
"from. E.g. redis://redis.oldhost.com:6379/0") | |
migrate_parser.add_argument( | |
"--to", | |
required=True, | |
action="store", | |
help="the redis host to copy schedules " | |
"to. E.g. redis://redis.newhost.com:6379/0") | |
# ---- remove schedule | |
remove_parser = subparsers.add_parser( | |
"remove", | |
help="remove a particular schedule from redis by id" | |
) | |
remove_parser.set_defaults(func=remove_schedule) | |
remove_parser.add_argument( | |
"--from", | |
dest="from_", | |
action="store", | |
required=True, | |
help="the redis host to remove a schedule from " | |
"E.g. redis://redis.myhost.com:6379/0") | |
remove_parser.add_argument( | |
"--id", | |
dest="id_", | |
action="store", | |
required=True, | |
help="the schedule id to remove") | |
# ---- purge | |
purge_parser = subparsers.add_parser( | |
"purge", | |
help="purge all schedules from a redis host") | |
purge_parser.set_defaults(func=purge_schedules) | |
purge_parser.add_argument( | |
"--from", | |
dest="from_", | |
action="store", | |
required=True, | |
help="the redis host to purge schedules " | |
"from. E.g. redis://redis.myhost.com:6379/0") | |
# ---- validate | |
validate_parser = subparsers.add_parser( | |
"validate", | |
help="validate the serialization of schedules on a redis host") | |
validate_parser.set_defaults(func=validate_schedules) | |
validate_parser.add_argument( | |
"--from", | |
dest="from_", | |
action="store", | |
required=True, | |
help="the redis host to validate schedules on. " | |
"E.g. redis://redis.myhost.com:6379/0") | |
# ---- version | |
version_parser = subparsers.add_parser( | |
"version", | |
help="print version and exit") | |
version_parser.set_defaults(func=print_version) | |
args = parser.parse_args() | |
args.func(**_make_kwargs(args)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment