Last active
September 29, 2022 16:45
-
-
Save techieforgood/421844ccbb949402a078c708bde96a09 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from ctypes import * | |
from time import sleep | |
from mcculw import ul | |
from mcculw.enums import ScanOptions, FunctionType, Status, InterfaceType, ULRange, ErrorCode | |
from mcculw.device_info import DaqDeviceInfo | |
def config_first_detected_device(board_num, dev_id_list=None): | |
"""Adds the first available device to the UL. If a types_list is specified, | |
the first available device in the types list will be add to the UL. | |
Parameters | |
---------- | |
board_num : int | |
The board number to assign to the board when configuring the device. | |
dev_id_list : list[int], optional | |
A list of product IDs used to filter the results. Default is None. | |
See UL documentation for device IDs. | |
""" | |
ul.ignore_instacal() | |
devices = ul.get_daq_device_inventory(InterfaceType.ANY) | |
if not devices: | |
raise Exception('Error: No DAQ devices found') | |
print('Found', len(devices), 'DAQ device(s):') | |
for device in devices: | |
print(' ', device.product_name, ' (', device.unique_id, ') - ', | |
'Device ID = ', device.product_id, sep='') | |
device = devices[0] | |
if dev_id_list: | |
device = next((device for device in devices | |
if device.product_id in dev_id_list), None) | |
if not device: | |
err_str = 'Error: No DAQ device found in device ID list: ' | |
err_str += ','.join(str(dev_id) for dev_id in dev_id_list) | |
raise Exception(err_str) | |
# Add the first DAQ device to the UL with the specified board number | |
ul.create_daq_device(board_num, device) | |
def send_data_to_daq(): | |
# By default, the example detects and displays all available devices and | |
# selects the first device listed. Use the dev_id_list variable to filter | |
# detected devices by device ID (see UL documentation for device IDs). | |
# If use_device_detection is set to False, the board_num variable needs to | |
# match the desired board number configured with Instacal. | |
use_device_detection = True | |
dev_id_list = [] | |
board_num = 0 | |
memhandle = None | |
try: | |
if use_device_detection: | |
config_first_detected_device(board_num, dev_id_list) | |
daq_dev_info = DaqDeviceInfo(board_num) | |
if not daq_dev_info.supports_analog_output: | |
raise Exception('Error: The DAQ device does not support ' | |
'analog output') | |
print('\nActive DAQ device: ', daq_dev_info.product_name, ' (', | |
daq_dev_info.unique_id, ')\n', sep='') | |
ao_info = daq_dev_info.get_ao_info() | |
low_chan = 0 | |
high_chan = min(3, ao_info.num_chans - 1) | |
num_chans = high_chan - low_chan + 1 | |
rate = 400 | |
points_per_channel = 136*200 | |
total_count = points_per_channel * num_chans | |
print("Total count = ", total_count) | |
ao_range = ULRange.BIP10VOLTS | |
# ao_range = ULRange.BIP10VOLTS | |
# Allocate a buffer for the scan | |
# | |
memhandle = ul.win_buf_alloc(total_count) | |
# Convert the memhandle to a ctypes array | |
# Note: the ctypes array will no longer be valid after win_buf_free | |
# is called. | |
# A copy of the buffer can be created using win_buf_to_array | |
# before the memory is freed. The copy can be used at any time. | |
ctypes_array = cast(memhandle, POINTER(c_ushort)) | |
# Check if the buffer was successfully allocated | |
if not memhandle: | |
raise Exception('Error: Failed to allocate memory') | |
frequencies = add_example_data(board_num, ctypes_array, ao_range, | |
num_chans, rate, points_per_channel) | |
# Start the scan | |
s = ul.a_out_scan(board_num, low_chan, high_chan, total_count, rate, | |
ao_range, memhandle, ScanOptions.BACKGROUND) | |
print("rate = ", s) | |
ul.a | |
# Wait for the scan to complete | |
print('Waiting for output scan to complete...', end='') | |
status = Status.RUNNING | |
while status != Status.IDLE: | |
print('.', end='') | |
# Slow down the status check so as not to flood the CPU | |
sleep(0.5) | |
# Update status | |
status, _, _ = ul.get_status(board_num, FunctionType.AOFUNCTION) | |
print('') | |
print('Scan completed successfully') | |
except Exception as e: | |
print('\n', e) | |
finally: | |
if memhandle: | |
# Free the buffer in a finally block to prevent a memory leak. | |
ul.win_buf_free(memhandle) | |
if use_device_detection: | |
ul.release_daq_device(board_num) | |
def add_example_data(board_num, data_array, ao_range, num_chans, rate, | |
points_per_channel): | |
# Note that since we are using the SCALEDATA option, the values | |
# added to data_array are the actual voltage values that the device1000 | |
# will output | |
array1 = [1,1.1,1.2,1.3,1.4,1.5,1.6,1.7,1.8,1.9,1.10,1.11,1.12,1.13,1.14,1.15,1.16,1.17,1.18,1.19,1.20,1.21,1.22,1.23,1.24,1.25,1.26,1.27,1.28,1.29,1.30,1.31,1.32,1.33,1.34,1.35,1.36,1.37,1.38,1.39,1.40,1.41,1.42,1.43,1.44,1.45,1.46,1.47,1.48,1.49,1.50,1.51,1.52,1.53,1.54,1.55,1.56,1.57,1.58,1.59,1.60,1.61,1.62,1.63,1.64,1.65,1.66,1.67,1.68,1.69,1.70,1.71,1.72,1.73,1.74,1.75,1.76,1.77,1.78,1.79,1.80,1.81,1.82,1.83,1.84,1.85,1.86,1.87,1.88,1.89,1.90,1.91,1.92,1.93,1.94,1.95,1.96,1.97,1.98,1.99,2.00,2.01,2.02,2.03,2.04,2.05,2.06,2.07,2.08,2.09,2.10,2.11,2.12,2.13,2.14,2.15,2.16,2.17,2.18,2.19,2.20,2.21,2.22,2.23,2.24,2.25,2.26,2.27,2.28,2.29,2.30,2.31,2.32,2.33,2.34,2.35,2.36] | |
array2 = [3.1,3.2,3.3,3.4,3.5,3.6,3.7,3.8,3.9,3.10,3.11,3.12,3.13,3.14,3.15,3.16,3.17,3.18,3.19,3.20,3.21,3.22,3.23,3.24,3.25,3.26,3.27,3.28,3.29,3.30,3.31,3.32,3.33,3.34,3.35,3.36,3.37,3.38,3.39,3.40,3.41,3.42,3.43,3.44,3.45,3.46,3.47,3.48,3.49,3.50,3.51,3.52,3.53,3.54,3.55,3.56,3.57,3.58,3.59,3.60,3.61,3.62,3.63,3.64,3.65,3.66,3.67,3.68,3.69,3.70,3.71,3.72,3.73,3.74,3.75,3.76,3.77,3.78,3.79,3.80,3.81,3.82,3.83,3.84,3.85,3.86,3.87,3.88,3.89,3.90,3.91,3.92,3.93,3.94,3.95,3.96,3.97,3.98,3.99,4.00,4.01,4.02,4.03,4.04,4.05,4.06,4.07,4.08,4.09,4.10,4.11,4.12,4.13,4.14,4.15,4.16,4.17,4.18,4.19,4.20,4.21,4.22,4.23,4.24,4.25,4.26,4.27,4.28,4.29,4.30,4.31,4.32,4.33,4.34,4.35,4.36] | |
# The daq accepts only one array | |
# The single array contains info for both channels | |
# The 1st value is for 1st channel, the 2nd value is for 2nd channel (interleaving) | |
pre_data_array = [] | |
joined_array=list(zip(array1,array2)) | |
frequency=200 | |
for val in joined_array: | |
for freq in range(frequency): | |
if freq%2==0: | |
pre_data_array.extend([val[0],val[1]]) | |
else: | |
pre_data_array.extend([-val[0],-val[1]]) | |
data_index = 0 | |
for channel_num in pre_data_array: | |
value = channel_num | |
raw_value = ul.from_eng_units(board_num, ao_range, value) | |
data_array[data_index] = raw_value | |
data_index += 1 | |
frequencies = [] | |
return pre_data_array | |
if __name__ == '__main__': | |
send_data_to_daq() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment