Skip to content

Instantly share code, notes, and snippets.

@tomaka
Last active October 12, 2017 14:23
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tomaka/61807c08693604c25fc9a585220f46cc to your computer and use it in GitHub Desktop.
Save tomaka/61807c08693604c25fc9a585220f46cc to your computer and use it in GitHub Desktop.
Creating a tasks system based on Rust coroutines

Let's take the first example from the Rust generators RFC:

#[async]
fn print_lines() -> io::Result<()> {
    let addr = "127.0.0.1:8080".parse().unwrap();
    let tcp = await!(TcpStream::connect(&addr))?;
    ...
}

The way this works it that calling TcpStream::connect would immediately start the process of connecting to the address, and return an object (a future) that represents the moment when the connection succeeds. Then the call to await! suspends the current function and resumes it only when the connection succeeded.

What the RFC doesn't show is that one of the interesting things with generator is that we can do this:

#[async]
fn print_lines() -> io::Result<()> {
    let addr = "127.0.0.1:8080".parse().unwrap();
    let connection1 = TcpStream::connect(&addr);
    let connection2 = TcpStream::connect(&addr);
    let connection3 = TcpStream::connect(&addr);
    
    let actual_c1 = await!(connection1)?;
    let actual_c2 = await!(connection2)?;
    let actual_c3 = await!(connection3)?;
    ...
}

In other words we immediately start three connections, then only we interrupt the current function three times (of course as an optimization, a call to await! would not interrupt the function if the connection has already succeeded). The advantage of splitting TcpStream::connect from await! is that the three connections execute concurrently.

Game development

In the context of a game, what you want is to process each frame as fast as possible. You have a list of CPU actions to perform, and they should all be finished in under 16 milliseconds after the first one has started. Since we are in the era of multi-core processors, you usually want to dispatch these actions between multiple threads.

Most modern game engines handle this by using tasks. All the CPU actions are split in small tasks that are processed by a threads pool. The tasks should be small enough so that they can be equally dispatched between all the CPU cores without having some cores idle because they are waiting. The fact that the tasks must be small means that we want some way to make it easy to create lots of tasks.

But that's easy! you might say. After all, it is possible to use the futures-cpupool crate or an equivalent, and write something like this:

#[async]
fn create_task() -> u32 {
    let t1 = spawn_task(|| 5);
    let t3 = spawn_task(|| 6);
    let t2 = spawn_task(|| 7);

    await!(t1) + await!(t2) + await!(t3)
}

But this approach has several problems.

Leak safety

The biggest problem of this approach is that the closure of the task must be 'static. If it wasn't then you could write something like this, which is undefined behavior:

#[async]
fn create_task() {
    let mut my_string = "test".to_owned();

    {
        let task = spawn_task(|| {
            println!("{}", my_string);
        });
        mem::forget(task);
    }

    drop(my_string);
}

In this code, calling spawn_task immediately starts executing the closure, but since the rest of the create_task function continues to execute concurrently, chances are that my_string is going to be dropped before the println! executes.

Requiring 'static everywhere is really unpractical, especially since we want to split operations into lots of tasks. You would basically need Arc wrapped around structs that themselves contain Arcs, that themselves wrap around structs that contain Arcs. And consequently you would also need mutexes everywhere to go with the Arcs. We really want to avoid that.

The fix

Let me know if I'm missing something, but for me the only way to solve this leak safety problem is to ensure that the function that spawns the task doesn't return until that task is finished.

In other words you create a task then pass it to a function that immediately executes it, like this:

#[task]
fn create_task(some_borrow: &Foo) {
    foo.bar();
}

let local_foo: Foo = ...;
let task = create_task(&local_foo);       // Doesn't start the task yet
task.start_the_task_in_a_thread_and_wait_until_it_is_finished();

This way you can't destroy local_foo while it is in use by the task.

A consequence of this is that calling create_task doesn't actually do anything expect return a dummy object (which the compiler will probably optimize into a no-op).

Obviously you might say that this constraint kills the point of using tasks. But here's the trick: a task can immediately spawn other tasks, provided that the lifetime of the spawned task outlives the lifetime of the spawner. In other words, during its execution create_task can immediately spawn subtasks if it only passes some_borrow (or some of fields of some_borrow) to them. But create_task cannot create a local variable and spawn a subtask that borrows this local variable, unless it uses start_the_task_and_wait_until_it_is_finished.

To sum things up, tasks can safely behave in two possible ways:

  • If we're within a task and create a subtask whose lifetime outlives the parent, then the subtask can begin its execution immediately.
  • Otherwise the task must not immediately start when it is created. Instead the helping library must provide functions (start_the_task_and_wait_until_it_is_finished or similar) that start the task and block the current thread until the task finishes.

Making it usable

Creating a task sometimes does nothing and sometimes should start the task immediately, depending on the context of the caller. Obviously this isn't possible to handle automatically, so we need to use a macro in the case where we want to start the task immediately.

In other words, it would look like this:

impl Foo {
    #[task]
    fn foo1(&self) -> u32 {
        let foo2_task = task!(self.foo2());   // `self.foo2()` doesn't do anything, and `task!` starts the task
        let foo3_task = task!(self.foo3());

        await!(foo2_task) + await!(foo3_task)   // `await!` waits for the task to be finished
    }

    #[task]
    fn foo2(&self) -> u32 { .. }
    #[task]
    fn foo3(&self) -> u32 { .. }
}

The task! macro would have to enforce that the lifetime of its content doesn't outlive the lifetime of the parent. Fortunately this can be done with the procedural macro that handles #[task]. The task! macro wouldn't come from an external crate, but instead be redeclared locally at the start of each function with the #[task] attribute.

If, however, you want to spawn a subtask that uses a local variable, then you have to block:

impl Foo {
    #[task]
    fn foo1(&self) -> u32 {
        let local_variable = Bar::new();

        let bar_task = bar.bar();       // Doesn't start any execution immediately
        let foo2_task = task!(self.foo2());     // Does start executing immediately

        task_await!(bar_task) + await!(foo2_task)   // Notice `task_await!`, a combination of `task!` + `await!`
    }
}

The task_await! macro would also be able to take multiple parameters so that we can block multiple tasks at once:

impl Foo {
    #[task]
    fn foo1(&self) -> u32 {
        let local_variable = Bar::new();

        let bar_task1 = bar.bar();
        let bar_task2 = bar.bar();

        let (r1, r2) = task_await!(bar_task1, bar_task2);
        r1 + r2
    }
}

The task_await! macro works by yielding. If the user advances the coroutines generated by calling foo.foo1() until task_await! yields, then mem::forgets that coroutine, then we have an undefined behavior. This means that it is important that only the helper library must be able to resume coroutines.

Outside of the context of a task, you would have to call start_the_task_and_wait_until_it_is_finished() as explained.

let my_foo: Foo = ..;
let task = my_foo.foo1();
let ret_val = task.start_the_task_and_wait_until_it_is_finished();

Conclusions

A task system still isn't simple neither to create nor to use. The leakpocalypse problem still makes multithreaded code difficult to write.

@tomaka
Copy link
Author

tomaka commented Jul 9, 2017

I just realized that the "subtask lifetime must outlive parent" thing still isn't safe, because the parent can return before the child if it mem::forgets the child.

@tomaka
Copy link
Author

tomaka commented Jul 10, 2017

However the problem in the previous comment could be fixed by runtime tracking the uses of task! and await!. A bit of a crappy solution.

@bestouff
Copy link

Typo: in "The fix", s/foo.bar();/some_borrow.bar();/.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment