Created
September 22, 2021 13:27
-
-
Save k0d/f13caebaef4a818f1e8d6f9ad3abc0f4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from ws_connection import ClientClosedError | |
from ws_server import WebSocketServer, WebSocketClient | |
import network | |
import machine | |
import neopixel | |
import time | |
import uasyncio | |
np = neopixel.NeoPixel(machine.Pin(27), 50) | |
c = (255, 0, 0) | |
c2 = (0, 0, 255) | |
async def blink(): | |
global c, c2 | |
while True: | |
np[0] = c | |
np.write() | |
await uasyncio.sleep(0.5) | |
np[0] = c2 | |
np.write() | |
await uasyncio.sleep(0.5) | |
def hex_to_rgb(hex): | |
rgb = [] | |
for i in (1, 3, 5): | |
decimal = int(hex[i:i+2], 16) | |
rgb.append(decimal) | |
return tuple(rgb) | |
class TestClient(WebSocketClient): | |
def __init__(self, conn): | |
super().__init__(conn) | |
def process(self): | |
global c, c2 | |
try: | |
msg = self.connection.read() | |
if not msg: | |
return | |
msg = msg.decode("utf-8") | |
items = msg.split(" ") | |
cmd = items[0] | |
if cmd == "Hello": | |
self.connection.write(cmd + " World") | |
print("Hello World") | |
if cmd == "change_color": | |
c = hex_to_rgb(items[1]) | |
print("Changing colour to ", c) | |
except ClientClosedError: | |
self.connection.close() | |
class TestServer(WebSocketServer): | |
def __init__(self): | |
super().__init__("main.html", 2) | |
def _make_client(self, conn): | |
return TestClient(conn) | |
sta_if = network.WLAN(network.STA_IF) | |
if not sta_if.isconnected(): | |
print('connecting to network...') | |
sta_if.active(True) | |
sta_if.connect('', "") | |
while not sta_if.isconnected(): | |
pass | |
print('Network config:', sta_if.ifconfig()) | |
# ap = network.WLAN(network.AP_IF) | |
# ap.active(True) | |
# ap.config(essid="mpy_001", password="foo") | |
# print('Network config:', ap.ifconfig()) | |
async def wifi(): | |
server = TestServer() | |
server.start() | |
try: | |
while True: | |
server.process_all() | |
await uasyncio.sleep_ms(1) | |
except KeyboardInterrupt: | |
pass | |
server.stop() | |
async def main(): | |
uasyncio.create_task(blink()) | |
uasyncio.create_task(wifi()) | |
while True: | |
await uasyncio.sleep_ms(1) | |
uasyncio.run(main()) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment