Last active
March 20, 2017 07:43
Star
You must be signed in to star a gist
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import json | |
import requests | |
import six | |
import time | |
import uuid | |
DEFAULT_TIMEOUT = 30 | |
LOCK_PREFIX = '/locks/' | |
def _encode(data): | |
if isinstance(data, six.string_types): | |
data = six.b(data) | |
return base64.b64encode(data).decode("utf-8") | |
def _decode(data): | |
if isinstance(data, six.string_types): | |
data = six.b(data) | |
return base64.b64decode(data).decode("utf-8") | |
class Lease(object): | |
def __init__(self, id, client=None): | |
self.id = id | |
self.client = client | |
def revoke(self): | |
self.client.post(self.client.get_url("/kv/lease/revoke"), | |
json={"ID": self.id}) | |
return True | |
def ttl(self): | |
result = self.client.post(self.client.get_url("/kv/lease/timetolive"), | |
json={"ID": self.id}) | |
return int(result['TTL']) | |
def refresh(self): | |
result = self.client.post(self.client.get_url("/lease/keepalive"), | |
json={"ID": self.id}) | |
return int(result['result']['TTL']) | |
def keys(self): | |
result = self.client.post(self.client.get_url("/kv/lease/timetolive"), | |
json={"ID": self.id, | |
"keys": True}) | |
return [base64.b64decode(six.b(key)).decode('utf-8') | |
for key in result['keys']] | |
class Lock(object): | |
def __init__(self, name, ttl=DEFAULT_TIMEOUT, client=None): | |
self.name = name | |
self.ttl = ttl | |
self.client = client | |
self.key = LOCK_PREFIX + self.name | |
self.lease = None | |
self.uuid = None | |
def acquire(self): | |
self.lease = self.client.lease(self.ttl) | |
self.uuid = str(uuid.uuid1()) | |
base64_key = _encode(self.key) | |
base64_value = _encode(self.uuid) | |
txn = { | |
'compare': [{ | |
'key': base64_key, | |
'result': 'EQUAL', | |
'target': 'CREATE', | |
'create_revision': 0 | |
}], | |
'success': [{ | |
'request_put': { | |
'key': base64_key, | |
'value': base64_value, | |
'lease': self.lease.id | |
} | |
}], | |
'failure': [{ | |
'request_range': { | |
'key': base64_key | |
} | |
}] | |
} | |
result = self.client.transaction(txn) | |
if 'succeeded' in result: | |
return result['succeeded'] | |
return False | |
def release(self): | |
base64_key = _encode(self.key) | |
base64_value = _encode(self.uuid) | |
txn = { | |
'compare': [{ | |
'key': base64_key, | |
'result': 'EQUAL', | |
'target': 'VALUE', | |
'value': base64_value | |
}], | |
'success': [{ | |
'request_delete_range': { | |
'key': base64_key | |
} | |
}] | |
} | |
result = self.client.transaction(txn) | |
if 'succeeded' in result: | |
return result['succeeded'] | |
return False | |
def refresh(self): | |
if self.lease: | |
return self.lease.refresh() | |
else: | |
raise ValueError('No lease associated with this lock') | |
def is_acquired(self): | |
values = self.client.get(self.key) | |
return self.uuid in values | |
def __enter__(self): | |
self.acquire() | |
return self | |
def __exit__(self, exception_type, exception_value, traceback): | |
self.release() | |
class Client(object): | |
def __init__(self, host="localhost", port=2379, protocol="http"): | |
self.host = host | |
self.port = port | |
self.protocol = protocol | |
self.session = requests.Session() | |
def get_url(self, path): | |
base_url = self.protocol + '://' + self.host + ':' + str(self.port) | |
return base_url + '/v3alpha/' + path.lstrip("/") | |
def post(self, *args, **kwargs): | |
resp = self.session.post(*args, **kwargs) | |
if resp.status_code != 200: | |
raise requests.exceptions.RequestException( | |
'Bad response code : %d' % resp.status_code) | |
return resp.json() | |
def status(self): | |
return self.post(self.get_url("/maintenance/status"), | |
json={}) | |
def lease(self, ttl=DEFAULT_TIMEOUT): | |
result = self.post(self.get_url("/lease/grant"), | |
json={"TTL": ttl, "ID": 0}) | |
return Lease(int(result['ID']), client=self) | |
def put(self, key, value, lease=None): | |
payload = { | |
"key": _encode(key), | |
"value": _encode(value) | |
} | |
if lease: | |
payload['lease'] = lease.id | |
self.post(self.get_url("/kv/put"), json=payload) | |
return True | |
def get(self, key, **kwargs): | |
payload = { | |
"key": _encode(key), | |
} | |
payload.update(kwargs) | |
result = self.post(self.get_url("/kv/range"), | |
json=payload) | |
if 'kvs' not in result: | |
return [] | |
return [base64.b64decode(six.b(item['value'])).decode('utf-8') | |
for item in result['kvs']] | |
def delete(self, key, **kwargs): | |
payload = { | |
"key": _encode(key), | |
} | |
payload.update(kwargs) | |
result = self.post(self.get_url("/kv/deleterange"), | |
json=payload) | |
if 'deleted' in result: | |
return True | |
return False | |
def transaction(self, txn): | |
resp = self.session.post(self.get_url("/kv/txn"), | |
data=json.dumps(txn)) | |
if resp.status_code != 200: | |
raise requests.exceptions.RequestException( | |
'Bad response code : %d' % resp.status_code) | |
return resp.json() | |
def main(): | |
client = Client() | |
print('>>>> Status') | |
result = client.status() | |
print("cluster id : %r" % result['header']['cluster_id']) | |
print('>>>> Lease') | |
lease = client.lease() | |
print("Lease id : %r" % lease.id) | |
print("Lease ttl : %r" % lease.ttl()) | |
print("Lease refresh : %r" % lease.refresh()) | |
result = client.put('foo2', 'bar2', lease) | |
print("Key put foo2 : %r" % result) | |
result = client.put('foo3', 'bar3', lease) | |
print("Key put foo3 : %r" % result) | |
print("Lease Keys : %r" % lease.keys()) | |
result = lease.revoke() | |
print("Lease Revoke : %r" % result) | |
result = client.get('foox') | |
print("Key get foox : %r" % result) | |
result = client.put('foo', 'bar') | |
print("Key put foo : %r" % result) | |
result = client.get('foo') | |
print("Key get foo : %r" % result) | |
result = client.delete('foo') | |
print("Key delete foo : %r" % result) | |
result = client.delete('foo-unknown') | |
print("Key delete foo-unknown : %r" % result) | |
print ('>>>> Lock') | |
lock = Lock('xyz-%s' % time.clock(), ttl=10000, client=client) | |
result = lock.acquire() | |
print("acquire : %r" % result) | |
result = lock.refresh() | |
print("refresh : %r" % result) | |
result = lock.is_acquired() | |
print("is_acquired : %r" % result) | |
result = lock.release() | |
print("release : %r" % result) | |
result = lock.is_acquired() | |
print("is_acquired : %r" % result) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment