Skip to content

Instantly share code, notes, and snippets.

@falsetru
Last active August 29, 2015 13:56
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save falsetru/9323541 to your computer and use it in GitHub Desktop.
Save falsetru/9323541 to your computer and use it in GitHub Desktop.
touchtracer
#!/usr/bin/kivy
import kivy
#kivy.require('1.0.6')
from kivy.app import App
from kivy.uix.floatlayout import FloatLayout
from kivy.uix.label import Label
from kivy.graphics import Color, Rectangle, Point, GraphicException
from kivy.clock import Clock
from kivy.logger import Logger
from random import random
from math import sqrt
import socket
import struct
import time
import threading
import Queue
import socket
def get_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(('google.com', 0))
return s.getsockname()[0]
finally:
s.close()
def get_broadcast_addr(ip):
# XXX
return ip.rsplit('.', 1)[0] + '.255'
ip = get_ip()
broadcast_ip = get_broadcast_addr(ip)
addr = broadcast_ip, 9876
# Because of rp_filter, can't use <broadcast> or 255.255.255.255
coord_struct = struct.Struct('>Q2d')
q = Queue.Queue()
def remake_socket():
global s, addr
addr = broadcast_ip, 9876
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
s.bind(('', 9876))
remake_socket()
def calculate_points(x1, y1, x2, y2, steps=5):
dx = x2 - x1
dy = y2 - y1
dist = sqrt(dx * dx + dy * dy)
if dist < steps:
return None
o = []
m = dist / steps
for i in range(1, int(m)):
mi = i / m
lastx = x1 + dx * mi
lasty = y1 + dy * mi
o.extend([lastx, lasty])
return o
def recv_loop():
while True:
try:
recv_loop_real()
except socket.error as e:
print e
remake_socket()
def recv_loop_real():
global addr
s.sendto('START', addr)
try:
while True:
data, a = s.recvfrom(65536)
if a[0] == ip:
#ignore packet came from same host.
continue
elif data.startswith('START'):
Logger.info('Received {} from {}'.format(data, a))
Logger.info('Received {} from {}'.format(data, a))
Logger.info('Received {} from {}'.format(data, a))
Logger.info('Received {} from {}'.format(data, a))
Logger.info('Received {} from {}'.format(data, a))
addr = a
if data == 'START':
s.sendto('STARTED', a)
else:
for i in range(len(data)/coord_struct.size):
pos = coord_struct.unpack(data[i*coord_struct.size:(i+1)*coord_struct.size])
q.put(pos)
finally:
s.close()
def my_callback(dt):
while q.unfinished_tasks:
uid, x, y = q.get()
app.tracer.add_line(uid + 0x00100000, x, y)
q.task_done()
class Touchtracer(FloatLayout):
def __init__(self, *args, **kwargs):
super(Touchtracer, self).__init__(*args, **kwargs)
self.lines = {}
self.buf = []
def add_line(self, uid, x, y):
#g = str(uid)
#self.c = Color(1, 1, 1, mode='hsv', group=g)
with self.canvas:
if uid not in self.lines:
self.lines[uid] = Point(points=(x,y), source='particle.png', pointsize=5)#, group=g)
else:
self.lines[uid].add_point(x, y)
def on_touch_down(self, touch):
win = self.get_parent_window()
ud = touch.ud
ud['group'] = g = str(touch.uid)
with self.canvas:
ud['color'] = Color(random(), 1, 1, mode='hsv', group=g)
ud['lines'] = (
Rectangle(pos=(touch.x, 0), size=(1, win.height), group=g),
Rectangle(pos=(0, touch.y), size=(win.width, 1), group=g),
Point(points=(touch.x, touch.y), source='particle.png',
pointsize=5))#, group=g))
self.send_point(touch.uid, touch.x, touch.y)
#ud['label'] = Label(size_hint=(None, None))
#self.update_touch_label(ud['label'], touch)
#self.add_widget(ud['label'])
touch.grab(self)
return True
def send_point(self, uid, x, y):
dgram = coord_struct.pack(uid, x, y)#touch.uid, points[idx], points[idx+1])
self.buf.append(dgram)
if len(self.buf) >= 100:
self.flush()
def flush(self):
if not self.buf:
return
buf, self.buf = self.buf, []
dgram = b''.join(buf)
try:
s.sendto(dgram, addr)
except socket.error:
pass
def on_touch_move(self, touch):
if touch.grab_current is not self:
return
ud = touch.ud
ud['lines'][0].pos = touch.x, 0
ud['lines'][1].pos = 0, touch.y
points = ud['lines'][2].points
oldx, oldy = points[-2], points[-1]
points = calculate_points(oldx, oldy, touch.x, touch.y)
if points:
try:
lp = ud['lines'][2].add_point
for idx in range(0, len(points), 2):
self.send_point(touch.uid, points[idx], points[idx+1])
lp(points[idx], points[idx+1])
except GraphicException:
pass
def on_touch_up(self, touch):
if touch.grab_current is not self:
return
touch.ungrab(self)
ud = touch.ud
self.canvas.remove_group(ud['group'])
if touch.x < 100 and touch.y < 100:
self.canvas.clear()
else:
self.flush()
class TouchtracerApp(App):
title = 'Touchtracer'
icon = 'icon.png'
def build(self):
self.tracer = Touchtracer()
return self.tracer
def on_pause(self):
return True
if __name__ == '__main__':
app = TouchtracerApp()
t = threading.Thread(target=recv_loop)
t.daemon = True
t.start()
Clock.schedule_interval(my_callback, 0.01)
app.run()
#:kivy 1.0
#:import kivy kivy
<Touchtracer>:
canvas:
Color:
rgb: 1, 1, 1
#Rectangle:
# source: 'data/images/background.jpg'
# size: self.size
BoxLayout:
padding: '10dp'
spacing: '10dp'
size_hint: 1, None
pos_hint: {'top': 1}
height: '44dp'
#Image:
# size_hint: None, None
# size: '24dp', '24dp'
# source: 'data/logo/kivy-icon-64.png'
# mipmap: True
#Label:
# height: '24dp'
# text_size: self.width, None
# color: (1, 1, 1, .8)
# text: 'Kivy %s - Touchtracer' % kivy.__version__
# valign: 'middle'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment