22 Rustlings Threads Solution
In most current operating systems, an executed program’s code is run in a process, and the operating system manages multiple processes at once. Within your program, you can also have independent parts that run simultaneously. The features that run these independent parts are called threads.
Further information
Threads1.rs
// threads1.rs
// Execute `rustlings hint threads1` or use the `hint` watch subcommand for a hint.
// This program spawns multiple threads that each run for at least 250ms,
// and each thread returns how much time they took to complete.
// The program should wait until all the spawned threads have finished and
// should collect their return values into a vector.
// I AM NOT DONE
use std::thread;
use std::time::{Duration, Instant};
fn main() {
let mut handles = vec![];
for i in 0..10 {
handles.push(thread::spawn(move || {
let start = Instant::now();
thread::sleep(Duration::from_millis(250));
println!("thread {} is complete", i);
start.elapsed().as_millis()
}));
}
let mut results: Vec<u128> = vec![];
for handle in handles {
// TODO: a struct is returned from thread::spawn, can you use it?
}
if results.len() != 10 {
panic!("Oh no! All the spawned threads did not finish!");
}
println!();
for (i, result) in results.into_iter().enumerate() {
println!("thread {} took {}ms", i, result);
}
}
We have one TODO on this exercise and we have a question about thread::spawn
' s returned struct.
Threads1.rs Errors
⚠️ Ran exercises/threads/threads1.rs with errors
thread 'main' panicked at exercises/threads/threads1.rs:31:9:
Oh no! All the spawned threads did not finish!
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Looking at the errors we see that the if
result is triggering a panic with the Oh no! All the spawned threads did not finish!
message.
Threads1.rs Solution
If you look at the Rust documentation you will see that you can use the JoinHandle when using the std::thread::spawn
which provides a join
method that can be used to "join" the spawned thread, this is because the join
method waits for the thread to finish and returns the result of the thread's operation, which is why it's necessary to call join
on each handle. So let's try that in our for handle
loop. It would look something like this:
for handle in handles {
handle.join().unwrap();
}
Let's run this is and see what happens
⚠️ Ran exercises/threads/threads1.rs with errors
thread 2 is complete
thread 0 is complete
thread 3 is complete
thread 9 is complete
thread 1 is complete
thread 8 is complete
thread 6 is complete
thread 7 is complete
thread 4 is complete
thread 5 is complete
thread 'main' panicked at exercises/threads/threads1.rs:32:9:
Oh no! All the spawned threads did not finish!
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Looks like we're getting closer but we are still panicking with the same message, notice how the threads are all out of order, this will vary every time we run the program and this is part the challenge with multi-threaded applications that the main thread can finish before the spawned threads are complete. So going back to our for
loop, if we look back at the code we see that we are using the results
variable in the if
portion of the code but we never actually pushed the threads into the results
variable so the code is panicking regardless of the fact that we're printing out the threads, so let's fix that.
for handle in handles {
// creating a new variable `result` with the joined threads
let result = handle.join().unwrap();
// pushing these results to the `results` Vec<u128> defined before the for loop.
results.push(result);
}
This will properly populate the results
vector and allow the if
part of the code not panic because results
will indeed be equal to 10
with this we are complete, here's our output:
✅ Successfully ran exercises/threads/threads1.rs!
🎉 🎉 The code is compiling! 🎉 🎉
Output:
====================
thread 1 is complete
thread 4 is complete
thread 9 is complete
thread 0 is complete
thread 2 is complete
thread 3 is complete
thread 5 is complete
thread 7 is complete
thread 8 is complete
thread 6 is complete
thread 0 took 255ms
thread 1 took 255ms
thread 2 took 255ms
thread 3 took 255ms
thread 4 took 254ms
thread 5 took 255ms
thread 6 took 255ms
thread 7 took 255ms
thread 8 took 255ms
thread 9 took 255ms
====================
Threads2.rs
// threads2.rs
// Execute `rustlings hint threads2` or use the `hint` watch subcommand for a hint.
// Building on the last exercise, we want all of the threads to complete their work but this time
// the spawned threads need to be in charge of updating a shared value: JobStatus.jobs_completed
// I AM NOT DONE
use std::sync::Arc;
use std::thread;
use std::time::Duration;
struct JobStatus {
jobs_completed: u32,
}
fn main() {
let status = Arc::new(JobStatus { jobs_completed: 0 });
let mut handles = vec![];
for _ in 0..10 {
let status_shared = Arc::clone(&status);
let handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(250));
// TODO: You must take an action before you update a shared value
status_shared.jobs_completed += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
// TODO: Print the value of the JobStatus.jobs_completed. Did you notice anything
// interesting in the output? Do you have to 'join' on all the handles?
println!("jobs completed {}", ???);
}
}
We have a couple of TODO
's on this exercise that is building on the previous
- In our first
for
loop we must take an action before we can update a shared value - In our
handle
for loop we must print out the value of theJobStatus.jobs_completed
Threads2.rs Errors
⚠️ Compiling of exercises/threads/threads2.rs failed! Please try again. Here's the output:
error: expected expression, found `?`
--> exercises/threads/threads2.rs:32:39
|
32 | println!("jobs completed {}", ???);
| ^ expected expression
error: aborting due to previous error
The errors show that we're missing the last part of our print statement so that's not much help, but let's proceed. We actually get more of a hint in the comments of the TODO
with : Did you notice anything interesting in the output? Do you have to 'join' on all the handles?
So let's keep that in mind.
Threads2.rs Solution
let's first take a look at our fn main
where we have the TODO
right before our incrementation.
fn main() {
let status = Arc::new(JobStatus { jobs_completed: 0 });
let mut handles = vec![];
for _ in 0..10 {
let status_shared = Arc::clone(&status);
let handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(250));
// TODO: You must take an action before you update a shared value
status_shared.jobs_completed += 1;
});
handles.push(handle);
}
Implementing Mutex
So, what is the action that we must do before we updated a shared value? This hint must be referring to the use of a Mutex which if you're not familiar with them, is to short for mutual exclusion -- meaning a mutex allows only one thread to access on piece of data at any given time. To use the data with a mutex a thread must request access by asking to acquire a mutex lock.
So let's first implement the Mutex
syntax into our code starting at the very top with:
let status = Arc::new(Mutex::new(JobStatus { jobs_completed }));
Also since we're using a new type, Mutex
we must import it with use std::sync::Mutex;
Alright now we have to work on the locking portion of the code our TODO:
// TODO: You must take an action before you update a shared value
status_shared.jobs_completed += 1;
So, let's create update the status_shared
variable so we can store the status_shared.lock().unwrap()
, let's make it mut
as well as we have to update the value. The lines would look like this:
let mut status_shared = status_shared.lock().unwrap();
status_shared.jobs_completed += 1;
Fixing Printing
Now, let's look at our 2nd TODO:
// TODO: Print the value of the JobStatus.jobs_completed. Did you notice anything
// interesting in the output? Do you have to 'join' on all the handles?
println!("jobs completed {}", ???);
First let's figure out what we should be printing here, clearly it's the jobs_completed
and this is being stored inside of our status
variable, so how do we get to this data? By dereferencing with *
and because it's a Mutex
we should also use the lock
syntax. Let's try:
println!("jobs completed {:?}", *status.lock().unwrap());
Note that we also have to add the :?
syntax inside of the curly braces as well as add #[derive(Debug)]
to our JobStatus
struct to be able to print to our terminal. Here's our updated full code block
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::sync::Mutex;
#[derive(Debug)]
struct JobStatus {
jobs_completed: u32,
}
fn main() {
let status = Arc::new(Mutex::new(JobStatus { jobs_completed: 0 }));
let mut handles = vec![];
for _ in 0..10 {
let status_shared = Arc::clone(&status);
let handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(250));
let mut status_shared = status_shared.lock().unwrap();
status_shared.jobs_completed += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
println!("jobs completed {:?}", *status.lock().unwrap());
}
}
And with this we are compiling, BUT maybe we're not getting the expected output:
🎉 🎉 The code is compiling! 🎉 🎉
Output:
====================
jobs completed JobStatus { jobs_completed: 10 }
jobs completed JobStatus { jobs_completed: 10 }
jobs completed JobStatus { jobs_completed: 10 }
jobs completed JobStatus { jobs_completed: 10 }
jobs completed JobStatus { jobs_completed: 10 }
====================
This is strange so how do we fix it? I propose we add 2 different print statements to get a better understanding of what is happening with our threads.
The first one inside of our for
loop
for _ in 0..10 {
let status_shared = Arc::clone(&status);
let handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(250));
let mut status_shared = status_shared.lock().unwrap();
status_shared.jobs_completed += 1;
// Print inside the thread to observe the incrementation
println!("Thread incremented jobs_completed to {}", status_shared.jobs_completed);
});
handles.push(handle);
}
and final printout that is not inside of the second for
loop but outside to confirm our total jobs completed:
for handle in handles {
handle.join().unwrap();
}
// Print the final value of the JobStatus.jobs_completed after all threads have been joined
println!("Final jobs completed {}", status.lock().unwrap().jobs_completed);
With these tweaks we have our code compiling and behaving in a more predictable or rather informative manner:
Output:
====================
Thread incremented jobs_completed to 1
Thread incremented jobs_completed to 2
Thread incremented jobs_completed to 3
Thread incremented jobs_completed to 4
Thread incremented jobs_completed to 5
Thread incremented jobs_completed to 6
Thread incremented jobs_completed to 7
Thread incremented jobs_completed to 8
Thread incremented jobs_completed to 9
Thread incremented jobs_completed to 10
jobs completed JobStatus { jobs_completed: 10 }
====================
Threads3.rs
// threads3.rs
// Execute `rustlings hint threads3` or use the `hint` watch subcommand for a hint.
// I AM NOT DONE
use std::sync::mpsc;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
struct Queue {
length: u32,
first_half: Vec<u32>,
second_half: Vec<u32>,
}
impl Queue {
fn new() -> Self {
Queue {
length: 10,
first_half: vec![1, 2, 3, 4, 5],
second_half: vec![6, 7, 8, 9, 10],
}
}
}
fn send_tx(q: Queue, tx: mpsc::Sender<u32>) -> () {
let qc = Arc::new(q);
let qc1 = Arc::clone(&qc);
let qc2 = Arc::clone(&qc);
thread::spawn(move || {
for val in &qc1.first_half {
println!("sending {:?}", val);
tx.send(*val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
for val in &qc2.second_half {
println!("sending {:?}", val);
tx.send(*val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
}
fn main() {
let (tx, rx) = mpsc::channel();
let queue = Queue::new();
let queue_length = queue.length;
send_tx(queue, tx);
let mut total_received: u32 = 0;
for received in rx {
println!("Got: {}", received);
total_received += 1;
}
println!("total numbers received: {}", total_received);
assert_eq!(total_received, queue_length)
}
On this final threads exercise we don't get any information other than what the Rust errors tell us.
Threads3.rs Errors
error[E0382]: use of moved value: `tx`
--> exercises/threads/threads3.rs:40:19
|
27 | fn send_tx(q: Queue, tx: mpsc::Sender<u32>) -> () {
| -- move occurs because `tx` has type `Sender<u32>`, which does not implement the `Copy` trait
...
32 | thread::spawn(move || {
| ------- value moved into closure here
...
35 | tx.send(*val).unwrap();
| -- variable moved due to use in closure
...
40 | thread::spawn(move || {
| ^^^^^^^ value used here after move
...
43 | tx.send(*val).unwrap();
| -- use occurs due to use in closure
error: aborting due to previous error
For more information about this error, try `rustc --explain E0382`.
Alright we have some hints from the error's -- let's implement a solution that fixes this Copy
trait issue.
Threads3.rs Solution
Let's recap, the error we're encountering is because the tx
(transmitter) is moved into the first thread we spawn. Once a value has been moved into a closure, it cannot be used again because the ownership has been transferred to the closure. In Rust, most values have 'move semantics', which means that when you pass a value to a function or a thread, the ownership is transferred, and the original variable can no longer be used.
To fix this, you need to create a transmitter for each thread because the mpsc::Sender
type does not implement the Copy
trait, (as the error tells us) which means it cannot be copied, only moved. However, mpsc::Sender
does implement the Clone
trait, so you can clone it before moving it into the thread.
Here's how we can modify the send_tx
function:
fn send_tx(q: Queue, tx: mpsc::Sender<u32>) -> () {
let qc = Arc::new(q);
let qc1 = Arc::clone(&qc);
let qc2 = Arc::clone(&qc);
let tx1 = tx.clone(); // Clone the transmitter for the first thread
thread::spawn(move || {
for val in &qc1.first_half {
println!("sending {:?}", val);
tx1.send(*val).unwrap(); // Use the cloned transmitter
thread::sleep(Duration::from_secs(1));
}
});
// No need to clone here, as we can use the original transmitter
thread::spawn(move || {
for val in &qc2.second_half {
println!("sending {:?}", val);
tx.send(*val).unwrap(); // Use the original transmitter
thread::sleep(Duration::from_secs(1));
}
});
}
In this modified function, tx1
is a clone of the original transmitter tx
and is moved into the first thread. The second thread can use the original tx
because it's no longer needed in the main thread after send_tx
is called.
And with that we're compiling!
✅ Successfully ran exercises/threads/threads3.rs!
🎉 🎉 The code is compiling! 🎉 🎉
Output:
====================
sending 1
sending 6
Got: 1
Got: 6
sending 2
sending 7
Got: 2
Got: 7
sending 3
sending 8
Got: 3
Got: 8
sending 9
Got: 9
sending 4
Got: 4
sending 10
Got: 10
sending 5
Got: 5
total numbers received: 10
====================
By cloning the tx
before moving it into the threads, we've ensured that each thread has its own Sender
to communicate with the Receiver
. This allows both threads to send messages concurrently without taking ownership of the original Sender
, which is why we're seeing the interleaved "sending" and "Got:" outputs.
The reason the Receiver
stops blocking and allows the program to finish, even without explicitly joining the threads, is because the Sender
endpoints are dropped when each thread finishes execution. When all Sender
instances are dropped, the Receiver
knows no more messages will come through, and it stops blocking, allowing the for received in rx
loop to complete.
This is a key aspect of Rust's channel implementation: the Receiver
will only return None
(and thus stop iterating in a for
loop) when all Sender
instances have been dropped, signaling that no more messages will be sent on the channel.
Here's a summary of how it happens:
- Each thread sends its messages, sleeping for a second between each one.
- The main thread concurrently receives messages. As soon as a message is sent, it's printed out by the main thread.
- When the threads finish their execution, the
Sender
objects they own are dropped. - Once all
Sender
objects are dropped, theReceiver
stops blocking and thefor
loop in the main thread exits. - The program prints the total number of messages received and exits.
The interleaved output of "sending" and "Got:" lines is due to the threads and the main thread running concurrently, with the threads sleeping for a second between sends, giving the main thread time to print out the received messages.
Conclusion
Throughout this exploration of Rust's threading model, we've seen how the language's design choices around ownership and concurrency enable us to write safe and efficient multithreaded code. From the basics of spawning threads and joining them, to sharing state via Arc
and Mutex
, and coordinating between threads with message passing, Rust provides a rich set of tools for concurrent programming.
The exercises we've worked through demonstrate the importance of understanding ownership and the type system when working with threads in Rust. By leveraging these concepts, Rust ensures that our concurrent programs are free from data races and other common concurrency pitfalls.
As we've seen, Rust's channels and synchronization primitives are powerful, but they also require us to think carefully about how we structure our code and manage shared state. The compiler's strict checks might seem restrictive at first, but they guide us towards writing code that is not only correct but also clear in its intent.
In conclusion, Rust's approach to concurrency is not about taking away the power from the programmer but about providing the tools to harness that power responsibly. It encourages us to think in terms of safe abstractions, clear ownership, and explicit synchronization. As you continue to explore Rust's concurrency features, keep in mind the patterns and principles we've discussed here. They will serve as a solid foundation for building reliable, high-performance concurrent applications.
Happy coding, and may your Rustacean journey be concurrency-error-free!