Skip to content

Instantly share code, notes, and snippets.

@acidzebra
Last active February 6, 2022 11:07
Show Gist options
  • Save acidzebra/c866794aaf16da707f83894cb3dd8b74 to your computer and use it in GitHub Desktop.
Save acidzebra/c866794aaf16da707f83894cb3dd8b74 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import anki_vector
import json
import paho.mqtt.client as mqtt
import requests
url = "YOURDISCORDWEBHOOKURL"
MQTT_HOST = "YOURMQTTHOST"
MQTT_PORT = 1883
MQTT_KEEPALIVE_INTERVAL = 20
MQTT_TOPIC = "YOURMQTTTOPIC"
vectorserial = "YOURVECTORSERIAL"
myrobot = anki_vector.Robot(vectorserial,default_logging=False,behavior_control_level=None)
robot_voltage = 0
robot_batlevel = 0
robot_charging = 0
robot_docked = 0
robot_discordmessage = ""
try:
myrobot.connect(timeout=12)
myrobot_battery_state = myrobot.get_battery_state()
myrobot_carryblock = myrobot.status.is_carrying_block
if myrobot_carryblock:
robot_discordmessage = ":robot::blue_square: Bluey is carrying a cube!"
myrobot_cliffdetect = myrobot.status.is_cliff_detected
if myrobot_cliffdetect:
robot_discordmessage = ":robot::blue_square: Bluey is getting himself into trouble!"
myrobot_docking = myrobot.status.is_docking_to_marker
if myrobot_docking:
robot_discordmessage = ":robot::blue_square: Bluey is docking!"
myrobot_falling = myrobot.status.is_falling
if myrobot_falling:
robot_discordmessage = ":robot::blue_square: Bluey is falling!"
myrobot_pickup = myrobot.status.is_picked_up
if myrobot_pickup:
robot_discordmessage = ":robot::blue_square: Bluey is getting himself into trouble!"
if myrobot_battery_state:
robot_voltage = round(myrobot_battery_state.battery_volts,2)
robot_batlevel = myrobot_battery_state.battery_level
robot_charging = myrobot_battery_state.is_charging
robot_docked = myrobot_battery_state.is_on_charger_platform
myrobot.disconnect()
except:
print("unable to get data from robot")
if robot_discordmessage:
robotdcdata = {
"content" : robot_discordmessage
}
try:
result = requests.post(url, json = robotdcdata)
except:
print("unable to post robot discord message")
data = {}
if robot_voltage and robot_voltage != 0:
data['robots'] = []
data['robots'].append({
'name': 'robot',
'voltage': robot_voltage,
'batlevel': robot_batlevel,
'charging': robot_charging,
'docked': robot_docked
})
if data:
MQTT_MSG = str(data)
# Define on_publish event function
def on_publish(client, userdata, mid):
print("Robot Message Published...")
# Initiate MQTT Client
mqttc = mqtt.Client()
# Register publish callback function
mqttc.on_publish = on_publish
try:
# Connect with MQTT Broker
mqttc.connect(MQTT_HOST, MQTT_PORT, MQTT_KEEPALIVE_INTERVAL)
# Publish message to MQTT Broker
mqttc.publish(MQTT_TOPIC,MQTT_MSG)
# Disconnect from MQTT_Broker
mqttc.disconnect()
except:
print("issue publishing robot data")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment