Skip to content

Instantly share code, notes, and snippets.

@com4
Created January 7, 2019 18:56
Show Gist options
  • Save com4/836dfb6dc2b2130f7ac1b89ab8c6fd21 to your computer and use it in GitHub Desktop.
Save com4/836dfb6dc2b2130f7ac1b89ab8c6fd21 to your computer and use it in GitHub Desktop.
A utility for managing EventMQ schedules stored in Redis.
#!/usr/bin/env python2
# LocalWords: redis Namespace bool Args str STDOUT JSON myhost usr oldhost
# LocalWords: newhost env LocalWords argparse kwargs args func kwarg stdout
# LocalWords: tPayload namespace tElement deserializing url unix clusterunix
# LocalWords: rediscluster py EventMQ
import argparse
from importlib import import_module
import json
import sys
from urlparse import urlparse
# Placeholders for modules that are imported later
redis = None
rediscluster = None
__version__ = "0.1.3"
def _make_kwargs(args, skip_func_kwarg=True):
"""Convert argparse Namespace object into dictionary containing kwargs.
Args:
args (Namespace): the namespace object from parser.parse_args()
skip_func_kwarg (bool): Don't add ``func`` to the dictionary.
*Default is True*
Returns:
(dict) Dictionary containing the arguments contained in the provided
``Namespace``.
"""
retval = {}
for k, v in args._get_kwargs():
if skip_func_kwarg == True and k == "func":
continue
retval[k] = v
return retval
def get_schedules_generator(from_):
"""Generator to retrieve schedules from the provided redis address.
Args:
from_ (str): connection string for redis host to read schedules from.
Returns:
generator: Yields scheduled jobs in the requested redis host as a tuple
with the form (id<str>, payload<str>)
"""
server = get_redis_connection_from_url(from_)
retval = {}
for id_ in server.lrange("interval_jobs", 0, -1):
payload = server.get(id_)
yield (id_, payload)
def get_redis_connection_from_url(url):
"""Create connection to redis based on url.
There are two different modules that need to be used depending on if you're
connecting to a redis host or a redis cluster. This checks the scheme and
passes the url to the proper low level class.
Args:
url (str): URL to connect to
Return:
A client object connected to the proper redis host with the correct class
"""
global redis, rediscluster
if url.startswith("redis") or url.startswith("unix"):
if not redis:
try:
redis = import_module("redis")
except ImportError:
sys.stdout.write("Missing redis module. pip install redis\n")
sys.exit(1)
return redis.StrictRedis.from_url(url)
elif url.startswith("cluster"):
if not rediscluster:
try:
rediscluster = import_module("rediscluster")
except ImportError:
sys.stdout.write("Missing rediscluster module. pip install redis-py-cluster\n")
sys.exit(1)
if url.startswith("clusterunix"):
url = url.replace("clusterunix", "unix")
else:
url = url.replace("cluster", "redis")
return rediscluster.StrictRedisCluster.from_url(url, skip_full_coverage_check=True)
else:
sys.stderr.write("Unrecognized schema in url: {}".format(url))
sys.exit(1)
def save_schedule(id_, payload, server):
"""Set and save schedule to the provided redis connection
Args:
id_ (str): Hash of the job identifier
payload (str): JSON encoded string of job parameters
server (redis): client object used to write the information to
"""
server.lpush("interval_jobs", id_)
server.set(id_, payload)
def dump_schedules(from_):
"""Write JSON formatted schedules to STDOUT.
Args:
from_ (str): connection string for redis host to read schedules from.
"""
for id_, payload in get_schedules_generator(from_):
# Note: value is a JSON encoded string, so the back up will correctly
# have double encoded it.
print json.dumps([id_, payload])
def load_schedules(to, source_file):
"""Load schedules from JSON formatted ``source_file`` to provided host.
Args:
to (str): connection string for redis host to write schedules to.
source_file (str): file path containing JSON encoded schedules to read
from.
"""
to_server = get_redis_connection_from_url(to)
with open(source_file, "r") as f:
print "Loading from {}...".format(source_file)
for line in f.readlines():
if not line.strip():
continue
value = json.loads(line)
id_ = value.pop(0)
payload = value[0]
# Note: Value is a JSON encoded string and should be written to
# redis as such
try:
json.loads(payload)
save_schedule(id_, payload, to_server)
except ValueError as e:
sys.stderr.write("Error parsing payload for id {}".format(id_))
sys.stderr.write("\tPayload: {}".format(payload))
sys.stderr.write("\tPayload Type: {}".format(type(payload)))
def migrate_schedules(from_, to):
"""Read schedules from ``from_`` host and write them to ``to`` host.
Args:
from_ (str): connection string of source redis host to migrate
schedules from.
to (str): connection string of destination redis host to migrate
schedules to.
"""
print "Migrating..."
to_server = get_redis_connection_from_url(to)
for id_, payload in get_schedules_generator(from_):
save_schedule(id_, payload, to_server)
def remove_schedule(from_, id_):
"""Remove a single schedule from a redis host.
Args:
from_ (str): connection string of redis host to remove schedule from
id_ (str): the id of the schedule to remove
"""
server = get_redis_connection_from_url(from_)
print "Removing id_ {}".format(id_)
server.lrem("interval_jobs", 0, id_)
server.delete(id_)
def purge_schedules(from_):
"""Remove all schedules from a redis host.
Args:
from_ (str): connection string of source redis host to purge
schedules from.
"""
server = get_redis_connection_from_url(from_)
for id_ in server.lrange("interval_jobs", 0, -1):
server.delete(id_)
server.delete("interval_jobs")
def validate_schedules(from_):
"""Validate JSON payload for schedules on a redis host.
Args:
from_ (str): connection string of source redis host to validate
schedules from.
"""
error_found = False
for id_, value in get_schedules_generator(from_):
# Note: value is a JSON encoded string, so the back up will correctly
# have double encoded it.
try:
payload = json.loads(value)
except ValueError:
error_found = True
sys.stderr.write("Error deserializing schedule with id: "
"{}\n".format(id_))
sys.stderr.write("\tPayload: {}\n".format(value))
sys.stderr.write("\tPayload Type: {}\n".format(type(value)))
continue
if not isinstance(payload, list):
error_found = True
sys.stderr.write("Invalid payload type for schedule with id: "
"{}\n".format(id_))
sys.stderr.write("\tPayload: {}\n".format(payload))
sys.stderr.write("\tPayload Type: {}\n".format(type(payload)))
continue
if len(payload) != 5:
error_found = True
sys.stderr.write("Invalid element count for schedule with id: "
"{}\n".format(id_))
sys.stderr.write("\tPayload: {}\n".format(payload))
sys.stderr.write("\tElement Count (5 expected): {}\n".format(len(payload)))
continue
try:
job_params = json.loads(payload[3])
except ValueError:
error_found = True
sys.stderr.write("Error deserializing job from schedule with id: "
"{}\n".format(id_))
sys.stderr.write("\tPayload: {}\n".format(value))
sys.stderr.write("\tPayload Type: {}\n".format(type(value)))
continue
if not error_found:
print "No JSON errors found on {}".format(from_)
def print_version():
"""Print version of this utility"""
print __version__
if __name__ == "__main__":
import textwrap
parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter,
description="Utility for managing EventMQ schedules stored in redis.\n\n"
"Valid schemes:\n"
"- redis://\n"
"- rediss://\n"
"- unix://\n"
"- cluster://\n"
"- clusterunix://\n"
)
subparsers = parser.add_subparsers()
# ---- load
load_parser = subparsers.add_parser(
"load",
help="load schedules to a redis host from a file")
load_parser.set_defaults(func=load_schedules)
load_parser.add_argument(
"--to",
action="store",
required=True,
help="the redis host to load schedules into "
"from. E.g. redis://redis.myhost.com:6379/0")
load_parser.add_argument(
"--source_file",
action="store",
required=True,
help="the source file with JSON formatted schedules to load into --to")
# ---- dump
dump_parser = subparsers.add_parser(
"dump",
help="dump the schedules from a redis host to stdout")
dump_parser.set_defaults(func=dump_schedules)
dump_parser.add_argument(
"--from",
dest="from_",
action="store",
required=True,
help="the redis host to dump schedules "
"from. E.g. redis://redis.myhost.com:6379/0")
# ---- migrate
migrate_parser = subparsers.add_parser(
"migrate",
help="copy schedules from one redis host to another")
migrate_parser.set_defaults(func=migrate_schedules)
migrate_parser.add_argument(
"--from",
dest="from_",
action="store",
required=True,
help="the redis host to copy schedules "
"from. E.g. redis://redis.oldhost.com:6379/0")
migrate_parser.add_argument(
"--to",
required=True,
action="store",
help="the redis host to copy schedules "
"to. E.g. redis://redis.newhost.com:6379/0")
# ---- remove schedule
remove_parser = subparsers.add_parser(
"remove",
help="remove a particular schedule from redis by id"
)
remove_parser.set_defaults(func=remove_schedule)
remove_parser.add_argument(
"--from",
dest="from_",
action="store",
required=True,
help="the redis host to remove a schedule from "
"E.g. redis://redis.myhost.com:6379/0")
remove_parser.add_argument(
"--id",
dest="id_",
action="store",
required=True,
help="the schedule id to remove")
# ---- purge
purge_parser = subparsers.add_parser(
"purge",
help="purge all schedules from a redis host")
purge_parser.set_defaults(func=purge_schedules)
purge_parser.add_argument(
"--from",
dest="from_",
action="store",
required=True,
help="the redis host to purge schedules "
"from. E.g. redis://redis.myhost.com:6379/0")
# ---- validate
validate_parser = subparsers.add_parser(
"validate",
help="validate the serialization of schedules on a redis host")
validate_parser.set_defaults(func=validate_schedules)
validate_parser.add_argument(
"--from",
dest="from_",
action="store",
required=True,
help="the redis host to validate schedules on. "
"E.g. redis://redis.myhost.com:6379/0")
# ---- version
version_parser = subparsers.add_parser(
"version",
help="print version and exit")
version_parser.set_defaults(func=print_version)
args = parser.parse_args()
args.func(**_make_kwargs(args))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment