Skip to content

Instantly share code, notes, and snippets.

@metadaddy
Last active December 14, 2015 08:59
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save metadaddy/5061951 to your computer and use it in GitHub Desktop.
Save metadaddy/5061951 to your computer and use it in GitHub Desktop.
Controlling physical devices from a Raspberry Pi and approvals in Force.com!
import RPi.GPIO as GPIO
import time
import urllib
import urllib2
import os
import json
GPIO.setmode(GPIO.BCM)
GPIO.setup(24, GPIO.OUT)
count = 0
last = 0
url = 'http://home.superpat.com:5000/pins/25'
class ForceClient:
def __init__(self, options):
self.client_id = options.get('client_id', None)
if self.client_id is None:
print('You MUST set the client_id')
sys.exit(1)
self.client_secret = options.get('client_secret', None)
if self.client_secret is None:
print('You MUST set the client_secret')
sys.exit(1)
self.username = options.get('username', None)
if self.username is None:
print('You MUST set the username')
sys.exit(1)
self.password = options.get('password', None)
if self.password is None:
print('You MUST set the password')
sys.exit(1)
self.login_server = options.get('login_server', 'https://login.salesforce.com')
self.oauth = self.get_token()
def get_token(self):
# Do OAuth username/password
token_url = '%s/services/oauth2/token' % self.login_server
params = urllib.urlencode({
'grant_type': 'password',
'client_id': self.client_id,
'client_secret': self.client_secret,
'username': self.username,
'password': self.password
})
print('Getting token from %s' % token_url)
try:
data = urllib2.urlopen(token_url, params).read()
except urllib2.URLError, e:
if hasattr(e, 'code'):
if e.code == 400:
print('Check the client_id, client_secret, username and password')
else:
print('HTTP status %d' % e.code)
elif hasattr(e, 'reason'):
print('Error posting to URL %s: %d %s' %
(token_url, e.reason[0], e.reason[1]),
'Check the login_server')
else:
print('Unknown error %s' % e)
# print('Got token %s' % data)
print('Got token')
oauth = json.loads(data)
print('Logged in to %s as %s' % (self.login_server, self.username))
return oauth
def postToApexRest(self, method, data):
query_url = '%s/services/apexrest/%s' % (self.oauth['instance_url'], method)
headers = {
'Authorization': 'Bearer %s' % self.oauth['access_token'],
'Content-type': 'application/json'
}
req = urllib2.Request(query_url, data, headers)
f = urllib2.urlopen(req)
response = f.read()
f.close()
# print response
client = ForceClient({
'client_id': os.environ['CLIENT_ID'],
'client_secret': os.environ['CLIENT_SECRET'],
'username': os.environ['USERNAME'],
'password': os.environ['PASSWORD'],
})
while True:
inputValue = GPIO.input(24)
if (inputValue != last):
data = 'true' if (inputValue == 1) else 'false'
print("Button %s" % ("pressed" if (inputValue == 1) else "released"))
client.postToApexRest('button', data)
last = inputValue
time.sleep(.01)
@RestResource(urlMapping='/button/*')
global class ButtonController {
@future(callout=true)
public static void light(Boolean state) {
HTTPRequest outreq = new HTTPRequest();
outreq.setEndpoint('http://api.superpat.com:5000/pins/25');
outreq.setMethod('POST');
outreq.setHeader('Content-Type', 'application/json');
outreq.setBody(state ? 'true' : 'false');
Http h = new Http();
HttpResponse outres = h.send(outreq);
if (outres.getStatusCode() != 204) {
System.debug(outres.getBody());
}
}
@HttpPost
global static void pressButton() {
RestRequest req = RestContext.request;
RestResponse res = RestContext.response;
Boolean state = Boolean.valueOf(req.requestBody.toString());
System.debug('Button pressed - state = ' + state);
if (state) {
// Just one light for now
Light__c light = [SELECT Id, State__c FROM Light__c LIMIT 1];
if (light.State__c == 'Off') {
light.State__c = 'Request On';
update light;
// Create an approval request
Approval.ProcessSubmitRequest req1 =
new Approval.ProcessSubmitRequest();
req1.setComments('Please turn the light on, Dad!');
req1.setObjectId(light.Id);
// Submit the approval request for the account
Approval.ProcessResult result = Approval.process(req1);
System.debug('Submitted for approval successfully: '+result.isSuccess());
} else if (light.State__c == 'On'){
light.State__c = 'Off';
update light;
}
}
res.statusCode = 204;
}
}
trigger LightTrigger on Light__c (before update) {
for (Light__c light : Trigger.new) {
Light__c beforeUpdate = System.Trigger.oldMap.get(light.Id);
if (light.State__c == 'On' && beforeUpdate.State__c != 'On') {
ButtonController.light(true);
} else if (light.State__c == 'Off' && beforeUpdate.State__c != 'Off') {
ButtonController.light(false);
}
}
}
import RPi.GPIO as GPIO
import time
GPIO.setmode(GPIO.BCM)
# We only work with pin 25 for now, but it would be easy to extend this to as many pins as desired
GPIO.setup(25, GPIO.OUT)
GPIO.output(25, GPIO.HIGH)
time.sleep(1)
GPIO.output(25, GPIO.LOW)
from flask import Flask, request
app = Flask(__name__)
@app.route('/pins/<int:pin>', methods=['POST'])
def pins(pin):
state = GPIO.HIGH if request.json else GPIO.LOW
print("Setting pin %d to %s" % (pin, state))
GPIO.output(pin, state)
return '', 204
if __name__ == "__main__":
app.run('0.0.0.0')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment