Skip to content

Instantly share code, notes, and snippets.

@butla
Last active October 8, 2015 11:43
Show Gist options
  • Save butla/ffe18eab3e71dec295cb to your computer and use it in GitHub Desktop.
Save butla/ffe18eab3e71dec295cb to your computer and use it in GitHub Desktop.
# to install: pip install networkx
__author__ = 'butla'
import sys
import asyncio
import aiohttp
import networkx
import networkx.algorithms
class CfComponent:
def __init__(self, name):
self.name = name
def __repr__(self):
return '{}({})'.format(type(self).__name__, self.name)
def __hash__(self):
return hash(type(self).__name__ + self.name)
class CfApp(CfComponent):
def push(self):
print("I am {}, I'm an app and I'm being pushed".format(self.name))
class CfService(CfComponent):
def push(self):
print("I am {}, I'm a service and I'm being pushed".format(self.name))
def create_graph():
"""
Nodes (CfComponents) and endges will be normally created during yml parsing, after yml merge.
This is just an example.
:return: Graph of apps and services.
:rtype: networkx.DiGraph
"""
konsola = CfApp('konsola')
das = CfApp('das')
downloader = CfApp('downloader')
metadata = CfApp('metadata')
data_catalog = CfApp('data katalog')
hdfs = CfService('hadeefes')
redis = CfService('redis')
kafka = CfService('kawka')
elastic = CfService('elastik')
g = networkx.DiGraph()
g.add_edge(konsola, das)
g.add_edge(konsola, data_catalog)
g.add_edge(das, downloader)
g.add_edge(das, metadata)
g.add_edge(das, redis)
g.add_edge(das, kafka)
g.add_edge(downloader, hdfs)
g.add_edge(metadata, hdfs)
g.add_edge(metadata, data_catalog)
g.add_edge(data_catalog, elastic)
g.add_edge(data_catalog, downloader)
# add this to see a cycle
# g.add_edge(data_catalog, konsola)
return g
def get_apps_for_deployment(app_graph):
"""
Gets iterator that returns apps in the order of deployment.
"""
graph = networkx.DiGraph(app_graph)
while graph:
leaves = [component for component, dep_num in graph.out_degree_iter() if dep_num == 0]
for component in leaves:
yield component
graph.remove_nodes_from(leaves)
#@asyncio.coroutine
#def download_file(url):
# response = yield from aiohttp.get(url)
# while (yield from response.content.read(102400)):
# pass
# return response.status
#
#
#@asyncio.coroutine
#def download_urls():
# #url = 'http://localhost:9090/fake-csv/3000000'
# url = 'http://www.wp.pl'
#
# # Use loop semaphore to throttle simultaneous downloads
# # Is there some readinto buffer?
#
# done, _ = yield from asyncio.wait([download_file(url) for _ in range(10)])
#
# response_codes = []
# for future in done:
# response_codes.append(future.result())
# print(len(response_codes), response_codes)
# an example of how this can work
if __name__ == '__main__':
app_stack = create_graph()
cycles = list(networkx.algorithms.simple_cycles(app_stack))
if cycles:
print('ERROR, there are cycles:')
for cycle in cycles:
print(cycle)
sys.exit('No sie zjebalo, bo sa cykle, a tego nie zdeployujesz.')
for component in get_apps_for_deployment(app_stack):
component.push()
# ioloop = asyncio.get_event_loop()
# ioloop.run_until_complete(download_urls())
# TODO
# 1. There are only apps (or brokers) and services
# 2. Node may be created as a dependency, but it should be overwritten on reading it's config
# 3. Always log to a file
# 4. Leave merged manifests, leave the overall manifest
# 5. Colored, all root logs with time and thread id. colorlog.
# 6. Parallel pushing of all leaves. Logs for each app need to be separate then.
# 7. Downloading of artifacts / unpacking
# 8. Merging yamls
# 9. Identifying differences in the pushed apps. Not just, there are other values. Take defaults into account. Have strategies - update everything, only lower versions, if config different, if config different when version lower (default)
# 10. All calls to CF api can be done on ThreadPoolExecutor concurrently. Or using asyncio
@kkonradI
Copy link

kkonradI commented Oct 8, 2015

If you use topological sorting it will run much faster (linear, now it is quadratic). Nice thing about it is that it will throw an error on cycles.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment