Loom del Funcionamiento: https://www.loom.com/share/f5a413c7a96e4de5aad02b2fcd653668
# LANDGRAVE REZA ANGEL ELOY
# 20211798
# Instituto Tecnologico de tijuana
# Practica: Enviar informacion de sensores de un MicroBit a traves del puerto serial
# -*- coding: utf-8 -*-
from microbit import *
import radio
# No necesitamos 'machine' aquí si la calibración es estable
# --- CONFIGURACIÓN INICIAL ---
radio.on()
radio.config(group=1)
print("Radio encendido. Grupo 1.") # Mensajes de consola iniciales
if not compass.is_calibrated():
print("Iniciando calibración compás...")
try:
compass.calibrate()
except OSError as e:
# Usamos .format() para compatibilidad
print("Error crítico calibración: {}".format(e))
display.show(Image.SAD)
sleep(3000)
# Considera qué hacer si falla, ¿reiniciar? ¿continuar sin compás?
if compass.is_calibrated():
print("Compás calibrado.")
display.show(Image.YES)
else:
print("Fallo calibración compás.")
display.show(Image.NO)
sleep(1000)
display.clear()
print("--- Iniciando envío de datos seriales CSV ---")
# --- BUCLE PRINCIPAL ---
while True:
# 1. Leer Sensores
estado_boton_a = 1 if button_a.is_pressed() else 0 # Enviar como 0 o 1
estado_boton_b = 1 if button_b.is_pressed() else 0 # Enviar como 0 o 1
temperatura_actual = temperature()
valores_acelerometro = accelerometer.get_values() # Tupla (x, y, z)
accel_x, accel_y, accel_z = valores_acelerometro # Desempaquetar
# Manejar lectura del compás
rumbo_brujula_str = "-1" # Valor numérico por defecto para error/no calibrado
if compass.is_calibrated():
try:
rumbo_brujula = compass.heading()
rumbo_brujula_str = str(rumbo_brujula) # Convertir a string
except OSError:
rumbo_brujula_str = "-2" # Otro código para error de lectura
# Leer Radio y manejar None
mensaje_recibido = radio.receive()
if mensaje_recibido is None:
mensaje_recibido_str = "" # String vacío si no hay mensaje
else:
# Limpiar comas y comillas para evitar romper el CSV (o usar formato más robusto como JSON)
mensaje_recibido_str = str(mensaje_recibido).replace(',', ';').replace('"', "'")
# 2. Formatear como CSV
# Orden: boton_a,boton_b,temp,accel_x,accel_y,accel_z,brujula,radio_msg
linea_csv = "{},{},{},{},{},{},{},{}".format(
estado_boton_a,
estado_boton_b,
temperatura_actual,
accel_x,
accel_y,
accel_z,
rumbo_brujula_str,
mensaje_recibido_str
)
# 3. Enviar por Serial
print(linea_csv) # print() en MicroPython envía por el USB serial
# 4. Espera
sleep(5000) # Envía datos cada 5 segundos (ajusta según necesidad)
Codigo Python 'Intermediario' para el envio desde el puerto serial, a la computadora, y despues al servidor AWS
# LANDGRAVE REZA ANGEL ELOY
# 20211798
#Instituto Tecnologico de tijuana
# Practica: Enviar informacion serial de MicroBit a AWS
import serial
import requests
import time
import sys
import os
# --- Configuración ---
# Puerto serial - AJUSTA ESTO a tu puerto correcto
# Linux: suele ser /dev/ttyACM0, /dev/ttyACM1, etc.
# macOS: suele ser /dev/cu.usbmodemXXXXX
# Windows: suele ser COM1, COM3, COM4, etc.
SERIAL_PORT = 'COM9' # ¡¡¡CAMBIA ESTO!!!
BAUD_RATE = 115200 # Baud rate por defecto del REPL de MicroPython
# Configuración de InfluxDB v2.x - AJUSTA ESTO
INFLUXDB_URL = 'http://54.175.226.192:8086' # URL de tu InfluxDB en AWS
INFLUXDB_ORG = 'studio' # Tu organización en InfluxDB
INFLUXDB_BUCKET = 'mainb' # El bucket que creaste
INFLUXDB_TOKEN = 'gu2rXpUW6ygIMvm0AWYHh0k4mJI9B5-AZhMG0AxLB6vAU_She_8u4RSMMCHMPuMm0TEz_nbmgdcknXibyYm2Ug==' # El token que generaste
# Nombre de la "medición" (como una tabla) en InfluxDB
MEASUREMENT_NAME = 'sensor_data'
# Tags opcionales para identificar el origen de los datos
DEVICE_TAG = 'microbit_01'
# --- Fin Configuración ---
def format_influx_line(csv_data):
"""Convierte una línea CSV del micro:bit a InfluxDB Line Protocol."""
try:
parts = csv_data.split(',')
if len(parts) != 8:
print(f"Error: Línea CSV inesperada - {len(parts)} partes: {csv_data}")
return None
# Extraer y convertir datos (añadir validación si es necesario)
btn_a = int(parts[0])
btn_b = int(parts[1])
temp = float(parts[2])
acc_x = int(parts[3])
acc_y = int(parts[4])
acc_z = int(parts[5])
compass_heading = int(parts[6]) # -1 o -2 si hay error/no calibrado
radio_msg = parts[7].strip() # Mensaje de radio (limpiado en microbit)
# Construir la cadena de campos (fields)
fields = f"button_a={btn_a}i," \
f"button_b={btn_b}i," \
f"temperature={temp}," \
f"accel_x={acc_x}i," \
f"accel_y={acc_y}i," \
f"accel_z={acc_z}i," \
f"compass={compass_heading}i" # Guardar como entero
# Añadir campo de mensaje de radio solo si no está vacío
# Los strings en InfluxDB necesitan comillas
if radio_msg:
# Escapar comillas dobles y barras invertidas en el mensaje
escaped_msg = radio_msg.replace('\\', '\\\\').replace('"', '\\"')
fields += f',radio_message="{escaped_msg}"'
# Obtener timestamp actual en nanosegundos
timestamp_ns = time.time_ns()
# Formato Line Protocol: measurement,tag=value field=value[,field2=value2] timestamp
line = f"{MEASUREMENT_NAME},device={DEVICE_TAG} {fields} {timestamp_ns}"
return line
except ValueError as e:
print(f"Error convirtiendo datos: {e} en línea: {csv_data}")
return None
except Exception as e:
print(f"Error inesperado formateando línea: {e}")
return None
def send_to_influx(line_protocol_data):
"""Envía datos formateados a InfluxDB."""
if not line_protocol_data:
return
headers = {
'Authorization': f'Token {INFLUXDB_TOKEN}',
'Content-Type': 'text/plain; charset=utf-8',
'Accept': 'application/json'
}
write_url = f"{INFLUXDB_URL}/api/v2/write?org={INFLUXDB_ORG}&bucket={INFLUXDB_BUCKET}&precision=ns"
try:
response = requests.post(write_url, headers=headers, data=line_protocol_data.encode('utf-8'), timeout=5)
response.raise_for_status() # Lanza excepción si hay error HTTP (4xx o 5xx)
print(f"Datos enviados OK: {line_protocol_data}")
# print(f"Respuesta InfluxDB ({response.status_code})") # Descomentar para debug
except requests.exceptions.RequestException as e:
print(f"Error al enviar a InfluxDB: {e}")
except Exception as e:
print(f"Error inesperado en envío a InfluxDB: {e}")
def main():
"""Función principal para leer serial y enviar a InfluxDB."""
print(f"Intentando conectar al puerto serial: {SERIAL_PORT} a {BAUD_RATE} baud")
ser = None
try:
ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=1)
print("Conectado al puerto serial.")
while True:
if ser.in_waiting > 0:
try:
# Leer línea, decodificar de bytes a string, quitar espacios/saltos
line_raw = ser.readline()
line_str = line_raw.decode('utf-8', errors='ignore').strip()
if line_str: # Asegurarse de que no esté vacía
# print(f"Recibido: {line_str}") # Descomentar para debug
influx_line = format_influx_line(line_str)
if influx_line:
send_to_influx(influx_line)
except serial.SerialException as e:
print(f"Error de lectura serial: {e}")
# Intentar reconectar o salir
break
except UnicodeDecodeError as e:
print(f"Error decodificando datos seriales (datos corruptos?): {e} - Datos: {line_raw}")
except KeyboardInterrupt:
print("\nInterrupción por teclado. Saliendo.")
break
except Exception as e:
print(f"Error inesperado en bucle principal: {e}")
time.sleep(0.1) # Pequeña pausa para no consumir 100% CPU
except serial.SerialException as e:
print(f"Error al abrir puerto serial {SERIAL_PORT}: {e}")
print("Verifica que el puerto sea correcto, que el micro:bit esté conectado")
print("y que ningún otro programa (Mu, Thonny REPL) lo esté usando.")
except Exception as e:
print(f"Error inesperado al iniciar: {e}")
finally:
if ser and ser.is_open:
ser.close()
print("Puerto serial cerrado.")
print("Script terminado.")
if __name__ == "__main__":
main()