Skip to content

Instantly share code, notes, and snippets.

@Sumalyo
Last active February 29, 2024 10:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Sumalyo/1fbb17299c3dd3195cde10786409dd27 to your computer and use it in GitHub Desktop.
Save Sumalyo/1fbb17299c3dd3195cde10786409dd27 to your computer and use it in GitHub Desktop.
GSoC 2023 Project Report

header C++ Badge Python Plotly daqling Badge

Real-Time Lossless Data Compression for the FASER Experiment

Author : Sumalyo Datta
Project : FASER
Mentors : Claire Antel and Brian Petersen
Date : 28.08.2023
Project Proposal : Link to Proposal

Introduction

The FASER experiment is an LHC experiment located in a tunnel parallel to the LHC ring. It is considerably smaller and low-budget compared to the titan LHC experiments such as ATLAS and CMS. The experiment seeks to detect new long-lived particles that have travelled half a kilometre from the LHC proton-proton collision site at the center of the ATLAS experiment, escaping the ATLAS detector undetected. During proton collisions at the LHC, the FASER experiment records up to 1500 events per second using an open-source data acquisition (DAQ) software framework developed at CERN. The DAQ software receives dedicated data fragments from subcomponents on the detector, packs them into a single event and writes completed events to a file. The assigned total storage space for the experiment was however quickly met, and already resulted in a doubling of requested storage space. The challenge was to develop a compression engine that would compress data in real time, introducing minimal latency and performance overheads. The compression engine would also have to decompress events transparently for data reconstruction for physics analysis.

FASER has a trigger-based data acquisition model, and the DAQ software is developed using the DAQling framework.

DAQling is an open-source lightweight C++ software framework that can be used as the core of data acquisition systems of small and medium-sized experiments. It provides a modular system in which custom applications can be plugged-in. It also provides the communication layer based on the widespread ZeroMQ messaging library, error logging using ERS, configuration management based on the JSON format, control of distributed applications and extendable operational monitoring with web-based visualization.

You can read more about the strategy in the paper published by the FASER collaboration.

The first part of the project involved the exploration of various open-source lossless compression libraries and the development of a standalone compressor for raw files, which can be used to record performance metrics (such as compression ratio and compression speed) to help determine the best compressor that could be used in the engine. You can read more about the exploration of compression algorithms in this medium blog

After exploration of various algorithms, the next task was to develop a compression engine module that could be integrated into the full FASER DAQ system and can be configured to obtain a compressed file for physics events directly from the acquisition system. The challenge was to design a module that could handle high throughput working at high event rates (as high as 4kHz). The module would publish relevant metrics on the Monitoring dashboard, such as the compression ratio.

DashBoard1
Fig 1: A Glimpse of The Metrics Dashboard

The Community Bonding Period

Before the coding period, the plan for contributions was roughly chalked out during the community bonding phase. The development setup was finalized during this phase. A Docker-based setup was planned as it helped replicate the production environment and made managing packages and dependencies much more manageable and streamlined. I found some issues with the existing docker image and worked with the mentors to add services like supervisord and reddis to the container configuration. To add support for data compression, additional dev packages were added to aid in building the prototype implementation. Code for the new Dockerfile can be found here : master branch on faser-docker
Merge Request: MR #2
MR_Merged

Raw File Compressor

In the first phase of the project, a standalone utility application eventCompress Link to Code was developed which can be used to compress existing raw data files and record performance metrics during the process. It can also do decompression tests and record metrics like decompression speed.

Usage: eventCompress [-n nEventsMax -l -d -s -o <output_file> -c <config_file> ] <input_raw_filename> 
   -n <no. events>:   Run Compression for n events only (optional)
   -l --enableLog:    Enable logging for metrics and benchmarks (optional) 
   -d --decomress:    Run Decompression tests (optional)
   -s --silent:       suupress logs (optional)
   -o --write:        specify file to write out to
   -c --config:       specify file to read compressor config

The idea here is to compress event by event reading from a raw file and alter the header information to indicate compression and specify the compression algorithm used. As each event was compressed individually, the time taken for compression and the ratio of the uncompressed size to the compressed size are recorded for each event. These are logged in a JSON file, which can be analysed with a Python notebook to visualise the data as graphs and to understand performance tradeoffs. You can view a sample report here.
Merge Request: MR #50
MR_Merged
The logs and the python based analysis notebooks can be found at this repo. Please refer to the README for additional information.

The Best Candidate

Chart
Fig 2 Average Compression Ratio vs Average Compression Speed

The approach for finding the best compressor is described below

  • Events were divided into a set of 10 classes based on event size (Class 0 having events of smallest size)
  • For each event class, the average compression speed and compression ratio were calculated (for each compressor)
  • The resulting points were plotted on a graph, and this helped to visualize the tradeoff. The compressor configuration offering the highest average compression ratio at the highest compression speed was considered optimal.

After running several experiments with recorded physics data, it was determined that [ZSTD](https://github.com/facebook/zstd) was the best compressor. The __compression levels 3 and 5__ were observed to be the most optimal configuration for implementation.

The Compression Engine

Chart
Fig 3 Schematic Diagram of the Implementation of the Compression Module

The final phase of the project was to develop a compression engine module with the DAQling framework that could be added to an existing configuration to compress physics events before they are written out. The EventBuilder module is responsible for collecting the data fragments from event sources (like the tracking detectors) and building events that are sent to the FileWriter Module. The Compression engine sits in between these modules, receiving events from the Eventbuilder, compressing them and sending them out to the FileWriter module.

It's designed in a non-blocking manner, with an internal buffer to store incoming events and then reading events from it to compress and send out. It utilizes multithreading to achieve this by using the producer-consumer queues provided by the Folly library. Essential parts of the design are laid out in the following sections.

std::array<unsigned int, 2> tids = {threadid++, threadid++};
    const auto & [ it, success ] =
        m_compressionContexts.emplace(0, std::forward_as_tuple(queue_size, std::move(tids)));
    assert(success);

    // Start the context's consumer thread.
    std::get<ThreadContext>(it->second)
    .consumer.set_work(&CompressionEngineModule::flusher,this, // Threaded function
    std::ref(std::get<PayloadQueue>(it->second)), // Primary
    Compressor,m_buffer_size // Auxiliary Argumets;
    );
  assert(m_compressionContexts.size() == 1); // Only one context may be created

Here the threads are spawned and associated with the queues which would be used to store and compress events.

 for (auto & [ chid, ctx ] : m_compressionContexts) {
  std::get<ThreadContext>(ctx).producer.set_work([&]() {
  auto &pq = std::get<PayloadQueue>(ctx);
  auto receiverChannel = m_config.getConnections(m_name)["receivers"][0]["chid"];
  while (m_run)
  {
    DataFragment<daqling::utilities::Binary> blob;
    auto chid = receiverChannel; // set to 0 for Physics data
    while (!m_connections.receive(chid, blob) && m_run)
    {
      if (m_statistics) {
      m_payload_queue_size = pq.sizeGuess();
      }
      std::this_thread::sleep_for(1ms);
    }
    DEBUG(" Received " << blob.size() << "B payload on channel: " << chid);
    if (pq.sizeGuess() > 990)
    {
      WARNING("Payload Queue is filling up !!");
    }
    while (m_run && !pq.write(blob));
    if (m_statistics && blob.size()) {
      m_events_received++;
    }
  }
  });

Here the section of the code is highlighted that is used to obtain events from the EventBuilder and write the payloads to the Producer queue.

compressorUsedFlush->setCompressionLevel(compressionLevel);
    auto senderChannel = m_config.getConnections(m_name)["senders"][0]["chid"];
  const auto flushVector = [&](std::vector<DataFragment<daqutils::Binary>> &eventBufferVector) {
    float uncompressed_size = 0;
    float compressed_size = 0;
    for (auto & blob : eventBufferVector)
    {
    eventGot = std::make_unique<DAQFormats::EventFull>(blob.data<uint8_t *>(), blob.size());
    uncompressed_size+=eventGot->payload_size();
    if(compressorUsedFlush->compress(eventGot) == false)
    {
      m_status = STATUS_ERROR;
    }
    compressed_size+=eventGot->payload_size();
    int channel = senderChannel; // Channel to send out compressed data 
    auto *bytestream = eventGot->raw();
    DataFragment<daqling::utilities::Binary> binData(bytestream->data(), bytestream->size());
    DEBUG("Sending event " << eventGot->event_id() << " - " << eventGot->size() << " bytes on channel " << channel);
    m_events_sent++;
    m_connections.send(channel, binData); // to file writer
    delete bytestream;
    }
    eventBufferVector.clear();
    if (compressed_size!=0)
    {m_compression_ratio.store(uncompressed_size/compressed_size);}
  };
  while (!m_stopWriters) {
    while (pq.isEmpty() && !m_stopWriters) { // wait until we have something to write
      flushVector(eventBuffer);
      std::this_thread::sleep_for(1ms);
    };

  if (m_stopWriters) {
    flushVector(eventBuffer);
    return;
  }

  auto payload = pq.frontPtr();
  DataFragment<daqutils::Binary> eventProcess(payload->data(), payload->size());
  eventBuffer.push_back(eventProcess);
  eventProcess = DataFragment<daqutils::Binary>();
  if(eventBuffer.size() > maxBufferedEvents )
  {
    flushVector(eventBuffer);
  }
  pq.popFront();
  
  }

This section of the code is used to send out the compressed events.

Merge Request: MR #203
MR_Merged

Decompression On-the-Fly

Another essential requirement of the project was to implement support for decompression in a way that would be abstracted from the user. The user should not bother about a file containing compressed events or raw events; all the software used for analysis and reconstruction should be compatible with the compressed files. For this, the loadPayload() method was altered to identify compressed events and decompress events on the fly.

 void loadPayload(const uint8_t *data, size_t datasize) {
    std::vector<uint8_t> decompressed_data_vector;
    if (header.status & Compressed) // Compressed Event Detected
      { 
        // DEBUG("A Compressed Event is being read");
        
        // Detect Compression Algorithm And Decompress
        if (decompressPayload(header.compression_code,data,static_cast<size_t>(datasize),decompressed_data_vector ))
        {
          data = &decompressed_data_vector[0];
          datasize = decompressed_data_vector.size();
          processCompressedData(datasize);
        }
        else
        {
            THROW(CompressionDataException,"DECOMPRESSION FAILED SKIPPING EVENT READ");
        }
      }
    if (datasize != header.payload_size) {
    THROW(EFormatException, "Payload size does not match header information");
    }
    for(int fragNum=0;fragNum<header.fragment_count;fragNum++) {
        EventFragment *fragment=new EventFragment(data,datasize,true);
        data+=fragment->size();
        datasize-=fragment->size();
        fragments[fragment->source_id()]=fragment;
      }
    }

The offline reconstruction software calypso was recompiled with support for handling compressed events and tested. It performed quite smoothly with compressed files passed as input.

High Event Rate Tests

The viability of the Compression Engine could only be determined by high event rate tests on the full FASER DAQ system. This was planned in the final days of the project. The Compression Engine would be subjected to a calibration LED Random Trigger, simulating conditions during proton-proton collisions in the LHC. The code was compiled on a spare production server and hooked to the FASER TDAQ system. The module published metrics indicating the compression ratio, the number of events received and sent out, and the internal queue size. The event rate was also monitored. The objective was to verify that there were no bottlenecks due to compression and that the module dropped no events.

TriggerDashboard
Fig 4 The FASER Trigger Dashboard during High Rate Tests

A compression level 5 was tested for the ZSTD Compressor

TriggerDashboard
Fig 5 The Compression Module Metrics Dashboard

It can be seen that the event rate was pushed up to 4kHz, and the average compression ratio reported was around 2. This means that the file sizes were halved during data acquisition. The internal queue with a capacity of about 1000 , only filled up to 200, which proved that there was no performance bottlenecks. No messages were dropped during compression. The Sender and the receiver queues were fairly freed up, indicating that the non-blocking multi-threaded approach was implemented successfully.

Conclusion

Contributing to the project for the last four months was really exciting. This was a great learning experience for me as I got to work on everything from DevOps to python scripting and multi-threaded C++. The studies done to determine the best compression engine was quite insightful and, I got to know a lot about how the design of a compressor affects performance and how it adapts to physics event data. Implementing the compression engine module was quite interesting as it helped me understand how to optimize my code for high throughput systems. It was really exciting to test out my code on the actual FASER experiment (down at the LHC ! ) setup with a full readout. I would like to thank my mentors, Brian Petersen and Claire Antel, for their guidance and support. It was a great experience implementing the real-time lossless data compression engine for the FASER experiment.


footer

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment