Skip to content

Instantly share code, notes, and snippets.

@jmcmellen
Last active August 29, 2015 14:13
Show Gist options
  • Save jmcmellen/c8c98a59663601ef3c5c to your computer and use it in GitHub Desktop.
Save jmcmellen/c8c98a59663601ef3c5c to your computer and use it in GitHub Desktop.
Use the Stream API to get realtime info from FlowDock API
import sseclient
import requests
import getpass
def main():
print "hello world"
# This uses the code you get from the app
# fields = {'code': code,
# 'client_id': id,
# 'client_secret': secret,
# 'redirect_uri': 'urn:ietf:wg:oauth:2.0:oob',
# 'grant_type': "authorization_code"
# }
password = getpass.getpass("Password: ")
# This uses your username and password
fields = {'client_id': id,
'client_secret': secret,
'grant_type': 'password',
'username': username,
'password': password,
'scope': "flow private"}
r = requests.post("https://api.flowdock.com/oauth/token",
json=fields)
for key in r.json().keys():
print key, " === ", r.json()[key]
last_response = r.json()
headers = {'Authorization': "Bearer {}".format(last_response['access_token'])}
r = requests.get("https://api.flowdock.com/flows/ksmu/main/messages",
headers=headers)
for item in r.json():
print item
print "\n"
messages = sseclient.SSEClient("https://stream.flowdock.com/flows/ksmu/main",
headers=headers)
for msg in messages: # This will loop forever
if msg.data != '':
processEvent(msg.data)
# msg = messages.next() # This will just get one message
# print msg.dump()
r = requests.post("https://api.flowdock.com/flows/ksmu/main/messages",
headers=headers,
json={"event": "message",
"content": "I'm posting from the Python"})
print r.content
def processEvent(event):
event = json.loads(event)
if event['event'] in ("message", "line", "status", 'mail', 'legacy_source', "rss", "message-delete" ):
print event['content']
elif event['event'] in ('activity.user'):
if event['content'].get('typing') is False:
print "John stopped typing"
elif event['content'].get('typing') is not None:
print "John is typing..."
else:
pass
# print event['content']
else:
print event['event']
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment