Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Task_api

task api:

based on: https://gist.github.com/pervognsen/052f3062db9545b43a7e9350130cb964

type TaskFn = fn (_ : void*) -> void*;
fn create(task_data : void*, task_fn : TaskFn) : TaskId;
fn discard(task : TaskId);
fn blocked_by(task : TaskId, predecessor : TaskId);
fn start(task : TaskId);

Logical data model:

tasks(task_id, ptr, fn). % unique(task_id)
blocked_by(task_id, p_task_id). % unique(task_id, p_task_id)
started_tasks(task_id, status). % unique(task_id)
executing(task_id, core_id). % unique(task_id), @pre: started_tasks(T, status=started), all(blocked_by(T, T2), started_tasks(T2, status=done)))

Logical Transformations:

create: record task and assign identity

fn create(task_data : void*, task_fn : TaskFn) : TaskId;
post: tasks(new_task_id, task_data, task_fn).
post: frame(ret0, new_task_id).

discard: cancel a task that hasn't been started yet

fn discard(task : TaskId);
pre: not(started_tasks(task_id=task)).
post: retract(blocked_by(T, task_id=task). retract(tasks(task_id=task).

blocked_by: register dependency

fn blocked_by(task : TaskId, predecessor : TaskId);
pre: tasks(task, P, F), \+ started_tasks(task).
post: blocked_by(task, predecessor).

start: start task

fn start(task : TaskId);
pre: tasks(task, Ptr, Fn), \+ started_tasks(task, started).
post: started_tasks(task, started).

also sometimes later: execute_task transform is performed on task and w/ any core C which is available

dispatch_per_core:

fn dispatch_per_core();
post: execute_task on random_combination((CoreId,TaskId), idle_cores(CoreId), started_tasks(TaskId, status=started) ^ not(executing(TaskId)) ^ all(blocked_by(TaskId, T), started_tasks(T, status=done)).

Pairs each executable task with a core, in order to call execute_task fairly over the available cores.

execute_task:

fn execute_task(task : TaskId, core);
pre: started_tasks(task, started), \+ executing(task, AnyCore), all(blocked_by(task, PrevTask), started_tasks(PrevTask, done)).
post: executing(task, core).
post: tasks(task, Ptr, Fn), pc(core, value=Fn), frame(core, arg0, data=Ptr).

finish_task:

fn finish_task(task : TaskId, core);
pre: pc(core,Value). % Value returned from Fn, Ptr for task
post: retract(executing(core, task)), started_tasks(task, done).

todo: physical data model

todo: physical transformations

@url: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/

@url: https://github.com/ziglang/zig/blob/caa008505729f9511f6f0b070636013e9597b3f7/std/os/index.zig#L2758-L2816

pub fn cpuCount(fallback_allocator: *mem.Allocator) CpuCountError!usize {
    switch (builtin.os) {
        builtin.Os.macosx => {
            var count: c_int = undefined;
            var count_len: usize = @sizeOf(c_int);
            const rc = posix.sysctlbyname(c"hw.ncpu", @ptrCast(*c_void, &count), &count_len, null, 0);
            const err = posix.getErrno(rc);
            switch (err) {
                0 => return @intCast(usize, count),
                posix.EFAULT => unreachable,
                posix.EINVAL => unreachable,
                posix.ENOMEM => return CpuCountError.OutOfMemory,
                posix.ENOTDIR => unreachable,
                posix.EISDIR => unreachable,
                posix.ENOENT => unreachable,
                posix.EPERM => unreachable,
                else => return os.unexpectedErrorPosix(err),
            }
        },
        builtin.Os.linux => {
            const usize_count = 16;
            const allocator = std.heap.stackFallback(usize_count * @sizeOf(usize), fallback_allocator).get();

            var set = try allocator.alloc(usize, usize_count);
            defer allocator.free(set);

            while (true) {
                const rc = posix.sched_getaffinity(0, set);
                const err = posix.getErrno(rc);
                switch (err) {
                    0 => {
                        if (rc < set.len * @sizeOf(usize)) {
                            const result = set[0 .. rc / @sizeOf(usize)];
                            var sum: usize = 0;
                            for (result) |x| {
                                sum += @popCount(x);
                            }
                            return sum;
                        } else {
                            set = try allocator.realloc(usize, set, set.len * 2);
                            continue;
                        }
                    },
                    posix.EFAULT => unreachable,
                    posix.EINVAL => unreachable,
                    posix.EPERM => return CpuCountError.PermissionDenied,
                    posix.ESRCH => unreachable,
                    else => return os.unexpectedErrorPosix(err),
                }
            }
        },
        builtin.Os.windows => {
            var system_info: windows.SYSTEM_INFO = undefined;
            windows.GetSystemInfo(&system_info);
            return @intCast(usize, system_info.dwNumberOfProcessors);
        },
        else => @compileError("unsupported OS"),
    }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.