Skip to content

Instantly share code, notes, and snippets.

@paul121
Last active January 5, 2022 20:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save paul121/17a3dda773c41074fc71128769cb2193 to your computer and use it in GitHub Desktop.
Save paul121/17a3dda773c41074fc71128769cb2193 to your computer and use it in GitHub Desktop.
farmOS.py aggregator preliminary
import os
from requests.sessions import Session
from farmOS.client_2 import ResourceBase
# Allow testing via http.
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
class AggregatorSession(Session):
def __init__(self, hostname):
super().__init__()
self.hostname = hostname
self._content_type = "application/vnd.api+json"
def authorize(self, username, password, scope=None):
self.headers.update({'api-key': '12345'})
def http_request(self, path, method="GET", options=None, params=None, headers=None):
"""Raw HTTP request helper function.
Keyword arguments:
:param path: the URL path.
:param method: the HTTP method.
:param options: a dictionary of data and parameters to pass on to the request.
:param params: URL query parameters.
:return: requests response object.
"""
# Strip protocol, hostname, leading/trailing slashes, and whitespace from the path.
path = path.strip("/")
path = path.strip()
# Assemble the URL.
#url = f"{self.hostname}/api/v2/farms/resources/relay/{path}"
return self._http_request(
url=path, method=method, options=options, params=params, headers=headers
)
def _http_request(self, url, method="GET", options=None, params=None, headers=None):
url = f"{self.hostname}/api/v2/farms/resources/relay/{url}"
# Automatically follow redirects, unless this is a POST request.
# The Python requests library converts POST to GET during a redirect.
# Allow this to be overridden in options.
allow_redirects = True
if method in ["POST", "PUT"]:
allow_redirects = False
if options and "allow_redirects" in options:
allow_redirects = options["allow_redirects"]
if headers is None:
headers = {}
# If there is data to be sent, include it.
data = None
if options and "data" in options:
data = options["data"]
headers["Content-Type"] = self._content_type
# If there is a json data to be sent, include it.
json = None
if options and "json" in options:
json = options["json"]
if "Content-Type" not in headers:
headers["Content-Type"] = self._content_type
# Perform the request.
response = self.request(
method,
url,
headers=headers,
allow_redirects=allow_redirects,
data=data,
json=json,
params=params,
)
# If this is a POST request, and a redirect occurred, attempt to re-POST.
redirect_codes = [300, 301, 302, 303, 304, 305, 306, 307, 308]
if method in ["POST", "PUT"] and response.status_code in redirect_codes:
if response.headers["Location"]:
response = self.request(
method,
response.headers["Location"],
allow_redirects=True,
data=data,
json=json,
params=params,
)
# Raise exception if error.
response.raise_for_status()
# Return the response.
return response
hostname = 'http://localhost'
session = AggregatorSession(hostname)
resources = ResourceBase(session)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment