-
-
Save DaAwesomeP/a7b26955f19c26c58ee0bf0fc872cb59 to your computer and use it in GitHub Desktop.
HA Component
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Support for the Arduino-compatible Microcontrollers through Firmata""" | |
import asyncio | |
import ipaddress | |
import logging | |
import voluptuous as vol | |
from homeassistant.const import CONF_HOST | |
from homeassistant.helpers import (config_validation as cv, device_registry as dr) | |
from homeassistant.helpers.discovery import async_load_platform | |
from .const import DOMAIN, CONF_NAME, CONF_PORT, CONF_HANDSHAKE, CONF_SERIAL_PORT | |
from .board import FirmataBoard | |
_LOGGER = logging.getLogger(__name__) | |
CONF_BOARDS = "boards" | |
DATA_CONFIGS = 'board_configs' | |
BOARD_CONFIG_SCHEMA = vol.Schema(vol.All( | |
{ | |
# Validate as IP address and then convert back to a string. | |
vol.Required(CONF_NAME): cv.string, | |
vol.Exclusive(CONF_HOST, 'connect_location'): vol.All(ipaddress.ip_address, cv.string), | |
vol.Optional(CONF_PORT): cv.port, | |
vol.Optional(CONF_HANDSHAKE): cv.string, | |
vol.Exclusive(CONF_SERIAL_PORT, 'connect_location'): cv.string, | |
}, | |
{ | |
vol.Required(CONF_NAME): cv.string, | |
# Require either a serial port or a host/port | |
vol.Required(vol.Any(CONF_HOST, CONF_SERIAL_PORT)): cv.string, | |
vol.Optional(CONF_PORT): cv.port, | |
vol.Optional(CONF_HANDSHAKE): cv.string, | |
} | |
), required=True, extra=vol.ALLOW_EXTRA) | |
CONFIG_SCHEMA = vol.Schema({ | |
DOMAIN: vol.Schema({ | |
vol.Optional(CONF_BOARDS): | |
vol.All(cv.ensure_list, [BOARD_CONFIG_SCHEMA]), | |
}), | |
}, extra=vol.ALLOW_EXTRA) | |
@asyncio.coroutine | |
async def async_setup(hass, config): | |
"""Set up the Firmata platform.""" | |
conf = config.get(DOMAIN) | |
if conf is None: | |
return True | |
elif CONF_BOARDS not in conf: | |
return True | |
hass.data[DOMAIN] = {} | |
boards = conf[CONF_BOARDS] | |
for board_conf in boards: | |
name = board_conf[CONF_NAME] | |
conf_data = { | |
'name': board_conf[CONF_NAME] | |
} | |
if CONF_HOST in board_conf: | |
conf_data['ip_address'] = board_conf[CONF_HOST] | |
if CONF_PORT in board_conf: | |
conf_data['ip_port'] = board_conf[CONF_PORT] | |
if CONF_HANDSHAKE in board_conf: | |
conf_data['ip_handshake'] = board_conf[CONF_HANDSHAKE] | |
else: | |
conf_data['com_port'] = board_conf[CONF_SERIAL_PORT] | |
board = FirmataBoard(hass, conf_data) | |
if not await board.async_setup(): | |
return False | |
hass.data[DOMAIN][name] = board | |
hass.async_create_task(async_load_platform(hass, 'switch', DOMAIN, {}, config)) | |
return True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Code to handle a Firmata board""" | |
import asyncio | |
import copy | |
import logging | |
_LOGGER = logging.getLogger(__name__) | |
from pymata_aio.pymata_core import PymataCore | |
#from homeassistant.exceptions import ConfigEntryNotReady | |
from homeassistant.const import DEVICE_DEFAULT_NAME | |
#from homeassistant.helpers import aiohttp_client, config_validation as cv | |
from homeassistant.helpers.entity import Entity | |
#from .const import DOMAIN, LOGGER, CONF_NAME, CONF_PORT, CONF_HANDSHAKE, CONF_SERIAL_PORT | |
#from .errors import AuthenticationRequired, CannotConnect | |
class FirmataBoard: | |
"""Manages a single Firmata board""" | |
def __init__(self, hass, config): | |
"""Initialize the system.""" | |
self.config = config | |
self.hass = hass | |
self.available = True | |
self.api = None | |
@property | |
def name(self): | |
"""Return the name of this board""" | |
return self.config['name'] | |
async def async_setup(self, tries=0): | |
"""Set up a Firmata insrance based on parameters""" | |
name = self.name | |
hass = self.hass | |
try: | |
_LOGGER.info('Attempting Firmata connection') | |
self.api = await get_board(hass, self.config) | |
except RuntimeError as err: | |
_LOGGER.error('Error connecting with PyMata board: %s', repr(err)) | |
return False | |
_LOGGER.info('Firmata connection successful') | |
return True | |
async def async_reset(self): | |
"""Reset this board to default state. | |
Will cancel any scheduled setup retry and will unload | |
the config entry. | |
""" | |
# If the board was never setup. | |
if self.api is None: | |
return True | |
await self.api.shutdown() | |
#return False not in results | |
return True | |
async def get_board(hass, data): | |
"""Create a board object""" | |
boardData = copy.copy(data) | |
boardData.pop('name') | |
boardData['port_discovery_exceptions'] = True | |
board = PymataCore(**boardData) | |
await board.start_aio() | |
return board | |
class FirmataBoardPin(Entity): | |
async def __init__(self, name, board, pin, mode, **kwargs): | |
self._name = name or DEVICE_DEFAULT_NAME | |
self._type = type | |
self._pin = pin | |
self._mode = mode | |
self._board = hass.data[DOMAIN][board] | |
self._kwargs = kwargs | |
await setup_pin() | |
async def setup_pin(self): | |
self._mode = None | |
pass | |
@property | |
def should_poll(self): | |
"""No polling needed.""" | |
return False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Constants for the Firmata component.""" | |
import logging | |
LOGGER = logging.getLogger('.') | |
DOMAIN = "firmata" | |
PLATFORM_NAME = DOMAIN | |
SWITCH_DEFAULT_NAME = "DigitalOut" | |
CONF_NAME = "name" | |
CONF_PORT = "port" | |
CONF_HANDSHAKE = "handshake" | |
CONF_SERIAL_PORT = "serialport" | |
CONF_PINS = "pins" | |
CONF_PIN = "pin" | |
CONF_TYPE = "type" | |
CONF_DIGITAL_PULLUP = "pullup" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"domain": "firmata", | |
"name": "Firmata", | |
"documentation": "https://www.home-assistant.io/components/firmata", | |
"requirements": [ | |
"pymata-aio==2.30" | |
], | |
"dependencies": [], | |
"codeowners": [ | |
"@DaAwesomeP", | |
"@balloob", | |
"@kane610" | |
] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Support for Firmata output.""" | |
import asyncio | |
from homeassistant.components.switch import PLATFORM_SCHEMA, SwitchDevice | |
import voluptuous as vol | |
#from homeassistant.const import CONF_SWITCHES | |
import logging | |
from homeassistant.helpers import (config_validation as cv, device_registry as dr) | |
from pymata_aio.constants import PymataConstants | |
from .const import DOMAIN, PLATFORM_NAME, SWITCH_DEFAULT_NAME, CONF_PINS, CONF_DIGITAL_PULLUP | |
from .board import FirmataBoardPin | |
DEFAULT_NAME = SWITCH_DEFAULT_NAME | |
_LOGGER = logging.getLogger(__name__) | |
SWITCH_SCHEMA = vol.Schema({ | |
vol.Required(CONF_TYPE): cv.string, | |
vol.Required(CONF_PIN): cv.positive_int, | |
vol.Required(CONF_NAME): cv.string | |
}) | |
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ | |
vol.Required(CONF_PINS): vol.Schema([SWITCH_SCHEMA]) | |
}) | |
# See if file is actually being run | |
_LOGGER.error('OOOH LOOK A SWITCH!') | |
print('TESTESTTEST') | |
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None): | |
_LOGGER.error('OOOH LOOK A SWITCH!2') | |
switches = config.get(CONF_PINS) | |
_LOGGER.info(switches.items()) | |
class FirmataDigitalOut(FirmataBoardPin, SwitchDevice): | |
"""Representation of a Firmata Digital Output Pin.""" | |
async def setup_pin(self): | |
self._mode = PymataConstants.OUTPUT | |
#if CONF_DIGITAL_PULLUP is in self._kwargs: | |
# if self._kwargs[CONF_DIGITAL_PULLUP]: | |
# self._mode = PymataConstants.PULLUP | |
if CONF_INITIAL_STATE in self._kwargs: | |
self.initial = self._kwargs[CONF_INITIAL_STATE] | |
await self._board.api.set_pin_mode(self._pin, self._mode) | |
await self._board.api.digital_pin_write(self._pin, self.intial) | |
@property | |
async def is_on(self): | |
"""Return true if switch is on.""" | |
return await self._board.api.get_pin_state(self._pin) | |
async def async_turn_on(self, **kwargs): | |
"""Turn on switch.""" | |
await self._board.api.digital_pin_write(self._pin, True) | |
async def async_turn_off(self, **kwargs): | |
"""Turn off switch.""" | |
await self._board.api.digital_pin_write(self._pin, False) |
Also - remove PLATFORM_SCHEMA
from switch. Your config should be at the component level. PLATFORM_SCHEMA
is for a platform-only integration (where there's no component)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Couple of thoughts:
@asyncio.coroutine
fromasync_setup
- it's already a coroutine function (async def
)async_load_platform
within the loop. Call it once after setting up all of the boards.async_setup
will returnTrue
without setting up the boards, which should not be necessary if the schema is working correctly. Can you confirm it's making it past those checks and actually setting up a board and calling load platform?