Skip to content

Instantly share code, notes, and snippets.

@angel-land
Created May 20, 2025 00:33
Show Gist options
  • Save angel-land/1407027455989db2219fe34d85a7d8f1 to your computer and use it in GitHub Desktop.
Save angel-land/1407027455989db2219fe34d85a7d8f1 to your computer and use it in GitHub Desktop.
INFLUX + PICO

Practica de InfluxDB con MicroBit


Loom del Funcionamiento: https://www.loom.com/share/f5a413c7a96e4de5aad02b2fcd653668

Codigo MicroPython para el funcionamiento del MicroBit

# LANDGRAVE REZA ANGEL ELOY
# 20211798
# Instituto Tecnologico de tijuana
# Practica: Enviar informacion de sensores de un MicroBit a traves del puerto serial
# -*- coding: utf-8 -*-
from microbit import *
import radio
# No necesitamos 'machine' aquí si la calibración es estable

# --- CONFIGURACIÓN INICIAL ---
radio.on()
radio.config(group=1)
print("Radio encendido. Grupo 1.") # Mensajes de consola iniciales

if not compass.is_calibrated():
   print("Iniciando calibración compás...")
   try:
       compass.calibrate()
   except OSError as e:
       # Usamos .format() para compatibilidad
       print("Error crítico calibración: {}".format(e))
       display.show(Image.SAD)
       sleep(3000)
       # Considera qué hacer si falla, ¿reiniciar? ¿continuar sin compás?

if compass.is_calibrated():
   print("Compás calibrado.")
   display.show(Image.YES)
else:
   print("Fallo calibración compás.")
   display.show(Image.NO)

sleep(1000)
display.clear()
print("--- Iniciando envío de datos seriales CSV ---")

# --- BUCLE PRINCIPAL ---
while True:
   # 1. Leer Sensores
   estado_boton_a = 1 if button_a.is_pressed() else 0 # Enviar como 0 o 1
   estado_boton_b = 1 if button_b.is_pressed() else 0 # Enviar como 0 o 1
   temperatura_actual = temperature()
   valores_acelerometro = accelerometer.get_values() # Tupla (x, y, z)
   accel_x, accel_y, accel_z = valores_acelerometro # Desempaquetar

   # Manejar lectura del compás
   rumbo_brujula_str = "-1" # Valor numérico por defecto para error/no calibrado
   if compass.is_calibrated():
       try:
           rumbo_brujula = compass.heading()
           rumbo_brujula_str = str(rumbo_brujula) # Convertir a string
       except OSError:
           rumbo_brujula_str = "-2" # Otro código para error de lectura

   # Leer Radio y manejar None
   mensaje_recibido = radio.receive()
   if mensaje_recibido is None:
       mensaje_recibido_str = "" # String vacío si no hay mensaje
   else:
       # Limpiar comas y comillas para evitar romper el CSV (o usar formato más robusto como JSON)
       mensaje_recibido_str = str(mensaje_recibido).replace(',', ';').replace('"', "'")

   # 2. Formatear como CSV
   # Orden: boton_a,boton_b,temp,accel_x,accel_y,accel_z,brujula,radio_msg
   linea_csv = "{},{},{},{},{},{},{},{}".format(
       estado_boton_a,
       estado_boton_b,
       temperatura_actual,
       accel_x,
       accel_y,
       accel_z,
       rumbo_brujula_str,
       mensaje_recibido_str
   )

   # 3. Enviar por Serial
   print(linea_csv) # print() en MicroPython envía por el USB serial

   # 4. Espera
   sleep(5000) # Envía datos cada 5 segundos (ajusta según necesidad)

Codigo Python 'Intermediario' para el envio desde el puerto serial, a la computadora, y despues al servidor AWS

# LANDGRAVE REZA ANGEL ELOY
# 20211798
#Instituto Tecnologico de tijuana
# Practica: Enviar informacion serial de MicroBit a AWS

import serial
import requests
import time
import sys
import os

# --- Configuración ---
# Puerto serial - AJUSTA ESTO a tu puerto correcto
# Linux: suele ser /dev/ttyACM0, /dev/ttyACM1, etc.
# macOS: suele ser /dev/cu.usbmodemXXXXX
# Windows: suele ser COM1, COM3, COM4, etc.

SERIAL_PORT = 'COM9' # ¡¡¡CAMBIA ESTO!!!
BAUD_RATE = 115200 # Baud rate por defecto del REPL de MicroPython

# Configuración de InfluxDB v2.x - AJUSTA ESTO
INFLUXDB_URL = 'http://54.175.226.192:8086' # URL de tu InfluxDB en AWS
INFLUXDB_ORG = 'studio'        # Tu organización en InfluxDB
INFLUXDB_BUCKET = 'mainb'   # El bucket que creaste
INFLUXDB_TOKEN = 'gu2rXpUW6ygIMvm0AWYHh0k4mJI9B5-AZhMG0AxLB6vAU_She_8u4RSMMCHMPuMm0TEz_nbmgdcknXibyYm2Ug==' # El token que generaste

# Nombre de la "medición" (como una tabla) en InfluxDB
MEASUREMENT_NAME = 'sensor_data'
# Tags opcionales para identificar el origen de los datos
DEVICE_TAG = 'microbit_01'

# --- Fin Configuración ---

def format_influx_line(csv_data):
   """Convierte una línea CSV del micro:bit a InfluxDB Line Protocol."""
   try:
       parts = csv_data.split(',')
       if len(parts) != 8:
           print(f"Error: Línea CSV inesperada - {len(parts)} partes: {csv_data}")
           return None

       # Extraer y convertir datos (añadir validación si es necesario)
       btn_a = int(parts[0])
       btn_b = int(parts[1])
       temp = float(parts[2])
       acc_x = int(parts[3])
       acc_y = int(parts[4])
       acc_z = int(parts[5])
       compass_heading = int(parts[6]) # -1 o -2 si hay error/no calibrado
       radio_msg = parts[7].strip() # Mensaje de radio (limpiado en microbit)

       # Construir la cadena de campos (fields)
       fields = f"button_a={btn_a}i," \
                f"button_b={btn_b}i," \
                f"temperature={temp}," \
                f"accel_x={acc_x}i," \
                f"accel_y={acc_y}i," \
                f"accel_z={acc_z}i," \
                f"compass={compass_heading}i" # Guardar como entero

       # Añadir campo de mensaje de radio solo si no está vacío
       # Los strings en InfluxDB necesitan comillas
       if radio_msg:
           # Escapar comillas dobles y barras invertidas en el mensaje
           escaped_msg = radio_msg.replace('\\', '\\\\').replace('"', '\\"')
           fields += f',radio_message="{escaped_msg}"'

       # Obtener timestamp actual en nanosegundos
       timestamp_ns = time.time_ns()

       # Formato Line Protocol: measurement,tag=value field=value[,field2=value2] timestamp
       line = f"{MEASUREMENT_NAME},device={DEVICE_TAG} {fields} {timestamp_ns}"
       return line

   except ValueError as e:
       print(f"Error convirtiendo datos: {e} en línea: {csv_data}")
       return None
   except Exception as e:
       print(f"Error inesperado formateando línea: {e}")
       return None

def send_to_influx(line_protocol_data):
   """Envía datos formateados a InfluxDB."""
   if not line_protocol_data:
       return

   headers = {
       'Authorization': f'Token {INFLUXDB_TOKEN}',
       'Content-Type': 'text/plain; charset=utf-8',
       'Accept': 'application/json'
   }
   write_url = f"{INFLUXDB_URL}/api/v2/write?org={INFLUXDB_ORG}&bucket={INFLUXDB_BUCKET}&precision=ns"

   try:
       response = requests.post(write_url, headers=headers, data=line_protocol_data.encode('utf-8'), timeout=5)
       response.raise_for_status() # Lanza excepción si hay error HTTP (4xx o 5xx)
       print(f"Datos enviados OK: {line_protocol_data}")
       # print(f"Respuesta InfluxDB ({response.status_code})") # Descomentar para debug

   except requests.exceptions.RequestException as e:
       print(f"Error al enviar a InfluxDB: {e}")
   except Exception as e:
        print(f"Error inesperado en envío a InfluxDB: {e}")


def main():
   """Función principal para leer serial y enviar a InfluxDB."""
   print(f"Intentando conectar al puerto serial: {SERIAL_PORT} a {BAUD_RATE} baud")
   ser = None
   try:
       ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=1)
       print("Conectado al puerto serial.")

       while True:
           if ser.in_waiting > 0:
               try:
                   # Leer línea, decodificar de bytes a string, quitar espacios/saltos
                   line_raw = ser.readline()
                   line_str = line_raw.decode('utf-8', errors='ignore').strip()

                   if line_str: # Asegurarse de que no esté vacía
                       # print(f"Recibido: {line_str}") # Descomentar para debug
                       influx_line = format_influx_line(line_str)
                       if influx_line:
                           send_to_influx(influx_line)

               except serial.SerialException as e:
                   print(f"Error de lectura serial: {e}")
                   # Intentar reconectar o salir
                   break
               except UnicodeDecodeError as e:
                   print(f"Error decodificando datos seriales (datos corruptos?): {e} - Datos: {line_raw}")
               except KeyboardInterrupt:
                   print("\nInterrupción por teclado. Saliendo.")
                   break
               except Exception as e:
                   print(f"Error inesperado en bucle principal: {e}")

           time.sleep(0.1) # Pequeña pausa para no consumir 100% CPU

   except serial.SerialException as e:
       print(f"Error al abrir puerto serial {SERIAL_PORT}: {e}")
       print("Verifica que el puerto sea correcto, que el micro:bit esté conectado")
       print("y que ningún otro programa (Mu, Thonny REPL) lo esté usando.")
   except Exception as e:
       print(f"Error inesperado al iniciar: {e}")
   finally:
       if ser and ser.is_open:
           ser.close()
           print("Puerto serial cerrado.")
       print("Script terminado.")

if __name__ == "__main__":
   main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment