The Reactor-Executor Pattern
This pattern is most often referred to as just Reactor Pattern and it's especially relevant in Rust due to how well this pattern aligns with the Futuresapi. This is a commonly used technique for "demultiplexing" asynchronous events in the order they arrive. Don't worry I'll explain this in English below.
In Rust we often refer to both a Reactorand an Executorwhen we talk about its asynchronous model. The reason for this is that Futuresin Rust fits nicely in between as the glue that allows these two pieces to work together.
One huge advantage of this is that this allows us to pick the Reactorand the Executorwhich best suits our problem at hand. In pratice, you'll most often use a runtime which provides both for you.
Before we talk more about how this relates to Futureslets first take a look at what we need to implement. To create a minimal example of the Reactor Pattern we need:
  1. 1.
    Reactor
    1. 1.
      An event queue
    2. 2.
      A way to notify the executor of an event
  2. 2.
    Executor
    1. 1.
      A Scheduler
    2. 2.
      A set of suspended tasks
  3. 3.
    Tasks
    1. 1.
      The user code representing a task we have to complete
    2. 2.
      Needs to be interruptible so it can be suspended and yield control to the Executor instead of waiting for I/O.

The Reactor

The cross platform Epoll/Kqueue/IOCP library which we implement in this book can be viewed as the main building block of the reactor - the event queue part. The only missing piece is a way for us to communicate with the Executorthat an event is ready and we actually need to receive the events which are ready and wake up the task which will finish. The simplest way to do this is to use a Channelwhich is exactly what we'll do.
A very simple Reactorcan look like this:
1
struct Reactor {
2
handle: std::thread::JoinHandle<()>,
3
registrator: Registrator,
4
}
5
​
6
impl Reactor {
7
fn new(evt_sender: Sender<usize>) -> Reactor {
8
let mut poll = Poll::new().unwrap();
9
let registrator = poll.registrator();
10
​
11
// Set up the epoll/IOCP event loop in a seperate thread
12
let handle = thread::spawn(move || {
13
let mut events = Events::with_capacity(1024);
14
loop {
15
// This call will block until an event is ready
16
match poll.poll(&mut events, Some(200)) {
17
Ok(..) => (),
18
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => break,
19
Err(e) => panic!("Poll error: {:?}, {}", e.kind(), e),
20
};
21
for event in &events {
22
let event_token = event.id();
23
evt_sender.send(event_token).expect("send event_token err.");
24
}
25
}
26
});
27
​
28
Reactor { handle, registrator }
29
}
30
​
31
fn register_stream_read_interest(&self, stream: &mut TcpStream, token: usize) {
32
self.registrator.register(stream, token, Interests::readable()).expect("registration err.");
33
}
34
​
35
fn stop_loop(&self) {
36
self.registrator.close_loop().expect("close loop err.");
37
}
38
}
Copied!

How does this relate async in Rust?

In Rust a library like mio will be what drives the Reactor part. In Rust we have Futureswhich pass on a Wakerto the Reactor. Instead of communicating directly with the executor through a channel, the Reactor wil call Waker.wake()on the relevant Wakeronce an event is ready.

Executor

The executor needs to schedule the execution of events which are ready and provide a way for us to set handlers for events. In our example we'll specifically set a handler function which will resume our task when the corresponding event is ready.
We run the handlers for each task as they arrive, and we run them synchronously. This is what we mean by "demultiplexing" asynchronous events.
A simple executor can look like this:
1
struct Excutor {
2
events: Vec<(usize, Box<dyn FnMut()>)>,
3
evt_receiver: Receiver<usize>,
4
}
5
​
6
impl Excutor {
7
fn new(evt_receiver: Receiver<usize>) -> Self {
8
Excutor { events: vec![], evt_receiver }
9
}
10
fn suspend(&mut self, id: usize, f: impl FnMut() + 'static) {
11
self.events.push((id, Box::new(f)));
12
}
13
fn resume(&mut self, event: usize) {
14
let (_, f) = self.events
15
.iter_mut()
16
.find(|(e, _)| *e == event)
17
.expect("Couldn't find event.");
18
f();
19
}
20
fn block_on_all(&mut self) {
21
while let Ok(received_token) = self.evt_receiver.recv() {
22
assert_eq!(TEST_TOKEN, received_token, "Non matching tokens.");
23
self.resume(received_token);
24
}
25
}
26
}
Copied!
It's not very sophisticated but will do the work for us. As you see a handler, which will actually represent a suspended task in our example, doesn't need to be anyting more fancy than a closure which we'll invoke when the time is right. The means of communication between our Reactorand Executoris just a regular std::sync::mspc::Channeland all we do in block_on_allis to wait for events which we can respond to.
There is of course many ways which we could choose to handle this, feel free to play around and try for yourself.

How does this relate to async in Rust?

In rust libraries like Tokioor async_stdtakes on the role as Executors. Generally a Reactorand an Executorwill be provided together in a runtime so you won't have to actually call different methods on the Executorand Reactorlike we do here and instead leave that to the runtime you use.
However, there is nothing in Rusts standard library or language which prevents you to choose an Reactorand an Executorbased on your needs.

Task

To actually use our Reactorand Executorwe need to provide some code which glues everything together. It looks like this:
1
fn main() {
2
let (evt_sender, evt_receiver) = channel();
3
let reactor = Reactor::new(evt_sender);
4
let mut executor = Excutor::new(evt_receiver);
5
​
6
// ===== TASK =====
7
let mut stream = TcpStream::connect("slowwly.robertomurray.co.uk:80").unwrap();
8
let request = b"GET /delay/1000/url/http://www.google.com HTTP/1.1\r\nHost: slowwly.robertomurray.co.uk\r\nConnection: close\r\n\r\n";
9
​
10
stream.write_all(request).expect("Stream write err.");
11
reactor.register_stream_read_interest(&mut stream, TEST_TOKEN);
12
​
13
// ===== SUSPEND TASK =====
14
executor.suspend(TEST_TOKEN, move || {
15
let mut buffer = String::new();
16
stream.read_to_string(&mut buffer).unwrap();
17
assert!(!buffer.is_empty(), "Got an empty buffer");
18
reactor.stop_loop();
19
});
20
​
21
// ===== TASK END =====
22
​
23
executor.block_on_all();
24
// NB! Best practice is to make sure to join our child thread. We skip it here for brevity.
25
}
Copied!
There are a few things to note here. First of all the TcpStreamis provided to us by our Reactorand is not the one in the standard library. Secondly you'll see that we have only implemented methods for Readand not for Write. If we had to write all our async code like this it wouldn't be very ergonomic. That's why we use runtimes which does a lot of this for us, like closing down our Pollloop, releasing resources and joining threads.
Lets go through the steps we take here:
  1. 1.
    We provide a means of communication between our Reactorand Executor
  2. 2.
    We instantiate a Reactorand an Executor
  3. 3.
    We open a socket and write a request (to a slow endpoint which will wait 1000ms before responding)
  4. 4.
    We register a Readinterest on that socket with our Reactor
  5. 5.
    We register a handler function with our Executorto be run once data is ready
  6. 6.
    We stop the Pollloop explicitly in our handler function (we only have one task)
  7. 7.
    We run our Executorand block until all tasks are finished

How does this relate to async in Rust?

We have a task here which we manually stop in the middle and then resume once data is ready for us. The task is "Get data from the socket and assert that we actually received some data".
To accomplish this we use a callback based approach. In Rust, we'd normally use Futuresto do this. We'll cover this in more detail in a later book, but it's important to remember that a Futureis just a different way of creating an interruptible task.
In addition, Rusts Futuresprovide a Wakerwhich should be used to let the Executorknow that a task can be woken up and resumed. In our example we have a tight coupling between the executor and reactor through the use of a regular Channelas a way to communicate. With Futuresthe executor and reactor can be totally decoupled.
As a final note. We run part of our task on the main thread. Normally, a runtime will let you just spawnyour task an take care of starting, suspending and resuming it for you.

Full code example

To run this example we'll have to rely on the code we're actually going to write in this book.
Create a new folder and start a new Rust project
1
cargo init
Copied!
In Cargo.tomladd this dependency
1
[dependencies]
2
minimio={git="https://github.com/cfsamson/examples-minimio"}
Copied!
In main.rsreplace what's there with the code from this chapter. I added a few more printstatements to see some output. Feel free to add more yourself and play around with the code:
1
use minimio::{Events, Interests, Poll, Registrator, TcpStream};
2
use std::{io, io::Read, io::Write, thread};
3
use std::sync::mpsc::{channel, Receiver, Sender};
4
​
5
const TEST_TOKEN: usize = 10;
6
​
7
fn main() {
8
let (evt_sender, evt_receiver) = channel();
9
let reactor = Reactor::new(evt_sender);
10
let mut executor = Excutor::new(evt_receiver);
11
​
12
let mut stream = TcpStream::connect("slowwly.robertomurray.co.uk:80").unwrap();
13
let request = b"GET /delay/1000/url/http://www.google.com HTTP/1.1\r\nHost: slowwly.robertomurray.co.uk\r\nConnection: close\r\n\r\n";
14
​
15
stream.write_all(request).expect("Stream write err.");
16
reactor.register_stream_read_interest(&mut stream, TEST_TOKEN);
17
​
18
executor.suspend(TEST_TOKEN, move || {
19
let mut buffer = String::new();
20
stream.read_to_string(&mut buffer).unwrap();
21
println!("{}", buffer);
22
assert!(!buffer.is_empty(), "Got an empty buffer");
23
reactor.stop_loop();
24
});
25
​
26
executor.block_on_all();
27
// NB! Best practice is to make sure to join our child thread. We skip it here for brevity.
28
println!("EXITING");
29
}
30
​
31
struct Reactor {
32
handle: std::thread::JoinHandle<()>,
33
registrator: Registrator,
34
}
35
​
36
impl Reactor {
37
fn new(evt_sender: Sender<usize>) -> Reactor {
38
let mut poll = Poll::new().unwrap();
39
let registrator = poll.registrator();
40
​
41
// Set up the epoll/IOCP event loop in a seperate thread
42
let handle = thread::spawn(move || {
43
let mut events = Events::with_capacity(1024);
44
loop {
45
println!("Waiting! {:?}", poll);
46
match poll.poll(&mut events, Some(200)) {
47
Ok(..) => (),
48
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {
49
println!("INTERRUPTED: {}", e);
50
break;
51
}
52
Err(e) => panic!("Poll error: {:?}, {}", e.kind(), e),
53
};
54
for event in &events {
55
let event_token = event.id().value();
56
evt_sender.send(event_token).expect("send event_token err.");
57
}
58
}
59
});
60
​
61
Reactor { handle, registrator }
62
}
63
​
64
fn register_stream_read_interest(&self, stream: &mut TcpStream, token: usize) {
65
self.registrator.register(stream, token, Interests::readable()).expect("registration err.");
66
}
67
​
68
fn stop_loop(&self) {
69
self.registrator.close_loop().expect("close loop err.");
70
}
71
}
72
​
73
struct Excutor {
74
events: Vec<(usize, Box<dyn FnMut()>)>,
75
evt_receiver: Receiver<usize>,
76
}
77
​
78
impl Excutor {
79
fn new(evt_receiver: Receiver<usize>) -> Self {
80
Excutor { events: vec![], evt_receiver }
81
}
82
fn suspend(&mut self, id: usize, f: impl FnMut() + 'static) {
83
self.events.push((id, Box::new(f)));
84
}
85
fn resume(&mut self, event: usize) {
86
println!("RESUMING TASK: {}", event);
87
let (_, f) = self.events
88
.iter_mut()
89
.find(|(e, _)| *e == event)
90
.expect("Couldn't find event.");
91
f();
92
}
93
fn block_on_all(&mut self) {
94
while let Ok(received_token) = self.evt_receiver.recv() {
95
assert_eq!(TEST_TOKEN, received_token, "Non matching tokens.");
96
println!("EVENT: {} is ready", received_token);
97
self.resume(received_token);
98
}
99
}
100
}
Copied!
Output (on my Windows machine):
1
Waiting! Poll { registry: Registry { selector: Selector { completion_port: 164 } }, is_poll_dead: false }
2
Waiting! Poll { registry: Registry { selector: Selector { completion_port: 164 } }, is_poll_dead: false }
3
Waiting! Poll { registry: Registry { selector: Selector { completion_port: 164 } }, is_poll_dead: false }
4
Waiting! Poll { registry: Registry { selector: Selector { completion_port: 164 } }, is_poll_dead: false }
5
Waiting! Poll { registry: Registry { selector: Selector { completion_port: 164 } }, is_poll_dead: false }
6
Waiting! Poll { registry: Registry { selector: Selector { completion_port: 164 } }, is_poll_dead: false }
7
Waiting! Poll { registry: Registry { selector: Selector { completion_port: 164 } }, is_poll_dead: false }
8
Waiting! Poll { registry: Registry { selector: Selector { completion_port: 164 } }, is_poll_dead: false }
9
EVENT: 10 is ready
10
RESUMING TASK: 10
11
HTTP/1.1 302 Found
12
Server: CowbHTTP/1.1 302 Found
13
Server: CowbHTTP/1.1 302 Found
14
Server: Cowboy
15
[...rest of response data...]
16
​
17
​
18
INTERRUPTED: Poll closed.
19
EXITING
Copied!
Last modified 1yr ago