Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@akirayou
Created August 25, 2018 15:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save akirayou/05062402d18da87b671b94d11f61c20a to your computer and use it in GitHub Desktop.
Save akirayou/05062402d18da87b671b94d11f61c20a to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
"""
Created on Thu Aug 9 22:50:30 2018
@author: akira
"""
import time
import socket
import numpy as np
import time
from threading import Thread, Lock
led_run=False
time.sleep(0.5)
host = '192.168.1.214'
#host = '224.0.0.1'
port = 13531
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
d=None
def send_command(sock,data,idx,commandNo):
global d
global port,host
d=bytearray(5+len(data))
d[0]=idx
ms=round(time.time()*1000) % 65536
d[1]=ms//256
d[2]=ms%256
d[3]=commandNo
d[4]=0
for i in range(len(data)):
d[5+i]=data[i]
sock.sendto(d, (host, port))
def set_offset(sock,offset):
send_command(sock,[offset//256,offset%256],0,0)
def set_maxdelay(sock,d):
send_command(sock,[d//256,d%256],0,0)
def _send(sock,data,idx,startPos_Q):
global d
global port,host
assert data.shape[0]%2==0, "data len must be even"
d=bytearray(5+3*data.shape[0])
d[0]=idx
ms=round(time.time()*1000) % 65536
d[1]=ms//256
d[2]=ms%256
d[3]=startPos_Q
d[4]=data.shape[0]//2
for i in range(data.shape[0]):
d[5+i*3+0]=data[i,0]
d[5+i*3+1]=data[i,1]
d[5+i*3+2]=data[i,2]
sock.sendto(d, (host, port))
#print(idx,startPos_Q,data.shape[0]//2,ms/1000.0)
def send(sock,data):
if not hasattr(send, "idx"):send.idx=0
data_len=data.shape[0]
chunk_max=((1472-6)//3//8)*8
for j in range(2): #dup send to aboid packet loss
for i in range(0,data_len,chunk_max):
chunk_len=min((chunk_max,data_len-i))
_send(sock,data[i:i+chunk_len,:],send.idx,i//8)
send.idx+=1
send.idx%=256
led_run=True
led_lock=Lock()
ledData=np.zeros( (nofLedR,3),dtype=np.ubyte)
offset_ms=5
nofAveBuf=5
def led_thread():
global ledData,sock
set_offset(sock,offset_ms)
set_maxdelay(sock,200)
while(led_run):
with led_lock:
data=np.copy(ledData)
send(sock,data[:nofLed,:])
#frame span must be offset_ms/nofAveBuf but 20fps is max
delay=offset_ms*0.001/nofAveBuf
time.sleep( max((1/20.0,delay)))
Thread(target=led_thread).start()
def send_led(led):
global ledData
with led_lock:
ledData=np.copy(led)
def cap_delay(dt):
start=time.time()
ret, f = cap.read()
ret, f = cap.read()
while(time.time()-start<dt):
ret, f = cap.read()
return f
def getDiff(mask):
global nofLed,nofLedR,nowBlock
delay=0.5
led=np.zeros( (nofLedR,3),dtype=np.ubyte)
send_led(led[:nofLed,:])
f0=cap_delay(delay)
led[nowBlock::nofBlock,:][mask,:]=5
send_led(led[:nofLed,:])
f1=cap_delay(delay)
f1=f1.astype(np.float)/255
f0=f0.astype(np.float)/255
diff=(f1-f0)
diff[diff<0]=0
diff/=np.max(diff)
if(True):
cv2.imshow("f0",f0)
cv2.imshow("f1",f1)
cv2.imshow("diff",diff)
cv2.waitKey(1)
return diff
def _getDiff(mask):
global nofLed,nofLedR,nowBlock
led=np.zeros( (nofLedR,3),dtype=np.ubyte)
send_led(led[:nofLed,:])
ret,f1 = cap.read()
f1=f1.astype(np.float)/255
while(True):
ret,f0 = cap.read()
f0=f0.astype(np.float)/255
diff=(f1-f0)
diff[diff<0]=0
if(np.sum(diff)>nofOnLed*140):break
led[nowBlock::nofBlock,:][mask,:]=5
send_led(led[:nofLed,:])
while(True):
ret, f1 = cap.read()
f1=f1.astype(np.float)/255
diff=(f1-f0)
diff[diff<0]=0
if(np.sum(diff)>nofOnLed*100):break
diff/=np.max(diff)
if(True):
cv2.imshow("f0",f0)
cv2.imshow("f1",f1)
cv2.imshow("diff",diff)
cv2.waitKey(1)
return diff
def show_single(idx):
led=np.zeros( (nofLedR,3),dtype=np.ubyte)
led[::nofBlock,:][idx,:]=10
send_led(led[:nofLed,:])
#get n bit pattern
def get_masks():
masks=[]
imgs=[]
#led on and wait for camera
led=np.zeros( (nofLedR,3),dtype=np.ubyte)
led[::nofBlock,:]=5
send_led(led);
time.sleep(0.5)
for n in range(nofOnLedL):
for b in range(2): #get low bit or high bit
mask=[]
for i in range(nofOnLed):
mod=2**n
o=1
if(mod&i): o^=1
o^=b
if(o!=0):mask.append(i)
print(mask)
masks.append(mask)
imgs.append(getDiff(mask))
return masks,imgs
def getSingle(masks,imgs,idx):
ret=None
ii=[]
for i,m in enumerate(masks):
if(idx in m):
img=imgs[i]/np.max(imgs[i])
ii.append(i)
if(ret is None):
ret=img
else:
ret*=img
ret=ret**(1.0/len(masks))
print(ii)
#ret/=np.max(ret )
return ret
def rgb_to_gray(src):
# チャンネル分解
b, g, r = src[:,:,0], src[:,:,1], src[:,:,2]
# R, G, Bの値からGrayの値に変換
gray = 0.2989 * r + 0.5870 * g + 0.1140 * b
return gray
import cv2
if "cap" in globals(): del cap
nofLed=1250
nofOnLedL=7
nofOnLed=2**nofOnLedL
nofBlock=(nofLed+nofOnLed-1)//nofOnLed;
nofLedR=nofBlock*nofOnLed;
nowBlock=0
count=0
cap = cv2.VideoCapture(1)
cap.set(10,1)#set Bright ness https://qiita.com/DoChi_72/items/a9fd82c1870b45aaf136
cap.set(14,1)#gain
cap.set(15,1)#exposure
print("aging")
get_masks()
print("real run")
ledPos=[(0,0)]*nofLed
for a in range(nofBlock):
nowBlock=a
masks,imgs=get_masks()
for i in range(nofOnLed):
ledId=nowBlock+nofBlock*i
if(nofLed<=ledId):continue
simg=getSingle(masks,imgs,i)
simg = rgb_to_gray(simg)
bimg=cv2.GaussianBlur(simg,(5,5),0)
bimg/=np.max(bimg)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(bimg)
print(ledId,max_loc)
ledPos[ledId]=max_loc
if(False):
cv2.line(bimg, (max_loc[0], 0), (max_loc[0], bimg.shape[0]), 255, 1)
cv2.line(bimg, (0,max_loc[1]), ( bimg.shape[1],max_loc[1]), 255, 1)
show_single(i)
f1=cap_delay(1)
cv2.imshow("f1",f1)
cv2.imshow("tt",bimg)
cv2.waitKey(1)
f=cap_delay(0.1)
for max_loc in ledPos:
f[max_loc[1],max_loc[0],2]=255
try:
f[max_loc[1]+1,max_loc[0]+1,2]=255
f[max_loc[1]+1,max_loc[0]+1,0:2]=0
except:
pass
cv2.imshow("final",f)
cv2.waitKey(1)
ledPos=np.array(ledPos)
xmin=np.min(ledPos[:,0])
xmax=np.max(ledPos[:,0])
xl=xmax-xmin
ymin=np.min(ledPos[:,1])
ymax=np.max(ledPos[:,1])
yl=ymax-ymin
count=0;
while(True):
led=np.zeros( (nofLed,3),dtype=np.ubyte)
x=count%xl+xmin
y=count%yl+ymin
led[np.nonzero( ledPos[:,0]//3==x//3)[0],1]=10
led[np.nonzero(ledPos[:,1]//3==y//3)[0],2]=10
send_led(led)
count +=3
time.sleep(1.0/20)
led_run=False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment