Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
A simple program using StarPU's partitioning feature.
// Simple program using StarPU's partitionning feature.
#include <cmath>
#include <sys/time.h>
#include <iostream>
#include <starpu.h>
starpu_perfmodel perf_model;
starpu_data_handle_t handle;
starpu_codelet cl;
starpu_conf conf;
double get_wtime() {
struct timeval tv;
gettimeofday(&tv, NULL);
return tv.tv_sec + tv.tv_usec * 1e-6;
}
// A simple CPU kernel, which computes pow(R, power) in a very inefficient way :)
void kernel(void *buffers[], void *arg) {
double *args = (double*)arg;
double *ary = (double*) STARPU_VECTOR_GET_PTR(buffers[0]);
int len = STARPU_VECTOR_GET_NX(buffers[0]);
double r = args[0];
int power = args[1];
// A heavy & parallel calculation
for(int j = 0; j < len; j++) {
double t = 1.0;
for(int i = 0; i < power; i++) {
t *= r;
}
ary[j] = t;
}
}
int main() {
const int LENGTH = 8192*4; // buffer length (a start line)
const int POWER = 15000; // dummy computing load
const double R = 1.0001;
double size_mb = LENGTH / 1024. * sizeof(double) / 1024;
std::cout << "Data size : " << size_mb << " MB" << std::endl;
starpu_conf_init(&conf);
conf.single_combined_worker = 1;
conf.sched_policy_name = "heft";
int ret = starpu_init(&conf);
STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");
perf_model.type = STARPU_REGRESSION_BASED;
perf_model.symbol = "simple";
bzero(&cl, sizeof(cl));
cl.where = STARPU_CPU;
cl.cpu_funcs[0] = kernel;
cl.nbuffers = 1;
cl.modes[0] = STARPU_W;
cl.model = &perf_model;
cl.max_parallelism = INT_MAX;
// First, we calculate the correct result.
double ans = pow(R, POWER);
double serial_time = 0;
// Launch many tasks changing the buffers size.
for (int nchildren = 1; nchildren < 10; nchildren++) {
int length = (int)(LENGTH);
double *ary = new double[length];
double args[] = {R, POWER};
double beg = get_wtime();
// the content of ary can be whatever, we use the initial vandom value
starpu_vector_data_register(&handle, 0, (uintptr_t) ary, length, sizeof(ary[0]));
starpu_data_filter f = {starpu_block_filter_func_vector, nchildren};
starpu_data_partition(handle, &f);
for (int j = 0; j < nchildren; j++) {
starpu_data_handle_t sub_handle = starpu_data_get_sub_data(handle, 1, j);
starpu_task *task = starpu_task_create();
task->cl = &cl;
task->handles[0] = sub_handle;
task->cl_arg = (void*)args;
task->cl_arg_size = sizeof(double) * 2;
task->synchronous = 0;
ret = starpu_task_submit(task);
STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
}
starpu_task_wait_for_all();
starpu_data_unpartition(handle, 0);
starpu_data_unregister(handle);
double sec = (get_wtime() - beg);
if (nchildren == 1) {
std::cout << nchildren << "-way Task : " << sec << " [s]" << std::endl;
serial_time = sec;
} else {
std::cout << nchildren << "-way Task : " << sec << " [s]"
<< ", efficiency = " << serial_time/sec/nchildren*100 << "%"
<< std::endl;
}
for (int i = 0; i < length; i++) {
if (abs(ary[i] - ans) > 0.00001) {
std::cout << "ERROR " << ary[i] << ", " << ans << std::endl;
}
}
delete[] ary;
}
starpu_shutdown();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment