Skip to content

Instantly share code, notes, and snippets.

@naiieandrade
Created June 13, 2018 11:35
Show Gist options
  • Save naiieandrade/1860841cc22a7f7d9e5dafc89fec31e8 to your computer and use it in GitHub Desktop.
Save naiieandrade/1860841cc22a7f7d9e5dafc89fec31e8 to your computer and use it in GitHub Desktop.
anitya.py
"""Fetcher to monitor Anitya events related to packages updates."""
import tempfile
import zmq
import yaml
from packaging import version
import fedmsg.consumers
import time
import kiskadee.fetchers
import kiskadee.queue
SLEEP_TIME = 1
class Fetcher(kiskadee.fetchers.Fetcher):
"""Fetcher to monitor Anitya (https://release-monitoring.org) Events."""
def watch(self):
"""Start the monitoring process for Anitya reports.
Each package monitored by the fetcher will be
queued by calling the watch parent method,
passing the package data as argument.
The fetcher will use zmq as messaging protocol to receive
the fedmsg-hub events. kiskadee and fedmsg-hub runs in different
processes, so we need something to enable the
comunication between then. When a message come to fedmsg-hub,
the AnityaConsumer instance, will publish this event to zmq server,
and kiskadee will consume this message.
"""
kiskadee.logger.debug("Starting anitya fetcher")
socket = self._connect_to_zmq(
self.config["zmq_port"],
self.config["zmq_topic"])
if socket:
while True:
msg = socket.recv_string()
package = self.package_to_enqueue(msg)
def get_sources(self, source_data):
"""Download packages from some Anitya Backend."""
tmp_path = tempfile.gettempdir()
path = tempfile.mkdtemp(dir=tmp_path)
backend_name = source_data.get('meta').get('backend').lower()
run_backend = self._load_backend(backend_name)
if run_backend:
return run_backend(self, source_data, path)
else:
return {}
def compare_versions(self, new, old):
"""Compare anitya source versions. If new > old, returns true."""
return version.parse(new) > version.parse(old)
def _load_backend(self, backend_name):
try:
backend = Backends()
return getattr(backend, backend_name)
except Exception:
kiskadee.logger.debug(
"Backend not suported: {}".format(str(backend_name))
)
return {}
def _connect_to_zmq(self, port, topic):
try:
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:{}".format(port))
socket.setsockopt_string(
zmq.SUBSCRIBE, topic)
kiskadee.logger.debug("Connecting to 0mq server")
return socket
except Exception as err:
kiskadee.logger.debug("Could not connect to zmq server")
kiskadee.logger.debug(err)
return False
def package_to_enqueue(self, fedmsg_event):
event = self._event_to_dict(fedmsg_event)
if event:
package = event.get('body').get('msg').get('package')
source_dict = {}
if package:
kiskadee_package = {
'name': package.get('name'),
'version': package.get('version'),
'fetcher': __name__,
'meta': {
'backend': package.get('backend'),
'homepage': package.get('homepage')
}
}
super().watch(**kiskadee_package)
def _event_to_dict(self, msg):
msg = msg[msg.find(" ")+1::]
event = None
try:
event = yaml.load(msg)
return event
except Exception as err:
kiskadee.logger.debug("Something went wrong on Anitya event")
kiskadee.logger.debug(err)
return event
class Backends():
"""Class to implement Anitya Backends.
Each method implemented in this class, should return an absolute path
to the downloaded source, or a empty dict if the download could
not be made.
"""
def github(self, fetcher, source_data, path):
"""Backend implementation to download github sources."""
source_version = ''.join([source_data.get('version'), '.tar.gz'])
homepage = source_data.get('meta').get('homepage')
url = ''.join([homepage, '/archive/', source_version])
return fetcher.download(path, url, source_version)
class AnityaConsumer(fedmsg.consumers.FedmsgConsumer):
"""Consumer used by fedmsg-hub to subscribe to fedmsg bus."""
topic = 'org.release-monitoring.prod.anitya.package.version.update'
config_key = 'anityaconsumer'
validate_signatures = False
def __init__(self, *args, **kw):
"""Anityaconsumer constructor."""
super().__init__(*args, **kw)
context = zmq.Context()
self.socket = context.socket(zmq.PUB)
self.socket.bind("tcp://*:5556")
def consume(self, msg):
"""Consume events from fedmsg-hub."""
self.socket.send_string("{} {}".format("anitya", str(msg)))
time.sleep(SLEEP_TIME)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment