Skip to content

Instantly share code, notes, and snippets.

@mliberty1
Created November 5, 2021 12:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mliberty1/60e9af101d424b0fb63e0b90fa1603fc to your computer and use it in GitHub Desktop.
Save mliberty1/60e9af101d424b0fb63e0b90fa1603fc to your computer and use it in GitHub Desktop.
Joulescope JS110 capture sampling_frequency with callback
# Created for https://forum.joulescope.com/t/nans-during-10-minutes-recordings/408
# Parts provided by user Andersbg, copyright unknown
# Progress: Copyright 2016 Vladimir Ignatev, MIT license (see below)
# Remainder Copyright 2021 Jetperch LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import joulescope
import logging
import matplotlib.pyplot as plt
import numpy as np
import os
import signal
import sys
import time
class StreamProcess:
def __init__(self,field,duration,sampling_frequency):
self.fs=sampling_frequency
self.f=field
samples=int(duration*sampling_frequency)
self.chunk_size = sampling_frequency // 100
length = ((samples + self.chunk_size - 1) // self.chunk_size) * self.chunk_size
self._buffer = np.empty((length, 1), dtype=np.float32)
self._buffer[:] = 0.0
self.idx = 0
def __str__(self):
return 'StreamProcess'
def __len__(self):
return min(self.idx, len(self._buffer))
@property
def buffer_len(self):
return len(self._buffer)
@property
def data(self):
return self._buffer[:self.idx,:]
def stream_notify(self, stream_buffer):
start_id, end_id = stream_buffer.sample_id_range
idx_next = self.idx + self.chunk_size
while idx_next <= end_id and idx_next <= len(self._buffer):
# Get and store power measurement
data = stream_buffer.samples_get(self.idx, idx_next, fields=[self.f])
self._buffer[self.idx:idx_next, 0] = data['signals'][self.f]['value']
self.idx = idx_next
idx_next += self.chunk_size
return self.idx >= len(self._buffer)
def count_nan(self):
return np.isnan(self.data).sum()
def export_data(self, outfolder, filename):
if not os.path.isdir(outfolder):
os.makedirs(outfolder)
fpath=outfolder+'/'+filename+'.npz'
print("Saving capture to file")
self.time = np.arange(0,len(self.data)/self.fs,1./self.fs)
data_type=self.f
np.savez(fpath,**{data_type: self.data})
print("Capture saved")
def plot_data(self):
print("---- Plotting captured data ----")
print("Close plot to continue")
data=self.data
plt.plot(self.time,data)
plt.xlabel('Time (s)')
plt.ylabel(self.f)
plt.title("Captured raw data")
plt.show()
def progress(count, total, status=''):
# The MIT License (MIT)
# Copyright (c) 2016 Vladimir Ignatev
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the Software
# is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
# INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
# PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE
# FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT
# OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE
# OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
# https://gist.github.com/vladignatyev/06860ec2040cb497f0f3
bar_len = 60
filled_len = int(round(bar_len * count / float(total)))
percents = round(100.0 * count / float(total), 1)
bar = '=' * filled_len + '-' * (bar_len - filled_len)
sys.stdout.write('[%s] %s%s ...%s\r' % (bar, percents, '%', status))
sys.stdout.flush() # As suggested by Rom Ruben (see: http://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console/27871113#comment50529068_27871113)
def capture(case, config):
_quit = False
def on_stop(*args, **kwargs):
nonlocal _quit
_quit = True
duration =config['duration']
f_s = config['sampling_frequency']
field = config['measurement']
try:
device = joulescope.scan_require_one(config='auto')
print("JouleScope connected")
except RuntimeError:
print("Connect a Joulescope and restart")
sys.exit(1)
device.parameter_set('buffer_duration', 2)
device.parameter_set('sampling_frequency', f_s)
stream_process= StreamProcess(field,duration,f_s)
signal.signal(signal.SIGINT, on_stop)
print("Start wait of:",config['wait'])
time.sleep(config['wait'])
with device:
device.stream_process_register(stream_process)
device.start(stop_fn=on_stop)
print("Capture started")
while not _quit:
time.sleep(0.1)
progress(len(stream_process), stream_process.buffer_len)
device.stop() # for CTRL-C handling (safe for duplicate calls)
print("Capture finished")
print(f'Found {stream_process.count_nan()} NaNs in {len(stream_process)} samples')
stream_process.export_data(config['resultfolder'],case)
stream_process.plot_data()
def run():
config = {
'duration': 60 * 10,
'sampling_frequency': 1000,
'measurement': 'power',
'wait': 0,
'resultfolder': os.getcwd()
}
capture('hello', config)
if __name__ == '__main__':
# logging.basicConfig(format='%(asctime)s %(levelname)-8s %(name)s %(message)s', level=logging.INFO)
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment