Skip to content

Instantly share code, notes, and snippets.

@ddemidov
Created December 15, 2017 15:00
Show Gist options
  • Save ddemidov/75195832da558a240302e796b9819ca2 to your computer and use it in GitHub Desktop.
Save ddemidov/75195832da558a240302e796b9819ca2 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import qt5reactor
import sys
from PyQt5 import QtWidgets, QtSvg
import argparse
import os
import sys
import threading
import graphviz as gv
default_settings = {
"color" : {
"compute" : "#F7B5CC",
"merge" : "#B6B9ED",
"queue" : "#95E1F7",
"package" : "#F5F7B8"
},
"shape" : {
"compute" : "box3d",
"merge" : "cylinder",
"queue" : "folder"
}
}
packages = {}
subgraph = {}
parser = argparse.ArgumentParser(sys.argv[0])
parser.add_argument('--host', dest='host', default='localhost', help='server address')
parser.add_argument('--port', dest='port', type=int, default=9999, help='server port')
parser.add_argument('-c', '--config', dest='config', required=True, help='path to configuration file')
args = parser.parse_args(sys.argv[1:])
def render():
root = fq.get_config('root')
cfg = fq.get_config('graph', default_settings)
g = gv.Digraph(format='svg')
for i,node in enumerate(fq.get_nodes(root)):
s = gv.Digraph('cluster_{}'.format(i))
s.attr(style='filled')
s.attr(color='black')
s.attr(fillcolor=cfg['color'][node.type])
s.attr(label=str(node))
s.attr(labeljust='l')
s.attr('edge', color='black')
subgraph[node.path] = s
for node,pkgs in packages.items():
n = fq.get_node(node)
for p in pkgs:
subgraph[node].node(p, label=os.path.relpath(p, node))
for s in subgraph.values():
g.subgraph(s)
svg.load(g.pipe())
svg.show()
app = QtWidgets.QApplication(sys.argv)
svg = QtSvg.QSvgWidget()
svg.setGeometry(50, 50, 600, 400)
svg.show()
qt5reactor.install()
import fq
import fq.network
from twisted.internet import reactor, protocol
fq.read_config(args.config)
class WatchProtocol(fq.network.ClientProtocol):
def connectionMade(self):
self.connect(authorize=True)
def on_connection(self):
self.send_message('subscribe', config=self.factory.config)
print('connected')
def do_subscription_failed(self, message):
print('Subscription failed:', message)
self.transport.loseConnection()
def do_package_created(self, path):
owner = fq.get_owner(path)
if owner.path not in packages:
packages[owner.path] = set()
packages[owner.path].add(path)
render()
def do_package_moved(self, src, dst):
old_owner = fq.get_owner(src)
new_owner = fq.get_owner(dst)
if old_owner.path not in packages:
packages[old_owner.path] = set()
if new_owner.path not in packages:
packages[new_owner.path] = set()
packages[old_owner.path].discard(src)
packages[new_owner.path].add(dst)
render()
def do_package_deleted(self, path):
owner = fq.get_owner(path)
if owner.path not in packages:
packages[owner.path] = set()
packages[owner.path].discard(path)
render()
class WatchFactory(protocol.ClientFactory):
protocol = WatchProtocol
def __init__(self, config):
self.config = config
def clientConnectionFailed(self, connector, reason):
print("Connection failed")
if reactor.running: reactor.stop()
def clientConnectionLost(self, connector, reason):
print("Connection closed")
if reactor.running: reactor.stop()
reactor.connectTCP(args.host, args.port,
WatchFactory(os.path.abspath(args.config)))
reactor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment