Async
Async programming enables you to extract the maximum concurrency out of your program by avoiding the overhead of spawning a thread for each asynchronous task, and also providing a few ergonomic primitives for building them.
But although it has some support in the language itself already, such as syntactic sugar in the form of the async
and await
keywords, the standard library doesn't provide a runtime for them, so we need to resort to external crates like tokio
for this.
In general, the runtime will execute lots of so called green threads that are extremely lightweight compared to regular ones. This means that you are able to spawn thousands of these threads without problem. They are scheduled cooperatively, so each thread won't yield to another one unless unless done explicitly. This happens at any call to await
.
Now we're going to take the example from the message passing chapter and adapt it to the async environment using the tokio
primitives:
async fn example() { let (task_sender, mut task_receiver) = tokio::sync::mpsc::channel(20); let (result_sender, mut result_receiver) = tokio::sync::mpsc::channel(20); let thread = tokio::spawn(async move { let mut sum: usize = 0; // Receive until pipe disconnects while let Some(number) = task_receiver.recv().await { sum += number; result_sender.send(sum).await.unwrap(); } }); for i in 0..20 { task_sender.send(i).await.unwrap(); } // Signal to the other thread that there will be no more tasks std::mem::drop(task_sender); while let Some(sum) = result_receiver.recv().await { println!("Result: {}", sum); } // Thread should be finished thread.await.unwrap(); } fn main() { // I personally prefer it explicitly since it's easier // to understand what the code does let runtime = tokio::runtime::Runtime::new().unwrap(); runtime.block_on(example()); }
That example makes use of the runtime scheduler, but tokio
has a few more interesting functions:
Non-parallel primitives
All of these run task concurrently, but on the same thread, so they don't run in parallel:
join!
takes multipleasync
expressions and waits until all of them complete.try_join!
takes multipleasync
expressions that return a result and waits for all to complete, but short-circuits on the first error and returns it. When this happens, all expressions still running are abandoned.select!
takes multipleasync
expressions, matches the return values to a pattern each and abandons all other expressions when the first match is encountered. If no return value matches, an optionalelse
arm can be defined.
Async IO
While the API is mostly equivalent to its std
pendants (just async
), they provide better utilization of CPU resources. Doing IO with regular threads blocks them when the OS is busy serving the request, decreasing throughput.
You can mitigate this by spawning more threads, so one of them can run in the meantime, but that means more threads than CPU resources, potentially competing for the same resources and therefore decreasing throughput from that side. Since async tasks are scheduled cooperatively, their overhead is much lower and therefore enables throughput in these two cases.
Timers
The runtime also provides timing functionality. Combined with the scheduler, time-based functionality can be implemented quite efficiently.
Sleep
blocks until a specified amount of time has passed. Compared to the variant in the standard library, only the task is blocked, so the runtime can switch to another one without involving the operating system, incurring much lower overhead.Timeout
can be used to attach a timeout to a task. If it blocks after the specified amount of time has passed, the task is abandoned and an error is returned.Interval
blocks on a call to itstick
method until the last successful call was at least the specified duration ago.
To finish this chapter, we're going to look at a last example:
struct WorkState(usize);
fn save(state: tokio::sync::MutexGuard<WorkState>) {
// actual saving here
println!("{}", state.0);
}
async fn example() {
let state = std::sync::Arc::new(tokio::sync::Mutex::new(WorkState(42)));
// auto save
let auto_save_state = state.clone();
tokio::spawn(async move {
let mut timer = tokio::time::interval(std::time::Duration::from_secs(300));
loop {
timer.tick().await;
save(auto_save_state.lock().await);
}
});
// the main loop goes here
println!("{}", state.lock().await.0);
}
fn main() {
// I personally prefer it explicitly since it's easier
// to understand what the code does
let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(example());
}