22 Rustlings Threads Solution

In most current operating systems, an executed program’s code is run in a process, and the operating system manages multiple processes at once. Within your program, you can also have independent parts that run simultaneously. The features that run these independent parts are called threads.

Further information

Threads1.rs

// threads1.rs
// Execute `rustlings hint threads1` or use the `hint` watch subcommand for a hint.

// This program spawns multiple threads that each run for at least 250ms,
// and each thread returns how much time they took to complete.
// The program should wait until all the spawned threads have finished and
// should collect their return values into a vector.

// I AM NOT DONE

use std::thread;
use std::time::{Duration, Instant};

fn main() {
    let mut handles = vec![];
    for i in 0..10 {
        handles.push(thread::spawn(move || {
            let start = Instant::now();
            thread::sleep(Duration::from_millis(250));
            println!("thread {} is complete", i);
            start.elapsed().as_millis()
        }));
    }

    let mut results: Vec<u128> = vec![];

    for handle in handles {
        // TODO: a struct is returned from thread::spawn, can you use it?

    }

    if results.len() != 10 {
        panic!("Oh no! All the spawned threads did not finish!");
    }

    println!();
    for (i, result) in results.into_iter().enumerate() {
        println!("thread {} took {}ms", i, result);
    }
}

We have one TODO on this exercise and we have a question about thread::spawn' s returned struct.

Threads1.rs Errors

⚠️  Ran exercises/threads/threads1.rs with errors

thread 'main' panicked at exercises/threads/threads1.rs:31:9:
Oh no! All the spawned threads did not finish!
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Looking at the errors we see that the if result is triggering a panic with the Oh no! All the spawned threads did not finish! message.

Threads1.rs Solution

If you look at the Rust documentation you will see that you can use the JoinHandle when using the std::thread::spawn which provides a join method that can be used to "join" the spawned thread, this is because the join method waits for the thread to finish and returns the result of the thread's operation, which is why it's necessary to call join on each handle. So let's try that in our for handle loop. It would look something like this:

for handle in handles {
	handle.join().unwrap();
}

Let's run this is and see what happens

⚠️  Ran exercises/threads/threads1.rs with errors
thread 2 is complete
thread 0 is complete
thread 3 is complete
thread 9 is complete
thread 1 is complete
thread 8 is complete
thread 6 is complete
thread 7 is complete
thread 4 is complete
thread 5 is complete

thread 'main' panicked at exercises/threads/threads1.rs:32:9:
Oh no! All the spawned threads did not finish!
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Looks like we're getting closer but we are still panicking with the same message, notice how the threads are all out of order, this will vary every time we run the program and this is part the challenge with multi-threaded applications that the main thread can finish before the spawned threads are complete. So going back to our for loop, if we look back at the code we see that we are using the results variable in the if portion of the code but we never actually pushed the threads into the results variable so the code is panicking regardless of the fact that we're printing out the threads, so let's fix that.

for handle in handles {
	// creating a new variable `result` with the joined threads
	let result = handle.join().unwrap();
	// pushing these results to the `results` Vec<u128> defined before the for loop.
	results.push(result);
}

This will properly populate the results vector and allow the if part of the code not panic because results will indeed be equal to 10

with this we are complete, here's our output:

✅ Successfully ran exercises/threads/threads1.rs!

🎉 🎉  The code is compiling! 🎉 🎉

Output:
====================
thread 1 is complete
thread 4 is complete
thread 9 is complete
thread 0 is complete
thread 2 is complete
thread 3 is complete
thread 5 is complete
thread 7 is complete
thread 8 is complete
thread 6 is complete

thread 0 took 255ms
thread 1 took 255ms
thread 2 took 255ms
thread 3 took 255ms
thread 4 took 254ms
thread 5 took 255ms
thread 6 took 255ms
thread 7 took 255ms
thread 8 took 255ms
thread 9 took 255ms

====================

Threads2.rs

// threads2.rs
// Execute `rustlings hint threads2` or use the `hint` watch subcommand for a hint.
// Building on the last exercise, we want all of the threads to complete their work but this time
// the spawned threads need to be in charge of updating a shared value: JobStatus.jobs_completed

// I AM NOT DONE

use std::sync::Arc;
use std::thread;
use std::time::Duration;

struct JobStatus {
    jobs_completed: u32,
}

fn main() {
    let status = Arc::new(JobStatus { jobs_completed: 0 });
    let mut handles = vec![];
    for _ in 0..10 {
        let status_shared = Arc::clone(&status);
        let handle = thread::spawn(move || {
            thread::sleep(Duration::from_millis(250));
            // TODO: You must take an action before you update a shared value
            status_shared.jobs_completed += 1;
        });
        handles.push(handle);
    }
    for handle in handles {
        handle.join().unwrap();
        // TODO: Print the value of the JobStatus.jobs_completed. Did you notice anything
        // interesting in the output? Do you have to 'join' on all the handles?
        println!("jobs completed {}", ???);
    }
}

We have a couple of TODO's on this exercise that is building on the previous

  1. In our first for loop we must take an action before we can update a shared value
  2. In our handle for loop we must print out the value of the JobStatus.jobs_completed

Threads2.rs Errors

⚠️  Compiling of exercises/threads/threads2.rs failed! Please try again. Here's the output:
error: expected expression, found `?`
  --> exercises/threads/threads2.rs:32:39
   |
32 |         println!("jobs completed {}", ???);
   |                                       ^ expected expression

error: aborting due to previous error

The errors show that we're missing the last part of our print statement so that's not much help, but let's proceed. We actually get more of a hint in the comments of the TODO with : Did you notice anything interesting in the output? Do you have to 'join' on all the handles? So let's keep that in mind.

Threads2.rs Solution

let's first take a look at our fn main where we have the TODO right before our incrementation.

fn main() {
    let status = Arc::new(JobStatus { jobs_completed: 0 });
    let mut handles = vec![];
    for _ in 0..10 {
        let status_shared = Arc::clone(&status);
        let handle = thread::spawn(move || {
            thread::sleep(Duration::from_millis(250));
            // TODO: You must take an action before you update a shared value
            status_shared.jobs_completed += 1;
        });
        handles.push(handle);
    }

Implementing Mutex

So, what is the action that we must do before we updated a shared value? This hint must be referring to the use of a Mutex which if you're not familiar with them, is to short for mutual exclusion -- meaning a mutex allows only one thread to access on piece of data at any given time. To use the data with a mutex a thread must request access by asking to acquire a mutex lock.

So let's first implement the Mutex syntax into our code starting at the very top with:

let status = Arc::new(Mutex::new(JobStatus { jobs_completed }));

Also since we're using a new type, Mutex we must import it with use std::sync::Mutex;

Alright now we have to work on the locking portion of the code our TODO:

    // TODO: You must take an action before you update a shared value
    status_shared.jobs_completed += 1;

So, let's create update the status_shared variable so we can store the status_shared.lock().unwrap(), let's make it mut as well as we have to update the value. The lines would look like this:

let mut status_shared = status_shared.lock().unwrap();
status_shared.jobs_completed += 1;

Fixing Printing

Now, let's look at our 2nd TODO:

// TODO: Print the value of the JobStatus.jobs_completed. Did you notice anything
// interesting in the output? Do you have to 'join' on all the handles?
println!("jobs completed {}", ???);

First let's figure out what we should be printing here, clearly it's the jobs_completed and this is being stored inside of our status variable, so how do we get to this data? By dereferencing with * and because it's a Mutex we should also use the lock syntax. Let's try:

println!("jobs completed {:?}", *status.lock().unwrap());

Note that we also have to add the :? syntax inside of the curly braces as well as add #[derive(Debug)] to our JobStatus struct to be able to print to our terminal. Here's our updated full code block

use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::sync::Mutex;

#[derive(Debug)]
struct JobStatus {
    jobs_completed: u32,
}

fn main() {
    let status = Arc::new(Mutex::new(JobStatus { jobs_completed: 0 }));
    let mut handles = vec![];
    for _ in 0..10 {
        let status_shared = Arc::clone(&status);
        let handle = thread::spawn(move || {
            thread::sleep(Duration::from_millis(250));
            let mut status_shared = status_shared.lock().unwrap();
            status_shared.jobs_completed += 1;
        });
        handles.push(handle);
    }
    for handle in handles {
        handle.join().unwrap();
        println!("jobs completed {:?}", *status.lock().unwrap());
    }

}

And with this we are compiling, BUT maybe we're not getting the expected output:

🎉 🎉  The code is compiling! 🎉 🎉

Output:
====================
jobs completed JobStatus { jobs_completed: 10 }
jobs completed JobStatus { jobs_completed: 10 }
jobs completed JobStatus { jobs_completed: 10 }
jobs completed JobStatus { jobs_completed: 10 }
jobs completed JobStatus { jobs_completed: 10 }
====================

This is strange so how do we fix it? I propose we add 2 different print statements to get a better understanding of what is happening with our threads.

The first one inside of our for loop

for _ in 0..10 {
    let status_shared = Arc::clone(&status);
    let handle = thread::spawn(move || {
        thread::sleep(Duration::from_millis(250));
        let mut status_shared = status_shared.lock().unwrap();
        status_shared.jobs_completed += 1;
        // Print inside the thread to observe the incrementation
        println!("Thread incremented jobs_completed to {}", status_shared.jobs_completed);
    });
    handles.push(handle);
}

and final printout that is not inside of the second for loop but outside to confirm our total jobs completed:

for handle in handles {
    handle.join().unwrap();
}

// Print the final value of the JobStatus.jobs_completed after all threads have been joined
println!("Final jobs completed {}", status.lock().unwrap().jobs_completed);

With these tweaks we have our code compiling and behaving in a more predictable or rather informative manner:

Output:
====================
Thread incremented jobs_completed to 1
Thread incremented jobs_completed to 2
Thread incremented jobs_completed to 3
Thread incremented jobs_completed to 4
Thread incremented jobs_completed to 5
Thread incremented jobs_completed to 6
Thread incremented jobs_completed to 7
Thread incremented jobs_completed to 8
Thread incremented jobs_completed to 9
Thread incremented jobs_completed to 10
jobs completed JobStatus { jobs_completed: 10 }

====================

Threads3.rs

// threads3.rs
// Execute `rustlings hint threads3` or use the `hint` watch subcommand for a hint.

// I AM NOT DONE

use std::sync::mpsc;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

struct Queue {
    length: u32,
    first_half: Vec<u32>,
    second_half: Vec<u32>,
}

impl Queue {
    fn new() -> Self {
        Queue {
            length: 10,
            first_half: vec![1, 2, 3, 4, 5],
            second_half: vec![6, 7, 8, 9, 10],
        }
    }
}

fn send_tx(q: Queue, tx: mpsc::Sender<u32>) -> () {
    let qc = Arc::new(q);
    let qc1 = Arc::clone(&qc);
    let qc2 = Arc::clone(&qc);

    thread::spawn(move || {
        for val in &qc1.first_half {
            println!("sending {:?}", val);
            tx.send(*val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        for val in &qc2.second_half {
            println!("sending {:?}", val);
            tx.send(*val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
}

fn main() {
    let (tx, rx) = mpsc::channel();
    let queue = Queue::new();
    let queue_length = queue.length;

    send_tx(queue, tx);

    let mut total_received: u32 = 0;
    for received in rx {
        println!("Got: {}", received);
        total_received += 1;
    }

    println!("total numbers received: {}", total_received);
    assert_eq!(total_received, queue_length)
}

On this final threads exercise we don't get any information other than what the Rust errors tell us.

Threads3.rs Errors

error[E0382]: use of moved value: `tx`
  --> exercises/threads/threads3.rs:40:19
   |
27 | fn send_tx(q: Queue, tx: mpsc::Sender<u32>) -> () {
   |                      -- move occurs because `tx` has type `Sender<u32>`, which does not implement the `Copy` trait
...
32 |     thread::spawn(move || {
   |                   ------- value moved into closure here
...
35 |             tx.send(*val).unwrap();
   |             -- variable moved due to use in closure
...
40 |     thread::spawn(move || {
   |                   ^^^^^^^ value used here after move
...
43 |             tx.send(*val).unwrap();
   |             -- use occurs due to use in closure

error: aborting due to previous error

For more information about this error, try `rustc --explain E0382`.

Alright we have some hints from the error's -- let's implement a solution that fixes this Copy trait issue.

Threads3.rs Solution

Let's recap, the error we're encountering is because the tx (transmitter) is moved into the first thread we spawn. Once a value has been moved into a closure, it cannot be used again because the ownership has been transferred to the closure. In Rust, most values have 'move semantics', which means that when you pass a value to a function or a thread, the ownership is transferred, and the original variable can no longer be used.

To fix this, you need to create a transmitter for each thread because the mpsc::Sender type does not implement the Copy trait, (as the error tells us) which means it cannot be copied, only moved. However, mpsc::Sender does implement the Clone trait, so you can clone it before moving it into the thread.

Here's how we can modify the send_tx function:

fn send_tx(q: Queue, tx: mpsc::Sender<u32>) -> () {
    let qc = Arc::new(q);
    let qc1 = Arc::clone(&qc);
    let qc2 = Arc::clone(&qc);

    let tx1 = tx.clone(); // Clone the transmitter for the first thread
    thread::spawn(move || {
        for val in &qc1.first_half {
            println!("sending {:?}", val);
            tx1.send(*val).unwrap(); // Use the cloned transmitter
            thread::sleep(Duration::from_secs(1));
        }
    });

    // No need to clone here, as we can use the original transmitter
    thread::spawn(move || {
        for val in &qc2.second_half {
            println!("sending {:?}", val);
            tx.send(*val).unwrap(); // Use the original transmitter
            thread::sleep(Duration::from_secs(1));
        }
    });
}

In this modified function, tx1 is a clone of the original transmitter tx and is moved into the first thread. The second thread can use the original tx because it's no longer needed in the main thread after send_tx is called.

And with that we're compiling!

✅ Successfully ran exercises/threads/threads3.rs!

🎉 🎉  The code is compiling! 🎉 🎉

Output:
====================
sending 1
sending 6
Got: 1
Got: 6
sending 2
sending 7
Got: 2
Got: 7
sending 3
sending 8
Got: 3
Got: 8
sending 9
Got: 9
sending 4
Got: 4
sending 10
Got: 10
sending 5
Got: 5
total numbers received: 10

====================

By cloning the tx before moving it into the threads, we've ensured that each thread has its own Sender to communicate with the Receiver. This allows both threads to send messages concurrently without taking ownership of the original Sender, which is why we're seeing the interleaved "sending" and "Got:" outputs.

The reason the Receiver stops blocking and allows the program to finish, even without explicitly joining the threads, is because the Sender endpoints are dropped when each thread finishes execution. When all Sender instances are dropped, the Receiver knows no more messages will come through, and it stops blocking, allowing the for received in rx loop to complete.

This is a key aspect of Rust's channel implementation: the Receiver will only return None (and thus stop iterating in a for loop) when all Sender instances have been dropped, signaling that no more messages will be sent on the channel.

Here's a summary of how it happens:

  1. Each thread sends its messages, sleeping for a second between each one.
  2. The main thread concurrently receives messages. As soon as a message is sent, it's printed out by the main thread.
  3. When the threads finish their execution, the Sender objects they own are dropped.
  4. Once all Sender objects are dropped, the Receiver stops blocking and the for loop in the main thread exits.
  5. The program prints the total number of messages received and exits.

The interleaved output of "sending" and "Got:" lines is due to the threads and the main thread running concurrently, with the threads sleeping for a second between sends, giving the main thread time to print out the received messages.

Conclusion

Throughout this exploration of Rust's threading model, we've seen how the language's design choices around ownership and concurrency enable us to write safe and efficient multithreaded code. From the basics of spawning threads and joining them, to sharing state via Arc and Mutex, and coordinating between threads with message passing, Rust provides a rich set of tools for concurrent programming.

The exercises we've worked through demonstrate the importance of understanding ownership and the type system when working with threads in Rust. By leveraging these concepts, Rust ensures that our concurrent programs are free from data races and other common concurrency pitfalls.

As we've seen, Rust's channels and synchronization primitives are powerful, but they also require us to think carefully about how we structure our code and manage shared state. The compiler's strict checks might seem restrictive at first, but they guide us towards writing code that is not only correct but also clear in its intent.

In conclusion, Rust's approach to concurrency is not about taking away the power from the programmer but about providing the tools to harness that power responsibly. It encourages us to think in terms of safe abstractions, clear ownership, and explicit synchronization. As you continue to explore Rust's concurrency features, keep in mind the patterns and principles we've discussed here. They will serve as a solid foundation for building reliable, high-performance concurrent applications.

Happy coding, and may your Rustacean journey be concurrency-error-free!