Skip to content

Instantly share code, notes, and snippets.

@justmobilize
Last active June 20, 2024 00:39
Show Gist options
  • Save justmobilize/f0ee9668d7aab6d91524b3ab53953d2c to your computer and use it in GitHub Desktop.
Save justmobilize/f0ee9668d7aab6d91524b3ab53953d2c to your computer and use it in GitHub Desktop.
# SPDX-FileCopyrightText: Copyright (c) 2024 Justin Myers
#
# SPDX-License-Identifier: MIT
import gc
print(f"load: {gc.mem_free()}")
import binascii
import os
import wifi
import adafruit_connection_manager
import adafruit_requests
from adafruit_io.adafruit_io import IO_HTTP, AdafruitIO_RequestError, validate_feed_key
print(f"imports: {gc.mem_free()}")
class Base64Stream:
"""A file wrapper that retuns base64 encoded data when read."""
def __init__(self, file_handle):
self._file_handle = file_handle
self._read_buffer = b""
def seek(self, target, whence=0):
"""Change the stream position to the given offset.
Behaviour depends on the whence parameter."""
if target == whence == 0:
# move to start
self._read_buffer = b""
elif target == 0 and whence == 2:
# move to end
pass
else:
raise ValueError("Seeking to the middle of a file is not supported.")
return self._file_handle.seek(target, whence)
def tell(self):
"""Return the current stream position as an opaque number"""
pos = self._file_handle.tell()
pos = pos * 4 / 3
if pos % 4:
pos += 4 - pos % 4
return int(pos)
def read(self, size=-1):
"""Read and return at most size characters from the stream as a single str.
If size is negative or None, reads until EOF."""
encoded_data = b""
if size == 0:
return encoded_data
if size in (-1, None):
read_size = -1
else:
if size < 4:
raise ValueError(
"To correctly encode, you must request at lease 4 bytes."
)
read_size = int((size - len(self._read_buffer)) * 3 / 4)
data = self._read_buffer
data += self._file_handle.read(read_size)
len_data = len(data)
if len_data == 0:
# no data return b""
return encoded_data
if len_data == len(self._read_buffer):
# no new data, clear out the _read_buffer and use
self._read_buffer = b""
elif len_data <= 3:
# if we have less then 4 bytes, clear out the _read_buffer and use
self._read_buffer = b""
else:
# calculate the extra bytes (we don't want to encode multiples that aren't of 3)
extra_data = -1 * (len_data % 3)
if extra_data != 0:
self._read_buffer = data[extra_data:]
data = data[:extra_data]
else:
self._read_buffer = b""
encoded_data = binascii.b2a_base64(data).strip()
return encoded_data
def readinto(self, buffer):
"""Read and fill the supplied buffer with at most size characters from the stream."""
size = len(buffer)
data = self.read(size)
read_size = len(data)
buffer[0:read_size] = data
return read_size
class IO_HTTP_FILE(IO_HTTP):
def send_file(self, webhook, file_handle):
path = f'{webhook["hook_url"]}/raw'
wrapped_file_handle=Base64Stream(file_handle)
with self._http.post(path, data=wrapped_file_handle) as response:
self._handle_error(response)
def create_new_feed(self, feed_key, feed_desc=None, feed_license=None, history=True):
validate_feed_key(feed_key)
path = self._compose_path("feeds")
payload = {"name": feed_key, "description": feed_desc, "license": feed_license, "history": history}
return self._post(path, payload)
def get_webhook(self, feed_key):
validate_feed_key(feed_key)
path = self._compose_path("feeds/{0}/webhooks".format(feed_key))
return self._get(path)
def create_new_webhook(self, feed_key):
validate_feed_key(feed_key)
path = self._compose_path("feeds/{0}/webhooks".format(feed_key))
payload = {"webhook": {"data_rate_limit": ""}}
return self._post(path, payload)
print(f"Custom code: {gc.mem_free()}")
aio_username = os.getenv("AIO_USERNAME")
aio_key = os.getenv("AIO_KEY")
wifi.radio.connect(os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv("CIRCUITPY_WIFI_PASSWORD"))
pool = adafruit_connection_manager.get_radio_socketpool(wifi.radio)
ssl_context = adafruit_connection_manager.get_radio_ssl_context(wifi.radio)
requests = adafruit_requests.Session(pool, ssl_context)
io = IO_HTTP_FILE(aio_username, aio_key, requests)
print(f"io init: {gc.mem_free()}")
try:
feed_camera = io.create_new_feed("birdfeeder", history=False)
except AdafruitIO_RequestError:
# feed already exists.
pass
webhook_camera = io.get_webhook("birdfeeder")
if webhook_camera:
webhook_camera = webhook_camera[0]
else:
webhook_camera = io.create_new_webhook("birdfeeder")
print(f"feed validation: {gc.mem_free()}")
with open("bird.jpg", "rb") as file_handle:
r = io.send_file(webhook_camera, file_handle)
print(f"sent image: {gc.mem_free()}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment