Skip to content

Instantly share code, notes, and snippets.

@legnaleurc
Created February 3, 2016 09:19
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save legnaleurc/2924ba4bf5f4f034ee67 to your computer and use it in GitHub Desktop.
Save legnaleurc/2924ba4bf5f4f034ee67 to your computer and use it in GitHub Desktop.
devtools_client.py
#! /usr/bin/env python3
import asyncio
import json
import sys
import yaml
def main(args=None):
if args is None:
args = sys.argv
main_loop = asyncio.get_event_loop()
main_loop.run_until_complete(normal())
return 0
# for normal mode
async def normal():
reader, writer = await asyncio.open_connection('localhost', 6000)
# welcome message
s_json = await read_json(reader)
s_yaml = yaml_from_json(s_json)
print(s_yaml)
# get actor
command = {
'to': 'root',
'type': 'listTabs',
}
s_json = json.dumps(command)
await write_json(writer, s_json)
s_json = await read_json(reader)
data = json.loads(s_json)
#tab = data['tabs'][0]
actor_name = data['eventPlayerActor']
print(actor_name)
# wait for start
input('press any key to start')
# send start command
command = {
'to': actor_name,
'type': 'start',
}
s_json = json.dumps(command)
await write_json(writer, s_json)
s_json = await read_json(reader)
s_yaml = yaml_from_json(s_json)
print(s_yaml)
# wait for stop
input('press any key to stop')
# send stop command
command = {
'to': actor_name,
'type': 'stop',
}
s_json = json.dumps(command)
await write_json(writer, s_json)
s_json = await read_json(reader)
data = json.loads(s_json)
events = data['events']
print(events)
# wait for stop
input('press any key to play')
# send stop command
command = {
'to': actor_name,
'type': 'play',
'events': events,
}
s_json = json.dumps(command)
await write_json(writer, s_json)
s_json = await read_json(reader)
s_yaml = yaml_from_json(s_json)
print(s_yaml)
# for layout debug
async def layout_debug():
reader, writer = await asyncio.open_connection('localhost', 6000)
# welcome message
s_json = await read_json(reader)
s_yaml = yaml_from_json(s_json)
print(s_yaml)
# get actor
command = {
'to': 'root',
'type': 'listTabs',
}
s_json = json.dumps(command)
await write_json(writer, s_json)
s_json = await read_json(reader)
command = json.loads(s_json)
actor_name = command['eventPlayerActor']
print(actor_name)
command = {
'to': actor_name,
'type': sys.argv[1],
}
s_json = json.dumps(command)
await write_json(writer, s_json)
s_json = await read_json(reader)
s_yaml = yaml_from_json(s_json)
print(s_yaml)
# for raw interactive UI
async def interact():
reader, writer = await asyncio.open_connection('localhost', 6000)
while True:
s_json = await read_json(reader)
s_yaml = yaml_from_json(s_json)
print(s_yaml)
s_yaml = get_user_input()
s_json = json_from_yaml(s_yaml)
await write_json(writer, s_json)
def get_user_input():
data = ''
while True:
line = input()
if not line:
return data
data += line + '\n'
async def read_json(reader):
packet = await reader.read(64 * 1024)
length, b_json = packet.split(b':', 1)
length = int(length)
while len(b_json) != length:
packet = await reader.read(64 * 1024)
b_json += packet
return b_json.decode('utf-8')
def yaml_from_json(s_json):
data = json.loads(s_json)
s_yaml = yaml.dump(data, default_flow_style=False)
return s_yaml
def json_from_yaml(s_yaml):
data = yaml.safe_load(s_yaml)
s_json = json.dumps(data)
return s_json
async def write_json(writer, s_json):
b_json = s_json.encode('utf-8')
packet = b'%d:%s' % (len(b_json), b_json)
writer.write(packet)
await writer.drain()
if __name__ == '__main__':
exit_code = main()
sys.exit(exit_code)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment