Async

Async programming enables you to extract the maximum concurrency out of your program by avoiding the overhead of spawning a thread for each asynchronous task, and also providing a few ergonomic primitives for building them. But although it has some support in the language itself already, such as syntactic sugar in the form of the async and await keywords, the standard library doesn't provide a runtime for them, so we need to resort to external crates like tokio for this.

In general, the runtime will execute lots of so called green threads that are extremely lightweight compared to regular ones. This means that you are able to spawn thousands of these threads without problem. They are scheduled cooperatively, so each thread won't yield to another one unless unless done explicitly. This happens at any call to await.

Now we're going to take the example from the message passing chapter and adapt it to the async environment using the tokio primitives:

async fn example() {
    let (task_sender, mut task_receiver) = tokio::sync::mpsc::channel(20);
    let (result_sender, mut result_receiver) = tokio::sync::mpsc::channel(20);

    let thread = tokio::spawn(async move {
        let mut sum: usize = 0;
        // Receive until pipe disconnects
        while let Some(number) = task_receiver.recv().await {
            sum += number;
            result_sender.send(sum).await.unwrap();
        }
    });

    for i in 0..20 {
        task_sender.send(i).await.unwrap();
    }

    // Signal to the other thread that there will be no more tasks
    std::mem::drop(task_sender);

    while let Some(sum) = result_receiver.recv().await {
        println!("Result: {}", sum);
    }

    // Thread should be finished
    thread.await.unwrap();
}

fn main() {
    // I personally prefer it explicitly since it's easier
    // to understand what the code does
    let runtime = tokio::runtime::Runtime::new().unwrap();
    runtime.block_on(example());
}

That example makes use of the runtime scheduler, but tokio has a few more interesting functions:

Non-parallel primitives

All of these run task concurrently, but on the same thread, so they don't run in parallel:

  • join! takes multiple async expressions and waits until all of them complete.
  • try_join! takes multiple async expressions that return a result and waits for all to complete, but short-circuits on the first error and returns it. When this happens, all expressions still running are abandoned.
  • select! takes multiple async expressions, matches the return values to a pattern each and abandons all other expressions when the first match is encountered. If no return value matches, an optional else arm can be defined.

Async IO

While the API is mostly equivalent to its std pendants (just async), they provide better utilization of CPU resources. Doing IO with regular threads blocks them when the OS is busy serving the request, decreasing throughput. You can mitigate this by spawning more threads, so one of them can run in the meantime, but that means more threads than CPU resources, potentially competing for the same resources and therefore decreasing throughput from that side. Since async tasks are scheduled cooperatively, their overhead is much lower and therefore enables throughput in these two cases.

Timers

The runtime also provides timing functionality. Combined with the scheduler, time-based functionality can be implemented quite efficiently.

  • Sleep blocks until a specified amount of time has passed. Compared to the variant in the standard library, only the task is blocked, so the runtime can switch to another one without involving the operating system, incurring much lower overhead.
  • Timeout can be used to attach a timeout to a task. If it blocks after the specified amount of time has passed, the task is abandoned and an error is returned.
  • Interval blocks on a call to its tick method until the last successful call was at least the specified duration ago.

To finish this chapter, we're going to look at a last example:

struct WorkState(usize);

fn save(state: tokio::sync::MutexGuard<WorkState>) {
    // actual saving here
    println!("{}", state.0);
}

async fn example() {
    let state = std::sync::Arc::new(tokio::sync::Mutex::new(WorkState(42)));

    // auto save
    let auto_save_state = state.clone();
    tokio::spawn(async move {
        let mut timer = tokio::time::interval(std::time::Duration::from_secs(300));
        loop {
            timer.tick().await;
            save(auto_save_state.lock().await);
        }
    });

    // the main loop goes here
    println!("{}", state.lock().await.0);
}

fn main() {
    // I personally prefer it explicitly since it's easier
    // to understand what the code does
    let runtime = tokio::runtime::Runtime::new().unwrap();
    runtime.block_on(example());
}