Async
Async programming enables you to extract the maximum concurrency out of your program by avoiding the overhead of spawning a thread for each asynchronous task, and also providing a few ergonomic primitives for building them.
But although it has some support in the language itself already, such as syntactic sugar in the form of the async and await keywords, the standard library doesn't provide a runtime for them, so we need to resort to external crates like tokio for this.
In general, the runtime will execute lots of so called green threads that are extremely lightweight compared to regular ones. This means that you are able to spawn thousands of these threads without problem. They are scheduled cooperatively, so each thread won't yield to another one unless unless done explicitly. This happens at any call to await.
Now we're going to take the example from the message passing chapter and adapt it to the async environment using the tokio primitives:
async fn example() { let (task_sender, mut task_receiver) = tokio::sync::mpsc::channel(20); let (result_sender, mut result_receiver) = tokio::sync::mpsc::channel(20); let thread = tokio::spawn(async move { let mut sum: usize = 0; // Receive until pipe disconnects while let Some(number) = task_receiver.recv().await { sum += number; result_sender.send(sum).await.unwrap(); } }); for i in 0..20 { task_sender.send(i).await.unwrap(); } // Signal to the other thread that there will be no more tasks std::mem::drop(task_sender); while let Some(sum) = result_receiver.recv().await { println!("Result: {}", sum); } // Thread should be finished thread.await.unwrap(); } fn main() { // I personally prefer it explicitly since it's easier // to understand what the code does let runtime = tokio::runtime::Runtime::new().unwrap(); runtime.block_on(example()); }
That example makes use of the runtime scheduler, but tokio has a few more interesting functions:
Non-parallel primitives
All of these run task concurrently, but on the same thread, so they don't run in parallel:
- join!takes multiple- asyncexpressions and waits until all of them complete.
- try_join!takes multiple- asyncexpressions that return a result and waits for all to complete, but short-circuits on the first error and returns it. When this happens, all expressions still running are abandoned.
- select!takes multiple- asyncexpressions, matches the return values to a pattern each and abandons all other expressions when the first match is encountered. If no return value matches, an optional- elsearm can be defined.
Async IO
While the API is mostly equivalent to its std pendants (just async), they provide better utilization of CPU resources. Doing IO with regular threads blocks them when the OS is busy serving the request, decreasing throughput.
You can mitigate this by spawning more threads, so one of them can run in the meantime, but that means more threads than CPU resources, potentially competing for the same resources and therefore decreasing throughput from that side. Since async tasks are scheduled cooperatively, their overhead is much lower and therefore enables throughput in these two cases.
Timers
The runtime also provides timing functionality. Combined with the scheduler, time-based functionality can be implemented quite efficiently.
- Sleepblocks until a specified amount of time has passed. Compared to the variant in the standard library, only the task is blocked, so the runtime can switch to another one without involving the operating system, incurring much lower overhead.
- Timeoutcan be used to attach a timeout to a task. If it blocks after the specified amount of time has passed, the task is abandoned and an error is returned.
- Intervalblocks on a call to its- tickmethod until the last successful call was at least the specified duration ago.
To finish this chapter, we're going to look at a last example:
struct WorkState(usize);
fn save(state: tokio::sync::MutexGuard<WorkState>) {
    // actual saving here
    println!("{}", state.0);
}
async fn example() {
    let state = std::sync::Arc::new(tokio::sync::Mutex::new(WorkState(42)));
    // auto save
    let auto_save_state = state.clone();
    tokio::spawn(async move {
        let mut timer = tokio::time::interval(std::time::Duration::from_secs(300));
        loop {
            timer.tick().await;
            save(auto_save_state.lock().await);
        }
    });
    // the main loop goes here
    println!("{}", state.lock().await.0);
}
fn main() {
    // I personally prefer it explicitly since it's easier
    // to understand what the code does
    let runtime = tokio::runtime::Runtime::new().unwrap();
    runtime.block_on(example());
}