Processing concurrently and in parallel with Rust streams

Author

Will Jones

Published

August 4, 2023

Here’s a common situation when processing data in Rust: you have N tasks to run and you want to run M of them at a time, and then collect the results. How do you do that? And if M is limited by IO or rate limiting, you might just want single-threaded concurrency; but if it’s CPU bound, you might also want your tasks to be run in parallel across multiple threads.

If you’re working in async Rust with Tokio, here’s how to do this easily:

  1. Create a Stream of futures representing your tasks. If you an iterator of tasks or something that can be converted into a iterator of tasks (such as a Vec), use futures::stream::iter().
  2. Call either the buffered() or buffered_unordered() method to indicate the max number of concurrent tasks. Use the unordered variant only if you don’t care about the order in which the results are returned.
  3. Collect the results. If your tasks are falliable, the try_collect() method from futures::stream::TryStreamExt is useful.

Sketched out as code, this looks like:

use futures::stream::StreamExt;
use futures::stream::TryStreamExt;

async some_task(input: &InputData) -> Result<OutputData, Error> {
    todo!("");
}

let input_data: Vec<InputData> = todo!();

// Do concurrently, without parallelism
let results: Vec<OutputData> = futures::stream::iter(input_data)
    .map(some_task)
    .buffered(10) // Run up to 10 concurrently
    .try_collect().await?;

Adding parallelism

The previous example showned how to run the tasks concurrently, which works well for tasks that don’t need more than a single core. But if the tasks are CPU-bound, we want to distribute this across multiple cores. If we are using Tokio’s multi-threaded runtime, we can use tokio::task::spawn() or tokio::task::spawn_blocking() to submit the task to be run in a new thread. (Use spawn() for futures and spawn_blocking for syncronous closures.) However, you need to make sure your tasks are Send and have a 'static lifetime. That means you can only pass either owned values or put shared values in Arc.

So the above example would become:

let input_data: Vec<InputData> = todo!();

// Now we are calling into_iter() to passed owned values of InputData,
// since owned values fulfill a 'static lifetime bound.
let results: Vec<OutputData> = futures::stream::iter(input_data.into_iter())
    .map(|input_data: InputData| tokio::task::spawn(async move { some_task(input_data).await }))
    .buffered(num_cpus::get()) // Run up to 10 concurrently
    .try_collect().await?;

(The num_cpus crate can be used to get the number of cpus on the machine. If you are using tokio with the multi-threaded runtime enabled, then you already have it as a transitive dependency.)

We fulfilled the 'static lifetime bound here by moving owned InputData into the async block. If we had some shared data between all the tasks, we would instead need to wrap that in an Arc and give each task it’s own copy. That usually looks like:

let input_data: Vec<InputData> = todo!();
let shared_data: Arc<SharedData> = todo!();

// Now we are calling into_iter() to passed owned values of InputData,
// since owned values fulfill a 'static lifetime bound.
let results: Vec<OutputData> = futures::stream::iter(input_data.into_iter())
    .map(|input_data: InputData| {
        // Create a copy of the Arc that we can move to the other thread
        let shared_data_ref = shared_data.clone();
        tokio::task::spawn(async move { some_task(input_data, shared_data_ref).await })
    })
    .buffered(num_cpus::get()) // Run up to 10 concurrently
    .try_collect().await?;

Dealing with higher order lifetime issues

The most challenging compiler error you will get with this pattern is the “higher order lifetime errors” that result when the Rust compiler can’t prove some part of your code is Send. It doesn’t mean it isn’t Send, just that the compiler doesn’t know for sure. There’s two variants you’ll commonly see: one referring to an async block and one referring to a stream.

For the async block, I find the easiest solution is to rewrite it as an async function.

For the Stream, call .boxed() on the stream (from the futures::stream::StreamExt trait) and then annotate the output as futures::stream::BoxStream. This has a Send bound, and I find the compiler usually accepts this.

Further reading