Skip to content

Instantly share code, notes, and snippets.

@stek29
Last active August 12, 2021 06:34
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stek29/876dd36da29223994441cab4e96bc7eb to your computer and use it in GitHub Desktop.
Save stek29/876dd36da29223994441cab4e96bc7eb to your computer and use it in GitHub Desktop.
miot vacuum remote control with JS Gamepad API with analog input

start py server, open web in browser

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Vacuum Control</title>
</head>
<body>
<input id="triggerControl" type="checkbox" checked>
<label for="triggerControl">accel/decel with triggers instead of left stick</label>
<pre id="status">No Controller Connected</pre>
<pre id="wsstatus">WebSocket not ready</pre>
<script type="text/javascript">
let ws = new WebSocket(`ws://${location.host}/ws`);
let wsStatusElement = document.getElementById('wsstatus');
let wsConfig;
ws.onopen = () => {
wsStatusElement.innerHTML = 'WebSocket connected'
};
ws.onclose = () => {
wsStatusElement.innerHTML = 'WebSocket closed';
wsConfig = null;
};
ws.onmessage = event => {
// yay xss :)
wsStatusElement.innerHTML = event.data;
let msg = JSON.parse(event.data);
if (msg.config) {
wsConfig = msg.config;
}
};
let sendMsg = function (angleCoef, accelCoef) {
if (!wsConfig) {
return;
}
const angle = parseInt(angleCoef * wsConfig.angle);
const velocity = parseInt(accelCoef * wsConfig.velocity);
let msg = JSON.stringify({ angle, velocity });
wsStatusElement.innerHTML = msg;
ws.send(msg);
}
</script>
<script type="text/javascript">
let statusElement = document.getElementById('status');
let gamepadIndex;
let triggerControlEl = document.getElementById('triggerControl');
window.addEventListener('gamepadconnected', (event) => {
gamepadIndex = event.gamepad.index;
});
const axisAngle = { 'index': 0, 'reverse': true, 'deadzone': 0.1 };
const axisAccel = { 'index': 1, 'reverse': true, 'deadzone': 0.1 };
const btnDecel = { 'index': 6 };
const btnAccel = { 'index': 7 };
let getAxis = (axes, cfg) => {
const v = axes[cfg.index];
return (Math.abs(v) < cfg.deadzone) ? 0 : v * (cfg.reverse ? -1 : 1);
}
let sendInput = ({ buttons, axes }) =>
sendMsg(getAxis(axes, axisAngle), (triggerControlEl.checked) ? (
buttons[btnAccel.index].value - buttons[btnDecel.index].value
) : getAxis(axes, axisAccel));
setInterval(() => {
if (gamepadIndex !== undefined) {
const gamepad = navigator.getGamepads()[gamepadIndex];
statusElement.innerHTML = ""; // reset page
statusElement.innerHTML += gamepad.buttons.filter(e => e.pressed).map((_, i) => `button ${i} is pressed`).join('<br>')
statusElement.innerHTML += gamepad.axes.map((v, i) => `axis ${i}: ${v}`).join('<br>')
sendInput(gamepad);
}
}, 100);
</script>
</body>
</html>
from miio import MiotDevice, exceptions
from miio.miioprotocol import MiIOProtocol
import argparse
from datetime import datetime
import json
from typing import Callable
parser = argparse.ArgumentParser(description='dreame remote control')
parser.add_argument('--ip', help='ip addr of robot', required=True)
parser.add_argument('--token', help='miot token', required=True)
parser.add_argument('--angle', help='angle range', default=60, type=int)
parser.add_argument('--velocity', help='velocity range', default=180, type=int)
parser.add_argument('--host', help='http/ws bind host',
default='0.0.0.0', type=str)
parser.add_argument('--port', help='http port', default=8080, type=int)
parser.add_argument('--tty', help='tty mode', default=False, type=bool)
_ACTIONS = {
'step': {'siid': 21, 'aiid': 1},
'stop': {'siid': 21, 'aiid': 2},
}
class RemoteControl:
def __init__(self, get_device: Callable[[], MiotDevice]):
self.get_device = get_device
self.device: MiotDevice = None
self.angle = 0
self.velocity = 0
def step(self, angle=None, velocity=None):
if not self.device:
self.device = self.get_device()
if angle is not None:
self.angle = angle
if velocity is not None:
self.velocity = velocity
try:
res = self.device.call_action_by(**_ACTIONS['step'], params=[
{'piid': 1, 'value': str(self.angle)},
{'piid': 2, 'value': str(self.velocity)},
])
return res['code'] == 0
except exceptions.DeviceException:
# force reconnect
self.device = None
return False
def stop(self):
self.device.call_action_by(**_ACTIONS['stop'])
def start_tty(ctl: RemoteControl, velocity, angle):
import tty
import sys
import termios
fd = termios.tcgetattr(sys.stdin)
tty.setcbreak(sys.stdin)
print('wasd for control, q for exit')
while True:
try:
x = sys.stdin.read(1)[0]
except KeyboardInterrupt:
x = 'q'
if x == 'q':
ctl.stop()
break
elif x == 'w':
ctl.step(angle=0, velocity=+velocity)
elif x == 'a':
ctl.step(angle=+angle, velocity=0)
elif x == 's':
ctl.step(angle=0, velocity=-velocity)
elif x == 'd':
ctl.step(angle=-angle, velocity=0)
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, fd)
def start_http(ctl: RemoteControl, velocity: int, angle: int, host: str, port: int):
import asyncio
import pathlib
import aiohttp
from aiohttp import web
async def handle_ws(request: web.Request):
ws = web.WebSocketResponse()
await ws.prepare(request)
print('new ws connection')
await ws.send_str(json.dumps({'config': {
'angle': angle, 'velocity': velocity,
}}))
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
event = json.loads(msg.data)
mAngle = event.get('angle')
mVelocity = event.get('velocity')
if ctl.angle == 0 and ctl.velocity == 0 and not (mAngle or mVelocity):
# skip stepping if staying zero
ok = True
else:
ok = ctl.step(angle=mAngle, velocity=mVelocity)
await ws.send_str(json.dumps({'status': ok}))
elif msg.type == aiohttp.WSMsgType.ERROR:
print('ws connection closed with exception %s' %
ws.exception())
return ws
def create_runner():
app = web.Application()
app.add_routes([
web.get('/', handle_http),
web.get('/ws', handle_ws),
])
app.router.add_static('/s', pathlib.Path(__file__).parent.resolve())
return web.AppRunner(app)
async def handle_http(_: web.Request):
raise web.HTTPFound('/s/index.html')
async def start_server():
runner = create_runner()
await runner.setup()
site = web.TCPSite(runner, host, port)
await site.start()
print(f'starting server on {host}:{port}')
loop = asyncio.get_event_loop()
loop.run_until_complete(start_server())
loop.run_forever()
class PatchedProtocol(MiIOProtocol):
MAX_ID = 65535
@MiIOProtocol._id.getter
def _id(self):
self._MiIOProtocol__id += 1
if self._MiIOProtocol__id >= PatchedProtocol.MAX_ID:
self._MiIOProtocol__id = 1
return self._MiIOProtocol__id
if __name__ == '__main__':
args = parser.parse_args()
def get_device():
# dirty hack for sequence ids
start_id = int(datetime.utcnow().timestamp()*100) % PatchedProtocol.MAX_ID
device = MiotDevice(ip=args.ip, token=args.token, start_id=start_id)
device.retry_count = 0
device._protocol = PatchedProtocol(ip=args.ip, token=args.token, start_id=start_id)
print('id1', device._protocol._id)
print('id2', device._protocol._id)
return device
ctl = RemoteControl(get_device)
if args.tty:
start_tty(ctl, velocity=args.velocity, angle=args.angle)
else:
start_http(
ctl, velocity=args.velocity, angle=args.angle,
host=args.host, port=args.port)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment