Skip to content

Instantly share code, notes, and snippets.

@whitmo
Forked from lazypower/hooks.py
Last active August 29, 2015 14:19
Show Gist options
  • Save whitmo/d4a4e026d9c61013a155 to your computer and use it in GitHub Desktop.
Save whitmo/d4a4e026d9c61013a155 to your computer and use it in GitHub Desktop.
def register_machine(apiserver, retry=False):
parsed = urlparse.urlparse(apiserver)
# identity = hookenv.local_unit().replace('/', '-')
private_address = hookenv.unit_private_ip()
with open('/proc/meminfo') as fh:
info = fh.readline()
mem = info.strip().split(":")[1].strip().split()[0]
cpus = os.sysconf("SC_NPROCESSORS_ONLN")
registration_request = Registrator()
with registration_request as r:
r.data['Kind'] = 'minion'
r.data['id'] = private_address
r.data['name'] = private_address
r.data['metadata']['name'] = private_address
r.data['resources']['capacity']['mem'] = mem + ' K'
r.data['resources']['capacity']['cpu'] = cpus
response = r.register(parsed.hostname, parsed.port,
"/api/v1beta2/nodes")
print(response)
print("Response status:%s reason:%s body:%s" % (
response.status, response.reason, result))
result = json.loads(body)
try:
registration_request.command_succeeded(response, result)
except ValueError:
# This happens when we have already registered
pass
import httplib
import json
import time
class Registrator:
def __init__(self):
self.ds ={
"creationTimestamp": "",
"kind": "Minion",
"name": "", # private_address
"metadata": {
"name": "", #private_address,
},
"spec": {
"externalID": "", #private_address
"capacity": {
"mem": "", # mem + ' K',
"cpu": "", # cpus
}
},
"status": {
"conditions": [],
"hostIP": "", #private_address
}
}
@property
def data(self):
''' Returns a data-structure for population to make a request. '''
return self.ds
def register(self, hostname, port, api_path):
''' Contact the API Server for a new registration '''
headers = {"Content-type": "application/json",
"Accept": "application/json"}
connection = httplib.HTTPConnection(hostname, port)
print 'CONN {}'.format(connection)
connection.request("POST", api_path, json.dumps(self.data), headers)
response = connection.getresponse()
body = response.read()
print(body)
result = json.loads(body)
print("Response status:%s reason:%s body:%s" % (
response.status, response.reason, result))
return result
def update(self):
''' Contact the API Server to update a registration '''
# do a get on the API for the node
# repost to the API with any modified data
pass
def save(self):
''' Marshall the registration data '''
# TODO
pass
def command_succeeded(self, response, result):
''' Evaluate response data to determine if the command was successful '''
if response.status in [200, 201]:
print("Registered")
return True
elif response.status in [409,]:
print("Status Conflict")
# Suggested return a PUT instead of a POST with this response
# code, this predicates use of the UPDATE method
# TODO
elif response.status in (500,) and result.get(
'message', '').startswith('The requested resource does not exist'):
# There's something fishy in the kube api here (0.4 dev), first time we
# go to register a new minion, we always seem to get this error.
# https://github.com/GoogleCloudPlatform/kubernetes/issues/1995
time.sleep(1)
print("Retrying registration...")
raise ValueError("Registration returned 500, retry")
# return register_machine(apiserver, retry=True)
else:
print("Registration error")
# TODO - get request data
raise RuntimeError("Unable to register machine with")
import json
from mock import MagicMock, patch, call
from path import Path
import pytest
import sys
d = Path('__file__').parent.abspath() / 'hooks'
sys.path.insert(0, d.abspath())
from lib.registrator import Registrator
class TestRegistrator():
def setup_method(self, method):
self.r = Registrator()
def test_data_type(self):
if type(self.r.data) is not dict:
pytest.fail("Invalid type")
@patch('json.loads')
@patch('httplib.HTTPConnection')
def test_register(self, httplibmock, jsonmock):
result = self.r.register('foo', 80, '/v1beta1/test')
httplibmock.assert_called_with('foo', 80)
requestmock = httplibmock().request
requestmock.assert_called_with(
"POST", "/v1beta1/test",
json.dumps(self.r.data),
{"Content-type": "application/json",
"Accept": "application/json"})
def test_command_succeeded(self):
response = MagicMock()
result = json.loads('{"status": "Failure", "kind": "Status", "code": 409, "apiVersion": "v1beta2", "reason": "AlreadyExists", "details": {"kind": "minion", "id": "10.200.147.200"}, "message": "minion \\"10.200.147.200\\" already exists", "creationTimestamp": null}')
response.status = 200
self.r.command_succeeded(response, result)
response.status = 500
with pytest.raises(RuntimeError):
self.r.command_succeeded(response, result)
response.status = 409
with pytest.raises(ValueError):
self.r.command_succeeded(response, result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment