Skip to content

Instantly share code, notes, and snippets.

@rnixx
Last active August 29, 2015 14:23
Show Gist options
  • Save rnixx/1f26328044b6e8996c6e to your computer and use it in GitHub Desktop.
Save rnixx/1f26328044b6e8996c6e to your computer and use it in GitHub Desktop.
Kivy: Widget Covering MJPEG Video based on https://gist.github.com/tito/d6eb474543c32eb33504
import io
import urllib
import threading
from kivy.uix.image import Image
from kivy.properties import StringProperty
from kivy.core.image import Image as CoreImage
from kivy.clock import Clock
from coverbehavior import CoverBehavior
class CoverMjpeg(CoverBehavior, Image):
url = StringProperty()
def start(self):
self.quit = False
self._thread = threading.Thread(target=self.read_stream)
self._thread.daemon = True
self._thread.start()
self._image_lock = threading.Lock()
self._image_buffer = None
Clock.schedule_interval(self.update_image, 1 / 30.)
def unload(self):
self.quit = True
self._thread.join()
Clock.unschedule(self.update_image)
def read_stream(self):
stream = urllib.urlopen(self.url)
bytes_ = ''
while not self.quit:
bytes_ += stream.read(1024)
a = bytes_.find('\xff\xd8')
b = bytes_.find('\xff\xd9')
if a != -1 and b != -1:
jpg = bytes_[a:b + 2]
bytes_ = bytes_[b + 2:]
with self._image_lock:
# previous image not consumed yet
if self._image_buffer is not None:
continue
data = io.BytesIO(jpg)
im = CoreImage(data, ext="jpeg", nocache=True)
self._image_buffer = im
def update_image(self, *args):
with self._image_lock:
im = self._image_buffer
self._image_buffer = None
if im is not None:
self.cover_origin_size = im.texture.size
self.calculate_cover()
self.texture = im.texture
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment