Skip to content

Instantly share code, notes, and snippets.

@otger
Created July 3, 2017 09:42
Show Gist options
  • Save otger/88b34cfec41da34a2d4fd8731eb42fbb to your computer and use it in GitHub Desktop.
Save otger/88b34cfec41da34a2d4fd8731eb42fbb to your computer and use it in GitHub Desktop.
Script to download an image from a webcam and upload to google drive using gdrive if it complies with some settings. Settings to decide if an image must be uploaded are brightness and change from last uploaded image
#!/usr/bin/env python
import math
import subprocess
from PIL import Image, ImageStat, ImageChops
import numpy as np
import datetime
import urllib
import os
import glob
import shutil
import logging
TIMELAPSE_GDRIVE_ID =
TIMELAPSE_URL =
TIMELAPSE_STORAGE =
TIMELAPSE_MIN_BRIGHTNESS = 30
TIMELAPSE_MIN_CHANGE = 10000
DEFAULT_DEVICE_WIDTH = 1920
DEFAULT_DEVICE_HEIGHT = 1080
log = logging.getLogger('lst1_lapse')
log.setLevel(logging.DEBUG)
nh = logging.NullHandler()
log.addHandler(nh)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
def add_file_handler(file_path, level=logging.DEBUG):
global log
global formatter
fh = logging.FileHandler(filename=file_path)
fh.setLevel(level=level)
# create formatter and add it to the handlers
fh.setFormatter(formatter)
log.addHandler(fh)
def now_string():
now = datetime.datetime.now()
return now.strftime('%Y%m%d_%H%M%S')
class Grabber(object):
def __init__(self, url, storage_path, min_bright, min_change):
self.url = url
self.storage_path = storage_path
self.storage_tmp_path = os.path.join(self.storage_path, 'tmp')
self.storage_save_path = os.path.join(self.storage_path, 'saved')
self.min_bright = min_bright
self.min_change = min_change
self.last_path = None
self.last_im = None
self.current_filename = None
self.current_path = None
self.current_im = None
self.set_last()
def grab_image(self):
self.current_filename = '{}.jpg'.format(now_string())
self.current_path = os.path.join(self.storage_path, 'tmp',
self.current_filename)
image = urllib.URLopener()
try:
image.retrieve(self.url, self.current_path)
except Exception as ex:
log.exception("Failed to retrieve image: {}".format(self.current_filename))
self.current_filename = None
self.current_path = None
self.current_im = None
raise
else:
self.current_im = Image.open(self.current_path)
def set_last(self):
contents = glob.glob('{}/*.jpg'.format(self.storage_save_path))
if contents == []:
self.last_path = None
self.last_im = None
return
contents.sort(reverse=True)
self.last_path = contents[0]
self.last_im = Image.open(self.last_path)
def save_current(self):
shutil.move(self.current_path, self.storage_save_path)
def remove_current(self):
os.remove(self.current_path)
def upload_current(self):
gdrive_call = ["gdrive", "upload", "-p", TIMELAPSE_GDRIVE_ID, self.current_path]
subprocess.call(gdrive_call)
def compare(self):
monoimage1 = self.current_im.convert("P", palette=Image.ADAPTIVE, colors=2)
monoimage2 = self.last_im.convert("P", palette=Image.ADAPTIVE, colors=2)
imgdata1 = monoimage1.getdata()
imgdata2 = monoimage2.getdata()
changed = 0
i = 0
acc = 3
while i < DEFAULT_DEVICE_WIDTH * DEFAULT_DEVICE_HEIGHT:
now = imgdata1[i]
prev = imgdata2[i]
if now != prev:
changed += 1
i += 1
percchange = float(changed) / float(DEFAULT_DEVICE_WIDTH * DEFAULT_DEVICE_HEIGHT)
return percchange
def compare2(self):
def ImageEntropy(image):
w, h = image.size
a = np.array(image.convert('RGB')).reshape((w*h, 3))
h, e = np.histogramdd(a, bins=(16,)*3, range=((0, 256),)*3)
prob = h/np.sum(h)
return -np.sum(np.log2(prob[prob > 0]))
diffs = ImageChops.difference(self.current_im, self.last_im)
# if self.first:
# diffs.show()
# self.first = False
return ImageEntropy(diffs)
def curr_brightness(self):
stat = ImageStat.Stat(self.current_im)
r, g, b = stat.mean
return math.sqrt(0.241*(r**2) + 0.691*(g**2) + 0.068*(b**2))
def run(self):
self.grab_image()
good_im = False
first_im = False
bright_enough = False
change_enough = False
save_and_upload = False
if self.current_im:
good_im = True
if self.last_im is None:
first_im = True
b = self.curr_brightness()
if b < self.min_bright:
log.debug("{} - not enough brightness: {}".format(self.current_filename, b))
bright_enough = True
c = self.compare2()
log.debug("Previous: {}".format(self.last_path))
log.debug("{} - brightness: {} - change: {}".format(self.current_filename, b, c))
if c < self.min_change:
log.debug("{} - not enough change: {}".format(self.current_filename, c))
# self.remove_current()
change_enough = True
if first_im and bright_enough:
save_and_upload = True
elif bright_enough and change_enough:
save_and_upload = True
try:
if save_and_upload:
log.debug('{} - Uploading'.format(self.current_filename))
self.upload_current()
except:
log.exception("{} - Failed to upload to gdrive".format(self.current_filename))
self.save_current()
if __name__ == "__main__":
add_file_handler(file_path=os.path.join(TIMELAPSE_STORAGE, 'grabber.log'))
g = Grabber(url=TIMELAPSE_URL,
storage_path=TIMELAPSE_STORAGE,
min_bright=TIMELAPSE_MIN_BRIGHTNESS,
min_change=TIMELAPSE_MIN_CHANGE
)
try:
g.run()
except:
log.exception("Error when grabbing")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment