Skip to content

Instantly share code, notes, and snippets.

@DaAwesomeP
Last active May 19, 2019 17:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DaAwesomeP/a7b26955f19c26c58ee0bf0fc872cb59 to your computer and use it in GitHub Desktop.
Save DaAwesomeP/a7b26955f19c26c58ee0bf0fc872cb59 to your computer and use it in GitHub Desktop.
HA Component
"""Support for the Arduino-compatible Microcontrollers through Firmata"""
import asyncio
import ipaddress
import logging
import voluptuous as vol
from homeassistant.const import CONF_HOST
from homeassistant.helpers import (config_validation as cv, device_registry as dr)
from homeassistant.helpers.discovery import async_load_platform
from .const import DOMAIN, CONF_NAME, CONF_PORT, CONF_HANDSHAKE, CONF_SERIAL_PORT
from .board import FirmataBoard
_LOGGER = logging.getLogger(__name__)
CONF_BOARDS = "boards"
DATA_CONFIGS = 'board_configs'
BOARD_CONFIG_SCHEMA = vol.Schema(vol.All(
{
# Validate as IP address and then convert back to a string.
vol.Required(CONF_NAME): cv.string,
vol.Exclusive(CONF_HOST, 'connect_location'): vol.All(ipaddress.ip_address, cv.string),
vol.Optional(CONF_PORT): cv.port,
vol.Optional(CONF_HANDSHAKE): cv.string,
vol.Exclusive(CONF_SERIAL_PORT, 'connect_location'): cv.string,
},
{
vol.Required(CONF_NAME): cv.string,
# Require either a serial port or a host/port
vol.Required(vol.Any(CONF_HOST, CONF_SERIAL_PORT)): cv.string,
vol.Optional(CONF_PORT): cv.port,
vol.Optional(CONF_HANDSHAKE): cv.string,
}
), required=True, extra=vol.ALLOW_EXTRA)
CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.Schema({
vol.Optional(CONF_BOARDS):
vol.All(cv.ensure_list, [BOARD_CONFIG_SCHEMA]),
}),
}, extra=vol.ALLOW_EXTRA)
@asyncio.coroutine
async def async_setup(hass, config):
"""Set up the Firmata platform."""
conf = config.get(DOMAIN)
if conf is None:
return True
elif CONF_BOARDS not in conf:
return True
hass.data[DOMAIN] = {}
boards = conf[CONF_BOARDS]
for board_conf in boards:
name = board_conf[CONF_NAME]
conf_data = {
'name': board_conf[CONF_NAME]
}
if CONF_HOST in board_conf:
conf_data['ip_address'] = board_conf[CONF_HOST]
if CONF_PORT in board_conf:
conf_data['ip_port'] = board_conf[CONF_PORT]
if CONF_HANDSHAKE in board_conf:
conf_data['ip_handshake'] = board_conf[CONF_HANDSHAKE]
else:
conf_data['com_port'] = board_conf[CONF_SERIAL_PORT]
board = FirmataBoard(hass, conf_data)
if not await board.async_setup():
return False
hass.data[DOMAIN][name] = board
hass.async_create_task(async_load_platform(hass, 'switch', DOMAIN, {}, config))
return True
"""Code to handle a Firmata board"""
import asyncio
import copy
import logging
_LOGGER = logging.getLogger(__name__)
from pymata_aio.pymata_core import PymataCore
#from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.const import DEVICE_DEFAULT_NAME
#from homeassistant.helpers import aiohttp_client, config_validation as cv
from homeassistant.helpers.entity import Entity
#from .const import DOMAIN, LOGGER, CONF_NAME, CONF_PORT, CONF_HANDSHAKE, CONF_SERIAL_PORT
#from .errors import AuthenticationRequired, CannotConnect
class FirmataBoard:
"""Manages a single Firmata board"""
def __init__(self, hass, config):
"""Initialize the system."""
self.config = config
self.hass = hass
self.available = True
self.api = None
@property
def name(self):
"""Return the name of this board"""
return self.config['name']
async def async_setup(self, tries=0):
"""Set up a Firmata insrance based on parameters"""
name = self.name
hass = self.hass
try:
_LOGGER.info('Attempting Firmata connection')
self.api = await get_board(hass, self.config)
except RuntimeError as err:
_LOGGER.error('Error connecting with PyMata board: %s', repr(err))
return False
_LOGGER.info('Firmata connection successful')
return True
async def async_reset(self):
"""Reset this board to default state.
Will cancel any scheduled setup retry and will unload
the config entry.
"""
# If the board was never setup.
if self.api is None:
return True
await self.api.shutdown()
#return False not in results
return True
async def get_board(hass, data):
"""Create a board object"""
boardData = copy.copy(data)
boardData.pop('name')
boardData['port_discovery_exceptions'] = True
board = PymataCore(**boardData)
await board.start_aio()
return board
class FirmataBoardPin(Entity):
async def __init__(self, name, board, pin, mode, **kwargs):
self._name = name or DEVICE_DEFAULT_NAME
self._type = type
self._pin = pin
self._mode = mode
self._board = hass.data[DOMAIN][board]
self._kwargs = kwargs
await setup_pin()
async def setup_pin(self):
self._mode = None
pass
@property
def should_poll(self):
"""No polling needed."""
return False
"""Constants for the Firmata component."""
import logging
LOGGER = logging.getLogger('.')
DOMAIN = "firmata"
PLATFORM_NAME = DOMAIN
SWITCH_DEFAULT_NAME = "DigitalOut"
CONF_NAME = "name"
CONF_PORT = "port"
CONF_HANDSHAKE = "handshake"
CONF_SERIAL_PORT = "serialport"
CONF_PINS = "pins"
CONF_PIN = "pin"
CONF_TYPE = "type"
CONF_DIGITAL_PULLUP = "pullup"
{
"domain": "firmata",
"name": "Firmata",
"documentation": "https://www.home-assistant.io/components/firmata",
"requirements": [
"pymata-aio==2.30"
],
"dependencies": [],
"codeowners": [
"@DaAwesomeP",
"@balloob",
"@kane610"
]
}
"""Support for Firmata output."""
import asyncio
from homeassistant.components.switch import PLATFORM_SCHEMA, SwitchDevice
import voluptuous as vol
#from homeassistant.const import CONF_SWITCHES
import logging
from homeassistant.helpers import (config_validation as cv, device_registry as dr)
from pymata_aio.constants import PymataConstants
from .const import DOMAIN, PLATFORM_NAME, SWITCH_DEFAULT_NAME, CONF_PINS, CONF_DIGITAL_PULLUP
from .board import FirmataBoardPin
DEFAULT_NAME = SWITCH_DEFAULT_NAME
_LOGGER = logging.getLogger(__name__)
SWITCH_SCHEMA = vol.Schema({
vol.Required(CONF_TYPE): cv.string,
vol.Required(CONF_PIN): cv.positive_int,
vol.Required(CONF_NAME): cv.string
})
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_PINS): vol.Schema([SWITCH_SCHEMA])
})
# See if file is actually being run
_LOGGER.error('OOOH LOOK A SWITCH!')
print('TESTESTTEST')
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
_LOGGER.error('OOOH LOOK A SWITCH!2')
switches = config.get(CONF_PINS)
_LOGGER.info(switches.items())
class FirmataDigitalOut(FirmataBoardPin, SwitchDevice):
"""Representation of a Firmata Digital Output Pin."""
async def setup_pin(self):
self._mode = PymataConstants.OUTPUT
#if CONF_DIGITAL_PULLUP is in self._kwargs:
# if self._kwargs[CONF_DIGITAL_PULLUP]:
# self._mode = PymataConstants.PULLUP
if CONF_INITIAL_STATE in self._kwargs:
self.initial = self._kwargs[CONF_INITIAL_STATE]
await self._board.api.set_pin_mode(self._pin, self._mode)
await self._board.api.digital_pin_write(self._pin, self.intial)
@property
async def is_on(self):
"""Return true if switch is on."""
return await self._board.api.get_pin_state(self._pin)
async def async_turn_on(self, **kwargs):
"""Turn on switch."""
await self._board.api.digital_pin_write(self._pin, True)
async def async_turn_off(self, **kwargs):
"""Turn off switch."""
await self._board.api.digital_pin_write(self._pin, False)
@andrewsayre
Copy link

andrewsayre commented May 19, 2019

Couple of thoughts:

  1. Make sure this is contained within a folder that matches the domain.
  2. Remove the @asyncio.coroutine from async_setup - it's already a coroutine function (async def)
  3. Don't call async_load_platform within the loop. Call it once after setting up all of the boards.
  4. You have a few conditions where async_setup will return True without setting up the boards, which should not be necessary if the schema is working correctly. Can you confirm it's making it past those checks and actually setting up a board and calling load platform?

@andrewsayre
Copy link

andrewsayre commented May 19, 2019

Also - remove PLATFORM_SCHEMA from switch. Your config should be at the component level. PLATFORM_SCHEMA is for a platform-only integration (where there's no component)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment