Skip to content

Instantly share code, notes, and snippets.

@fishmingyu
Last active November 26, 2023 20:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fishmingyu/cdc78a4126e5cf03e07348388536e608 to your computer and use it in GitHub Desktop.
Save fishmingyu/cdc78a4126e5cf03e07348388536e608 to your computer and use it in GitHub Desktop.
OpenMP segreduce

Simple Segment reduce in OpenMP

Compile

g++ -fopenmp segreduce.cpp -o segreduce

Test Results

Total time for 10 repetitions: 27 microseconds Average time per function call:(parallel) 2.7 microseconds Total time for 10 repetitions: 78 microseconds Average time per function call:(sequential)7.8 microseconds Total time for 10 repetitions: 8483 microseconds Average time per function call:(atomic)848.3 microseconds

Architecture:            x86_64
  CPU op-mode(s):        32-bit, 64-bit
  Address sizes:         48 bits physical, 48 bits virtual
  Byte Order:            Little Endian
CPU(s):                  16
  On-line CPU(s) list:   0-15
Vendor ID:               AuthenticAMD
  Model name:            AMD Ryzen 7 5800X 8-Core Processor
    CPU family:          25
    Model:               33
    Thread(s) per core:  2
    Core(s) per socket:  8
#pragma once
#include <algorithm>
#include <atomic>
#include <cmath>
#include <cstdlib>
#include <limits>
#include <omp.h>
template <typename T> struct Welford {
T mean = T(0);
T m2 = T(0);
T weight = T(0);
};
template <typename T> struct IsVecType : std::false_type {};
template <typename T>
Welford<T> welford_combine(const Welford<T> &a, const Welford<T> &b) {
if constexpr (!IsVecType<T>::value) {
if (a.weight == 0) {
return b;
}
if (b.weight == 0) {
return a;
}
}
auto delta = b.mean - a.mean;
auto new_weight = a.weight + b.weight;
auto wb_over_w = b.weight / new_weight;
if constexpr (IsVecType<T>::value) {
// Guard against division by zero
wb_over_w = T::blendv(wb_over_w, T(0), new_weight == T(0));
}
auto result = Welford<T>{a.mean + delta * wb_over_w,
a.m2 + b.m2 + delta * delta * a.weight * wb_over_w,
new_weight};
return result;
}
template <typename T>
Welford<T> welford_combine(const Welford<T> &acc, T data) {
// Add a single data point
auto delta = data - acc.mean;
auto new_weight = acc.weight + T(1);
auto new_mean = acc.mean + delta / new_weight;
auto new_delta = data - new_mean;
auto result = Welford<T>{new_mean, acc.m2 + delta * new_delta, new_weight};
return result;
}
template <typename T> inline T mod(T a, T b) { return a % b; }
template <> inline float mod(float a, float b) { return std::fmod(a, b); }
template <> inline double mod(double a, double b) { return std::fmod(a, b); }
constexpr float uint32_to_uniform_float(uint32_t value) {
// maximum value such that `MAX_INT * scale < 1.0` (with float rounding)
constexpr float scale = 4.6566127342e-10;
return static_cast<float>(value & 0x7FFFFFFF) * scale;
}
template <typename T> struct AsIntegerType { typedef T type; };
template <> struct AsIntegerType<float> { typedef uint32_t type; };
template <> struct AsIntegerType<double> { typedef uint64_t type; };
template <typename T>
typename std::enable_if<std::is_floating_point<T>::value,
T>::type inline fetch_value(volatile T *addr) {
return *addr;
}
template <typename T>
typename std::enable_if<!std::is_integral<T>::value>::type
atomic_add(volatile T *addr, T offset) {
typedef typename AsIntegerType<T>::type alt_type;
static_assert(sizeof(std::atomic<alt_type>) == sizeof(T),
"std::atomic issue");
alt_type expected;
alt_type desired;
std::atomic<alt_type> *atomic_addr = (std::atomic<alt_type> *)addr;
do {
T val = fetch_value(addr);
reinterpret_cast<T *>(&expected)[0] = val;
reinterpret_cast<T *>(&desired)[0] = val + offset;
} while (!atomic_addr->compare_exchange_weak(expected, desired,
std::memory_order_relaxed));
}
// Since C++20 float is supported by fetch_add, but the performance may not
// better than compare_exchange_weak, which can be checked by microbenchmark
// inductor_cpu_atomic.py
template <typename T>
typename std::enable_if<std::is_integral<T>::value>::type
atomic_add(volatile T *addr, T offset) {
static_assert(sizeof(std::atomic<T>) == sizeof(T), "std::atomic issue");
std::atomic<T> *atomic_addr = (std::atomic<T> *)addr;
atomic_addr->fetch_add(offset, std::memory_order_relaxed);
}
// This function is used to convert bool or uint8 to float mask for
// vectorization. The caller needs to make sure the src represents TRUE/FALSE
// correctly.
template <typename T> inline float flag_to_float_scalar(T src) {
float ret;
*(uint32_t *)(&ret) = src ? 0xFFFFFFFF : 0;
return ret;
}
#include "aux.h"
#include <algorithm>
#include <atomic>
#include <chrono>
#include <iostream>
#include <omp.h>
#include <random>
#include <vector>
void process_arrays(std::vector<float> &res, const std::vector<int> &indices,
const std::vector<float> &values) {
int n = indices.size();
// Check if indices and values vectors are empty
if (n == 0)
return;
#pragma omp parallel
{
float acc = 0.0; // Accumulator for the current index
int last_index = -1; // Initialize last_index with the first index
#pragma omp for nowait
for (int i = 0; i < n; ++i) {
if (i == 0 || indices[i] != last_index) {
if (last_index != -1) {
atomic_add(&res[last_index],
acc); // Update the last index with accumulated value
}
acc = values[i]; // Reset accumulator for new index
last_index = indices[i]; // Update the last index
} else {
// Same index, accumulate the values
acc += values[i];
}
}
// Update the last index after the loop
atomic_add(&res[last_index], acc);
}
}
void process_array_atomic(std::vector<float> &res,
const std::vector<int> &indices,
const std::vector<float> &values) {
int n = indices.size();
// Check if indices and values vectors are empty
if (n == 0)
return;
#pragma omp parallel
{
#pragma omp for nowait
for (int i = 0; i < n; ++i) {
atomic_add(&res[indices[i]], values[i]);
}
}
}
void process_array_sequential(std::vector<float> &res,
const std::vector<int> &indices,
const std::vector<float> &values) {
int n = indices.size();
// Check if indices and values vectors are empty
if (n == 0)
return;
float acc = 0.0; // Accumulator for the current index
int last_index = indices[0]; // Initialize last_index with the first index
// Set the first value outside the loop to handle edge cases
acc = values[0];
for (int i = 1; i < n; ++i) {
if (indices[i] != last_index) {
// Different index encountered, update the last index with accumulated
// value
res[last_index] += acc;
acc = values[i]; // Reset accumulator for new index
last_index = indices[i]; // Update the last index
} else {
// Same index, accumulate the values
acc += values[i];
}
}
// Update the last index after the loop
res[last_index] += acc;
}
int main() {
// Example usage
const int num_elements = 1000; // Number of elements in the vector
const int num_reduce = 10; // Number of elements to reduce
std::vector<float> res(
num_elements,
0.0f); // Initialize with appropriate size and default values
std::vector<float> golden_res(
num_elements,
0.0f); // Initialize with appropriate size and default values
std::random_device rd; // Obtain a random number from hardware
std::mt19937 gen(rd()); // Seed the generator
std::uniform_int_distribution<> distr(0, num_reduce); // Define the range
std::vector<int> indices;
// Generate random integers and add them to the vector
for (int i = 0; i < num_elements; ++i) {
indices.push_back(distr(gen));
}
// Sort the vector
std::sort(indices.begin(), indices.end());
std::vector<float> values;
// Generate random values and add them to the vector
for (int i = 0; i < num_elements; ++i) {
values.push_back(rand() % 100 / 100.0f);
}
// warm up
process_arrays(res, indices, values);
process_array_sequential(golden_res, indices, values);
for (int i = 0; i < num_reduce; ++i) {
if (res[i] - golden_res[i] > 0.0001) {
std::cout << "Error at index " << i << " res[i] = " << res[i]
<< " golden_res[i] = " << golden_res[i] << std::endl;
return 1;
}
}
const int num_repeats = 10; // Number of times to repeat the function
auto start = std::chrono::high_resolution_clock::now();
for (int i = 0; i < num_repeats; ++i) {
process_arrays(res, indices, values);
}
auto stop = std::chrono::high_resolution_clock::now();
auto total_duration =
std::chrono::duration_cast<std::chrono::microseconds>(stop - start);
std::cout << "Total time for " << num_repeats
<< " repetitions: " << total_duration.count() << " microseconds"
<< std::endl;
auto avg_duration = total_duration.count() / static_cast<double>(num_repeats);
std::cout << "Average time per function call:(parallel) " << avg_duration
<< " microseconds" << std::endl;
start = std::chrono::high_resolution_clock::now();
for (int i = 0; i < num_repeats; ++i) {
process_array_sequential(res, indices, values);
}
stop = std::chrono::high_resolution_clock::now();
total_duration =
std::chrono::duration_cast<std::chrono::microseconds>(stop - start);
std::cout << "Total time for " << num_repeats
<< " repetitions: " << total_duration.count() << " microseconds"
<< std::endl;
avg_duration = total_duration.count() / static_cast<double>(num_repeats);
std::cout << "Average time per function call:(sequential)" << avg_duration
<< " microseconds" << std::endl;
start = std::chrono::high_resolution_clock::now();
for (int i = 0; i < num_repeats; ++i) {
process_array_atomic(res, indices, values);
}
stop = std::chrono::high_resolution_clock::now();
total_duration =
std::chrono::duration_cast<std::chrono::microseconds>(stop - start);
std::cout << "Total time for " << num_repeats
<< " repetitions: " << total_duration.count() << " microseconds"
<< std::endl;
avg_duration = total_duration.count() / static_cast<double>(num_repeats);
std::cout << "Average time per function call:(atomic)" << avg_duration
<< " microseconds" << std::endl;
return 0;
}
@fishmingyu
Copy link
Author

g++ -fopenmp segreduce.cpp -o segreduce

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment