Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
miot vacuum remote control with JS Gamepad API with analog input

start py server, open web in browser

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Vacuum Control</title>
</head>
<body>
<input id="triggerControl" type="checkbox" checked>
<label for="triggerControl">accel/decel with triggers instead of left stick</label>
<pre id="status">No Controller Connected</pre>
<pre id="wsstatus">WebSocket not ready</pre>
<script type="text/javascript">
let ws = new WebSocket(`ws://${location.host}/ws`);
let wsStatusElement = document.getElementById('wsstatus');
let wsConfig;
ws.onopen = () => {
wsStatusElement.innerHTML = 'WebSocket connected'
};
ws.onclose = () => {
wsStatusElement.innerHTML = 'WebSocket closed';
wsConfig = null;
};
ws.onmessage = event => {
// yay xss :)
wsStatusElement.innerHTML = event.data;
let msg = JSON.parse(event.data);
if (msg.config) {
wsConfig = msg.config;
}
};
let sendMsg = function (angleCoef, accelCoef) {
if (!wsConfig) {
return;
}
const angle = parseInt(angleCoef * wsConfig.angle);
const velocity = parseInt(accelCoef * wsConfig.velocity);
let msg = JSON.stringify({ angle, velocity });
wsStatusElement.innerHTML = msg;
ws.send(msg);
}
</script>
<script type="text/javascript">
let statusElement = document.getElementById('status');
let gamepadIndex;
let triggerControlEl = document.getElementById('triggerControl');
window.addEventListener('gamepadconnected', (event) => {
gamepadIndex = event.gamepad.index;
});
const axisAngle = { 'index': 0, 'reverse': true, 'deadzone': 0.1 };
const axisAccel = { 'index': 1, 'reverse': true, 'deadzone': 0.1 };
const btnDecel = { 'index': 6 };
const btnAccel = { 'index': 7 };
let getAxis = (axes, cfg) => {
const v = axes[cfg.index];
return (Math.abs(v) < cfg.deadzone) ? 0 : v * (cfg.reverse ? -1 : 1);
}
let sendInput = ({ buttons, axes }) =>
sendMsg(getAxis(axes, axisAngle), (triggerControlEl.checked) ? (
buttons[btnAccel.index].value - buttons[btnDecel.index].value
) : getAxis(axes, axisAccel));
setInterval(() => {
if (gamepadIndex !== undefined) {
const gamepad = navigator.getGamepads()[gamepadIndex];
statusElement.innerHTML = ""; // reset page
statusElement.innerHTML += gamepad.buttons.filter(e => e.pressed).map((_, i) => `button ${i} is pressed`).join('<br>')
statusElement.innerHTML += gamepad.axes.map((v, i) => `axis ${i}: ${v}`).join('<br>')
sendInput(gamepad);
}
}, 100);
</script>
</body>
</html>
from miio import MiotDevice, exceptions
from miio.miioprotocol import MiIOProtocol
import argparse
from datetime import datetime
import json
from typing import Callable
parser = argparse.ArgumentParser(description='dreame remote control')
parser.add_argument('--ip', help='ip addr of robot', required=True)
parser.add_argument('--token', help='miot token', required=True)
parser.add_argument('--angle', help='angle range', default=60, type=int)
parser.add_argument('--velocity', help='velocity range', default=180, type=int)
parser.add_argument('--host', help='http/ws bind host',
default='0.0.0.0', type=str)
parser.add_argument('--port', help='http port', default=8080, type=int)
parser.add_argument('--tty', help='tty mode', default=False, type=bool)
_ACTIONS = {
'step': {'siid': 21, 'aiid': 1},
'stop': {'siid': 21, 'aiid': 2},
}
class RemoteControl:
def __init__(self, get_device: Callable[[], MiotDevice]):
self.get_device = get_device
self.device: MiotDevice = None
self.angle = 0
self.velocity = 0
def step(self, angle=None, velocity=None):
if not self.device:
self.device = self.get_device()
if angle is not None:
self.angle = angle
if velocity is not None:
self.velocity = velocity
try:
res = self.device.call_action_by(**_ACTIONS['step'], params=[
{'piid': 1, 'value': str(self.angle)},
{'piid': 2, 'value': str(self.velocity)},
])
return res['code'] == 0
except exceptions.DeviceException:
# force reconnect
self.device = None
return False
def stop(self):
self.device.call_action_by(**_ACTIONS['stop'])
def start_tty(ctl: RemoteControl, velocity, angle):
import tty
import sys
import termios
fd = termios.tcgetattr(sys.stdin)
tty.setcbreak(sys.stdin)
print('wasd for control, q for exit')
while True:
try:
x = sys.stdin.read(1)[0]
except KeyboardInterrupt:
x = 'q'
if x == 'q':
ctl.stop()
break
elif x == 'w':
ctl.step(angle=0, velocity=+velocity)
elif x == 'a':
ctl.step(angle=+angle, velocity=0)
elif x == 's':
ctl.step(angle=0, velocity=-velocity)
elif x == 'd':
ctl.step(angle=-angle, velocity=0)
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, fd)
def start_http(ctl: RemoteControl, velocity: int, angle: int, host: str, port: int):
import asyncio
import pathlib
import aiohttp
from aiohttp import web
async def handle_ws(request: web.Request):
ws = web.WebSocketResponse()
await ws.prepare(request)
print('new ws connection')
await ws.send_str(json.dumps({'config': {
'angle': angle, 'velocity': velocity,
}}))
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
event = json.loads(msg.data)
mAngle = event.get('angle')
mVelocity = event.get('velocity')
if ctl.angle == 0 and ctl.velocity == 0 and not (mAngle or mVelocity):
# skip stepping if staying zero
ok = True
else:
ok = ctl.step(angle=mAngle, velocity=mVelocity)
await ws.send_str(json.dumps({'status': ok}))
elif msg.type == aiohttp.WSMsgType.ERROR:
print('ws connection closed with exception %s' %
ws.exception())
return ws
def create_runner():
app = web.Application()
app.add_routes([
web.get('/', handle_http),
web.get('/ws', handle_ws),
])
app.router.add_static('/s', pathlib.Path(__file__).parent.resolve())
return web.AppRunner(app)
async def handle_http(_: web.Request):
raise web.HTTPFound('/s/index.html')
async def start_server():
runner = create_runner()
await runner.setup()
site = web.TCPSite(runner, host, port)
await site.start()
print(f'starting server on {host}:{port}')
loop = asyncio.get_event_loop()
loop.run_until_complete(start_server())
loop.run_forever()
class PatchedProtocol(MiIOProtocol):
MAX_ID = 65535
@MiIOProtocol._id.getter
def _id(self):
self._MiIOProtocol__id += 1
if self._MiIOProtocol__id >= PatchedProtocol.MAX_ID:
self._MiIOProtocol__id = 1
return self._MiIOProtocol__id
if __name__ == '__main__':
args = parser.parse_args()
def get_device():
# dirty hack for sequence ids
start_id = int(datetime.utcnow().timestamp()*100) % PatchedProtocol.MAX_ID
device = MiotDevice(ip=args.ip, token=args.token, start_id=start_id)
device.retry_count = 0
device._protocol = PatchedProtocol(ip=args.ip, token=args.token, start_id=start_id)
print('id1', device._protocol._id)
print('id2', device._protocol._id)
return device
ctl = RemoteControl(get_device)
if args.tty:
start_tty(ctl, velocity=args.velocity, angle=args.angle)
else:
start_http(
ctl, velocity=args.velocity, angle=args.angle,
host=args.host, port=args.port)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment