Skip to content

Instantly share code, notes, and snippets.

@ppettit
Created January 9, 2019 14:08
Show Gist options
  • Save ppettit/603a91e17ca85576011b26f1aa8ae1dc to your computer and use it in GitHub Desktop.
Save ppettit/603a91e17ca85576011b26f1aa8ae1dc to your computer and use it in GitHub Desktop.
quick and dirty rewrite of galicaster/opencast/client.py to use requests instead of pycurl
# -*- coding:utf-8 -*-
# Galicaster, Multistream Recorder and Player
#
# galicaster/opencast/client
#
# Copyright (c) 2016, Teltek Video Research <galicaster@teltek.es>
#
# This work is licensed under the Creative Commons Attribution-
# NonCommercial-ShareAlike 3.0 Unported License. To view a copy of
# this license, visit http://creativecommons.org/licenses/by-nc-sa/3.0/
# or send a letter to Creative Commons, 171 Second Street, Suite 300,
# San Francisco, California, 94105, USA.
# make sure python-requests is installed
# mostly untested and needs some refactoring ;)
import re
import json
import socket
import requests
from requests.auth import HTTPDigestAuth
from collections import OrderedDict
import urlparse
import urllib
from galicaster.utils.miscellaneous import get_timezone
try:
from galicaster import __version__ as version
except:
version = ""
import logging
from httplib import HTTPConnection
HTTPConnection.debuglevel = 1
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
INIT_ENDPOINT = 'info/me.json'
ME_ENDPOINT = 'info/me.json'
SERVICES_ENDPOINT = 'info/components.json'
SETRECORDINGSTATE_ENDPOINT = 'capture-admin/recordings/{id}'
SETSTATE_ENDPOINT = 'capture-admin/agents/{hostname}'
SETCONF_ENDPOINT = 'capture-admin/agents/{hostname}/configuration'
INGEST_ENDPOINT = 'ingest/addZippedMediaPackage'
ICAL_ENDPOINT = 'recordings/calendars'
SERIES_ENDPOINT = 'series/series.json'
SERVICE_REGISTRY_ENDPOINT = 'services/available.json'
SEARCH_ENDPOINT = 'search/episode.json'
WORKFLOWS_ENDPOINT = 'workflow/definitions.json'
SEARCH_SERVICE_TYPE = 'org.opencastproject.search'
INGEST_SERVICE_TYPE = 'org.opencastproject.ingest'
class OCHTTPClient(object):
def __init__(self, server, user, password, hostname='galicaster', address=None, multiple_ingest=False,
connect_timeout=2, timeout=2, workflow='full', workflow_parameters={'trimHold':'true'},
ca_parameters={}, polling_short=10, polling_long=60, repo=None, logger=None):
"""
Arguments:
server -- Opencast server URL.
user -- Account used to operate the Opencast REST endpoints service.
password -- Password for the account used to operate the Opencast REST endpoints service.
hostname -- Capture agent hostname, optional galicaster by default.
address -- Capture agent IP address, optional socket.gethostbyname(socket.gethostname()) by default.
multiple_ingest -- Use an ingest node, optional False by default.
connect_timeout -- Connection timeout for curl in seconds.
timeout -- Total timeout for curl in seconds .
workflow -- Name of the workflow used to ingest the recordings., optional `full` by default.
workflow_parameters -- Dict of parameters used to ingest, opcional {'trimHold':'true'} by default.
ca_parameters -- Dict of parameters used as configuration, optional empty by default.
logger -- Logger service.
repo -- Repository service.
"""
self.server = server
self.user = user
self.password = password
self.hostname = hostname
if address:
self.address = address
else:
try:
self.address = socket.gethostbyname(socket.gethostname())
except Exception as exc:
logger and logger.error('Problem on obtaining the IP of "{}", forced to "127.0.0.1". Exception: {}'.format(socket.gethostname(),exc))
self.address = '127.0.0.1'
self.multiple_ingest = multiple_ingest
self.connect_timeout = connect_timeout
self.timeout = timeout
self.workflow = workflow
self.logger = logger
self.repo = repo
self.workflow_parameters = workflow_parameters
self.workflow_names = []
self.ca_parameters = ca_parameters
self.search_server = None
self.polling_schedule = polling_long
self.polling_state = polling_short
# FIXME should be long? https://github.com/teltek/Galicaster/issues/114
self.polling_caps = polling_short
self.polling_config = polling_long
self.response = {'Status-Code': '', 'Content-Type': '', 'ETag': ''}
self.ical_etag = ''
def __call(self, method, endpoint, path_params={}, query_params={}, postfield={}, urlencode=True, server=None, timeout=True, headers={}, files={}):
theServer = server or self.server
url = list(urlparse.urlparse(theServer, 'http'))
url[2] = urlparse.urljoin(url[2], endpoint.format(**path_params))
final_url = urlparse.urlunparse(url)
headers['User-Agent'] = 'Galicaster' + version
headers['X-Requested-Auth'] = 'Digest'
auth = HTTPDigestAuth(self.user, self.password)
try:
response = requests.request(method, final_url,
params=query_params, data=postfield,
headers=headers, auth=auth,
timeout=(self.connect_timeout, self.timeout),
files=files)
status_code = response.status_code
self.response['Status-Code'] = status_code
self.response['Content-Type'] = response.headers.get('content-type')
self.response['ETag'] = response.headers.get('ETag')
if status_code != 200 and status_code != 302 and status_code != 304:
if (status_code > 200) and (status_code < 300):
self.logger and self.logger.warning("Opencast client ({}) sent a response with status code {}".format(urlparse.urlunparse(url), status_code))
else:
print response.text
title = self.find_between(response.text, "<title>", "</title>")
self.logger and self.logger.error('call error in %s, status code {%r}: %s',
urlparse.urlunparse(url), status_code, title)
raise IOError, 'Error in Opencast client'
return response.text
except IOError:
# Do not wrap the IOError. We raise it ourselves
raise
except Exception as exc:
self.logger and self.logger.warning("Problem connecting with Opencast: {}", exc, exc_info=1)
raise RuntimeError, exc
def whoami(self):
return json.loads(self.__call('GET', ME_ENDPOINT))
def welcome(self):
return self.__call('GET', INIT_ENDPOINT)
def services(self):
return self.__call('GET', SERVICES_ENDPOINT)
def ical(self):
icalendar = self.__call('GET', ICAL_ENDPOINT, query_params={'agentid': self.hostname}, headers={'If-None-Match': self.ical_etag})
if self.response['Status-Code'] == 304:
if self.logger:
self.logger.info("iCal Not modified")
return None
self.ical_etag = self.response['ETag']
if self.logger:
self.logger.info("iCal modified")
return icalendar
def setstate(self, state):
"""
Los posibles estados son: shutting_down, capturing, uploading, unknown, idle
"""
self.logger and self.logger.info("Sending state {}".format(state))
return self.__call('POST', SETSTATE_ENDPOINT, {'hostname': self.hostname},
postfield={'address': self.address, 'state': state})
def setrecordingstate(self, recording_id, state):
"""
Los posibles estados son: unknown, capturing, capture_finished, capture_error, manifest,
manifest_error, manifest_finished, compressing, compressing_error, uploading, upload_finished, upload_error
"""
return self.__call('POST', SETRECORDINGSTATE_ENDPOINT, {'id': recording_id}, postfield={'state': state})
def setconfiguration(self, capture_devices):
client_conf_xml = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties version="1.0">{0}</properties>"""
client_conf_xml_body = '<entry key="{key}">{value}</entry>'
client_conf = {
'service.pid': 'galicaster',
'capture.confidence.debug': 'false',
'capture.confidence.enable': 'false',
'capture.config.remote.polling.interval': self.polling_config,
'capture.agent.name': self.hostname,
'capture.agent.state.remote.polling.interval': self.polling_state,
'capture.agent.capabilities.remote.polling.interval': self.polling_caps,
'capture.agent.state.remote.endpoint.url': self.server + '/capture-admin/agents',
'capture.recording.shutdown.timeout': '60',
'capture.recording.state.remote.endpoint.url': self.server + '/capture-admin/recordings',
'capture.schedule.event.drop': 'false',
'capture.schedule.remote.polling.interval': int(self.polling_schedule)/60,
'capture.schedule.event.buffertime': '1',
'capture.schedule.remote.endpoint.url': self.server + '/recordings/calendars',
'capture.schedule.cache.url': '/opt/opencast/storage/cache/schedule.ics',
'capture.ingest.retry.interval': '300',
'capture.ingest.retry.limit': '5',
'capture.ingest.pause.time': '3600',
'capture.cleaner.interval': '3600',
'capture.cleaner.maxarchivaldays': '30',
'capture.cleaner.mindiskspace': '0',
'capture.error.messagebody': '&quot;Capture agent was not running, and was just started.&quot;',
'capture.error.subject': '&quot;%hostname capture agent started at %date&quot;',
'org.opencastproject.server.url': 'http://172.20.209.88:8080',
'org.opencastproject.capture.core.url': self.server,
'capture.max.length': '28800',
'capture.device.timezone': get_timezone()
}
if self.repo:
client_conf['capture.cleaner.mindiskspace'] = self.repo.get_free_space()
client_conf.update(capture_devices)
client_conf.update(self.ca_parameters)
xml = ""
for k, v in client_conf.iteritems():
xml = xml + client_conf_xml_body.format(key=k, value=v)
client_conf = client_conf_xml.format(xml)
return self.__call('POST', SETCONF_ENDPOINT, {'hostname': self.hostname}, postfield={'configuration': client_conf})
def _prepare_ingest(self, mp_file, workflow=None, workflow_instance=None, workflow_parameters=None):
"refactor of ingest to unit test"
postdict = OrderedDict()
postdict[u'workflowDefinitionId'] = workflow or self.workflow
if workflow_instance:
postdict['workflowInstanceId'] = str(workflow_instance)
if isinstance(workflow_parameters, basestring) and workflow_parameters != '':
postdict.update(dict(item.split(":") for item in workflow_parameters.split(";")))
elif isinstance(workflow_parameters, dict) and workflow_parameters:
postdict.update(workflow_parameters)
else:
postdict.update(self.workflow_parameters)
return postdict
def _get_endpoints(self, service_type):
if self.logger:
self.logger.info('Looking up Opencast endpoint for %s', service_type)
services = self.__call('GET', SERVICE_REGISTRY_ENDPOINT, {}, {'serviceType': service_type})
services = json.loads(services)
return services['services']['service']
def _get_search_server(self):
if not self.search_server:
service = self._get_endpoints(SEARCH_SERVICE_TYPE)
self.search_server = str(service['host'])
return self.search_server
def search_by_mp_id(self, mp_id):
""" Returns search result from opencast """
search_server = self._get_search_server()
result = self.__call('GET', SEARCH_ENDPOINT, {}, {'id': mp_id}, {}, True, search_server, True)
search_result = json.loads(result)
return search_result['search-results']
def verify_ingest_server(self, server):
""" if we have multiple ingest servers the get_ingest_server should never
return the admin node to ingest to, This is verified by the IP address so
we can meke sure that it doesn't come up through a DNS alias, If all ingest
services are offline the ingest will still fall back to the server provided
to Galicaster as then None will be returned by get_ingest_server """
p = '(?:http.*://)?(?P<host>[^:/ ]+).?(?P<port>[0-9]*).*'
m = re.search(p,server['host'])
host=m.group('host')
m = re.search(p,self.server)
adminHost=m.group('host')
if (not server['online']):
return False
if (server['maintenance']):
return False
try:
adminIP = socket.gethostbyname(adminHost);
hostIP = socket.gethostbyname(host)
if (adminIP != hostIP):
return True
else:
if adminHost != host:
self.logger and self.logger.info("Shared IP address ({}) between {} and {}, it will be used {}".format(adminIP, adminHost, host, server['host']))
return True
except Exception as exc:
self.logger and self.logger.error("Problem on verifying the ingest server {} (on getting the IP of adminHost={} and host={}), exception {}".format(server['host'], adminHost, host, exc))
return False
def get_ingest_server(self):
""" get the ingest server information from the admin node:
if there are more than one ingest servers the first from the list will be used
as they are returned in order of their load, if there is only one returned this
will be the admin node, so we can use the information we already have """
all_servers = self._get_endpoints(INGEST_SERVICE_TYPE)
if type(all_servers) is list:
for serv in all_servers:
if self.verify_ingest_server(serv):
return str(serv['host']) # Returns less loaded served
if self.verify_ingest_server(all_servers):
return str(all_servers['host']) # There's only one server
return None # it will use the admin server
def ingest(self, mp_file, mp_id, workflow=None, workflow_instance=None, workflow_parameters=None):
postdict = self._prepare_ingest(mp_file, workflow, workflow_instance, workflow_parameters)
server = self.server if not self.multiple_ingest else self.get_ingest_server()
if self.logger:
self.logger.info( 'Ingesting MP {} to Server {}'.format(mp_id, server) )
return self.__call('POST', INGEST_ENDPOINT, {}, {}, postdict.items(), False, server, False, files={'track': ('mp.zip', open(mp_file, 'rb'))})
def getseries(self, **query):
""" Get series according to the page count and offset provided"""
return self.__call('GET', SERIES_ENDPOINT, query_params = query)
def get_workflows(self, server=None):
""" Get workflow names """
theServer = server or self.server
validworkflows = ["schedule", "schedule-ng"]
workflows = []
if self.logger:
self.logger.info( 'Getting workflow names for {}'.format(theServer) )
# Get Workflow definitions
definitions = self.__call('GET', WORKFLOWS_ENDPOINT)
# Convert to JSON
if definitions:
definitions = json.loads(definitions)
for d in definitions["definitions"]["definition"]:
if d["tags"]:
if "tag" in d["tags"]:
if any(x in validworkflows for x in d["tags"]["tag"]):
workflows.append({"id" : d["id"],
"title" : d["title"]})
return workflows
def find_between(self, s, first, last):
try:
start = s.index(first) + len(first)
end = s.index(last, start)
return s[start:end]
except:
return ""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment