Skip to content

Instantly share code, notes, and snippets.

@dims
Last active March 20, 2017 07:43
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save dims/19ceaf9472ef54aa3011d7a11e809371 to your computer and use it in GitHub Desktop.
import base64
import json
import requests
import six
import time
import uuid
DEFAULT_TIMEOUT = 30
LOCK_PREFIX = '/locks/'
def _encode(data):
if isinstance(data, six.string_types):
data = six.b(data)
return base64.b64encode(data).decode("utf-8")
def _decode(data):
if isinstance(data, six.string_types):
data = six.b(data)
return base64.b64decode(data).decode("utf-8")
class Lease(object):
def __init__(self, id, client=None):
self.id = id
self.client = client
def revoke(self):
self.client.post(self.client.get_url("/kv/lease/revoke"),
json={"ID": self.id})
return True
def ttl(self):
result = self.client.post(self.client.get_url("/kv/lease/timetolive"),
json={"ID": self.id})
return int(result['TTL'])
def refresh(self):
result = self.client.post(self.client.get_url("/lease/keepalive"),
json={"ID": self.id})
return int(result['result']['TTL'])
def keys(self):
result = self.client.post(self.client.get_url("/kv/lease/timetolive"),
json={"ID": self.id,
"keys": True})
return [base64.b64decode(six.b(key)).decode('utf-8')
for key in result['keys']]
class Lock(object):
def __init__(self, name, ttl=DEFAULT_TIMEOUT, client=None):
self.name = name
self.ttl = ttl
self.client = client
self.key = LOCK_PREFIX + self.name
self.lease = None
self.uuid = None
def acquire(self):
self.lease = self.client.lease(self.ttl)
self.uuid = str(uuid.uuid1())
base64_key = _encode(self.key)
base64_value = _encode(self.uuid)
txn = {
'compare': [{
'key': base64_key,
'result': 'EQUAL',
'target': 'CREATE',
'create_revision': 0
}],
'success': [{
'request_put': {
'key': base64_key,
'value': base64_value,
'lease': self.lease.id
}
}],
'failure': [{
'request_range': {
'key': base64_key
}
}]
}
result = self.client.transaction(txn)
if 'succeeded' in result:
return result['succeeded']
return False
def release(self):
base64_key = _encode(self.key)
base64_value = _encode(self.uuid)
txn = {
'compare': [{
'key': base64_key,
'result': 'EQUAL',
'target': 'VALUE',
'value': base64_value
}],
'success': [{
'request_delete_range': {
'key': base64_key
}
}]
}
result = self.client.transaction(txn)
if 'succeeded' in result:
return result['succeeded']
return False
def refresh(self):
if self.lease:
return self.lease.refresh()
else:
raise ValueError('No lease associated with this lock')
def is_acquired(self):
values = self.client.get(self.key)
return self.uuid in values
def __enter__(self):
self.acquire()
return self
def __exit__(self, exception_type, exception_value, traceback):
self.release()
class Client(object):
def __init__(self, host="localhost", port=2379, protocol="http"):
self.host = host
self.port = port
self.protocol = protocol
self.session = requests.Session()
def get_url(self, path):
base_url = self.protocol + '://' + self.host + ':' + str(self.port)
return base_url + '/v3alpha/' + path.lstrip("/")
def post(self, *args, **kwargs):
resp = self.session.post(*args, **kwargs)
if resp.status_code != 200:
raise requests.exceptions.RequestException(
'Bad response code : %d' % resp.status_code)
return resp.json()
def status(self):
return self.post(self.get_url("/maintenance/status"),
json={})
def lease(self, ttl=DEFAULT_TIMEOUT):
result = self.post(self.get_url("/lease/grant"),
json={"TTL": ttl, "ID": 0})
return Lease(int(result['ID']), client=self)
def put(self, key, value, lease=None):
payload = {
"key": _encode(key),
"value": _encode(value)
}
if lease:
payload['lease'] = lease.id
self.post(self.get_url("/kv/put"), json=payload)
return True
def get(self, key, **kwargs):
payload = {
"key": _encode(key),
}
payload.update(kwargs)
result = self.post(self.get_url("/kv/range"),
json=payload)
if 'kvs' not in result:
return []
return [base64.b64decode(six.b(item['value'])).decode('utf-8')
for item in result['kvs']]
def delete(self, key, **kwargs):
payload = {
"key": _encode(key),
}
payload.update(kwargs)
result = self.post(self.get_url("/kv/deleterange"),
json=payload)
if 'deleted' in result:
return True
return False
def transaction(self, txn):
resp = self.session.post(self.get_url("/kv/txn"),
data=json.dumps(txn))
if resp.status_code != 200:
raise requests.exceptions.RequestException(
'Bad response code : %d' % resp.status_code)
return resp.json()
def main():
client = Client()
print('>>>> Status')
result = client.status()
print("cluster id : %r" % result['header']['cluster_id'])
print('>>>> Lease')
lease = client.lease()
print("Lease id : %r" % lease.id)
print("Lease ttl : %r" % lease.ttl())
print("Lease refresh : %r" % lease.refresh())
result = client.put('foo2', 'bar2', lease)
print("Key put foo2 : %r" % result)
result = client.put('foo3', 'bar3', lease)
print("Key put foo3 : %r" % result)
print("Lease Keys : %r" % lease.keys())
result = lease.revoke()
print("Lease Revoke : %r" % result)
result = client.get('foox')
print("Key get foox : %r" % result)
result = client.put('foo', 'bar')
print("Key put foo : %r" % result)
result = client.get('foo')
print("Key get foo : %r" % result)
result = client.delete('foo')
print("Key delete foo : %r" % result)
result = client.delete('foo-unknown')
print("Key delete foo-unknown : %r" % result)
print ('>>>> Lock')
lock = Lock('xyz-%s' % time.clock(), ttl=10000, client=client)
result = lock.acquire()
print("acquire : %r" % result)
result = lock.refresh()
print("refresh : %r" % result)
result = lock.is_acquired()
print("is_acquired : %r" % result)
result = lock.release()
print("release : %r" % result)
result = lock.is_acquired()
print("is_acquired : %r" % result)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment