Skip to content

Instantly share code, notes, and snippets.

@adongy
Last active August 29, 2015 14:01
Show Gist options
  • Save adongy/4d70c4981a05b34a845c to your computer and use it in GitHub Desktop.
Save adongy/4d70c4981a05b34a845c to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
import logging
import os
import tempfile
import time
from fractions import Fraction
import usb
from jpegtran import JPEGImage
from spreads.vendor.pathlib import Path
from spreads.config import OptionTemplate
from spreads.plugin import DevicePlugin, DeviceFeatures
from spreads.util import DeviceException
import lupa
WHITEBALANCE_MODES = {
'Auto': 0,
'Daylight': 1,
'Cloudy': 2,
'Tungsten': 3,
'Fluorescent': 4,
'Fluorescent H': 5,
'Custom': 7
}
#Run to get a chdkptp equivalent shell
INIT_SCRIPT = '''
require('lfs')
require('chdkptp')
require('lbuf')
require('rawimg')
require('io')
require('os')
util=require('util')
util:import()
ustime=require('ustime')
fsutil=require('fsutil')
prefs=require('prefs')
chdku=require('chdku')
cli=require('cli')
exp=require('exposure')
dng=require('dng')
dngcli=require('dngcli')
--set debug
prefs._add('core_verbose','number','ptp core verbosity',0,
function(self)
return corevar.get_verbose()
end,
function(self,val)
corevar.set_verbose(val)
end
)
prefs._set('core_verbose', 2)
prefs._set('cli_verbose', 2)
--global connection variable
con=chdku.connection()
dngcli.init_cli()
'''
class CHDKPTPException(Exception):
pass
class CHDKCameraDevice(DevicePlugin):
""" Plugin for digital cameras running the CHDK firmware.
"""
features = (DeviceFeatures.PREVIEW, DeviceFeatures.IS_CAMERA)
target_page = None
_cli_flags = None
_chdk_buildnum = None
_can_remote = False
_zoom_steps = 0
MAX_RESOLUTION = 0
MAX_QUALITY = 0
@classmethod
def configuration_template(cls):
conf = super(CHDKCameraDevice, cls).configuration_template()
conf.update(
{'sensitivity': OptionTemplate(80, "The ISO sensitivity value"),
'shutter_speed': OptionTemplate(
u"1/25", "The shutter speed as a fraction"),
'zoom_level': OptionTemplate(3, "The default zoom level"),
'dpi': OptionTemplate(300, "The capturing resolution"),
'shoot_raw': OptionTemplate(False, "Shoot in RAW format (DNG)",
advanced=True),
'focus_distance': OptionTemplate(0, "Set focus distance"),
'monochrome': OptionTemplate(
False, "Shoot in monochrome mode (reduces file size)"),
'wb_mode': OptionTemplate(value=sorted(WHITEBALANCE_MODES),
docstring='White balance mode',
selectable=True, advanced=True),
'chdkptp_path': OptionTemplate(u"/usr/local/lib/chdkptp",
"Path to CHDKPTP binary/libraries",
advanced=True),
})
return conf
@classmethod
def yield_devices(cls, config):
""" Search for usable devices, yield one at a time
:param config: spreads configuration
:type config: spreads.config.ConfigView
"""
SPECIAL_CASES = {
# (idVendor, idProduct): SpecialClass
(0x4a9, 0x31ef): QualityFix, # not r47, but has the same bug
(0x4a9, 0x3218): QualityFix,
(0x4a9, 0x3223): QualityFix,
(0x4a9, 0x3224): QualityFix,
(0x4a9, 0x3225): QualityFix,
(0x4a9, 0x3226): QualityFix,
(0x4a9, 0x3227): QualityFix,
(0x4a9, 0x3228): QualityFix,
(0x4a9, 0x3229): QualityFix,
(0x4a9, 0x322a): A2200,
(0x4a9, 0x322b): QualityFix,
(0x4a9, 0x322c): QualityFix,
}
for dev in usb.core.find(find_all=True):
cfg = dev.get_active_configuration()[(0, 0)]
ids = (dev.idVendor, dev.idProduct)
is_ptp = (hex(cfg.bInterfaceClass) == "0x6"
and hex(cfg.bInterfaceSubClass) == "0x1")
if not is_ptp:
continue
if ids in SPECIAL_CASES:
yield SPECIAL_CASES[ids](config, dev)
else:
yield cls(config, dev)
def __init__(self, config, device):
""" Set connection information and try to obtain target page.
:param config: spreads configuration
:type config: spreads.config.ConfigView
:param device: USB device to use for the object
:type device: `usb.core.Device <http://github.com/walac/pyusb>`_
"""
super(CHDKCameraDevice, self).__init__(config, device)
chdkptp_path = Path(self.config["chdkptp_path"].get(unicode))
os.environ['LUA_PATH'] = unicode(chdkptp_path / "lua" / "?.lua")
self._lua = lupa.LuaRuntime(unpack_returned_tuples=True)
#run init script
self._lua.execute(INIT_SCRIPTW)
self.logger = logging.getLogger('ChdkCamera')
self._usbport = (device.bus, device.address)
self._serial_number = (usb.util.get_string(device, 256, device.iSerialNumber)
.strip('\x00'))
self.logger.debug("Device has serial number {0}"
.format(self._serial_number))
#connect to camera
self._execute_lua_camera("connect -d={1:03} -b={0:03}".format(*self._usbport))
self._chdk_buildnum = self._execute_lua_camera("get_buildinfo()",
get_result=True)["build_revision"]
# PTP remote shooting is available starting from SVN r2927
self._can_remote = self._chdk_buildnum >= 2927
self._zoom_steps = self._execute_lua_camera("get_zoom_steps()",
get_result=True)
try:
self.target_page = self._get_target_page()
except ValueError:
self.target_page = None
# Set camera to highest quality
self._execute_lua_camera('exit_alt(); set_config_value(291, 0);'
'enter_alt();')
self.logger = logging.getLogger('ChdkCamera[{0}]'
.format(self.target_page))
def connected(self):
"""connected is taken as the chdkptp-style connected,
i.e. there is an active PTP connection to the camera
"""
def match_serial(dev):
serial = (
usb.util.get_string(dev, 256, dev.iSerialNumber)
.strip('\x00'))
return serial == self._serial_number
# Check if device is still attached
unchanged = usb.core.find(bus=self._usbport[0],
address=self._usbport[1],
custom_match=match_serial) is not None
if unchanged:
return True
new_device = usb.core.find(idVendor=0x04a9, # Canon vendor ID
custom_match=match_serial)
if new_device is None:
return False
self._usbport = (new_device.bus, new_device.address)
self._execute_lua_camera("connect -d={1:03} -b={0:03}".format(*self._usbport))
return True
def set_target_page(self, target_page):
""" Set the device target page.
:param target_page: The target page name
:type target_page: unicode in (u"odd", u"even")
"""
if not target_page in (u"odd", u"even"):
raise ValueError('Target page must be u"odd" or u"even"')
tmp_handle = tempfile.mkstemp(text=True)
os.write(tmp_handle[0], "print({0})".format(target_page.upper()))
self._execute_lua_local("upload {0} \"OWN.LUA\"".format(tmp_handle[1]))
self.target_page = target_page
os.remove(tmp_handle[1])
def prepare_capture(self, path):
shoot_monochrome = self.config['monochrome'].get(bool)
# Try to go into alt mode to prevent weird behaviour
self._execute_lua_camera("enter_alt()")
# Try to put into record mode
try:
self._execute_lua_local("rec")
except CHDKPTPException as e:
self.logger.debug(e)
self.logger.info("Camera already seems to be in recording mode")
self._set_zoom(int(self.config['zoom_level'].get()))
# Disable ND filter
self._execute_lua_camera("set_nd_filter(2)")
self._set_focus()
if shoot_monochrome:
rv = self._execute_lua_camera(
"capmode = require(\"capmode\")\n"
"return capmode.set(\"SCN_MONOCHROME\")",
get_result=True
)
if not rv:
self.logger.warn("Monochrome mode not supported on this "
"device, will be disabled.")
# Set White Balance mode
self._set_wb()
# Disable flash
self._execute_lua_camera(
"props = require(\"propcase\")\n"
"if(get_flash_mode()~=2) then set_prop(props.FLASH_MODE, 2) end")
# Set Quality
self._execute_lua_camera("set_prop(require('propcase').QUALITY, {0})"
.format(self.MAX_QUALITY))
self._execute_lua_camera("set_prop(require('propcase').RESOLUTION, {0})"
.format(self.MAX_RESOLUTION))
def finish_capture(self):
# Switch camera back to play mode.
# This will retract the lens and protect it from dust.
self._execute_lua_local("play")
def get_preview_image(self):
fpath = tempfile.mkstemp()[1]
cmd = "dumpframes -count=1 -nobm -nopal"
self._execute_lua_local("{0} {1}".format(cmd, fpath))
with open(fpath, 'rb') as fp:
data = fp.read()
os.remove(fpath)
return data
def capture(self, path):
# NOTE: To obtain the "real" Canon ISO value, we multiply the
# "market" value from the config by 0.65.
# See core/shooting.c#~l150 in the CHDK source for more details
sensitivity = int(self.config["sensitivity"].get())
shutter_speed = float(Fraction(self.config["shutter_speed"]
.get(unicode)))
shoot_raw = self.config['shoot_raw'].get(bool)
if self._can_remote:
cmd = ("remoteshoot -tv={0} -sv={1} {2} \"{3}\""
.format(shutter_speed, sensitivity * 0.65,
"-dng" if shoot_raw else "", path))
else:
cmd = ("shoot -tv={0} -sv={1} -dng={2} -rm -dl \"{3}\""
.format(shutter_speed, sensitivity * 0.65,
int(shoot_raw), path))
try:
self._execute_lua_local(cmd)
except CHDKPTPException as e:
if 'not in rec mode' in e.message:
self.prepare_capture(None)
self.capture(path)
else:
self.logger.warn("Capture command failed.")
raise e
extension = 'dng' if shoot_raw else 'jpg'
local_path = "{0}.{1}".format(path, extension)
# Set EXIF orientation
self.logger.debug("Setting EXIF orientation on captured image")
img = JPEGImage(local_path)
if self.target_page == 'odd':
img.exif_orientation = 6 # -90°
else:
img.exif_orientation = 8 # 90°
img.save(local_path)
def show_textbox(self, message):
messages = message.split("\n")
script = [
'screen_width = get_gui_screen_width();',
'screen_height = get_gui_screen_height();',
'draw_rect_filled(0, 0, screen_width, screen_height, 256, 256);'
]
script.extend(
['draw_string(0, 0+(screen_height/10)*{0}, "{1}", 258, 256);'
.format(idx, msg) for idx, msg in enumerate(messages, 1)]
)
self._execute_lua_camera("\n".join(script), wait=False, get_result=False)
def _execute_lua_local(self, command):
"""execute lua command just as if we were in a chdkptp shell"""
command = "cli:execute([[{0}]])".format(command)
self.logger.debug("Input Lua: {0}".format(command))
#status = True if okay, False else
status, output = self._lua.execute(command)
self.logger.debug("Lua returned: status {0}\n{1}".format(status, output))
# Filter out connected message (why?)
output = [x for x in output if not x.startswith('connected:')]
# Check for possible CHDKPTP errors
if not status:
raise CHDKPTPException("\n".join(output))
return output
def _execute_lua_camera(self, script, wait=True, get_result=False, timeout=256):
"""execute lua commands on the camera"""
if get_result and not "return" in script:
script = "return({0})".format(script)
cmd = "luar" if wait else "lua"
output = self._execute_lua_local("[[{0} {1}]]".format(cmd, script))
if not get_result:
return
#why filter?
output = [x for x in output if x.find(":return:")][0]
return output
def _get_target_page(self):
"""determine if the camera shoots odd or even pages
'odd' are ALWAYS the right pages, 'even' left ones
"""
try:
target_page = self._execute_lua_camera('loadfile("OWN.LUA")()', get_result=True)
except DeviceException:
raise ValueError("Could not find OWN.LUA")
return target_page if target_page else None
def _set_zoom(self, level):
if level >= self._zoom_steps:
raise ValueError("Zoom level {0} exceeds the camera's range!"
" (max: {1})".format(level, self._zoom_steps - 1))
self._execute_lua_camera("set_zoom({0})".format(level), wait=True)
def _acquire_focus(self):
""" Acquire auto focus and lock it. """
self._execute_lua_camera("enter_alt()")
# Try to put into record mode
try:
self._execute_lua_local("rec")
except CHDKPTPException as e:
self.logger.debug(e)
self.logger.info("Camera already seems to be in recording mode")
self._set_zoom(int(self.config['zoom_level'].get()))
self._execute_lua_camera("set_aflock(0)")
self._execute_lua_camera("press('shoot_half')")
time.sleep(0.8)
self._execute_lua_camera("release('shoot_half')")
time.sleep(0.5)
return self._execute_lua_camera("get_focus()", get_result=True)
def _set_focus(self):
focus_distance = int(self.config['focus_distance'].get())
self._execute_lua_camera("set_aflock(0)")
if focus_distance == 0:
return
self._execute_lua_camera("set_focus({0:.0f})".format(focus_distance))
time.sleep(0.5)
self._execute_lua_camera("press('shoot_half')")
time.sleep(0.25)
self._execute_lua_camera("release('shoot_half')")
time.sleep(0.25)
self._execute_lua_camera("set_aflock(1)")
def _set_wb(self):
value = WHITEBALANCE_MODES.get(self.config['wb_mode'].get())
self._execute_lua_camera("set_prop(require('propcase').WB_MODE, {0})"
.format(value))
class A2200(CHDKCameraDevice):
""" Canon A2200 driver.
Works around some quirks of that CHDK port.
"""
MAX_RESOLUTION = 0
MAX_QUALITY = 1
def __init__(self, config, device):
super(A2200, self).__init__(config, device)
if self.target_page is not None:
self.logger = logging.getLogger(
'A2200Device[{0}]'.format(self.target_page))
else:
self.logger = logging.getLogger('A2200Device')
def finish_capture(self):
# Putting the device back into play mode crashes the a2200 with
# chdk 1.3, this is why we stub it out here.
pass
def _set_zoom(self, level):
""" Set zoom level.
The A2200 currently has a bug, where setting the zoom level
directly via set_zoom crashes the camera quite frequently, so
we work around that by simulating button presses.
:param level: The zoom level to be used
:type level: int
"""
if level >= self._zoom_steps:
raise ValueError(
"Zoom level {0} exceeds the camera's range!"
" (max: {1})".format(level, self._zoom_steps - 1))
zoom = self._execute_lua_camera("get_zoom()", get_result=True)
if zoom < level:
self._execute_lua_camera("while(get_zoom()<{0}) do click(\"zoom_in\") end"
.format(level + 1),
wait=True)
elif zoom > level:
self._execute_lua_camera("while(get_zoom()>{0}) "
"do click(\"zoom_out\") end".format(level + 1),
wait=True)
class QualityFix(CHDKCameraDevice):
""" Fixes a bug that prevents remote capture with the highest resolution
and quality from succeeding. See this CHDK forum post for more details:
http://chdk.setepontos.com/index.php?topic=4338.msg111318#msg111318
"""
MAX_RESOLUTION = 0
MAX_QUALITY = 1
def __init__(self, config, device):
super(QualityFix, self).__init__(config, device)
if self.target_page is not None:
self.logger = logging.getLogger(
'QualityFixDevice[{0}]'.format(self.target_page))
else:
self.logger = logging.getLogger('QualityFixDevice')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment