Skip to content

Instantly share code, notes, and snippets.

@javrasya
Created October 21, 2014 06:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save javrasya/ee0acec94291c9465409 to your computer and use it in GitHub Desktop.
Save javrasya/ee0acec94291c9465409 to your computer and use it in GitHub Desktop.
proxy
import httplib2
from river_io_python.decorators.river_io_client import river_io_client
from river_io_python.urls import OBJECT_COUNT_WAITING_FOR_APPROVAL_URL, OBJECTS_WAITING_FOR_APPROVAL_URL, REGISTER_OBJECT_URL, PROCESSES_TRANSITION__URL, IS_USER_AUTHORIZED_URL
__author__ = 'ahmetdal'
class RiverIORestClient():
def __init__(self, base_url, auth_token):
self.base_url = base_url
self.auth_token = auth_token
self.connection = httplib2.Http()
self.headers = {
'AUTHORIZATION': self.auth_token,
'Content-type': 'application/x-www-form-urlencoded'
}
def proxy(self, requested_url, method, data=None, extra_headers=None):
response, content = self.request(requested_url, method, data=data, extra_headers=extra_headers)
return response, content
@river_io_client
def request(self, url, method, data=None, extra_headers=None):
headers = self.headers
if extra_headers:
headers.update(extra_headers)
return self.connection.request('%s%s' % (self.base_url, url), method, body=data, headers=headers)
def get_on_approval_object_count(self, content_type_id, field_id, user_id):
url = OBJECT_COUNT_WAITING_FOR_APPROVAL_URL % {
'content_type_id': content_type_id,
'field_id': field_id,
'user_id': user_id
}
content = self.request(url, 'GET')
return content['count']
def get_on_approval_objects(self, content_type_id, field_id, user_id):
url = OBJECTS_WAITING_FOR_APPROVAL_URL % {
'content_type_id': content_type_id,
'field_id': field_id,
'user_id': user_id
}
content = self.request(url, 'GET')
return content
def is_user_authorized(self, content_type_id, field_id, user_id):
url = IS_USER_AUTHORIZED_URL % {
'content_type_id': content_type_id,
'field_id': field_id,
'user_id': user_id
}
content = self.request(url, 'GET')
return content['is_authorized']
def register_object(self, content_type_id, object_id, field_id):
url = REGISTER_OBJECT_URL
data = {
'content_type_id': content_type_id,
'field_id': field_id,
'object_id': object_id
}
self.request(url, 'POST', data=data)
def process_transition(self, process_type, content_type_id, object_id, field_id, user_id, next_state_id=None):
url = PROCESSES_TRANSITION__URL
data = {
'process_type': process_type,
'content_type_id': content_type_id,
'field_id': field_id,
'object_id': object_id,
'user_id': user_id
}
if next_state_id:
data['next_state_id'] = next_state_id
self.request(url, 'POST', data=data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment