It's not that easy
Well if any of this classify as "easy" at all...
The funny thing about the exercise in this book is that a lot of the work is setting up the infrastructure to get a running event loop in the first place. Actually adding new functionality is much easier, but to keep this example as short as I can we skip many important things to actually implement a fully working event queue. I'll point some of the ones I consider the most important here.

Everything in networking is asynchronous

In our example we only treat Readevents on a socket. In reality, both the opening of a socket, resolving the DNS and writing data to a socket are all blocking tasks.
This actually means that we'll run into trouble using the standard library std::net::TcpStreamas the real workhorse behind our own TcpStreamsince there is no way to open a socket without resolving the DNS using a blocking call. In reality, we need to build our own TcpStreamfrom scratch if we want it to be truly asynchronous.

Communication could be interrupted

We're not accounting for the fact that communication could be interrupted. In such a case the right thing to do would be to suspend the task again and register interest for further events on that resource with our event queue.

Buffers could be full while there is more data to read

This is particularly a problem we have with our IOCPimplementation. We only submit one fixed buffer and does not even consider the possibility that we might receive more data than would fit in it. Neither do we consider how we would optimally arrange these buffers to avoid allocating more memory than we need and make sure performance does not suffer.
The right thing to do here is to decide on a buffering strategy, and this can be a complicated topic itself. However, getting a working and correct solution is not that much work, but if you want to win be the fastest I/Olibrary in the world, there is a lot to consider.
We should check if we have read all the data or not and resubmit interest to our queue for more data if needed.

Resolving DNS

As mentioned in the first paragraph of this chapter, almost everything is blocking at some level. DNS lookup is one of those things where we have to decide on a strategy. The optimal strategy for this might depend on our exact use case.
DNS entries are often cached by the OS. This means that if most of our DNS lookups are cached, the most efficient thing would probably be to actually resolve this in a blocking manner since there will be no real I/O. Deferring this to a thread pool, or adding such an interest to the event queue will most likely be slower than to just block if it's cached.
If most of our DNS lookups are not cached locally, this will be a bad strategy. It's very common to delegate the DNS lookup to a thread pool in these instances. First of all, some entries are cached and will return immediately, and if we need to wait for it a thread pool will make sure we only block there and it's most likely for a relatively short amount of time.
There is also the case of what APIs and methods are available for evented DNS lookup. Most experiences I've read seems to suggest that these APIs are pretty unergonomic to use and that it's difficult to justify that added complexity with the gains it would have (if any at all).

Level triggered vs Edge triggered events

So this is actually hides some pretty important complexity we skipped for the readinessbased models. The default mode for epollis level triggered. This means that if we don't immediately start reading from the socket, it will keep "nagging" us and trigger notifications that it's ready to be read.
Now this behavior is most often unwanted. Our executoror schedulermight want to defer reading from that socket for a while for some reason if there is a lot of work to do. In this case our event queue will be woken up repeatedly to get notified of something we already know.
In edge triggeredmode, we only get this notification once. We only get notifications on state transitions. An example of this is when the socketchanges state from NotReadyto Ready.
Another advantage of this is when we have multiple threads waiting on the same event queue. Let's pretend that we designed our Pollinstance to actually be cloned and shared to multiple threads.
edge triggeredmode guarantees that only one thread will be woken up and notified about the event. level triggeredmode does not make this guarantee. This means you need additional logic (and checks) to see if an event is already being handled by another thread.
There are similar flags we can pass to kqueueeven though it's not named the same there, but the same considerations is valid there as well.
We circumvented this by using EPOLLONESHOT. Using this actually removes the resource from the event queue alltoghether, and since we made our socketblocking on receiving the first event we knew that we would only block if for some reason we received an EAGAIN error on the socket.
The proper thing to do here is to read until we get a "loss of readiness" (if we get that at all) and suspend the task and wait for another Ready event. When using edge triggeredmode the socket will notify us when it changes state to Readyagain and we can resume.

Thread Safety and Race Conditions

Our implementation is pretty brittle in some aspects. Our main focus has been how Kqueue, Epoll and IOCP work, and not how to make a perfect event loop library.
In our Registrator::register()method we first check if the Pollinstance is dead using an atomic flag. But if we were to create multiple Registratorsand send them to different threads we're in trouble.
What happens if another thread closes the loop before we hit line 22 in the code below?
We'll nothing too dramatic, but our event will never return to us and if we're counting on that we might end up blocking for ever somewhere.
How can we solve this?
There are probably several possible solutions to this. The first that came to my mind was:
  1. 1.
    Use another flag to indicate that a "registration is in progress". In practice a "registration lock".
  2. 2.
    We use an atomic swap_and_compare in a loop to try setting that flag, exiting the loop when successful. By successfully setting that flag we're letting other threads know that we're about to register an event and that we hold the "registration lock"
  3. 3.
    Then we check if the poll is closed and if not we register our event
  4. 4.
    We then swap the flag back which "releases" the lock and lets other threads acquire the "lock".
This is of course assuming that the registration process itself is so fast that spinning on contention is preferred to other methods of synchronizing access like a Mutexor CondVar.
Maybe you have a better idea? Feel free to discuss and suggest it in the Issue Tracker for this book.
In our example library we treat this as an limitation and do not consider registrations from different threads as something we support. Take a look at the Appendix chapter Atomics in Rust to get some ideas on how this synchonization could be done in a non-blocking way.
pub fn register(
&self,
soc: &mut TcpStream,
token: usize,
interests: Interests,
) -> io::Result<()> {
// SPIN WHILE TRYING TO ACQUIRE A "REGISTRATION LOCK"
​
// THEN check if poll is dead
if self.is_poll_dead.load(Ordering::SeqCst) {
return Err(io::Error::new(
io::ErrorKind::Interrupted,
"Poll instance is dead.",
));
}
​
ffi::create_io_completion_port(
soc.as_raw_socket(),
self.completion_port, token
)?;
​
if interests.is_readable() {
// What happens if the poll has been closed since we checked it?
ffi::wsa_recv(soc.as_raw_socket(), &mut soc.wsabuf)?;
} else {
unimplemented!();
}
​
// RELEASE THE "REGISTRATION LOCK"
Ok(())
}
Copy link
On this page
Everything in networking is asynchronous
Communication could be interrupted
Buffers could be full while there is more data to read
Resolving DNS
Level triggered vs Edge triggered events
Thread Safety and Race Conditions