Skip to content

Instantly share code, notes, and snippets.

@eddo888
Created January 26, 2024 11:29
Show Gist options
  • Save eddo888/87f1b18fb02a280ddd8eb6481cb18891 to your computer and use it in GitHub Desktop.
Save eddo888/87f1b18fb02a280ddd8eb6481cb18891 to your computer and use it in GitHub Desktop.
# micropython
# Complete project details at https://RandomNerdTutorials.com
import sys, math, ubinascii, ssd1306, max7219, machine, ntptime, time
from machine import Pin, SoftI2C, SPI
from umqttsimple import MQTTClient
from rotary_irq_esp import RotaryIRQ
from stepper import Stepper
stepper_motor = Stepper(14,15,22,23)
spi = SPI(1, baudrate=10000000, polarity=1, phase=0, sck=Pin(4), mosi=Pin(2))
ss = Pin(5, Pin.OUT)
led_display = max7219.Matrix8x8(spi, ss, 1)
led_display.fill(0)
led_display.show()
def displays(texts):
if isinstance(texts,str):
texts = [ texts ]
if not isinstance(texts, list):
return
spacing = int(32/len(texts))
oled_display.fill(0)
for i, t in enumerate(texts):
oled_display.text(t, 0, spacing*i, 1)
oled_display.show()
def led8x8(lines):
for line in lines:
if not len(line.strip()):
continue
try:
(command,params) = tuple(line.split(':'))
if len(params):
args = tuple(list(map(lambda x: eval(x), params.split(','))))
else:
args = ()
method = getattr(led_display, command)
method(*args)
except Exception as e:
sys.print_exception(e)
def sub_cb(topic, msg):
if msg == b'restart':
restart_and_reconnect()
return
if msg.startswith('rotate:'):
s = msg.decode('UTF8')
number = int(s.split(':')[1])
print('rotate->%d'%number)
stepper_motor.run(number)
return
if msg.startswith('relay:'):
s = msg.decode('UTF8')
number = int(s.split(':')[1])
print('relay->%d'%number)
relay_output.value(number)
return
if msg.startswith('8x8:'):
s = msg.decode('UTF8')
lines = s.split('\n')[1:]
print('8x8->\n%s'%('\n'.join(lines)))
led8x8(lines)
return
displays([topic, msg])
print(f'{topic}:{msg}')
mqtt_server = '%HOSTNAME%'
mqtt_port = 1883
mqtt_username = '%USERNAME%'
mqtt_password = '%PASSWORD%'
i2c=SoftI2C(sda=Pin(19), scl=Pin(21))
oled_display=ssd1306.SSD1306_I2C(128,32,i2c)
displays(['ntptime', str(ntptime.host)])
ntptime.settime()
client_id = ubinascii.hexlify(machine.unique_id())
topic_sub = b'requests'
topic_pub = b'timestamp'
last_message = 0
message_interval = 0.9
counter = 0
def connect_and_subscribe():
global client_id, mqtt_server, topic_sub
client = MQTTClient(
client_id,
mqtt_server,
port=mqtt_port,
user=mqtt_username,
password=mqtt_password,
keepalive=30,
)
client.set_callback(sub_cb)
client.connect()
client.subscribe(topic_sub)
displays(['mqtt server', mqtt_server])
return client
def restart_and_reconnect():
displays('connecting')
time.sleep(5)
machine.reset()
try:
client = connect_and_subscribe()
except OSError as e:
restart_and_reconnect()
# magnetic sensor
near_field_sensor = Pin(34, Pin.IN)
near_field_sensor_old = near_field_sensor.value()
m3 = ''
# encoder button
encoder_button = Pin(26, Pin.IN)
encoder_button_old = encoder_button.value()
m0 = ''
# led output
led_output = Pin(27, Pin.OUT)
led_output_old = 0
led_output.value(led_output_old)
# relay output
relay_output = Pin(25, Pin.OUT)
# toggle switch
switch = Pin(35, Pin.IN)
switch_old = switch.value()
# encoder inner
encoder_inner = None
m1 = ''
encoder_inner = RotaryIRQ(
pin_num_clk=12,
pin_num_dt=13,
min_val=108,
max_val=117,
reverse=False,
range_mode=RotaryIRQ.RANGE_BOUNDED
)
encoder_inner_old = encoder_inner.value()
#encoder outer
encoder_outer = None
m2 = ''
encoder_outer = RotaryIRQ(
pin_num_clk=32,
pin_num_dt=33,
min_val=0,
max_val=9,
reverse=False,
range_mode=RotaryIRQ.RANGE_BOUNDED
)
encoder_outer_old = encoder_outer.value()
def nav():
msg = 'NAV1s:%03d%02d'%(encoder_inner_old, encoder_outer_old*10)
print(msg)
client.publish(b'esp32', msg)
while True:
time.sleep_ms(50)
if switch:
value = switch.value()
if value != switch_old:
switch_old = value
m3 = 'switch: %d'%switch_old
print(m3)
displays([m0,m1,m2,m3])
client.publish('switch',str(switch_old))
if near_field_sensor:
value = near_field_sensor.value()
if value != near_field_sensor_old:
near_field_sensor_old = value
m3 = f'sensor: %d'%value
print(m3)
displays([m0,m1,m2,m3])
client.publish('sensor',str(near_field_sensor_old))
if encoder_button and led_output:
value = encoder_button.value()
#led_output.value(value)
if value != encoder_button_old:
encoder_button_old = value
print('button: %d'%value)
if value == 0:
led_output_old = 1^led_output_old
m0 = 'led: %d'%led_output_old
displays([m0,m1,m2,m3])
led_output.value(led_output_old)
client.publish('cmnd/tasmota_C9D18B/Power',str(led_output_old))
if encoder_inner:
value = encoder_inner.value()
m1 = 'inner: %d'%value
if value != encoder_inner_old:
encoder_inner_old = value
print(m1)
displays([m0,m1,m2,m3])
nav()
if encoder_outer:
value = encoder_outer.value()
m2 = 'outer: %d'%value
if value != encoder_outer_old:
encoder_outer_old = value
print(m2)
displays([m0,m1,m2,m3])
nav()
if (time.time() - last_message) > message_interval:
gmt_plus = 10+1
now = time.localtime(time.time()+3600*gmt_plus)
date_str = '%4d-%02d-%02d'%now[0:3]
time_str = '%02d:%02d:%02d'%now[3:6]
msg = '%s %s'%(date_str,time_str)
print(msg)
last_message = time.time()
counter += 1
displays(['timestamp', date_str, time_str])
try:
client.check_msg()
client.publish(topic_pub, msg)
except OSError as e:
restart_and_reconnect()
if switch_old == 1:
continue
vector = int(time_str[-1])
radians = 2*math.pi*vector/10
xo = int(4 + 4 * math.cos(radians))
yo = int(4 + 4 * math.sin(radians))
xi = int(4 + 1 * math.cos(radians))
yi = int(4 + 1 * math.sin(radians))
#print(f'{vector}: {xi},{yi} -> {xo}.{yo}')
led_display.fill(0)
led_display.line(xi,yi,xo,yo,1)
led_display.show()
stepper_motor.run(int(4200/10))
relay_output.value(near_field_sensor.value())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment