Skip to content

Instantly share code, notes, and snippets.

@0x0L
Last active June 12, 2024 20:38
Show Gist options
  • Save 0x0L/a4054abd38f25f434ac1f0d2c66d3641 to your computer and use it in GitHub Desktop.
Save 0x0L/a4054abd38f25f434ac1f0d2c66d3641 to your computer and use it in GitHub Desktop.
parallel_unique_merge.cpp
import numpy as np
N = 200
L = 10_000_000
p = 0.2
r = np.arange(L / p, dtype=int)
s = [
r[np.random.rand(r.shape[0]) > 1 - p]
for _ in range(N)
]
length = np.array([len(x) for x in s])
data = np.concatenate(s)
print(length)
print(data)
data.tofile("series/data")
length.tofile("series/length")
// c++ -Wall -Ofast -std=c++20 parallel_unique_merge.cpp -o pum
#include <iostream>
#include <filesystem>
#include <fstream>
#include <vector>
#include <span>
#include <queue>
#include <future>
template <
typename T,
typename A,
template <typename, typename> class C>
auto chunker(const C<T, A> &c, const typename C<T, A>::size_type &k)
{
if (k <= 0)
throw std::domain_error("chunker() requires k > 0");
using INPUT_CONTAINER_TYPE = C<T, A>;
using OUTPUT_CONTAINER_TYPE = C<INPUT_CONTAINER_TYPE,
std::allocator<INPUT_CONTAINER_TYPE>>;
OUTPUT_CONTAINER_TYPE out_c;
auto chunkBeg = std::begin(c);
for (auto left = c.size(); left != 0;)
{
auto const skip = std::min(left, k);
INPUT_CONTAINER_TYPE sub_container;
std::back_insert_iterator<INPUT_CONTAINER_TYPE> back_v(sub_container);
std::copy_n(chunkBeg, skip, back_v);
out_c.push_back(sub_container);
left -= skip;
std::advance(chunkBeg, skip);
}
return out_c;
}
template <typename T>
auto read_file(std::string path)
{
std::ifstream ifs(path, std::ios::binary);
auto sz = std::filesystem::file_size(path);
std::vector<T> buf(sz / sizeof(T));
ifs.read(reinterpret_cast<char *>(buf.data()), sz);
return buf;
}
template <typename T>
auto split_arrays(const std::vector<T> &data, const std::vector<int64_t> &lengths)
{
std::vector<std::span<const T>> arr(lengths.size());
size_t pos = 0;
for (size_t i = 0; i < lengths.size(); i++)
{
arr[i] = std::span(&data[pos], lengths[i]);
pos += lengths[i];
}
return arr;
}
struct FrontCompare
{
template <typename T>
inline bool operator()(const std::span<const T> &x, const std::span<const T> &y) const
{
return x.front() > y.front();
}
};
template <typename T>
auto unique_sorted(const std::vector<std::span<const T>> &inputs)
{
std::vector<T> output;
using U = std::span<const T>;
std::priority_queue<U, std::vector<U>, FrontCompare> priority_queue;
for (auto &v : inputs)
{
if (!v.empty())
priority_queue.emplace(v);
}
auto longest = std::max_element(
inputs.begin(), inputs.end(),
[](const auto &x, const auto &y)
{ return x.size() < y.size(); });
output.reserve(longest->size());
while (!priority_queue.empty())
{
auto top = priority_queue.top();
auto val = top.front();
output.push_back(val);
while (top.front() == val)
{
priority_queue.pop();
if (top.size() > 1)
priority_queue.emplace(top.subspan(1));
if (priority_queue.empty())
break;
top = priority_queue.top();
}
}
return output;
}
template <typename T>
auto parallel_unique_sorted(const std::vector<std::span<const T>> &inputs)
{
size_t sz = inputs.size();
size_t n_threads = std::min((size_t)std::thread::hardware_concurrency(), sz);
size_t chunksize = sz / n_threads;
//std::cout << "num_threads " << num_threads << " chunksize " << chunksize << std::endl;
auto chunks = chunker(inputs, chunksize);
std::vector<std::future<std::vector<T>>> futures;
for (auto &c : chunks)
futures.emplace_back(std::async(std::launch::async, unique_sorted<T>, std::cref(c)));
std::vector<std::vector<T>> outputs;
for (auto &f : futures)
outputs.emplace_back(f.get());
std::vector<std::span<const T>> as_span;
for (auto &x : outputs)
as_span.emplace_back(std::span{x});
return unique_sorted(as_span);
}
int main()
{
auto lengths = read_file<int64_t>("series/length");
auto data = read_file<int64_t>("series/data");
// std::vector<int64_t> lengths {4, 5};
// std::vector<int64_t> data {2, 5, 6, 8, 1, 2, 3, 4, 7};
auto arr = split_arrays(data, lengths);
auto result = parallel_unique_sorted(arr);
std::cout << result.size() << std::endl;
for (size_t i = 0; i < 3; i++)
{
std::cout << result[i] << ", ";
}
std::cout << " ... ";
for (size_t i = result.size() - 3; i < result.size(); i++)
{
std::cout << result[i] << ", ";
}
std::cout << std::endl;
// for (auto z : result) {
// std::cout << z << ", ";
// }
// std::cout << std::endl;
}
import numpy as np
import pyarrow as pa
lengths = np.fromfile("series/length", dtype=int)
data = np.fromfile("series/data", dtype=int)
tbl = pa.table({"ts": data})
df = tbl["ts"]
df = df.unique().sort()
df = df.to_numpy()
print(df)
import numpy as np
import pyarrow as pa
from datafusion import SessionContext, col
lengths = np.fromfile("series/length", dtype=int)
data = np.fromfile("series/data", dtype=int)
# arr = np.split(data, np.cumsum(lengths)[:-1])
# tables = [
# pa.table({"ts": x})
# for x in arr
# ]
tbl = pa.table({"ts": data})
ctx = SessionContext()
df = ctx.from_arrow_table(tbl)
df = df.distinct().sort(col("ts").sort())
df = df.to_arrow_table()["ts"].to_numpy()
print(len(df))
import numpy as np
lengths = np.fromfile("series/length", dtype=int)
data = np.fromfile("series/data", dtype=int)
df = np.sort(np.unique(data))
print(len(df))
import numpy as np
import pyarrow as pa
import polars as pl
lengths = np.fromfile("series/length", dtype=int)
data = np.fromfile("series/data", dtype=int)
tbl = pa.table({"ts": data})
df = pl.from_arrow(tbl)
df = df.unique().sort("ts")
df = df.to_arrow()["ts"].to_numpy()
print(len(df))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment