Skip to content

Instantly share code, notes, and snippets.

@hbldh
Last active April 20, 2018 09:56
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save hbldh/a16017394dba559ae0b094514465ef79 to your computer and use it in GitHub Desktop.
Logging in PyMetaWear
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
:mod:`accelerometer_logging`
==================
Created by hbldh <henrik.blidh@nedomkull.com>
Created on 2018-04-20
"""
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import
import time
from pymetawear.discover import select_device
from pymetawear.client import MetaWearClient
address = select_device()
c = MetaWearClient(str(address), debug=True)
print("New client created: {0}".format(c))
print("Get possible accelerometer settings...")
settings = c.accelerometer.get_possible_settings()
print(settings)
time.sleep(1.0)
print("Write accelerometer settings...")
c.accelerometer.set_settings(data_rate=3.125, data_range=8.0)
time.sleep(1.0)
print("Check accelerometer settings...")
settings = c.accelerometer.get_current_settings()
print(settings)
# Here be dragons... (new logging code starts)
from ctypes import byref
from threading import Event
from pymetawear.client import libmetawear
from pymetawear.modules.base import data_handler
from mbientlab.metawear.cbindings import FnVoid_VoidP, FnVoid_DataP, LogDownloadHandler, FnVoid_UInt_UInt, FnVoid_UByte_Long_UByteP_UByte
logger_ready_event = Event()
download_done = Event()
logger_address = None
def _logger_ready(address):
if address:
global logger_address
logger_address = address
print("Logger address: {0}".format(logger_address))
else:
print('failed to start logging accelerometer')
logger_ready_event.set()
signal = c.accelerometer.data_signal
logger_ready = FnVoid_VoidP(_logger_ready)
libmetawear.mbl_mw_datasignal_log(signal, logger_ready)
libmetawear.mbl_mw_logging_start(c.board, 0)
libmetawear.mbl_mw_acc_enable_acceleration_sampling(c.board)
libmetawear.mbl_mw_acc_start(c.board)
logger_ready_event.wait()
if logger_address is None:
libmetawear.mbl_mw_debug_reset(c.board)
time.sleep(2.0)
c.disconnect()
raise Exception("Aborting due failed initiation of logger.")
print("Logging...")
time.sleep(5.0)
print("Stop logging.")
# Shutdown accelerometer
libmetawear.mbl_mw_acc_stop(c.board)
libmetawear.mbl_mw_acc_disable_acceleration_sampling(c.board)
# Shutdown log
libmetawear.mbl_mw_logging_stop(c.board)
def _data_point_handler(*args):
print(args)
data_point_handler = FnVoid_DataP(data_handler(_data_point_handler))
print("Subscribe to data.")
libmetawear.mbl_mw_logger_subscribe(logger_address, data_point_handler)
def _progress_update(entriesLeft, totalEntries):
print('received_progress_update entriesLeft:' + str(entriesLeft) + ' totalEntries:' + str(totalEntries))
if entriesLeft == 0:
download_done.set()
progress_update = FnVoid_UInt_UInt(_progress_update)
def _unknown_entry(id, epoch, data, length):
# I have no idea what this data is. Needs further investigation.
# Might work to parse data with this:
#
# from pymetawear.modules.base import DATA_HANDLERS, _error_handler
# DATA_HANDLERS.get(data.contents.type_id, _error_handler)(data)
print('received_unknown_entry: id: {0}, epoch: {1}, '
'data: {2}, length: {3}'.format(id, epoch, data, length))
unknown_entry = FnVoid_UByte_Long_UByteP_UByte(_unknown_entry)
def _unhandled_entry(data):
print('received_unhandled_entry: ' + str(data))
unhandled_entry = FnVoid_DataP(data_handler(_unhandled_entry))
log_download_handler = LogDownloadHandler(
received_progress_update = progress_update,
received_unknown_entry = unknown_entry,
received_unhandled_entry = unhandled_entry
)
# Actually start the log download, this will cause all the handlers we setup to be invoked
libmetawear.mbl_mw_logging_download(c.board, 100, byref(log_download_handler))
print("Waiting for completed download...")
download_done.wait()
time.sleep(0.1)
print("Download done")
print("Remove logger")
libmetawear.mbl_mw_logger_remove(logger_address)
time.sleep(0.2)
print("Tear down")
libmetawear.mbl_mw_metawearboard_tear_down(c.board)
time.sleep(0.2)
print("Disconnecting")
c.disconnect()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment