Created
September 23, 2018 11:33
-
-
Save 0xIslamTaha/15ae96f3e1cbb4c3af5ccb284b7e200d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
import time | |
import requests | |
import gevent | |
import netaddr | |
from jumpscale import j | |
from zerorobot.service_collection import ServiceNotFoundError | |
from zerorobot.template.base import TemplateBase | |
from zerorobot.template.decorator import timeout | |
from zerorobot.template.state import StateCheckError | |
VM_TEMPLATE_UID = 'github.com/threefoldtech/0-templates/dm_vm/0.0.1' | |
GATEWAY_TEMPLATE_UID = 'github.com/threefoldtech/0-templates/gateway/0.0.1' | |
MINIO_TEMPLATE_UID = 'github.com/threefoldtech/0-templates/minio/0.0.1' | |
NS_TEMPLATE_UID = 'github.com/threefoldtech/0-templates/namespace/0.0.1' | |
class S3(TemplateBase): | |
version = '0.0.1' | |
template_name = "s3" | |
def __init__(self, name=None, guid=None, data=None): | |
super().__init__(name=name, guid=guid, data=data) | |
self.recurring_action('_monitor', 30) # every 30 seconds | |
self._nodes = [] | |
self.selected_node = None | |
def validate(self): | |
if self.data['parityShards'] > self.data['dataShards']: | |
raise ValueError('parityShards must be equal to or less than dataShards') | |
if len(self.data['minioPassword']) < 8: | |
raise ValueError("minio password need to be at least 8 characters") | |
self._nodes = list_farm_nodes(self.data['farmerIyoOrg']) | |
if not self._nodes: | |
raise ValueError('There are no nodes in this farm') | |
if not self.data['nsPassword']: | |
self.data['nsPassword'] = j.data.idgenerator.generateXCharID(32) | |
def _monitor(self): | |
self.logger.info('Monitor s3 %s' % self.name) | |
self.state.check('actions', 'install', 'ok') | |
@timeout(10) | |
def update_state(): | |
vm_robot, _ = self._vm_robot_and_ip() | |
minio = vm_robot.services.get(template_uid=MINIO_TEMPLATE_UID, name=self.guid) | |
try: | |
minio.state.check('status', 'running', 'ok') | |
self.state.set('status', 'running', 'ok') | |
return | |
except StateCheckError: | |
self.state.delete('status', 'running') | |
zdbs_connection = [] | |
for namespace in self.data['namespaces']: | |
robot = get_zrobot(namespace['node'], namespace['url']) | |
ns = robot.services.get(template_uid=NS_TEMPLATE_UID, name=namespace['name']) | |
try: | |
ns.state.check('status', 'running', 'ok') | |
zdbs_connection.append(namespace_connection_info(ns)) | |
except StateCheckError: | |
break | |
else: | |
minio = vm_robot.services.get(template_uid=MINIO_TEMPLATE_UID, name=self.guid) | |
minio.schedule_action('update_zerodbs', args={'zerodbs': zdbs_connection}).wait(die=True) | |
try: | |
update_state() | |
except: | |
self.state.delete('status', 'running') | |
def install(self): | |
def deploy_namespaces(): | |
namespaces = self._deploy_namespaces() | |
namespaces_connections = namespaces_connection_info(namespaces) | |
return namespaces_connections | |
def deploy_vm(): | |
self._deploy_minio_vm() | |
return self._vm_robot_and_ip() | |
# deploy all namespaces and vm concurrently | |
ns_gl = gevent.spawn(deploy_namespaces) | |
vm_gl = gevent.spawn(deploy_vm) | |
gevent.wait([ns_gl, vm_gl]) | |
if ns_gl.exception: | |
raise ns_gl.exception | |
namespaces_connections = ns_gl.value | |
if vm_gl.exception: | |
raise vm_gl.exception | |
vm_robot, ip = vm_gl.value | |
self.logger.info("create the minio service on the vm") | |
minio_data = { | |
'zerodbs': namespaces_connections, | |
'namespace': self.guid, | |
'nsSecret': self.data['nsPassword'], | |
'login': self.data['minioLogin'], | |
'password': self.data['minioPassword'], | |
'dataShard': self.data['dataShards'], | |
'parityShard': self.data['parityShards'] | |
} | |
self.logger.info("wait up to 20 mins for zerorobot until it downloads the repos and starts accepting requests") | |
now = time.time() | |
minio = None | |
while time.time() < now + 2400: | |
try: | |
minio = vm_robot.services.find_or_create(MINIO_TEMPLATE_UID, self.guid, minio_data) | |
break | |
except requests.ConnectionError: | |
self.logger.info("vm not up yet...waiting some more") | |
time.sleep(10) | |
if not minio: | |
raise RuntimeError('Failed to create minio service') | |
self.logger.info("install minio") | |
minio.schedule_action('install').wait(die=True) | |
minio.schedule_action('start').wait(die=True) | |
self.logger.info("minio installed") | |
port = minio.schedule_action('node_port').wait(die=True).result | |
self.logger.info("open port %s on minio vm", port) | |
self._vm().schedule_action('add_portforward', args={'name': 'minio', 'target': port, 'source': None}).wait(die=True) | |
self.state.set('actions', 'install', 'ok') | |
def uninstall(self): | |
# uninstall and delete all the created namespaces | |
def delete_namespace(namespace): | |
self.logger.info("deleting namespace %s on node %s", namespace['node'], namespace['url']) | |
robot = get_zrobot(namespace['node'], namespace['url']) | |
ns = robot.services.get(template_uid=NS_TEMPLATE_UID, name=namespace['name']) | |
ns.schedule_action('uninstall').wait(die=True) | |
ns.delete() | |
self.data['namespaces'].remove(namespace) | |
group = gevent.pool.Group() | |
group.imap_unordered(delete_namespace, list(self.data['namespaces'])) | |
group.join() | |
try: | |
# uninstall and delete the minio vm | |
self.logger.info("deleting minio vm") | |
vm = self._vm() | |
vm.schedule_action('uninstall').wait(die=True) | |
vm.delete() | |
except ServiceNotFoundError: | |
pass | |
self.state.delete('actions', 'install') | |
self.state.delete('status', 'running') | |
def url(self): | |
vm_robot, public_ip = self._vm_robot_and_ip() | |
minio = vm_robot.services.get(template_uid=MINIO_TEMPLATE_UID, name=self.guid) | |
public_port = minio.schedule_action('node_port').wait(die=True).result | |
vm_info = self._vm().schedule_action('info').wait(die=True).result | |
storage_ip = vm_info['host']['storage_addr'] | |
storage_port = None | |
for src, dest in vm_info['ports'].items(): | |
if dest == public_port: | |
storage_port = int(src) | |
break | |
output = { | |
'public': 'http://{}:{}'.format(public_ip, public_port), | |
'storage': '', | |
} | |
if storage_ip and storage_port: | |
output['storage'] = 'http://{}:{}'.format(storage_ip, storage_port) | |
return output | |
def start(self): | |
self.state.check('actions', 'install', 'ok') | |
vm_robot, _ = self._vm_robot_and_ip() | |
minio = vm_robot.services.get(template_uid=MINIO_TEMPLATE_UID, name=self.guid) | |
minio.schedule_action('start').wait(die=True) | |
def stop(self): | |
self.state.check('actions', 'install', 'ok') | |
vm_robot, _ = self._vm_robot_and_ip() | |
minio = vm_robot.services.get(template_uid=MINIO_TEMPLATE_UID, name=self.guid) | |
minio.schedule_action('stop').wait(die=True) | |
def upgrade(self): | |
self.stop() | |
self.start() | |
def _vm(self): | |
return self.api.services.get(template_uid=VM_TEMPLATE_UID, name=self.guid) | |
def _gateway(self): | |
robot = j.clients.zrobot.robots[self.data['gatewayRobot']] | |
return robot.services.get(template_uid=GATEWAY_TEMPLATE_UID, name=self.data['gateway']) | |
def _vm_robot_and_ip(self): | |
vm = self._vm() | |
vminfo = vm.schedule_action('info', args={'timeout': 1200}).wait(die=True).result | |
mgmt_ip = vminfo['zerotier'].get('ip') | |
if not mgmt_ip: | |
raise RuntimeError('VM has no ip assignments in zerotier network') | |
ip = mgmt_ip | |
return get_zrobot(vm.name, 'http://{}:6600'.format(mgmt_ip)), ip | |
def _deploy_namespaces(self): | |
self.logger.info("create namespaces to be used as a backend for minio") | |
self.logger.info("compute how much zerodb are required") | |
required_nr_namespaces = compute_minumum_namespaces(total_size=self.data['storageSize'], | |
data=self.data['dataShards'], | |
parity=self.data['parityShards'], | |
shard_size=self.data['shardSize']) | |
deployed_nr_namespaces = 0 | |
deployed_namespaces = [] | |
# Check if namespaces have already been created in a previous install attempt | |
if self.data['namespaces']: | |
for namespace in self.data['namespaces']: | |
robot = get_zrobot(namespace['node'], namespace['url']) | |
namespace = robot.services.get(template_uid=NS_TEMPLATE_UID, name=namespace['name']) | |
deployed_namespaces.append(namespace) | |
self.logger.info("namespaces required %d", required_nr_namespaces) | |
self.logger.info("namespaces already deployed %d", len(deployed_namespaces)) | |
required_nr_namespaces = required_nr_namespaces - len(deployed_namespaces) | |
storage_key = 'sru' if self.data['storageType'] == 'ssd' else 'hru' | |
while deployed_nr_namespaces < required_nr_namespaces: | |
# sort nodes by the amount of storage available | |
nodes = sorted(self._nodes, key=lambda k: k['total_resources'][storage_key], reverse=True) | |
gls = set() | |
for i in range(required_nr_namespaces - deployed_nr_namespaces): | |
node = nodes[i % len(nodes)] | |
gls.add(gevent.spawn(install_namespace, | |
node=node, | |
name=self.guid, | |
disk_type=self.data['storageType'], | |
size=self.data['shardSize'], | |
password=self.data['nsPassword'])) | |
for g in gevent.wait(gls): | |
if g.exception: | |
if g.exception.node in nodes: | |
# we could not deploy on this node, remove it from the possible node to use | |
nodes.remove(g.exception.node) | |
else: | |
namespace, node = g.value | |
deployed_namespaces.append(namespace) | |
self.data['namespaces'].append({'name': namespace.name, | |
'url': node['robot_address'], | |
'node': node['node_id']}) | |
# update amount of ressource so the next iteration of the loop will sort the list of nodes properly | |
nodes[nodes.index(node)]['total_resources'][storage_key] -= self.data['shardSize'] | |
self.save() # to save the already deployed namespaces | |
deployed_nr_namespaces = len(deployed_namespaces) | |
self.logger.info("%d namespaces deployed, remaining %s", deployed_nr_namespaces, required_nr_namespaces - deployed_nr_namespaces) | |
if len(nodes) <= 0: | |
raise RuntimeError('could not deploy enough namespaces') | |
return deployed_namespaces | |
def _deploy_minio_vm(self): | |
self.logger.info("create the zero-os vm on which we will create the minio container") | |
vm_node_id = pick_vm_node(self._nodes) | |
if vm_node_id is None: | |
raise RuntimeError("no node found to deploy vm on it") | |
mgmt_nic = { | |
'id': self.data['mgmtNic']['id'], | |
'ztClient': self.data['mgmtNic']['ztClient'], | |
'type': 'zerotier', | |
} | |
vm_data = { | |
'cpu': 2, | |
'memory': 4000, | |
'image': 'zero-os', | |
'mgmtNic': mgmt_nic, | |
'disks': [{ | |
'diskType': 'ssd', | |
'size': 10, # FIXME: need to compute how much storage is needed on the disk to supprot X number of files in minio | |
'label': 's3vm' | |
}], | |
'nodeId': vm_node_id, | |
} | |
vm = self.api.services.find_or_create(VM_TEMPLATE_UID, self.guid, vm_data) | |
try: | |
vm.state.check('actions', 'install', 'ok') | |
except StateCheckError: | |
vm.schedule_action('install').wait(die=True) | |
return vm | |
def list_farm_nodes(farm_organization): | |
""" | |
return all the nodes detail from a farm | |
:param farm_organization: IYO organization of the farm | |
:type farm_organization: str | |
:return: array container the detail of the nodes | |
:rtype: array | |
""" | |
capacity = j.clients.threefold_directory.get(interactive=False) | |
resp = capacity.api.ListCapacity(query_params={'farmer': farm_organization})[1] | |
resp.raise_for_status() | |
return resp.json() | |
def install_namespace(node, name, disk_type, size, password): | |
try: | |
robot = get_zrobot(node['node_id'], node['robot_address']) | |
data = { | |
'diskType': disk_type, | |
'mode': 'direct', | |
'password': password, | |
'public': False, | |
'size': size, | |
'nsName': name, | |
} | |
namespace = robot.services.create(template_uid=NS_TEMPLATE_UID, data=data) | |
task = namespace.schedule_action('install').wait() | |
if task.eco: | |
if task.eco.category == 'python.NoNamespaceAvailability': | |
namespace.delete() | |
else: | |
raise NamespaceDeployError(task.eco.message, node) | |
return namespace, node | |
except Exception as err: | |
raise NamespaceDeployError(str(err), node) | |
def compute_minumum_namespaces(total_size, data, parity, shard_size=2000): | |
""" | |
compute the minumum number of zerodb required to | |
fulfill the erasure coding policy | |
:param data: data shards number | |
:type data: int | |
:param parity: parity shard number | |
:type parity: int | |
:return: minumum number of zerodb required | |
:rtype: int | |
""" | |
# minimum number of shards to fullfill the erasure coding policy | |
minimum = math.ceil(((total_size * (data+parity)) / data) / shard_size) | |
# add 25% of the minimum so we have more shards then needed | |
# this is needed to be able to still writes data if some shards are down | |
return math.ceil(minimum + (minimum * 0.25)) | |
def namespaces_connection_info(namespaces): | |
group = gevent.pool.Group() | |
return list(group.imap_unordered(namespace_connection_info, namespaces)) | |
def namespace_connection_info(namespace): | |
result = namespace.schedule_action('connection_info').wait(die=True).result | |
# if there is not special storage network configured, | |
# then the sal return the zerotier as storage address | |
return '{}:{}'.format(result['storage_ip'], result['port']) | |
def get_zrobot(name, url): | |
j.clients.zrobot.get(name, data={'url': url}) | |
return j.clients.zrobot.robots[name] | |
def pick_vm_node(nodes): | |
""" | |
try to find a node where we can deploy the minio VM | |
:param nodes: list of nodes from the farm | |
:type nodes: list | |
:return: the node id of the selected node | |
:rtype: string | |
""" | |
# sort all the node by the amount of storage available | |
# TODO: better sorting logic taking in account memory and CPU available too | |
nodes = sorted(nodes, key=lambda k: k['total_resources']['sru'], reverse=True) | |
for node in nodes: | |
robot = get_zrobot(node['node_id'], node['robot_address']) | |
try: | |
# make sure the robot is reachable | |
robot.services.find() | |
self.selected_node = node['node_id'] | |
break | |
except: | |
continue | |
return self.selected_node | |
def get_selected_node(): | |
return self.selected_node | |
class NamespaceDeployError(RuntimeError): | |
def __init__(self, msg, node): | |
super().__init__(self, msg) | |
self.node = node |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment