Skip to content

Instantly share code, notes, and snippets.

@cocagne
Created October 4, 2013 19:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cocagne/6831642 to your computer and use it in GitHub Desktop.
Save cocagne/6831642 to your computer and use it in GitHub Desktop.
Trivial key-value store for configuration content
#!/usr/bin/env python
import sys
import zmq
import json
import datetime
import argparse
PUB_ADDR = 'ipc:///tmp/config_daemon_pub'
REP_ADDR = 'ipc:///tmp/config_daemon_rep'
context = zmq.Context()
req_sock = context.socket(zmq.REQ)
sub_sock = context.socket(zmq.SUB)
def pretty_print_json(j):
print json.dumps(json.loads(j), sort_keys=True, indent=4, separators=(',', ': '))
def err_exit( msg ):
print >> sys.stdout, msg
sys.exit(1)
def check_error( parts ):
if parts[0] == 'ERROR':
err_exit( parts[1] )
def get_cmd( args ):
req_sock.connect(REP_ADDR)
parts = ['GET']
if args.label:
parts.append( args.label )
req_sock.send_multipart(parts)
parts = req_sock.recv_multipart()
check_error(parts)
pretty_print_json( parts[1] )
def get_key_cmd( args ):
req_sock.connect(REP_ADDR)
req_sock.send_multipart(['GET'])
parts = req_sock.recv_multipart()
check_error(parts)
print json.loads(parts[1]).get(args.key, '')
def set_cmd( args ):
if not (len(args.pairs)) % 2 == 0 and len(args.pairs) >= 2:
err_exit('Arguments must be an even number of key, value pairs')
req_sock.connect(REP_ADDR)
d = dict()
i = 0
while i < len(args.pairs):
v = args.pairs[i+1]
d[ args.pairs[i] ] = v if v != 'NULL' else None
i += 2
req_sock.send_multipart(['SET', json.dumps(d)])
parts = req_sock.recv_multipart()
check_error(parts)
def watch_cmd( args ):
sub_sock.connect(PUB_ADDR)
sub_sock.setsockopt(zmq.SUBSCRIBE, 'UPDATE')
while True:
parts = sub_sock.recv_multipart()
print '-'*80
print 'Updated: '
pretty_print_json( parts[1] )
if args.show_current:
print 'Current: '
pretty_print_json( parts[2] )
def list_cmd( args ):
req_sock.connect(REP_ADDR)
req_sock.send_multipart(['LIST'])
parts = req_sock.recv_multipart()
check_error(parts)
label_list = json.loads(parts[1])
if not label_list:
print 'No labels defined'
else:
for t in label_list:
print datetime.datetime(*t[0]).isoformat(), t[1]
def label_cmd( args ):
req_sock.connect(REP_ADDR)
if not args.label[0].isalpha():
err_exit('Label must begin with a letter')
req_sock.send_multipart(['LABEL', args.label])
parts = req_sock.recv_multipart()
check_error(parts)
def restore_cmd( args ):
req_sock.connect(REP_ADDR)
if not args.label[0].isalpha():
err_exit('Label must begin with a letter')
req_sock.send_multipart(['RESTORE', args.label])
parts = req_sock.recv_multipart()
check_error(parts)
description='''Configuration daemon CLI'''
top = argparse.ArgumentParser(description=description)
sub = top.add_subparsers()
sub_get = sub.add_parser('get', help='Gets the current configuration')
sub_get.add_argument("label", nargs='?', help="Label of the configuration. If omitted, the current configuration is obtained")
sub_get.set_defaults( sub_command = get_cmd )
sub_get = sub.add_parser('get-value', help='Prints the value of the requested key')
sub_get.add_argument("key", help="Key to print")
sub_get.set_defaults( sub_command = get_key_cmd )
sub_set = sub.add_parser('set', help='Sets configuration key-value pairs. Values set to NULL will be removed from the configuration database')
sub_set.add_argument("pairs", nargs='+', help="List of key, value pairs. The number of arguments must be even.")
sub_set.set_defaults( sub_command = set_cmd )
sub_watch = sub.add_parser('watch', help='Watches for configuration updates')
sub_watch.add_argument("-s", '--show-current', action='store_true', help="Shows the full current configuration in addition to the changed values on each update.")
sub_watch.set_defaults( sub_command = watch_cmd )
sub_list = sub.add_parser('list', help='Lists the labled configurations')
sub_list.set_defaults( sub_command = list_cmd )
sub_label = sub.add_parser('label', help='Labels the current configuration')
sub_label.add_argument("label", help="Label for the current configuration. Must begin with a letter")
sub_label.set_defaults( sub_command = label_cmd )
sub_restore = sub.add_parser('restore', help='Restores a labeled configuration')
sub_restore.add_argument("label", help="Labled configuration")
sub_restore.set_defaults( sub_command = restore_cmd )
args = top.parse_args()
try:
args.sub_command( args )
except KeyboardInterrupt:
pass
#!/usr/bin/env python
'''
= Simple Configuration Daemon =
This module implements a low-performance but very simple configuration daemon
that is built on top of three core technologies:
* ZeroMQ for Inter-Process Communication
* JSON for message encoding
* Zipfiles for configuration archival and labeling
Configurations content is small (less than 10k on average) and updates are rare
(10s of operations during a busy week). Consequently, performance and scalability
are effectively non-issues. Robust operation and easy client use are the
primary goals.
One aspect of this daemon that will not be immediately apparent to those not
familiar with ZeroMQ is that client applications do no need to worry about
bootstrapping considerations. For example, a client application may send a GET
request message before the configuration daemon starts and it will be correctly
serviced once the configuration daemon starts up. Similarly, if the
configuration daemon is restarted on-the-fly, client applications will notice
nothing more than delayed responses to their queries. The built-in reconnection
and retry logic in ZeroMQ insulates the clients from having to deal with these
issues themselves.
== Design ==
Single-file database::
A single JSON file is used to store the current configuration. Updates
are done by way of a full re-write to a temp file followed by an
os.rename()
Atomic updates::
Multiple key-value pairs may be updated in a single request. Requests
are single-threaded so no intermediate values are possible.
No partial reads::
Requests for configuration return the entire configuration as a
single JSON object.
Zip-file archival of old configurations::
Each time the configuration is modified, the previous configuration
is appended, in it's entirety to an +archive.zip+ file in the
same directory as the JSON file.
Configuration Label/Restore::
Clients can assign labels to configurations and later request that
the configuration be restored to the value associated with that
label.
Completely passive::
The daemon does nothing but respond to external requests.
ZeroMQ is used for communication::
* Easy to use
* Language bindings exist for all programming languages
* zmq's built-in queueing and retry mechanisms simplify client code
JSON encoding used for all complex data structures::
Every language has an easy-to-use JSON library
Publish-Subscribe used for notifying clients of configuration updates::
* All changes to the current configuration result in an UPDATE message
being published
* The UPDATE message contains two JSON encoded objects:
.. A dictionary of the updates where the keys are the modified attributes
and the values are tuples of (previous_value, new_vaule)
.. The complete current configuration
== Bootstrapping ==
On initial start, the daemon will ensure that the database directory
exists and will create an empty initial configuration "{}" as well
as an empty archive. The intiail content must be inserted by an
external entity.
== Protocol ==
=== Message Encoding ===
ZeroMQ multi-part messages are used to separate the parts of messages.
=== Requests ===
==== Request Encoding ====
Client requests place the request type in part 0 and follow it with any
request-specific parts.
All requests will receive a reply. If the requested operation was successfull,
the reply will contain 'OK' in part0 and the subsequent parts (if any) will be
formatted on a per-request basis. If an error occured, the reply will consist
of two message parts with 'ERROR' contained in part 0 and a human-readable
error message in part 1.
==== GET ====
Additional Request Parts: [label]
Additional Response parts: json-encoded-configuration
Obtains the configuration for the requested label or the current configuration
if +[label]+ is omitted
==== SET ====
Additional Request Parts: json-encoded-dictionary
Additional Response parts:
Sets one or more values in the curent configuration. The encoded dictionary
must contain string keys mapping to primitive values (numbers and string
only). If the value is encoded as a JSON null, the corresponding entry in the
config database will be deleted.
==== LABEL ====
Additional Request Parts: label
Additional Response parts:
Sets the supplied label to reference the current configuration
==== LIST ====
Additional Request Parts:
Additional Response parts: json-encoded-list
Obtains a list of all available configuration labels. The entries of
the encoded list are tuples of: ( (year,day,month,hours,minutes,seconds), label )
The first element of the tuple is the time at which the label was created.
Each entry in the timestamp tuple is an integer.
==== RESTORE ====
Additional Request Parts: label
Additional Response parts:
Restores the configuration to the labeled value
=== Publishes ===
=== UPDATE ===
Part 0: 'UPDATE'
Part 1: json-encoded-deltas-dictionary
Part 2: json-encoded-current-configuration-dictionary
The deltas dictionary maps the modified keys to tuples of (previous_value, current_value)
'''
import os
import zmq
import json
import time
import os.path
import zipfile
import tempfile
DB_DIR = '/tmp/db_dir'
PUB_ADDR = 'ipc:///tmp/config_daemon_pub'
REP_ADDR = 'ipc:///tmp/config_daemon_rep'
CURRENT_FN = DB_DIR + '/current'
ARCHIVE_FN = DB_DIR + '/archive.zip'
jcurrent = '{}' # JSON-searialized version of "current" used as a cache to
# prevent continual re-serialization
current = dict()
if not os.path.exists(DB_DIR):
os.makedirs(DB_DIR, mode=0755)
if os.path.exists(CURRENT_FN):
with open(CURRENT_FN) as f:
jcurrent = f.read()
current = json.loads( jcurrent )
context = zmq.Context()
pub_sock = context.socket(zmq.PUB)
rep_sock = context.socket(zmq.REP)
pub_sock.bind(PUB_ADDR)
rep_sock.bind(REP_ADDR)
def archive_and_update( new_values, save_name=None, restore=False ):
global jcurrent
try:
archive = zipfile.ZipFile(ARCHIVE_FN, 'a', zipfile.ZIP_DEFLATED)
archive.writestr(str(time.time()) if save_name is None else save_name, jcurrent)
archive.close()
except:
pass # Non-essential
updated = dict()
if restore:
for k in set(current.keys()) - set(new_values.keys()):
new_values[ k ] = None
for k,v in new_values.iteritems():
old = current.get(k,None)
if old != v:
updated[ k ] = (old, v)
if not updated:
return
current.update( new_values )
for k,v in current.items():
if v is None:
del current[k]
jcurrent = json.dumps(current)
fd, abs_fn = tempfile.mkstemp( dir=DB_DIR )
with os.fdopen(fd, 'w') as f:
f.write(jcurrent)
f.flush()
os.fsync(fd)
os.chmod(abs_fn, 0644)
os.rename(abs_fn, CURRENT_FN)
pub_sock.send_multipart( ['UPDATE', json.dumps(updated), jcurrent] )
def get_labeled_config( label ):
try:
archive = zipfile.ZipFile(ARCHIVE_FN)
new_values = json.loads(archive.read( label ))
archive.close()
return new_values
except Exception, e:
raise Exception('Failed to load labled configuration {0}: {1}'.format(label, str(e)))
def restore( label ):
archive_and_update( get_labeled_config( label ), restore=True )
def list_labels():
try:
archive = zipfile.ZipFile(ARCHIVE_FN)
ret = [ (zi.date_time, zi.filename) for zi in archive.infolist() if zi.filename[0].isalpha() ]
archive.close()
return json.dumps(ret)
except Exception, e:
raise Exception('Failed to obtain archived configurations: {0}'.format(str(e)))
while True:
try:
parts = rep_sock.recv_multipart()
except KeyboardInterrupt:
break
reply_current = True
try:
if parts[0] == 'GET':
if len(parts) > 1:
reply_current = False
rep_sock.send_multipart(['OK', json.dumps(get_labeled_config(parts[1]))])
elif parts[0] == 'SET':
try:
new_values = json.loads(parts[1])
except:
raise Exception('Failed to parse json value')
if not isinstance(new_values, dict):
raise Exception('JSON value must be a dictionary')
for k,v in new_values.iteritems():
if not isinstance(k,basestring):
raise Exception('Keys must be strings')
if isinstance(v,list) or isinstance(v,dict):
raise Exception('Values must be integers or strings')
archive_and_update( new_values )
elif parts[0] == 'LIST':
reply_current = False
rep_sock.send_multipart(['OK', list_labels()])
elif parts[0] == 'LABEL':
if len(parts) != 2:
raise Exception('LABEL requires two message parts')
if len(parts[1]) > 64:
raise Exception('Label name exceeds 64 characters')
archive_and_update( dict(), parts[1] )
elif parts[0] == 'RESTORE':
if len(parts) != 2:
raise Exception('RESTORE requires two message parts')
restore( parts[1] )
else:
raise Exception('Unsupported request type')
if reply_current:
rep_sock.send_multipart(['OK', jcurrent])
except Exception, e:
import traceback
traceback.print_exc()
rep_sock.send_multipart(['ERROR', str(e)])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment