Skip to content

Instantly share code, notes, and snippets.

@jlstanus
Created April 29, 2021 14:14
Show Gist options
  • Save jlstanus/6b79181ca56ffd598a646eb70b20520d to your computer and use it in GitHub Desktop.
Save jlstanus/6b79181ca56ffd598a646eb70b20520d to your computer and use it in GitHub Desktop.
import time
from network import WLAN
from machine import Pin, PWM
from umqtt.simple import MQTTClient
from ubinascii import hexlify
import sys
CLIENT_ID = "smashing-sub"
MQTT_SERVER = "192.168.1.10"
# Mettre a None si pas utile
MQTT_USER = None
MQTT_PSWD = None
cycle_start = 480
cycle_end = 1023
waiting_time = 0.01
time_out = 10
rpin_en = Pin(14, Pin.OUT)
lpin_en = Pin(12, Pin.OUT)
rpin_pwm = Pin(13, Pin.OUT)
lpin_pwm = Pin(15, Pin.OUT)
freq = 150
def activer(freq=100):
rpin_en.on()
lpin_en.on()
lpwm = PWM(lpin_pwm)
rpwm = PWM(rpin_pwm)
rpwm.freq(freq)
lpwm.freq(freq)
print("motor ready")
return rpwm, lpwm
def release():
rpin_en.off()
lpin_en.off()
time.sleep(0.5)
rpin_en.on()
lpin_en.on()
def stop():
rpwm.deinit()
lpwm.deinit()
def avancer(rpwm):
print("avancer")
for d in range(cycle_start, cycle_end):
rpwm.freq(freq)
rpwm.duty(d)
print(str(d))
time.sleep(waiting_time)
while time.time() - start_time < time_out:
rpwm.duty(cycle_end)
time.sleep(waiting_time)
time.sleep(1)
def reculer(lpwm):
print("reculer")
for d in range(cycle_start, cycle_end):
lpwm.freq(freq)
lpwm.duty(d)
print(str(d))
time.sleep(waiting_time)
while time.time() - start_time < time_out:
lpwm.duty(cycle_end)
time.sleep(waiting_time)
time.sleep(1)
rpwm, lpwm = activer()
start_time = time.time()
avancer(rpwm)
reculer(lpwm)
# stop()
def sub_cb(topic, msg):
""" fonction de rappel pour souscription MQTT """
t = topic.decode('utf8')
m = msg.decode('utf8')
print('-'*20)
print('topic: %s' % t)
print('message: %s' % m)
if t == 'sy/smash':
print("changement etat ")
# LED en logique inversée
if m == "avancer":
start_time = time.time()
avancer(rpwm)
release()
elif m == "reculer":
start_time = time.time()
reculer(lpwm)
release()
try:
print("Creation MQTTClient")
q = MQTTClient(client_id=CLIENT_ID, server=MQTT_SERVER, user=MQTT_USER, password=MQTT_PSWD)
q.set_callback(sub_cb)
if q.connect() != 0:
print("erreur connexion")
sys.exit()
print("Connected")
q.subscribe('sy/smash')
print("souscription OK")
except Exception as e:
print(e)
# annonce connexion objet
try:
sMac = hexlify(WLAN().config('mac')).decode()
q.publish("sy/connect/%s" % CLIENT_ID, sMac)
except Exception as e:
print(e)
# Boucle de traitement
while True:
# traitement message MQTT (BLOQUANT)
q.wait_msg()
# traitment message MQTT (NON BLOQUANT)
# q.check_msg()
q.disconnect()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment