Skip to content

Instantly share code, notes, and snippets.

@dlech
Created August 1, 2017 15:42
Show Gist options
  • Save dlech/6877e0d3f96ba563b2ca5dfded651d48 to your computer and use it in GitHub Desktop.
Save dlech/6877e0d3f96ba563b2ca5dfded651d48 to your computer and use it in GitHub Desktop.
NXTCam5 streaming (uses unreleased GRX library, so won't work unless you compile GRX yourself)
#!/usr/bin/env python3
import fcntl
import os
import gi
gi.require_version('GLib', '2.0')
from gi.repository import GLib
gi.require_version('Grx', '3.0')
from gi.repository import Grx
PORT = '/dev/ttyACM0'
class App(Grx.Application):
_r = None
_w = None
def __init__(self):
super(Grx.Application, self).__init__()
self.init()
self.hold()
def do_activate(self):
# super important!
os.system('stty --file={} raw'.format(PORT))
# using 'rb+' gives an error because file is not seekable
# so using separate read and write file handles
self._r = open(PORT, 'rb', buffering=0)
self._w = open(PORT, 'wb', buffering=0)
# drain any old data
fd = self._r.fileno()
orig_fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, orig_fl | os.O_NONBLOCK)
while self._r.read(1):
pass
fcntl.fcntl(fd, fcntl.F_SETFL, orig_fl)
GLib.timeout_add(100, self._update)
def _update(self):
self._w.write(b's')
size = int(self._r.readline())
# doesn't seem to be able to read the data all at once
# so we have to keep reading until we get all of the data
# e.g. `data = self._r.read(size)` should work but doesn't
data = b''
remaining = size
while remaining > 0:
d = self._r.read(remaining)
data += d
remaining -= len(d)
# print(size, len(data))
try:
Grx.get_screen_context().load_from_jpeg_data(data, 1)
except:
pass
return GLib.SOURCE_CONTINUE
def do_event(self, event):
if Grx.Application.do_event(self, event):
return True
if event.type == Grx.EventType.TOUCH_DOWN:
self.quit()
return True
return False
if __name__ == '__main__':
GLib.set_prgname('camstream.py')
GLib.set_application_name('NXTCam5 streaming test')
app = App()
app.run()
import pyb
import sensor
import time
sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QVGA)
sensor.skip_frames(30)
clock = time.clock()
first_tick = False
usb = pyb.USB_VCP()
while True:
c = usb.recv(1)
if c == b's':
if not first_tick:
clock.tick()
first_tick = True
s = sensor.snapshot()
s.draw_string(10, 10, '{} FPS'.format(clock.fps()), color=(0, 0, 0))
clock.tick()
s.compress()
usb.write(str(s.size()))
usb.write('\n')
usb.send(s)
@dlech
Copy link
Author

dlech commented Aug 1, 2017

camstream.py runs on the host system. main.py runs on the NXTCam.

@ec-m
Copy link

ec-m commented Nov 13, 2017

Would it be possible to communicate using the regular cables that connect the NXTCam-v5 to one of the input ports of the EV3 instead of communication via USB and if so, how would I have to change the code above to achieve this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment