Skip to content

Instantly share code, notes, and snippets.

@MaartenBaert
Created January 27, 2018 14:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MaartenBaert/fb761958b8a122f3af1213ed0d2ae398 to your computer and use it in GitHub Desktop.
Save MaartenBaert/fb761958b8a122f3af1213ed0d2ae398 to your computer and use it in GitHub Desktop.
GLInject to Python (numpy)
import ctypes
import glob
import mmap
import operator
import os.path
import time
import numpy
import matplotlib.pyplot as plt
GLINJECT_RING_BUFFER_SIZE = 4
GLINJECT_IDENTIFIER = 0x8af7a476
GLINJECT_FLAG_CAPTURE_ENABLED = 0x0001
GLINJECT_FLAG_RECORD_CURSOR = 0x0002
GLINJECT_FLAG_LIMIT_FPS = 0x0004
class GLInjectHeader(ctypes.Structure):
_pack_ = 1
_fields_ = [
("identifier", ctypes.c_uint32),
("ring_buffer_read_pos", ctypes.c_uint32),
("ring_buffer_write_pos", ctypes.c_uint32),
("current_width", ctypes.c_uint32),
("current_height", ctypes.c_uint32),
("frame_counter", ctypes.c_uint32),
("capture_flags", ctypes.c_uint32),
("capture_target_fps", ctypes.c_uint32),
]
class GLInjectFrameInfo(ctypes.Structure):
_pack_ = 1
_fields_ = [
("timestamp", ctypes.c_int64),
("width", ctypes.c_uint32),
("height", ctypes.c_uint32),
("stride", ctypes.c_int32),
]
class GLInjectMain(ctypes.Structure):
_pack_ = 1
_fields_ = [
("header", GLInjectHeader),
("frameinfo", GLInjectFrameInfo * GLINJECT_RING_BUFFER_SIZE),
]
def makepagesize(x):
return (x + mmap.PAGESIZE - 1) // mmap.PAGESIZE * mmap.PAGESIZE
def releasemem(x):
for mv in x._objects.values():
if isinstance(mv, memoryview):
mv.release()
def showimage(img):
plt.figure("Image", figsize=(img.shape[1] / 80, img.shape[0] / 80), dpi=80)
plt.clf()
plt.figimage(img)
plt.show()
channel_dir = "/dev/shm/ssr-channel-maarten"
streamlist = [os.path.basename(filename).split("-") for filename in glob.glob(os.path.join(channel_dir, "video-*"))]
if len(streamlist) == 0:
print("Could not find any streams!")
streamlist.sort(key=operator.itemgetter(1))
selected_stream = streamlist[-1]
fp_video = open(os.path.join(channel_dir, "-".join(selected_stream)), "r+b")
fp_videoframes = [open(os.path.join(channel_dir, "-".join(["videoframe%d" % (i)] + selected_stream[1:])), "r+b") for i in range(GLINJECT_RING_BUFFER_SIZE)]
try:
mmap_video = mmap.mmap(fp_video.fileno(), makepagesize(ctypes.sizeof(GLInjectMain)))
struct_video = GLInjectMain.from_buffer(mmap_video)
print("Clearing old frames", struct_video.header.ring_buffer_read_pos, struct_video.header.ring_buffer_write_pos)
struct_video.header.ring_buffer_read_pos = struct_video.header.ring_buffer_write_pos
print("Starting capture")
struct_video.header.capture_flags = GLINJECT_FLAG_CAPTURE_ENABLED | GLINJECT_FLAG_LIMIT_FPS
print("Waiting for frame ... ", end="")
while struct_video.header.ring_buffer_read_pos == struct_video.header.ring_buffer_write_pos:
time.sleep(0.01)
print("got frame %d" % (struct_video.header.ring_buffer_read_pos))
framenum = struct_video.header.ring_buffer_read_pos % GLINJECT_RING_BUFFER_SIZE
print("Reading frame %d ... " % (framenum), end="")
(timestamp, width, height, stride) = (struct_video.frameinfo[framenum].timestamp, struct_video.frameinfo[framenum].width, struct_video.frameinfo[framenum].height, struct_video.frameinfo[framenum].stride)
required_size = abs(stride) * height
mmap_videoframe = mmap.mmap(fp_videoframes[framenum].fileno(), makepagesize(abs(stride) * height))
rawimage = numpy.frombuffer(mmap_videoframe, dtype=numpy.uint8, count=abs(stride) * height).reshape((height, abs(stride)))
rawimage = rawimage[:, :width*4] if stride > 0 else rawimage[::-1, :width*4]
image = rawimage.reshape((height, width, 4))[:, :, 2::-1].copy()
mmap_videoframe.close()
struct_video.header.ring_buffer_read_pos = (struct_video.header.ring_buffer_read_pos + 1) % (GLINJECT_RING_BUFFER_SIZE * 2)
print(" done")
print("Stopping capture")
struct_video.header.capture_flags = 0
print("Cleaning up ...", end="")
releasemem(struct_video)
mmap_video.close()
print(" done")
finally:
fp_video.close()
for fp in fp_videoframes:
fp.close()
showimage(image)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment