Skip to content

Instantly share code, notes, and snippets.

@aleksas
Last active November 24, 2023 12:59
Show Gist options
  • Save aleksas/e50d0567b03a762e2e46bf141be10868 to your computer and use it in GitHub Desktop.
Save aleksas/e50d0567b03a762e2e46bf141be10868 to your computer and use it in GitHub Desktop.
Analog-to-digital conversion and acquisition (Arduino DUE + Python)

Analog-to-digital conversion and acquisition (Arduino DUE + Python)

Copied and translated from original published by Frédéric Legrand (archived copy).

1. Introduction

This page shows how to operate the analog-to-digital converter of the Arduino Due, in order to perform sampled acquisitions. The sampling will be done by interrupts triggered by a stopwatch. The samples will be transmitted in a continuous stream to the computer, in order to be plotted in a graphics window or to undergo various treatments.

The arduino code presented can be used as a basis for an on-board application dealing with audio signals.

2. Analog-to-digital converter

2.a. Characteristics of the converter

The SAM3X8E microcontroller features a 12-bit analog-to-digital converter (ADC). The sampling frequency can in principle reach 1 MHz  . A multiplexer allows the use of up to 12 analog inputs (terminals A0 to A11). The voltages applied to these inputs must be between 0 and 3.3 V  . When the gain of the input amplifier is equal to 1, this voltage range gives numbers between 0 and 4095. More precisely, the converter operates with a reference voltage VREF (very close to 3.3 V ) and a voltage equal to VREF gives the number 4096. In practice, we often have to digitize alternating signals (for example an audio signal). Here is an analog circuit allowing to make the conversion, which we will call shifter circuit :

interfaceArduinoDue-ADC svg

Inputs IN0 and IN1 are connected to a unity gain inverting amplifier. The potentiometer is used to adjust the offset so that a zero voltage becomes 1.65 V  on inputs A0 and A1 of the arduino (which will give the median value 2048 after conversion). The amplifier is supplied directly from the 3.3 V  terminal of the arduino. It must be able to operate in rail to rail (full scale output); we have chosen the TL2372 dual amplifier for this. The voltages applied to the inputs must be between -1.65 V  and 1.65 V , which is roughly the maximum amplitude of a signal from the audio output of a computer. If exceeded, the amplifier saturates at 0 or 3.3 V  , which provides protection for inputs A0 and A1 (which are directly connected to the microcontroller).

2.b. ADC programming

To program the ADC, three programming levels can be used:

  • The analogRead function of the Arduino API.
  • The functions of the API Atmel, whose source code is in hardware / Arduino / Sat / system / libsam / source / adc.c .
  • Direct access to the microcontroller programming registers.

The analogRead function is much too slow to perform digitizations sampled at several kHz. We will therefore use the Atmel API, and direct access to the registers for certain operations.

To obtain the maximum sample rate (1 MHz), the ADC must be operated with direct memory access (DMA). We will use another method, consisting in triggering the conversions with a stopwatch (Timer), and in calling an interrupt function which will be responsible for storing the result of the conversion in a buffer. This method will allow you to perform operations on the samples (such as filtering). On the other hand, the maximum sampling frequency will be lower.

To configure the ADC, we start by activating the clock for this device, by calling the following function, defined in hardware / arduino / sam / system / libsam / source / pmc.c (Power management controler):

pmc_enable_periph_clk (ID_ADC);

The ADC is initialized by the following function (defined in adc.c ):

adc_init (ADC, VARIANT_MCK, ADC_FREQ_MAX, 0);

The second argument defines the frequency of the clock, here equal to the frequency of the main clock (84 MHz). The third argument defines the analog-to-digital conversion frequency, which is taken at its maximum value. The last argument is the start time of the ADC, here equal to 0.

The following function configures the ADC timing:

adc_configure_timing (ADC, 0, ADC_SETTLING_TIME_3, 1);

The second argument sets the tracking time , here equal to (0 + 1) clock ticks. The third argument is the settling time , equal here to 3 clock ticks (the other possible values ​​are 5.9 and 17). The last argument fixes the transfer time , here equal to (0 * 2 + 3) clock ticks.

The following function sets the resolution (10 or 12 bits). We choose 12 bits:

adc_set_resolution (ADC, ADC_12_BITS);

An interrupt should be generated at the end of each analog-to-digital conversion. It is therefore necessary to activate the interrupts for the ADC device:

NVIC_EnableIRQ (ADC_IRQn);

The NVIC_EnableIRQ function is part of the Cortex Microcontrolers Software Interface Standard (CMSIS) library, used by microcontrollers that include a Cortex microprocessor. The source code for SAM3 microcontrollers can be found in hardware / arduino / sam / system / CMSIS / CMSIS / Include / core_cm3.h .

Several types of interrupts can be associated with the ADC. The one we are interested in here is EOC (end of conversion), which is triggered when the conversion is complete. First, we deactivate all the interrupts by setting all the bits of the ADC_IDR (Interrupt Disable Register) register to 1:

ADC-> ADC_IDR = 0xFFFFFFFF;

All the channels are deactivated by setting all the bits of the ADC_CHDR (Channel Disable Register) register to 1:

DC-> ADC_CHDR = 0xFFFF;

then we set to 0 all the bits of the ADC_CHSR register (Channel Status Register):

ADC-> ADC_CHSR = 0x0000;

All the bits of the ADC_CHER (Channel Enable Register) register are set to 0:

ADC-> ADC_CHER = 0x0000;

To program the conversion on several channels, the most efficient method is to define a sequence of channels to be acquired successively. To do this, we write in the ADC_SEQR1 register (Sequence 1 register). The first 4 bits contain the number of the first channel to be digitized, the next 4 bits the number of the second channel to be digitized, etc. This 32-bit register therefore makes it possible to define a sequence of at most 8 channels. For each programmed channel, it must also be activated in the ADC_CHER register (Channel Enable Register). Suppose that the numbers of the channels to be acquired are in a channels array . Here is the loop that defines the sequence:

ADC-> ADC_SEQR1 = 0x00000000;
for (k = 0; k <nchannels; k ++) {
      ADC-> ADC_SEQR1 | = ((channels [k] & 0xF) << (4 * k));
      ADC-> ADC_CHER | = (1 << k);
}

The gain can be configured on each channel. The gains are defined in the ADC_CGR register (Channel Gain Register), at the rate of 2 bits for each channel (00: gain = 1, 01: gain = 1, 10: gain = 2, 11: gain = 4). In order to apply a gain (2 or 4) to an AC signal that has been shifted, the A / D converter can subtract the voltage VREF / 2 (in principle 1.65 V  ) before applying the gain. To do this, the bit corresponding to the channel must be activated in the ADC_COR register (Channel Offset Register). Suppose the gains are in a gain array. The following loop configures the gains then activates the offet if necessary:

ADC-> ADC_CGR = 0x00000000;
ADC-> ADC_COR = 0x00000000;
for (k = 0; k <nchannels; k ++) {
    if (gain [k] == 2) ADC-> ADC_CGR | = 2 << (2 * k);
    if (gain [k] == 4) ADC-> ADC_CGR | = 3 << (2 * k);
    if (offset [k]) ADC-> ADC_COR | = 1 << k;
}

The ADC operating mode is configured in the ADC_MR register:

#define ADC_MR_TRIG1 (1 << 1)
ADC-> ADC_MR = (ADC-> ADC_MR & 0xFFFFFFF0) | ADC_MR_TRIG1 | ADC_MR_TRGEN | ADC_MR_USEQ;

The ADC_MR_USEQ bit selects the use of the sequence defined in ADC_SEQR1 and ADC_SEQR2. The ADC_MR_TRGEN bit selects the hardware trigger of the conversion. The 3 bits ADC_MR_TRIG1 configure the triggering by the TIOA output of the Timer TC0 (oddly, the masks for triggering are not defined in the sources).

The last step is to activate triggering of an interrupt when the analog-to-digital conversion is complete. The ADC_IDR (Interrupt Disable Register) and ADC_IER (Interrupt Enable Register) registers are used for this. The first 16 bits of these two registers deactivate and activate the EOC (End Of Conversion) interrupt for each of the channels.

ADC-> ADC_IDR = ~ (1 << channels [0]);
ADC-> ADC_IER = 1 << channels [0]; 

The function called by the interrupt must be defined by:

void ADC_Handler () {
    // operation to do after a conversion
}

The result of the conversion must be read in this function, by reading the ADC_CDRn (Channel Data Register) registers. Here is how we access the result of the conversion for the channel defined in the element k of the channels array :

uint16_t x = * (ADC-> ADC_CDR + channels [k]);

At this stage, the ADC is not yet working because it is also necessary to configure and start the timer (Timer) which will trigger the ADC periodically.

3. Programming the stopwatch

The analog-to-digital converter (ADC) is programmed to be periodically triggered by a stopwatch. It is therefore necessary to configure a chronometer (or Timer) so that it generates a square wave whose period is equal to the sampling period.

The SAM3X8E microcontroller has 9 32-bit timer-counters (Timer-Counter or TC). Each has three lanes, each lane having its own counter. We are going to use channel 0 of the TC0 stopwatch. We start by activating the clock for this stopwatch:

uint32_t channel = 0;
pmc_enable_periph_clk (TC_INTERFACE_ID + 0 * 3 + channel);

To configure the timer, you can use the Atmel API functions defined in hardware / arduino / sam / system / libsam / source / tc.c , or you can directly access the registers. For our application, the functions of the Atmel API are sufficient, but it is still necessary to consult the documentation of the SAM3X microcontroller to know the configurations of the registers.

We start by choosing the clock frequency of the stopwatch:

uint8_t clock = TC_CMR_TCCLKS_TIMER_CLOCK1; // clock 84MHz / 2 = 42 MHz

For this choice, the frequency is half that of the main clock, ie 84/2 = 42 MHz. The other possible choices are TC_CMR_TCCLKS_TIMER_CLOCK2 (division by 8), TC_CMR_TCCLKS_TIMER_CLOCK3 (division by 32) and TC_CMR_TCCLKS_TIMER_CLOCK4 (division by 128). Knowing that the number of clock ticks for a sampling period is coded on 32 bits, a clock frequency of 42 MHz will make it possible to go down to one hundredth of a Hertz.

The stopwatch can run in catpure mode or in waveform mode . This last mode is used to generate a signal to trigger the ADC ( capture mode is used to make time or frequency measurements).

The actual counter of a channel consists of a 32-bit register called CV (Counter Value), which is incremented by one at each clock tick. For our choice of clock, there is a clock tick every 1/42 microsecond. There are also 3 32-bit registers named RA, RB and RC. The CV register can be compared with the values ​​stored in these registers to trigger different events, according to the threshold trigger principle. Here we choose the UP_RC mode(automatic trigger on RC compare), which resets CV to zero when the value of RC is reached. Each channel of a timer has two TIOA and TIOB inputs / outputs (with two states 0 and 1). We will be using TIOA as the output. The following figure shows the evolution of CV over time and the TIOA output:

RC-compare svg

The RC register contains the sampling period, expressed in number of ticks of the clock (which is at 42 MHz). The counter (CV) therefore returns to zero at this period. The RA register contains half of RC. To obtain the TIOA output given in the figure, it is necessary to choose TC_CMR_ACPC_SET (RC compare effect on TIOA = set) so that TIOA switches to the high level when CV reaches RC, and TC_CMR_ACPA_CLEAR (RA compares effect on TIOA = clear) so that TIOA switches to low level when CV reaches RA.

The periodic TIOA output thus generated can be used to drive a TTL digital output or to trigger an interrupt. In this case, the TIOA output triggers the analog-to-digital conversion (because the ADC has been configured to be driven by TIOA, with the trigger option ADC_MR_TRIG1).

Here is how the stopwatch is configured:

TC_Configure (TC0, channel, TC_CMR_WAVE | TC_CMR_WAVSEL_UP_RC | TC_CMR_ACPA_CLEAR | TC_CMR_ACPC_SET | clock); 

Suppose ticks contains the sample period in units of clock ticks , as a 32-bit unsigned integer. The values ​​of RC and RA are assigned by:

TC_SetRC (TC0, channel, ticks); 
TC_SetRA (TC0, channel, ticks >> 1);

The stopwatch is started by:

TC_Start (TC0, channel);

As soon as the stopwatch is running, the TIOA signal it generates triggers the analog-to-digital conversions. After each conversion, the ADC_Handler function is called by interrupt.

To stop the stopwatch:

TC_Stop (TC0, channel);
pmc_disable_periph_clk (TC_INTERFACE_ID + 0 * 3 + channel);

4. Storage of samples

The numbers obtained by analog-to-digital conversion must be stored in order to be transmitted through the USB port to the computer. To obtain a continuous flow of data to the computer, it is necessary to use a circular buffer . More precisely, for each channel to be acquired, NBUF buffers are used, each comprising BUF_SIZE samples. Here is the declaration of the array containing these buffers:

#define MAX_NCHAN 2
#define NBUF 0x10
#define NBUF_MASK 0xF // (NBUF-1)
#define BUF_SIZE 0x100
#define BUF_MASK 0xFF // (BUF_SIZE-1)

uint16_t buf [MAX_NCHAN] [NBUF] [BUF_SIZE];

The maximum number of channels is chosen here equal to 2. The number of buffers and the size of the buffers must be a power of 2. The samples are stored in a buffer in the ADC_Handler function. When the buffer is full, we write over the next one. When the last buffer is full, we write again on the first (this is why we speak of circular buffer). The program responsible for transferring the data via the serial port will be defined in the loop function. It will transfer a tampon as soon as it is full. The fact of having several buffers (here 16) makes it possible to cope with the variations in operating time which may arise during the transfer of data via the serial port. If the transfer is slightly slowed down (for example due to a slowdown in playback by the computer), sample storage can continue. In the example above, the table contains 2 * 16 * 256 = 8192 16-bit words, or 16 KB (out of an available total of 96 KB).

Access to an element of a buffer is made by a 16-bit global variable index_buf . Access to a buffer is via the 8-bit global variable compt_buf . Here is the content of the ADC_Handler function called by interrupt after each scan:

void ADC_Handler () {
  int k;
  for (k = 0; k <nchannels; k ++) {
    buf [k] [counter_buf] [index_buf] = * (ADC-> ADC_CDR + channels [k]);
  }
  buf_index ++;
  if (index_buf == BUF_SIZE) {// uint16_t index_buffer
    index_buf = 0;
    buf_counter = (buf_counter + 1) & NBUF_MASK;
  }
}

5. Data transmission

For the transfer of the samples, a global variable 16 bits counter_buf_transmis is used which indicates the next buffer to be transmitted. When there are buffers filled by ADC_Handler but not yet transmitted, this counter is different from counter_buf . Here is the data transfer:

int k;
if (buf_counter_transmitted! = buf_counter) {
    for (k = 0; k <nchannels; k ++) SerialUSB.write ((uint8_t *) buf [k] [counter_buf_transmis], BUF_SIZE * 2);
    transmission_buf_counter = (transmission_buf_counter + 1) & NBUF_MASK;
}

This block will be executed in the loop function .

6. Arduino program

We present here a complete program for the Arduino Due, which communicates with a computer through the native USB port (USB port managed by the microcontroller).

First of all, here is the header with the declaration of constants and global variables:

arduinoDueAcquisitionSignal.ino

#include "Arduino.h"

#define ACQUISITION 104
#define STOP_ACQUISITION 105
#define ADC_MR_TRIG1 (1 << 1)
uint8_t nchannels;
#define MAX_NCHAN 2
uint8_t channels [MAX_NCHAN];
uint8_t gain [MAX_NCHAN];
uint8_t offset [MAX_NCHAN];
uint8_t channels_pins [8] = {7,6,5,4,3,2,1,0};
#define NBUF 0x10
#define NBUF_MASK 0xF // (NBUF-1)
#define BUF_SIZE 0x100
#define BUF_MASK 0xFF // (BUF_SIZE-1)
uint16_t buf [MAX_NCHAN] [NBUF] [BUF_SIZE];
volatile uint8_t counter_buf, counter_buf_transmis;
volatile uint16_t index_buf;
uint32_t nblocks;
uint8_t sans_fin;
            
MAX_NCHAN is the maximum number of channels that can be used. This number can be increased provided that the buffer fits in the available memory. The channels_pins table contains the correspondence between the number of the terminal on the board (0 = A0, 1 = A1, etc.) and the number of the channel used by the ADC. For example, terminal A0 corresponds to channel 7.

The following function configures the ADC:

void config_adc () {
    int k;
    pmc_enable_periph_clk (ID_ADC);
    adc_init (ADC, VARIANT_MCK, ADC_FREQ_MAX, 0);
    adc_configure_timing (ADC, 0, ADC_SETTLING_TIME_3, 1);
    adc_set_resolution (ADC, ADC_12_BITS);
    NVIC_EnableIRQ (ADC_IRQn);   
    ADC-> ADC_IDR = 0xFFFFFFFF;  
    ADC-> ADC_CHDR = 0xFFFF; 
    ADC-> ADC_CHSR = 0x0000; 
    ADC-> ADC_WPMR & = ~ 0x1;
    ADC-> ADC_CHER = 0x0000;
    ADC-> ADC_SEQR1 = 0x00000000;
    ADC-> ADC_CGR = 0x00000000;
    ADC-> ADC_COR = 0x00000000;
    for (k = 0; k <nchannels; k ++) {
      ADC-> ADC_SEQR1 | = ((channels [k] & 0xF) << (4 * k));
      ADC-> ADC_CHER | = (1 << k);
      if (gain [k] == 2) ADC-> ADC_CGR | = 2 << (2 * k);
      if (gain [k] == 4) ADC-> ADC_CGR | = 3 << (2 * k);
      if (offset [k]) ADC-> ADC_COR | = 1 << k;
    }
    ADC-> ADC_MR = (ADC-> ADC_MR & 0xFFFFFFF0) | ADC_MR_TRIG1 | ADC_MR_TRGEN | ADC_MR_USEQ;
    ADC-> ADC_IDR = ~ (1 << channels [0]); 
    ADC-> ADC_IER = 1 << channels [0];
}
            
The following function configures and starts the stopwatch:

void config_timer (uint32_t ticks) {
  uint32_t channel = 0;
  uint8_t clock = TC_CMR_TCCLKS_TIMER_CLOCK1; // clock 84MHz / 2 = 42 MHz
  pmc_enable_periph_clk (TC_INTERFACE_ID + 0 * 3 + channel);
  TC_Configure (TC0, channel, TC_CMR_WAVE | TC_CMR_WAVSEL_UP_RC | TC_CMR_ACPA_CLEAR | TC_CMR_ACPC_SET | clock); 
  TC_SetRC (TC0, channel, ticks); 
  TC_SetRA (TC0, channel, ticks >> 1);
  TC_Start (TC0, channel);
}

The following function stops the stopwatch:

void stop_timer () {
    uint32_t channel = 0;
    TC_Stop (TC0, channel);
    pmc_disable_periph_clk (TC_INTERFACE_ID + 0 * 3 + channel);
}

Here is the function called during interruptions, that is, just after conversions:

void ADC_Handler () {
  int k;
  for (k = 0; k <nchannels; k ++) {
    buf [k] [counter_buf] [index_buf] = * (ADC-> ADC_CDR + channels [k]);
  }
  buf_index ++;
  if (index_buf == BUF_SIZE) {// uint16_t index_buffer
    index_buf = 0;
    buf_counter = (buf_counter + 1) & NBUF_MASK;
  }
}

The setup function opens the USB port and initializes the buffers:

void setup () {
   SerialUSB.begin (115200);
   int i, j, k;
   for (i = 0; i <NBUF; i ++)
     for (j = 0; j <BUF_SIZE; j ++)
        for (k = 0; k <MAX_NCHAN; k ++) 
            buf [k] [i] [j] = 2048;
   counter_buf = counter_buf_transmis = 0;
}

The ACQUISITION command sent by the computer must be followed by the following information:

  • The number of channels (8-bit integer).
  • The terminal numbers of these channels (8-bit integers).
  • The gains for these channels (8-bit integers).
  • The number of clock ticks for the sampling period (32-bit integer).
  • The number of sample blocks to be transmitted (32-bit integer). Each block has BUF_SIZE samples for each channel. If this number is zero, the acquisition is endless.

The following function reads this data and triggers the acquisition:

void read_acquisition () {
    uint32_t c1, c2, c3, c4;
    uint32_t ticks;
    int k;
    while (SerialUSB.available () <1) {};
    nchannels = SerialUSB.read ();
    while (SerialUSB.available () <nchannels) {};
    for (k = 0; k <nchannels; k ++) {
      if (k <MAX_NCHAN) channels [k] = channels_pins [SerialUSB.read ()]; 
    }
    while (SerialUSB.available () <nchannels) {};
    for (k = 0; k <nchannels; k ++) {
      if (k <MAX_NCHAN) gain [k] = SerialUSB.read (); 
    }
    while (SerialUSB.available () <nchannels) {};
    for (k = 0; k <nchannels; k ++) {
      if (k <MAX_NCHAN) offset [k] = SerialUSB.read (); 
    }
    if (nchannels> MAX_NCHAN) nchannels = MAX_NCHAN;
    while (SerialUSB.available () <4) {};
    c1 = SerialUSB.read ();
    c2 = SerialUSB.read ();
    c3 = SerialUSB.read ();
    c4 = SerialUSB.read ();
    ticks = ((c1 << 24) | (c2 << 16) | (c3 << 8) | c4);
    while (SerialUSB.available () <4) {};
    c1 = SerialUSB.read ();
    c2 = SerialUSB.read ();
    c3 = SerialUSB.read ();
    c4 = SerialUSB.read ();
    nblocks = ((c1 << 24) | (c2 << 16) | (c3 << 8) | c4);
    if (nblocks == 0) {
        without_fin = 1;
        nblocks = 1;
    }
    else without_end = 0;
    counter_buf = counter_buf_transmis = 0;
    index_buf = 0;
    config_adc ();
    config_timer (ticks);
}

The following function stops the acquisition:

void stop_acquisition () {
    ADC-> ADC_MR & = ~ (0x1);
    stop_timer ();
    counter_buf = counter_buf_transmis = 0;
}

The following function reads the USB port and acts if a command is present:

void lecture_series () {
   char com;
   if (SerialUSB.available ()> 0) {
        com = SerialUSB.read ();
        if (com == ACQUISITION) lecture_acquisition ();
        if (com == STOP_ACQUISITION) stop_acquisition ();
   }
}

Here is the loop function . It detects the presence of a full buffer that has not yet been transmitted. If necessary, it transmits this buffer then decrements the block counter in the case of an acquisition with a finite number of blocks (a block is the content of a buffer, ie BUF_SIZE samples).

void loop () {
    int k;
    if ((buf_counter_transmitted! = buf_counter) && (nblocks> 0)) {
        for (k = 0; k <nchannels; k ++) SerialUSB.write ((uint8_t *) buf [k] [counter_buf_transmis], BUF_SIZE * 2);
        transmission_buf_counter = (transmission_buf_counter + 1) & NBUF_MASK;
        if (sans_end == 0) {
            nblocks--;
            if (nblocks == 0) stop_acquisition ();
        }
    }
    else read_series ();
}

7. Python program

arduinoDueAcquisitionSignal.py

import serial
import numpy
import time
import threading

The python program (for python 3.x) includes a class whose functions allow you to send configuration data to the arduino.

The manufacturer opens communication with the arduino and defines constants, which must of course be identical to those defined in the arduino program.

Arduino class:
    def __init __ (self, port):
        self.ser = serial.Serial (port, 115200)
        time.sleep (2)
        self.ACQUISITION = 104
        self.STOP_ACQUISITION = 105
        self.clockFreq = 42.0e6 # clock frequency
        self.BLOCK_SIZE = 256
        self.BLOCK_SIZE_INT8 = self.BLOCK_SIZE * 2
        
    def close (self):
        self.ser.close ()

The following functions send an 8-bit, 16-bit, and 32-bit integer. The most significant bytes are sent first, according to the convention used in the arduino code presented above ( big endian convention ).

    def write_int8 (self, v):
        self.ser.write ((v & 0xFF) .to_bytes (1, byteorder = 'big'))

    def write_int16 (self, v):
        v = numpy.int16 (v)
        char1 = int ((v & 0xFF00) >> 8)
        char2 = int ((v & 0x00FF))
        self.ser.write ((char1) .to_bytes (1, byteorder = 'big'))
        self.ser.write ((char2) .to_bytes (1, byteorder = 'big'))
        
    def write_int32 (self, v):
        v = numpy.int32 (v)
        char1 = int ((v & 0xFF000000) >> 24)
        char2 = int ((v & 0x00FF0000) >> 16)
        char3 = int ((v & 0x0000FF00) >> 8)
        char4 = int ((v & 0x000000FF))
        self.ser.write ((char1) .to_bytes (1, byteorder = 'big'))
        self.ser.write ((char2) .to_bytes (1, byteorder = 'big'))
        self.ser.write ((char3) .to_bytes (1, byteorder = 'big'))
        self.ser.write ((char4) .to_bytes (1, byteorder = 'big'))  

The following function starts an acquisition. channel is the list of channels used. winnings is the list of corresponding winnings (1,2 or 4). offset is an array configuring the offset for each channel: 0 to not apply an offset before conversion, 1 to subtract VREF / 2 before conversion. nb is the number of blocks to acquire, which must be zero for endless acquisition.

    def lancer_acquisition (self, channels, gains, offsets, fechant, nb):
        ticks = int (self.clockFreq / fechant)
        self.write_int8 (self.ACQUISITION)
        self.nvoies = nv = len (channels)
        self.write_int8 (nv)
        for k in range (nv):
            self.write_int8 (channels [k])
        for k in range (nv):
            self.write_int8 (gains [k])
        for k in range (nv):
            self.write_int8 (offsets [k])
        self.write_int32 (ticks)
        self.write_int32 (nb)

The following function stops the acquisition in progress:

    def stopper_acquisition (self):
        self.write_int8 (self.STOP_ACQUISITION)

The following function reads a block (for one channel). It returns an array of dtype = numpy.float32 , which actually contains integers between 0 and 4095 (12-bit converion).

    def read (self):
        buf = self.ser.read (self.BLOCK_INT8_SIZE)
        data = numpy.zeros (self.BLOCK_SIZE, dtype = numpy.float32)
        j = 0
        for i in range (self.BLOCK_SIZE):
        	data [i] = buf [j] + 0x100 * buf [j + 1]
        	j + = 2
        return data

In some cases, we do not try to convert integers into voltage values ​​because the voltage is not the information sought. For example, for a microphone, it will suffice to subtract the average value corresponding to the zero level (in principle 2048). If you want to calculate the voltage, you have to take into account the existence of an offset and the gain (1,2 or 4). If an offset has been applied to the input voltage (for example with the circuit presented above), this offset must be equal to VREF / 2, which corresponds to the value 2048. If the gain is different from 1, the circuit which apply offset must be set so that this offset is effectively VREF / 2. To set this circuit, it is necessary to apply a zero voltage at the input of the circuit and then adjust the offset so that the value after conversion is equal to 2048 (to the nearest bit); the offset voltage (read on the voltmeter) gives the value of VREF / 2. If the gain is 1, you can use the offset you want and it is then easier to read the value obtained when the input voltage is zero. Yesx is the integer between 0 and 4095 and G is the gain of the converter (1,2 or 4) then the voltage is given by:

After the acquisition has been started, you must do an asynchronous reading of the data sent by the arduino. The objective is to process or display the signals in packets, each packet consisting of a certain number of blocks. The blocks are the units transmitted by the arduino, which here contain 256 samples. We are going to read the data sent by the arduino on an execution thread. This requires defining a class that inherits from threading.Thread . Here is the constructor of this class:

class AcquisitionThread (threading.Thread): 
    def __init __ (self, arduino, lanes, gains, offsets, fechant, nblocs, npaquets):
        threading.Thread .__ init __ (self)
        self.arduino = arduino
        self.nvoies = len (channels)
        self.ways = ways
        self.gains = earnings
        self.offsets = offsets
        self.fechant = fechant
        self.running = False
        self.nblocs = nblocs # number of blocks in a packet
        self.npaquets = npaquets
        self.block_size = arduino.BLOCK_SIZE
        self.package_size = self.block_size * self.nblocks
        self.data = numpy.zeros ((self.nvoies, self.nblocs * arduino.BLOCK_SIZE * self.npaquets))
        self.packet_counter = 0
        self.read_packages_counter = 0
        self.nechant = 0

nblocks is the number of blocks in a packet (each block contains BLOCK_SIZE = 256 samples) and npaquets and the number of packets to acquire. The samples will be stored in the self.data array , each row of which corresponds to a channel.

The run function is executed when launching the Thread. It consists of a loop that reads data from the arduino and stores it in self.data . This loop increments self.packet_counter each time a packet has been transmitted by the arduino.

    def run (self):
        self.arduino.lancer_acquisition (self.voies, self.gains, self.offsets, self.fechant, 0) # endless acquisition
        self.running = True
        block_index = 0
        while self.running:
            i = self.pack_counter * self.pack_size + block_index * self.block_size
            j = i + self.block_size
            for v in range (self.nvoies):
                self.data [v, i: j] = self.arduino.lecture ()
            self.nechant = j
            index_block + = 1
            if index_block == self.nblocs:
                block_index = 0
                self.packet_counter + = 1
                if self.paquets_counter == self.npaquets:
                    self.stop ()

The following function is used to stop the acquisition:

    def stop (self):
        self.running = False
        time.sleep (1)
        # self.join ()
        self.arduino.stopper_acquisition ()

The self.join () call waits for the exit of the run function .

Here is the function which allows to recover the last packet. If there is no package available yet, it returns -1 . Two consecutive calls to this function return two consecutive packets (or -1 ).

    def package (self): # get the last package
        if self.counter_paquets == self.counter_paquets_read:
            return -1
        i = self.read_package_meter * self.package_size
        j = i + self.package_size                    
        self.counter_read_packs + = 1
        return self.data [:, i: j]

In some cases, you don't want to wait until a package is available to get the last samples. The following function is used to retrieve the last number of samples. The function returns the instants, the samples and the corresponding duration. If the number of samples requested is not yet available, it returns the values ​​-1.

    def samples (self, number):
        j = self.nechant
        i = j-number
        if i <0: i = 0
        if ji <= 0: return (-1, -1, -1)
        time = numpy.arange (ji) /self.fechant
        tmax = number / self.fechant
        return (time, self.data [:, i: j], tmax)

8. Test with signal trace

The following test program performs an acquisition on two channels with a plot of the two signals in a matplotlib window managed by an animation. With the animate1 function , data is retrieved in packets at an interval of time equal to the time it takes to digitize a packet. With the animate2 function , we get the last number_echant samples every 50 ms  . For fast signals, it is better to use animate1 . For slow signals (whose packets are longer than one second), it is better to use animate2 because this allows the trace to be refreshed as the samples arrive.

testAcquisitionAnimate.py

import numpy
from matplotlib.pyplot import *
import matplotlib.animation as animation
from arduinoDueAcquisitionSignal import *

ard = Arduino ("COM9") 
fechant = 40,000
nblocks = 20 # number of blocks in a packet (one block = 256 samples)
package_time = nblocks * ard.BLOCK_SIZE / fechant
print ("duration of a packet =% f"% (duration of packet))
npackets = 200 # number of packages
print ("total duration =% f"% (duration_package * npackets))
nechant = ard.BLOCK_TAILLE * nblocks # number of samples in a package
delay = duration_package
number_echant = 1000 # number of samples read at each refresh (animate2)
t = numpy.arange (nechant) * 1.0 / fechant
fig, ax = subplots ()
line0, = ax.plot (t, numpy.zeros (nechant))
line1, = ax.plot (t, numpy.zeros (nechant))
ax.axis ([0, t.max (), - 100,5000]) # window for a packet
ax.grid ()

channels = [0.1]
G = 1
gains = [G, G]
offsets = [1,1]
acquisition = AcquisitionThread (ard, channels, gains, offsets, fechant, nblocs, npaquets)
acquisition.start ()

setting = False
if setting:
    ax.axis ([0, t.max (), 2000,2096])
    ax.plot ([0, t.max ()], [2048,2048])
conversion = True
if conversion:
    VREF = 1.646 * 2 # to be read on the voltmeter
    conv = VREF / (2 * G) / 2048
    ax.axis ([0, t.max (), - VREF / 2, VREF / 2])

def animate1 (i):
    global line0, line1, acquisition, signal, conversion, conv
    data = acquisition.package ()
    if isinstance (data, int) == False:
        if conversion:
            data [0] = (data [0] -2048) * conv
            data [1] = (data [1] -2048) * conv
        line0.set_ydata (data [0])
        line1.set_ydata (data [1])
        
            

def animate2 (i):
    global line0, line1, acquisition, conversion, VREF, conv
    (time, data, tmax) = acquisition.samples (number_sample)
    if isinstance (data, int) == False:
        if conversion:
            data [0] = (data [0] -2048) * conv
            data [1] = (data [1] -2048) * conv
            ax.axis ([0, tmax, -VREF / 2, VREF / 2])
        else:
            ax.axis ([0, tmax, -100,5000])
        line0.set_ydata (data [0])
        line1.set_ydata (data [1])
        line0.set_xdata (time)
        line1.set_xdata (time)
ani = animation.FuncAnimation (fig, animate1,100, interval = delai * 1000)
#ani = animation.FuncAnimation (fig, animate2,100, interval = 50)
show()
acquisition.stop ()
ard.close ()
numpy.savetxt ("signals-1.txt", acquisition.data.T, delimiter = "\ t", fmt = "% 0.4e")
show()      

Let's see how to do the tuning and calibrating. We must connect a voltmeter between the mass and the A0 input of the arduino. The IN0 input of the shifter being connected to ground, turning the potentiometer so that the voltage A0 on the input or to 1.65 V  . We set adjustment = True in the script above, which allows to precisely visualize the difference between the values ​​of the samples and the value 2048 (the quantization noise is clearly visible). It is necessary to turn the potentiometer of the shifter circuit so that the average value is at 2048. The voltage read on the voltmeter on input A0 is then VREF / 2. The value of VREF should be reported in the above script.

If conversion = True the values ​​converted to voltage are plotted. Otherwise, whole numbers (between 0 and 4095) are plotted. The choice of an offset 1 for a channel makes it possible to amplify (by a gain of 2 or 4) the alternating signal applied to the input IN0 or IN1 of the shifter circuit. If one wishes to process a signal with positive values ​​(for example that coming from a photodiode), one does not use a shifter circuit and the gain is applied without offset.

To test the correct operation of the sampling, we digitize a sinusoid of amplitude 1 V  and frequency 200.0 Hz  (sampling frequency 40 kHz  , duration 12.8 s  ) then we carry out the spectral analysis as next :

import numpy
from matplotlib.pyplot import *
import scipy.signal

def spectrum (t, u):
    N = len (u)
    te = t [1] -t [0]
    zeros = numpy.zeros (6 * N)
    U = numpy.concatenate ((u * scipy.signal.blackman (N), zeros))
    NN = len (U)
    spectrum = numpy.absolute (numpy.fft.fft (U)) * 2.0 / N / 0.42
    freq = numpy.arange (NN) * 1.0 / (NN * te)
    return (freq, spectrum)

fe = 40,000
te = 1 / fe
[u0, u1] = numpy.loadtxt ("signals-1.txt", unpack = True)
N = len (u0)
t = numpy.arange (N) * te
(f, A) = spectrum (t, u0)


figure ()
plot (f, A)
xlabel ("f (Hz)")
ylabel ("A (V)")
xlim (195,205)
grid ()

show()       

test_sinus

Creative Commons License Texts and figures are made available under a Creative Commons license.

#include "Arduino.h"
#define ACQUISITION 104
#define STOP_ACQUISITION 105
#define ADC_MR_TRIG1 (1<<1)
uint8_t nchannels;
#define MAX_NCHAN 2
uint8_t channels[MAX_NCHAN];
uint8_t gain[MAX_NCHAN];
uint8_t offset[MAX_NCHAN];
uint8_t channels_pins[8] ={7,6,5,4,3,2,1,0};
#define NBUF 0x10
#define NBUF_MASK 0xF // (NBUF-1)
#define BUF_SIZE 0x100
#define BUF_MASK 0xFF // (BUF_SIZE-1)
uint16_t buf[MAX_NCHAN][NBUF][BUF_SIZE];
volatile uint8_t compteur_buf,compteur_buf_transmis;
volatile uint16_t indice_buf;
uint32_t nblocs;
uint8_t sans_fin;
void config_adc() {
int k;
pmc_enable_periph_clk(ID_ADC);
adc_init(ADC, VARIANT_MCK, ADC_FREQ_MAX, 0);
adc_configure_timing(ADC, 0, ADC_SETTLING_TIME_3, 1);
adc_set_resolution(ADC,ADC_12_BITS);
NVIC_EnableIRQ (ADC_IRQn) ;
ADC->ADC_IDR=0xFFFFFFFF ;
ADC->ADC_CHDR = 0xFFFF;
ADC->ADC_CHSR = 0x0000;
ADC->ADC_WPMR &= ~0x1;
ADC->ADC_CHER = 0x0000;
ADC->ADC_SEQR1 = 0x00000000;
ADC->ADC_CGR = 0x00000000;
ADC->ADC_COR = 0x00000000;
for (k=0; k<nchannels; k++) {
ADC->ADC_SEQR1 |= ((channels[k]&0xF)<<(4*k));
ADC->ADC_CHER |= (1<<k);
if (gain[k]==2) ADC->ADC_CGR |= 2 << (2*k);
if (gain[k]==4) ADC->ADC_CGR |= 3 << (2*k);
if (offset[k]) ADC->ADC_COR |= 1 << k;
}
ADC->ADC_MR = (ADC->ADC_MR & 0xFFFFFFF0) | ADC_MR_TRIG1 | ADC_MR_TRGEN | ADC_MR_USEQ;
ADC->ADC_IDR=~(1<<channels[0]);
ADC->ADC_IER=1<<channels[0];
}
void config_timer(uint32_t ticks) {
uint32_t channel = 0;
uint8_t clock = TC_CMR_TCCLKS_TIMER_CLOCK1; // horloge 84MHz/2=42 MHz
pmc_enable_periph_clk (TC_INTERFACE_ID + 0*3+channel) ;
TC_Configure(TC0,channel, TC_CMR_WAVE | TC_CMR_WAVSEL_UP_RC | TC_CMR_ACPA_CLEAR | TC_CMR_ACPC_SET |clock);
TC_SetRC(TC0,channel,ticks);
TC_SetRA(TC0,channel,ticks >> 1);
TC_Start(TC0, channel);
}
void stop_timer() {
uint32_t channel = 0;
TC_Stop(TC0,channel);
pmc_disable_periph_clk (TC_INTERFACE_ID + 0*3+channel);
}
void ADC_Handler() {
int k;
for (k=0; k<nchannels; k++) {
buf[k][compteur_buf][indice_buf] = *(ADC->ADC_CDR+channels[k]);
}
indice_buf++;
if (indice_buf == BUF_SIZE) { // uint16_t indice_buffer
indice_buf = 0;
compteur_buf = (compteur_buf+1)&NBUF_MASK;
}
}
void setup() {
SerialUSB.begin(115200);
int i,j,k;
for (i=0; i<NBUF; i++)
for (j=0; j<BUF_SIZE; j++)
for (k=0; k<MAX_NCHAN; k++)
buf[k][i][j] = 2048;
compteur_buf = compteur_buf_transmis = 0;
}
void lecture_acquisition() {
uint32_t c1,c2,c3,c4;
uint32_t ticks;
int k;
while (SerialUSB.available()<1) {};
nchannels = SerialUSB.read();
while (SerialUSB.available()<nchannels) {};
for (k=0; k<nchannels; k++) {
if (k < MAX_NCHAN) channels[k] = channels_pins[SerialUSB.read()];
}
while (SerialUSB.available()<nchannels) {};
for (k=0; k<nchannels; k++) {
if (k < MAX_NCHAN) gain[k] = SerialUSB.read();
}
while (SerialUSB.available()<nchannels) {};
for (k=0; k<nchannels; k++) {
if (k < MAX_NCHAN) offset[k] = SerialUSB.read();
}
if (nchannels > MAX_NCHAN) nchannels = MAX_NCHAN;
while (SerialUSB.available()<4) {};
c1 = SerialUSB.read();
c2 = SerialUSB.read();
c3 = SerialUSB.read();
c4 = SerialUSB.read();
ticks = ((c1 << 24) | (c2 << 16) | (c3 << 8) | c4);
while (SerialUSB.available()<4) {};
c1 = SerialUSB.read();
c2 = SerialUSB.read();
c3 = SerialUSB.read();
c4 = SerialUSB.read();
nblocs = ((c1 << 24) | (c2 << 16) | (c3 << 8) | c4);
if (nblocs==0) {
sans_fin = 1;
nblocs = 1;
}
else sans_fin = 0;
compteur_buf=compteur_buf_transmis=0;
indice_buf = 0;
config_adc();
config_timer(ticks);
}
void stop_acquisition() {
ADC->ADC_MR &= ~(0x1);
stop_timer();
compteur_buf=compteur_buf_transmis=0;
}
void lecture_serie() {
char com;
if (SerialUSB.available()>0) {
com = SerialUSB.read();
if (com==ACQUISITION) lecture_acquisition();
if (com==STOP_ACQUISITION) stop_acquisition();
}
}
void loop() {
int k;
if ((compteur_buf_transmis!=compteur_buf)&&(nblocs>0)) {
for (k=0; k<nchannels; k++) SerialUSB.write((uint8_t *)buf[k][compteur_buf_transmis],BUF_SIZE*2);
compteur_buf_transmis=(compteur_buf_transmis+1)&NBUF_MASK;
if (sans_fin==0) {
nblocs--;
if (nblocs==0) stop_acquisition();
}
}
else lecture_serie();
}
import serial
import numpy
import time
import threading
class Arduino:
def __init__(self,port):
self.ser = serial.Serial(port,115200)
time.sleep(2)
self.ACQUISITION = 104
self.STOP_ACQUISITION = 105
self.clockFreq = 42.0e6 # frequence d'horloge
self.TAILLE_BLOC = 256
self.TAILLE_BLOC_INT8 = self.TAILLE_BLOC*2
def close(self):
self.ser.close()
def write_int8(self,v):
self.ser.write((v&0xFF).to_bytes(1,byteorder='big'))
def write_int16(self,v):
v = numpy.int16(v)
char1 = int((v & 0xFF00) >> 8)
char2 = int((v & 0x00FF))
self.ser.write((char1).to_bytes(1,byteorder='big'))
self.ser.write((char2).to_bytes(1,byteorder='big'))
def write_int32(self,v):
v = numpy.int32(v)
char1 = int((v & 0xFF000000) >> 24)
char2 = int((v & 0x00FF0000) >> 16)
char3 = int((v & 0x0000FF00) >> 8)
char4 = int((v & 0x000000FF))
self.ser.write((char1).to_bytes(1,byteorder='big'))
self.ser.write((char2).to_bytes(1,byteorder='big'))
self.ser.write((char3).to_bytes(1,byteorder='big'))
self.ser.write((char4).to_bytes(1,byteorder='big'))
def lancer_acquisition(self,voies,gains,offsets,fechant,nb):
ticks = int(self.clockFreq/fechant)
self.write_int8(self.ACQUISITION)
self.nvoies = nv = len(voies)
self.write_int8(nv)
for k in range(nv):
self.write_int8(voies[k])
for k in range(nv):
self.write_int8(gains[k])
for k in range(nv):
self.write_int8(offsets[k])
self.write_int32(ticks)
self.write_int32(nb)
def stopper_acquisition(self):
self.write_int8(self.STOP_ACQUISITION)
def lecture(self):
buf = self.ser.read(self.TAILLE_BLOC_INT8)
data = numpy.zeros(self.TAILLE_BLOC,dtype=numpy.float32)
j = 0
for i in range(self.TAILLE_BLOC):
data[i] = buf[j]+0x100*buf[j+1]
j += 2
return data
class AcquisitionThread(threading.Thread):
def __init__(self,arduino,voies,gains,offsets,fechant,nblocs,npaquets):
threading.Thread.__init__(self)
self.arduino = arduino
self.nvoies = len(voies)
self.voies = voies
self.gains = gains
self.offsets = offsets
self.fechant = fechant
self.running = False
self.nblocs = nblocs # nombre de blocs dans un paquet
self.npaquets = npaquets
self.taille_bloc = arduino.TAILLE_BLOC
self.taille_paquet = self.taille_bloc * self.nblocs
self.data = numpy.zeros((self.nvoies,self.nblocs*arduino.TAILLE_BLOC*self.npaquets))
self.compteur_paquets = 0
self.compteur_paquets_lus = 0
self.nechant = 0
def run(self):
self.arduino.lancer_acquisition(self.voies,self.gains,self.offsets,self.fechant,0) # acquisition sans fin
self.running = True
indice_bloc = 0
while self.running:
i = self.compteur_paquets*self.taille_paquet +indice_bloc*self.taille_bloc
j = i+self.taille_bloc
for v in range(self.nvoies):
self.data[v,i:j] = self.arduino.lecture()
self.nechant = j
indice_bloc += 1
if indice_bloc==self.nblocs:
indice_bloc = 0
self.compteur_paquets += 1
if self.compteur_paquets==self.npaquets:
self.stop()
def stop(self):
self.running = False
time.sleep(1)
#self.join()
self.arduino.stopper_acquisition()
def paquet(self): # obtention du dernier paquet
if self.compteur_paquets==self.compteur_paquets_lus:
return -1
i = self.compteur_paquets_lus*self.taille_paquet
j = i+self.taille_paquet
self.compteur_paquets_lus += 1
return self.data[:,i:j]
def echantillons(self,nombre):
j = self.nechant
i = j-nombre
if i<0: i=0
if j-i<=0: return (-1,-1,-1)
temps = numpy.arange(j-i)/self.fechant
tmax = nombre/self.fechant
return (temps,self.data[:,i:j],tmax)
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
import numpy
from matplotlib.pyplot import *
import matplotlib.animation as animation
from arduinoDueAcquisitionSignal import *
ard = Arduino("COM9")
fechant = 40000
nblocs = 20 # nombre de blocs dans un paquet (un bloc = 256 échantillons)
duree_paquet = nblocs*ard.TAILLE_BLOC/fechant
print("durée d'un paquet = %f"%(duree_paquet))
npaquets = 200 # nombre de paquets
print("durée totale = %f"%(duree_paquet*npaquets))
nechant = ard.TAILLE_BLOC*nblocs # nombre d'échantillons dans un paquet
delai = duree_paquet
nombre_echant = 1000 # nombre d'échantillons lus à chaque rafraichissement (animate2)
t = numpy.arange(nechant)*1.0/fechant
fig,ax = subplots()
line0, = ax.plot(t,numpy.zeros(nechant))
line1, = ax.plot(t,numpy.zeros(nechant))
ax.axis([0,t.max(),-100,5000]) # fenêtre pour un paquet
ax.grid()
voies = [0,1]
G=1
gains = [G,G]
offsets = [1,1]
acquisition = AcquisitionThread(ard,voies,gains,offsets,fechant,nblocs,npaquets)
acquisition.start()
reglage = False
if reglage:
ax.axis([0,t.max(),2000,2096])
ax.plot([0,t.max()],[2048,2048])
conversion = True
if conversion:
VREF = 1.646*2 # à lire au voltmètre
conv = VREF/(2*G)/2048
ax.axis([0,t.max(),-VREF/2,VREF/2])
def animate1(i):
global line0,line1,acquisition,signal,conversion,conv
data = acquisition.paquet()
if isinstance(data,int)==False:
if conversion:
data[0] = (data[0]-2048)*conv
data[1] = (data[1]-2048)*conv
line0.set_ydata(data[0])
line1.set_ydata(data[1])
def animate2(i):
global line0,line1,acquisition,conversion,VREF,conv
(temps,data,tmax) = acquisition.echantillons(nombre_echant)
if isinstance(data,int)==False:
if conversion:
data[0] = (data[0]-2048)*conv
data[1] = (data[1]-2048)*conv
ax.axis([0,tmax,-VREF/2,VREF/2])
else:
ax.axis([0,tmax,-100,5000])
line0.set_ydata(data[0])
line1.set_ydata(data[1])
line0.set_xdata(temps)
line1.set_xdata(temps)
ani = animation.FuncAnimation(fig,animate1,100,interval=delai*1000)
#ani = animation.FuncAnimation(fig,animate2,100,interval=50)
show()
acquisition.stop()
ard.close()
numpy.savetxt("signaux-1.txt",acquisition.data.T,delimiter="\t",fmt="%0.4e")
show()
@Tantoh-Moussa
Copy link

I will like to first thank you for this.
Please I'm new in this domain and have never programmed before.
I want to program an earthquake detector (Seismograph as I call it), please I will like you to help guide me with the various steps on how to do it.
Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment