Skip to content

Instantly share code, notes, and snippets.

@sameg14
Created May 8, 2016 20:46
Show Gist options
  • Save sameg14/79ace0f72764b4f806689aa015cd896b to your computer and use it in GitHub Desktop.
Save sameg14/79ace0f72764b4f806689aa015cd896b to your computer and use it in GitHub Desktop.
Example class to authenticate via the ReStream REST API using OAuth2 and sending a sample payload
import urllib
import urllib2
import json
import shelve
import random
from time import gmtime, strftime
from pprint import pprint
from datetime import datetime
class Transmitter:
"""
Class will allow ReStream devices, out in the field, to send sensor readings to the API
The class is hand written in pure python, with no reliance on any external packages
"""
time_format = '%H:%M:%S'
worker_name = 'ProcessMessageWorker'
shelve_file = '/tmp/restream-api-client-shelve'
def __init__(self, api_url, client_id, client_secret):
"""
Called upon class instantiation
:param api_url: The specific ReStream API URL
:param client_id: Provided client identifier that is generated during customer setup
:param client_secret: Secret key for the corresponding client identifier
:return: void
"""
self.api_url = api_url
self.client_id = client_id
self.client_secret = client_secret
self.shelve = shelve.open(self.shelve_file)
if self.shelve.has_key("bearer_token"):
self.log('Found shelved bearer token')
self.bearer_token = self.shelve["bearer_token"]
else:
self.log("did not find shelved bearer token")
self.bearer_token = None
if self.shelve.has_key("bearer_token_expiry"):
self.bearer_token_expiry = self.shelve["bearer_token_expiry"]
self.log("found shelved expiry time" + self.bearer_token_expiry)
else:
self.log('did not find shelved expiry time')
self.bearer_token = None
self.log("Bearer Token: " + self.bearer_token)
def authenticate(self):
"""
This method will attempt to acquire a bearer token from the API via OAuth2
"""
if self.has_token_expired():
self.log("Token has expired, attempting to authenticate")
params = urllib.urlencode({
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': "client_credentials"
})
a = urllib.urlopen(self.api_url + "/token", params)
if a.getcode() == 200:
self.log('Success HTTP 200')
json_response = a.read()
token_response = json.loads(json_response)
self.bearer_token = token_response['access_token']
self.bearer_token_expiry = token_response['token_expiry']
self.shelve["bearer_token"] = self.bearer_token
self.shelve["bearer_token_expiry"] = self.bearer_token_expiry
return True
else:
self.log("Error: " + a.getcode() + "\n" + a.read())
return False
else:
self.log("Token is valid")
return True
def has_token_expired(self):
"""
Has the shelved bearer token expired
:return: boolean
"""
# If we have a shelved token, and it has not expired, do not authenticate again
if self.bearer_token and self.bearer_token_expiry:
token_expire = self.get_token_expire_secs()
self.log('Token will expire in : ' + str(token_expire / 60) + ' minutes')
if token_expire > 0:
return False
else:
return True
else:
return True
def get_token_expire_secs(self):
"""
When will the cached token expire, in seconds
:return: int
"""
time_pieces = self.bearer_token_expiry.split(" ")
expire_time = time_pieces[1]
existing_time = strftime(self.time_format, gmtime())
time_delta = \
datetime.strptime(expire_time, self.time_format) - datetime.strptime(existing_time, self.time_format)
return time_delta.seconds
def post(self, device_data_dict):
"""
Make a post call to the API
:param device_data_dict:
:return:
"""
req = urllib2.Request(self.api_url + "/api/job/" + self.worker_name)
req.add_header('Authorization', 'Bearer: ' + self.bearer_token)
resp = urllib2.urlopen(req, json.dumps(device_data_dict))
json_response = resp.read()
return json.loads(json_response)
def log(self, msg):
"""
Log to stdout
:param msg: message to log
:return: void
"""
pprint(msg) # Usage
# Provide API credentials
transmitter = Transmitter(
'http://test.api.restreamsolutions.com', '7a9743307c5d73b1bec8641b1f4cd43a', 'c83f7b52c032b873a56f49cbf206f877')
# Attempt to authenticate
transmitter.authenticate()
# Craft a payload
payload = {
"customer_name": "diamondback",
"site_name": "testwell123",
"timestamp": "2016-04-03 12:22:32",
"conductivity": random.randint(1, 70),
"orp": random.randint(100, 300),
"corrosion": random.randint(10, 40),
"dissolved_o2": random.randint(1, 7),
"flow_rate": random.randint(2, 12),
"annulus_pressure": random.randint(500, 700),
"choke_pressure": random.randint(20, 70),
"line_pressure": random.randint(30, 65),
"discharge_pressure": random.randint(20, 60),
"tank_level_1": random.randint(50, 80),
"tank_level_2": random.randint(40, 60),
"temperature": random.randint(40, 95)
}
# Send the payload
response = transmitter.post(payload)
# Display API response
print str(response)
@sameg14
Copy link
Author

sameg14 commented May 8, 2016

To run

python restream_api_test.py

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment