Skip to content

Instantly share code, notes, and snippets.

@niteshgaba
Created April 12, 2021 08:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save niteshgaba/2148e78ae5276907fc49c4eddb44446e to your computer and use it in GitHub Desktop.
Save niteshgaba/2148e78ae5276907fc49c4eddb44446e to your computer and use it in GitHub Desktop.
Arducam
import numpy as np
import cv2
from multiprocessing import Process, Queue
from threading import Thread
from datetime import datetime
from datetime import timedelta
import os
import time
from timeit import default_timer as timer
import operator
import queue
from multiprocessing import Manager
def gstreamer_pipeline(
capture_width=1920,
capture_height=1080,
display_width=1920,
display_height=1080,
framerate=60,
flip_method=0,
):
return (
"nvarguscamerasrc aelock=true ! "
"video/x-raw(memory:NVMM), "
"width=(int)%d, height=(int)%d, "
"format=(string)NV12, framerate=(fraction)%d/1 ! "
"nvvidconv flip-method=%d ! "
"video/x-raw, width=(int)%d, height=(int)%d, format=(string)BGRx ! "
"videoconvert ! "
"video/x-raw, format=(string)BGR ! appsink"
% (
capture_width,
capture_height,
framerate,
flip_method,
display_width,
display_height,
)
)
def __process_job(delta_x, delta_y, delta_z, delta_o, speed, iteration, move_output_queue):
time.sleep(5)
move_output_queue.put(True)
def __multiple_shots(camera, move_output_queue, frame_output_queue, all_files):
while True:
try:
command = move_output_queue.get_nowait()
except queue.Empty:
frame = None
dt = datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f')
_, frame = camera.read()
if(frame is not None):
m = frame.copy()
all_files[dt] = m
frame_output_queue.put(dt)
else:
if(command is not None):
frame_output_queue.put(None)
break
def get_brenner_score(frame):
return 1
def generate_score(process_count, frame_output_queue, score_output_queue, all_files):
while True:
print("ALL FILES IN SCORE", len(all_files))
frame_output = frame_output_queue.get()
if frame_output is None:
print("frame_output is none")
score_output_queue.put(None)
print("generate_score worker exited!", process_count)
return
dt = frame_output
if(dt in all_files):
frame = all_files[dt]
score = get_brenner_score(frame)
file_details = {
"frame_captured" : dt,
"score":score
}
print("Queue", process_count, score)
score_output_queue.put(file_details)
else:
print("Missing- requeue", dt)
frame_output_queue.put(dt)
def __process_frames(camera, folder_path, extension, delta_x, delta_y, delta_z, delta_o, speed, iteration, final_output_queue, prefix, top_percentage):
move_output_queue = Queue(maxsize=0)
frame_output_queue = Queue(maxsize=0)
score_output_queue = Queue(maxsize=0)
all_files = Manager().dict()
processed_file_details = []
# start the movement
p = Thread(target=__process_job, args=(delta_x, delta_y, delta_z, delta_o, speed, iteration, move_output_queue))
p.start()
# start the capture
th = Thread(target=__multiple_shots, args=(camera, move_output_queue, frame_output_queue, all_files))
th.start()
s = Thread(target=generate_score, args=(0, frame_output_queue, score_output_queue, all_files, ))
s.start()
exit_count = 0
while True:
segment_output_row = score_output_queue.get()
if (segment_output_row is None):
exit_count += 1
print("file worker exit count ", exit_count)
if exit_count == 1:
top_x = int(len(processed_file_details) * top_percentage / 100)
keyfun = operator.itemgetter("score")
processed_file_details.sort(key=keyfun, reverse=True)
files_to_write = processed_file_details[:top_x]
print("All work Done, time to store ", top_x, "out of", len(all_files))
file_details = []
for f in files_to_write:
key = f["frame_captured"]
img_file_name = "{}_{}.{}".format(prefix, key, extension)
file_name = os.path.join(folder_path, img_file_name)
print("writing (cap, writing)", file_name, key, datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f'))
frame = all_files[f["frame_captured"]]
cv2.imwrite(file_name, frame)
file_details.append([file_name, f["score"]])
move_output_queue.close()
frame_output_queue.close()
score_output_queue.close()
all_files.clear()
final_output_queue.put(file_details)
break
else:
processed_file_details.append(segment_output_row)
p.join()
th.join()
s.join()
def __continous_move_internal(cam, folder_path, delta_x, delta_y, delta_z, delta_o, speed, iteration, prefix, top_percentage):
final_output_queue = Queue(maxsize=0)
start = time.time()
__process_frames(cam, folder_path, "jpg", delta_x, delta_y, delta_z, delta_o, speed, iteration, final_output_queue, prefix, top_percentage)
while(final_output_queue.empty()):
time.sleep(0.1)
end = time.time()
print("A----->", (end - start))
all_files = final_output_queue.get()
final_output_queue.close()
return all_files
def main():
cam = cv2.VideoCapture(gstreamer_pipeline(flip_method=0), cv2.CAP_GSTREAMER)
if not cam.isOpened():
print("Device not found")
return None
cwd = os.getcwd()
sample_id = input("Start Sample Code : ")
folder_path = os.path.join(cwd, 'testing', sample_id)
os.makedirs(folder_path)
all_files = __continous_move_internal(cam, folder_path, 0, 0, 0, 1, 10000, 1, "PRE1", 5)
print(all_files)
cam.release()
print("Are we oK")
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment