Skip to content

Instantly share code, notes, and snippets.

@techieforgood
Last active September 29, 2022 16:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save techieforgood/421844ccbb949402a078c708bde96a09 to your computer and use it in GitHub Desktop.
Save techieforgood/421844ccbb949402a078c708bde96a09 to your computer and use it in GitHub Desktop.
from ctypes import *
from time import sleep
from mcculw import ul
from mcculw.enums import ScanOptions, FunctionType, Status, InterfaceType, ULRange, ErrorCode
from mcculw.device_info import DaqDeviceInfo
def config_first_detected_device(board_num, dev_id_list=None):
"""Adds the first available device to the UL. If a types_list is specified,
the first available device in the types list will be add to the UL.
Parameters
----------
board_num : int
The board number to assign to the board when configuring the device.
dev_id_list : list[int], optional
A list of product IDs used to filter the results. Default is None.
See UL documentation for device IDs.
"""
ul.ignore_instacal()
devices = ul.get_daq_device_inventory(InterfaceType.ANY)
if not devices:
raise Exception('Error: No DAQ devices found')
print('Found', len(devices), 'DAQ device(s):')
for device in devices:
print(' ', device.product_name, ' (', device.unique_id, ') - ',
'Device ID = ', device.product_id, sep='')
device = devices[0]
if dev_id_list:
device = next((device for device in devices
if device.product_id in dev_id_list), None)
if not device:
err_str = 'Error: No DAQ device found in device ID list: '
err_str += ','.join(str(dev_id) for dev_id in dev_id_list)
raise Exception(err_str)
# Add the first DAQ device to the UL with the specified board number
ul.create_daq_device(board_num, device)
def send_data_to_daq():
# By default, the example detects and displays all available devices and
# selects the first device listed. Use the dev_id_list variable to filter
# detected devices by device ID (see UL documentation for device IDs).
# If use_device_detection is set to False, the board_num variable needs to
# match the desired board number configured with Instacal.
use_device_detection = True
dev_id_list = []
board_num = 0
memhandle = None
try:
if use_device_detection:
config_first_detected_device(board_num, dev_id_list)
daq_dev_info = DaqDeviceInfo(board_num)
if not daq_dev_info.supports_analog_output:
raise Exception('Error: The DAQ device does not support '
'analog output')
print('\nActive DAQ device: ', daq_dev_info.product_name, ' (',
daq_dev_info.unique_id, ')\n', sep='')
ao_info = daq_dev_info.get_ao_info()
low_chan = 0
high_chan = min(3, ao_info.num_chans - 1)
num_chans = high_chan - low_chan + 1
rate = 400
points_per_channel = 136*200
total_count = points_per_channel * num_chans
print("Total count = ", total_count)
ao_range = ULRange.BIP10VOLTS
# ao_range = ULRange.BIP10VOLTS
# Allocate a buffer for the scan
#
memhandle = ul.win_buf_alloc(total_count)
# Convert the memhandle to a ctypes array
# Note: the ctypes array will no longer be valid after win_buf_free
# is called.
# A copy of the buffer can be created using win_buf_to_array
# before the memory is freed. The copy can be used at any time.
ctypes_array = cast(memhandle, POINTER(c_ushort))
# Check if the buffer was successfully allocated
if not memhandle:
raise Exception('Error: Failed to allocate memory')
frequencies = add_example_data(board_num, ctypes_array, ao_range,
num_chans, rate, points_per_channel)
# Start the scan
s = ul.a_out_scan(board_num, low_chan, high_chan, total_count, rate,
ao_range, memhandle, ScanOptions.BACKGROUND)
print("rate = ", s)
ul.a
# Wait for the scan to complete
print('Waiting for output scan to complete...', end='')
status = Status.RUNNING
while status != Status.IDLE:
print('.', end='')
# Slow down the status check so as not to flood the CPU
sleep(0.5)
# Update status
status, _, _ = ul.get_status(board_num, FunctionType.AOFUNCTION)
print('')
print('Scan completed successfully')
except Exception as e:
print('\n', e)
finally:
if memhandle:
# Free the buffer in a finally block to prevent a memory leak.
ul.win_buf_free(memhandle)
if use_device_detection:
ul.release_daq_device(board_num)
def add_example_data(board_num, data_array, ao_range, num_chans, rate,
points_per_channel):
# Note that since we are using the SCALEDATA option, the values
# added to data_array are the actual voltage values that the device1000
# will output
array1 = [1,1.1,1.2,1.3,1.4,1.5,1.6,1.7,1.8,1.9,1.10,1.11,1.12,1.13,1.14,1.15,1.16,1.17,1.18,1.19,1.20,1.21,1.22,1.23,1.24,1.25,1.26,1.27,1.28,1.29,1.30,1.31,1.32,1.33,1.34,1.35,1.36,1.37,1.38,1.39,1.40,1.41,1.42,1.43,1.44,1.45,1.46,1.47,1.48,1.49,1.50,1.51,1.52,1.53,1.54,1.55,1.56,1.57,1.58,1.59,1.60,1.61,1.62,1.63,1.64,1.65,1.66,1.67,1.68,1.69,1.70,1.71,1.72,1.73,1.74,1.75,1.76,1.77,1.78,1.79,1.80,1.81,1.82,1.83,1.84,1.85,1.86,1.87,1.88,1.89,1.90,1.91,1.92,1.93,1.94,1.95,1.96,1.97,1.98,1.99,2.00,2.01,2.02,2.03,2.04,2.05,2.06,2.07,2.08,2.09,2.10,2.11,2.12,2.13,2.14,2.15,2.16,2.17,2.18,2.19,2.20,2.21,2.22,2.23,2.24,2.25,2.26,2.27,2.28,2.29,2.30,2.31,2.32,2.33,2.34,2.35,2.36]
array2 = [3.1,3.2,3.3,3.4,3.5,3.6,3.7,3.8,3.9,3.10,3.11,3.12,3.13,3.14,3.15,3.16,3.17,3.18,3.19,3.20,3.21,3.22,3.23,3.24,3.25,3.26,3.27,3.28,3.29,3.30,3.31,3.32,3.33,3.34,3.35,3.36,3.37,3.38,3.39,3.40,3.41,3.42,3.43,3.44,3.45,3.46,3.47,3.48,3.49,3.50,3.51,3.52,3.53,3.54,3.55,3.56,3.57,3.58,3.59,3.60,3.61,3.62,3.63,3.64,3.65,3.66,3.67,3.68,3.69,3.70,3.71,3.72,3.73,3.74,3.75,3.76,3.77,3.78,3.79,3.80,3.81,3.82,3.83,3.84,3.85,3.86,3.87,3.88,3.89,3.90,3.91,3.92,3.93,3.94,3.95,3.96,3.97,3.98,3.99,4.00,4.01,4.02,4.03,4.04,4.05,4.06,4.07,4.08,4.09,4.10,4.11,4.12,4.13,4.14,4.15,4.16,4.17,4.18,4.19,4.20,4.21,4.22,4.23,4.24,4.25,4.26,4.27,4.28,4.29,4.30,4.31,4.32,4.33,4.34,4.35,4.36]
# The daq accepts only one array
# The single array contains info for both channels
# The 1st value is for 1st channel, the 2nd value is for 2nd channel (interleaving)
pre_data_array = []
joined_array=list(zip(array1,array2))
frequency=200
for val in joined_array:
for freq in range(frequency):
if freq%2==0:
pre_data_array.extend([val[0],val[1]])
else:
pre_data_array.extend([-val[0],-val[1]])
data_index = 0
for channel_num in pre_data_array:
value = channel_num
raw_value = ul.from_eng_units(board_num, ao_range, value)
data_array[data_index] = raw_value
data_index += 1
frequencies = []
return pre_data_array
if __name__ == '__main__':
send_data_to_daq()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment