-
-
Save justmobilize/f0ee9668d7aab6d91524b3ab53953d2c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# SPDX-FileCopyrightText: Copyright (c) 2024 Justin Myers | |
# | |
# SPDX-License-Identifier: MIT | |
import gc | |
print(f"load: {gc.mem_free()}") | |
import binascii | |
import os | |
import wifi | |
import adafruit_connection_manager | |
import adafruit_requests | |
from adafruit_io.adafruit_io import IO_HTTP, AdafruitIO_RequestError, validate_feed_key | |
print(f"imports: {gc.mem_free()}") | |
class Base64Stream: | |
"""A file wrapper that retuns base64 encoded data when read.""" | |
def __init__(self, file_handle): | |
self._file_handle = file_handle | |
self._read_buffer = b"" | |
def seek(self, target, whence=0): | |
"""Change the stream position to the given offset. | |
Behaviour depends on the whence parameter.""" | |
if target == whence == 0: | |
# move to start | |
self._read_buffer = b"" | |
elif target == 0 and whence == 2: | |
# move to end | |
pass | |
else: | |
raise ValueError("Seeking to the middle of a file is not supported.") | |
return self._file_handle.seek(target, whence) | |
def tell(self): | |
"""Return the current stream position as an opaque number""" | |
pos = self._file_handle.tell() | |
pos = pos * 4 / 3 | |
if pos % 4: | |
pos += 4 - pos % 4 | |
return int(pos) | |
def read(self, size=-1): | |
"""Read and return at most size characters from the stream as a single str. | |
If size is negative or None, reads until EOF.""" | |
encoded_data = b"" | |
if size == 0: | |
return encoded_data | |
if size in (-1, None): | |
read_size = -1 | |
else: | |
if size < 4: | |
raise ValueError( | |
"To correctly encode, you must request at lease 4 bytes." | |
) | |
read_size = int((size - len(self._read_buffer)) * 3 / 4) | |
data = self._read_buffer | |
data += self._file_handle.read(read_size) | |
len_data = len(data) | |
if len_data == 0: | |
# no data return b"" | |
return encoded_data | |
if len_data == len(self._read_buffer): | |
# no new data, clear out the _read_buffer and use | |
self._read_buffer = b"" | |
elif len_data <= 3: | |
# if we have less then 4 bytes, clear out the _read_buffer and use | |
self._read_buffer = b"" | |
else: | |
# calculate the extra bytes (we don't want to encode multiples that aren't of 3) | |
extra_data = -1 * (len_data % 3) | |
if extra_data != 0: | |
self._read_buffer = data[extra_data:] | |
data = data[:extra_data] | |
else: | |
self._read_buffer = b"" | |
encoded_data = binascii.b2a_base64(data).strip() | |
return encoded_data | |
def readinto(self, buffer): | |
"""Read and fill the supplied buffer with at most size characters from the stream.""" | |
size = len(buffer) | |
data = self.read(size) | |
read_size = len(data) | |
buffer[0:read_size] = data | |
return read_size | |
class IO_HTTP_FILE(IO_HTTP): | |
def send_file(self, webhook, file_handle): | |
path = f'{webhook["hook_url"]}/raw' | |
wrapped_file_handle=Base64Stream(file_handle) | |
with self._http.post(path, data=wrapped_file_handle) as response: | |
self._handle_error(response) | |
def create_new_feed(self, feed_key, feed_desc=None, feed_license=None, history=True): | |
validate_feed_key(feed_key) | |
path = self._compose_path("feeds") | |
payload = {"name": feed_key, "description": feed_desc, "license": feed_license, "history": history} | |
return self._post(path, payload) | |
def get_webhook(self, feed_key): | |
validate_feed_key(feed_key) | |
path = self._compose_path("feeds/{0}/webhooks".format(feed_key)) | |
return self._get(path) | |
def create_new_webhook(self, feed_key): | |
validate_feed_key(feed_key) | |
path = self._compose_path("feeds/{0}/webhooks".format(feed_key)) | |
payload = {"webhook": {"data_rate_limit": ""}} | |
return self._post(path, payload) | |
print(f"Custom code: {gc.mem_free()}") | |
aio_username = os.getenv("AIO_USERNAME") | |
aio_key = os.getenv("AIO_KEY") | |
wifi.radio.connect(os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv("CIRCUITPY_WIFI_PASSWORD")) | |
pool = adafruit_connection_manager.get_radio_socketpool(wifi.radio) | |
ssl_context = adafruit_connection_manager.get_radio_ssl_context(wifi.radio) | |
requests = adafruit_requests.Session(pool, ssl_context) | |
io = IO_HTTP_FILE(aio_username, aio_key, requests) | |
print(f"io init: {gc.mem_free()}") | |
try: | |
feed_camera = io.create_new_feed("birdfeeder", history=False) | |
except AdafruitIO_RequestError: | |
# feed already exists. | |
pass | |
webhook_camera = io.get_webhook("birdfeeder") | |
if webhook_camera: | |
webhook_camera = webhook_camera[0] | |
else: | |
webhook_camera = io.create_new_webhook("birdfeeder") | |
print(f"feed validation: {gc.mem_free()}") | |
with open("bird.jpg", "rb") as file_handle: | |
r = io.send_file(webhook_camera, file_handle) | |
print(f"sent image: {gc.mem_free()}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment