Skip to content

Instantly share code, notes, and snippets.

@mrocklin
Forked from anonymous/log
Last active September 14, 2016 21:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mrocklin/72cfd17a9f097e7880730d66cbde16a0 to your computer and use it in GitHub Desktop.
Save mrocklin/72cfd17a9f097e7880730d66cbde16a0 to your computer and use it in GitHub Desktop.
from collections import deque
import json
from tornado.ioloop import IOLoop
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
from tornado import gen
bytes_buffer = deque()
messages = []
def on_bytes(msg):
""" Append bytes message to buffer, trigger handle_bytes_callback """
bytes_buffer.append(msg)
handle_bytes_buffer()
def handle_bytes_buffer():
""" Convert byte responses into JSON messages
Trigger on_message for each complete response
"""
if not bytes_buffer:
return
length = bytes_buffer[0].split(b'\n', 1)[0]
n = -len(length)
length = int(length)
i = 0
while i < len(bytes_buffer) and n < length:
n += len(bytes_buffer[i])
i += 1
if n < length:
return
msgs = [bytes_buffer.popleft().split(b'\n', 1)[1]]
n = len(msgs[0])
while n < length:
msg = bytes_buffer.popleft()
n += len(msg)
msgs.append(msg)
if n > length:
msg = msgs[-1]
a, b = msg[(length - n):], msg[:(length - n)]
msgs[-1] = a
bytes_buffer.appendleft(b)
msg = b''.join(msgs)
msg = json.loads(msg.decode())
on_message(msg)
return handle_bytes_buffer() # may be more messages, try again
def on_message(msg):
""" Trigger action from incoming subscription message """
print("NEW MESSAGE\n===========")
print(json.dumps(msg, indent=2))
print('\n')
messages.append(msg)
if __name__ == '__main__':
client = AsyncHTTPClient()
d = {
"type": "SUBSCRIBE",
"subscribe" : {
"framework_info" : {
"user" : "username",
"name" : "Example HTTP Framework"
}
}
}
r = HTTPRequest(url='http://localhost:5050/api/v1/scheduler',
method='POST',
headers={'content-type': 'application/json'},
body=json.dumps(d),
streaming_callback=on_bytes,
request_timeout=1e100)
loop = IOLoop.current()
loop.add_callback(client.fetch, r)
IOLoop.current().start()
NEW MESSAGE
===========
{
"type": "SUBSCRIBED",
"subscribed": {
"heartbeat_interval_seconds": 15.0,
"framework_id": {
"value": "b67d0b11-6f5d-43b0-b953-c208e3974f29-0017"
}
}
}
NEW MESSAGE
===========
{
"type": "HEARTBEAT"
}
NEW MESSAGE
===========
{
"offers": {
"offers": [
{
"hostname": "127.0.0.1",
"url": {
"path": "/slave(1)",
"address": {
"hostname": "127.0.0.1",
"ip": "127.0.0.1",
"port": 5051
},
"scheme": "http"
},
"agent_id": {
"value": "81de9ee9-afc5-4d41-9c06-ba9e0c972f68-S0"
},
"framework_id": {
"value": "b67d0b11-6f5d-43b0-b953-c208e3974f29-0017"
},
"resources": [
{
"type": "RANGES",
"ranges": {
"range": [
{
"begin": 11000,
"end": 11999
}
]
},
"name": "ports",
"role": "*"
},
{
"type": "SCALAR",
"scalar": {
"value": 4.0
},
"name": "cpus",
"role": "*"
},
{
"type": "SCALAR",
"scalar": {
"value": 14890.0
},
"name": "mem",
"role": "*"
},
{
"type": "SCALAR",
"scalar": {
"value": 917466.0
},
"name": "disk",
"role": "*"
}
],
"id": {
"value": "b67d0b11-6f5d-43b0-b953-c208e3974f29-O40"
}
},
{
"hostname": "127.0.0.1",
"url": {
"path": "/slave(1)",
"address": {
"hostname": "127.0.0.1",
"ip": "127.0.0.1",
"port": 5052
},
"scheme": "http"
},
"agent_id": {
"value": "16a1e0db-e1da-4c3e-903e-e0fd9867959c-S0"
},
"framework_id": {
"value": "b67d0b11-6f5d-43b0-b953-c208e3974f29-0017"
},
"resources": [
{
"type": "RANGES",
"ranges": {
"range": [
{
"begin": 11000,
"end": 11999
}
]
},
"name": "ports",
"role": "*"
},
{
"type": "SCALAR",
"scalar": {
"value": 4.0
},
"name": "cpus",
"role": "*"
},
{
"type": "SCALAR",
"scalar": {
"value": 14890.0
},
"name": "mem",
"role": "*"
},
{
"type": "SCALAR",
"scalar": {
"value": 917466.0
},
"name": "disk",
"role": "*"
}
],
"id": {
"value": "b67d0b11-6f5d-43b0-b953-c208e3974f29-O41"
}
}
]
},
"type": "OFFERS"
}
NEW MESSAGE
===========
{
"type": "HEARTBEAT"
}
NEW MESSAGE
===========
{
"type": "HEARTBEAT"
}
NEW MESSAGE
===========
{
"type": "HEARTBEAT"
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment