Skip to content

Instantly share code, notes, and snippets.

@nv1t
Last active March 12, 2024 06:33
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nv1t/3856d1f23c7d7696bdaf6ba0d4d1c17b to your computer and use it in GitHub Desktop.
Save nv1t/3856d1f23c7d7696bdaf6ba0d4d1c17b to your computer and use it in GitHub Desktop.
Walking Pad Logger
import csv
import sys
import os
from datetime import datetime
def convert_seconds_to_hms(seconds):
"""Converts seconds to hours, minutes, and seconds format."""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
seconds = int(seconds % 60)
return f"{hours} hours {minutes} minutes {seconds} seconds"
def convert_unix_timestamp(timestamp):
"""Converts unix timestamp to human-readable date and time."""
readable_time = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")
return readable_time
starting_timestamp = None
workout_data = {}
# Check if the log file for the current date exists
current_dir = os.path.dirname(os.path.abspath(__file__))
current_date = datetime.now().strftime("%Y-%m-%d")
file_name = f"{current_dir}/logs/{current_date}.log.csv"
if not os.path.exists(file_name):
sys.exit()
with open(file_name) as csvfile:
reader = csv.reader(csvfile)
next(reader)
# Initialize an empty dictionary to store final results
workout_data = {}
for workout, timestamp, speed, distance, time in reader:
if starting_timestamp is None:
starting_timestamp = int(timestamp)
# Check if workout exists in the dictionary
if workout not in workout_data:
workout_data[workout] = {"distance": 0, "time": 0}
# Update distance and time if current values are higher
if int(distance) > workout_data[workout]["distance"]:
workout_data[workout]["distance"] = int(distance)
if int(time) > workout_data[workout]["time"]:
workout_data[workout]["time"] = int(time)
print("Final Workout Data:")
for workout, data in workout_data.items():
print(f"\tWorkout: {workout}")
print(f"\t\tDistance: {data['distance']} meters")
print(f"\t\tTime: {data['time']} seconds")
total_distance = 0
total_time = 0
for workout, data in workout_data.items():
total_distance += data["distance"] / 1000 # Convert meters to kilometers
total_time += data["time"]
# Convert meters to kilometers and format time
starting_time = convert_unix_timestamp(starting_timestamp)
total_distance_km = f"{total_distance:.2f} km" # Format to 2 decimal places
total_time_hms = convert_seconds_to_hms(total_time)
print("Combined Workout Data:")
print(f"\tStarting Time: {starting_time}")
print(f"\tTotal Distance: {total_distance_km}")
print(f"\tTotal Time: {total_time_hms}")
"""
This script connects to a specified Bluetooth Low Energy (BLE) device and logs data related
to speed, distance, and time intervals from notifications. It uses the bleak library for
BLE communication, handles SIGINT signals for graceful shutdown, and logs data to a CSV file
with a daily log rotation. It is losely based on FTSM and could work for all kind of walking pads,
but not sure.
## Prerequisites
- Python 3.6+
- `bleak` library (install using `pip install bleak`)
## Usage
1. Clone the repository or download the script.
2. Install the required dependencies using `pip install -r requirements.txt`.
3. Run the script by executing `python ble_device_data_logger.py`.
## Configuration
- You may need to modify the `addr` parameter in the `main` function to match the address of your BLE device.
"""
import argparse
import asyncio
import logging
import signal
import sys
import time
import os
import datetime
import struct
import uuid
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
logger = logging.getLogger(__name__)
async def main(addr):
"""
Main function to scan for a specific BLE device and read its characteristics.
"""
stopped = False # Flag to control the main loop
workout_id = uuid.uuid4()
def signal_handler(sig, frame):
"""
Signal handler function to handle SIGINT (Ctrl+C) signal.
Args:
sig: The signal number
frame: The current stack frame
"""
print('You pressed Ctrl+C!')
print('Waiting 5sec to kill')
stopped = True
time.sleep(5)
sys.exit()
signal.signal(signal.SIGINT, signal_handler)
def notification_handler(characteristic: BleakGATTCharacteristic, data: bytearray):
"""
Handle notifications by logging the received data and appending it to a CSV file.
Args:
characteristic (BleakGATTCharacteristic): The characteristic the data is received from.
data (bytearray): The data received.
Returns:
None
"""
# Log the received data
logger.info("%s: %r", characteristic.description, data.hex())
# Extract speed, distance, and timedelta from the data
speed = struct.unpack("h", data[2:4])[0]
distance = struct.unpack("h", data[4:6])[0]
timedelta = struct.unpack("h", data[13:15])[0]
# Check if the log file for the current date exists
current_date = datetime.datetime.now().strftime("%Y-%m-%d")
file_name = f"logs/{current_date}.log.csv"
if not os.path.exists(file_name):
with open(file_name, "w") as f:
f.write("Workout (UUID), Speed (n/100 km/h), Distance (meter), Time (seconds)")
f.write("\n")
# Append the data to the log file
with open(file_name, "a") as f:
f.write("%s,%d,%d,%d,%d" % (str(workout_id),int(time.time()), speed, distance, timedelta))
f.write("\n")
logger.info("starting scan...")
# Find the BLE device by address
device = await BleakScanner.find_device_by_address(
addr, cb=dict(use_bdaddr=False)
)
if device is None:
logger.error("could not find device with address '69:82:20:D9:27:90'")
return
logger.info("connecting to device...")
# Connect to the device
async with BleakClient(device) as client:
logger.info("Connected")
# Read device characteristics
manufacturer = await client.read_gatt_char("00002a29-0000-1000-8000-00805f9b34fb")
model = await client.read_gatt_char("00002a24-0000-1000-8000-00805f9b34fb")
hardware_revision = await client.read_gatt_char("00002a27-0000-1000-8000-00805f9b34fb")
firmware_revision = await client.read_gatt_char("00002a26-0000-1000-8000-00805f9b34fb")
fitness_machine_feature = await client.read_gatt_char("00002acc-0000-1000-8000-00805f9b34fb")
supported_speed_range = await client.read_gatt_char("00002ad4-0000-1000-8000-00805f9b34fb")
# Log the device characteristics
logger.info("Manufacturer: %s", str(manufacturer, "utf-8"))
logger.info("Model: %s", str(model, "utf-8"))
logger.info("Hardware Revision: %s", str(hardware_revision, "utf-8"))
logger.info("Firmware Revision: %s", str(firmware_revision, 'utf-8'))
logger.info("Supported Speed Range: %s", supported_speed_range.hex())
logger.info("Fitness Machine Feature: %s", fitness_machine_feature.hex())
# Start notification for a specific characteristic and handle notifications in a separate function
await client.start_notify("00002acd-0000-1000-8000-00805f9b34fb", notification_handler)
# Main loop to keep the program running until stopped
while not stopped:
await asyncio.sleep(1)
# Stop notification for a specific characteristic
await client.stop_notify("00002acd-0000-1000-8000-00805f9b34fb")
if __name__ == "__main__":
log_level = logging.INFO
logging.basicConfig(
level=log_level,
format="%(asctime)-15s %(name)-8s %(levelname)s: %(message)s",
)
asyncio.run(main("69:82:20:D9:27:90"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment