waking wakers & futuring futures — or — Building a future executor

11 minutes read (3130 words)

In the previous installment of the long winded rambly postings by your local internet weird girl talking about two of her favourite subjects — async and Rust — we laid out the foundations of Futures and Tasks. This time around we'll get our hands dirty and actually start writing code to run futures ourselves. (And I'll switch back to less cicerian sentence building)

The question we left off with was what is this "Context" and how do we use it respectively construct our own.

Wakers, Context and Readiness, or how the runtime knows when to do anything at all.

At this point I think an example is the best way to explain a few topics. So, lets start with designing a future, one we can use to explain a few topics and then segue into how the runtime side of things work. Let's make a channel, somewhat similar to the one in std::mpsc but async. To make the code less complex we only make the receiving end async and only allow sending one message at a time.

This will require us to define three types: A shared struct being the contact point between the receiving and transmitting endpoints and of course the transmitting and receiving endpoints themselves. I'll explain the code interspersed as I go. If you want to look at the whole code as a block or run my version yourself, all code can be found on GitHub. Also, finally, the code is going to have issues, be slow, and have potentials for nasty bugs. It is meant as a teaching tool, not production code. We will also only depend on the rust stdlib so you don't have to know your way around other crates, you don't even need to use cargo to run the code.

I'll start with the shared struct and transmitting endpoint since those are both pretty unsurprising:

/// The shared contact point between tx and rx.
/// 
/// Both the TX and RX have a reference to an instance of this and "sending" a
/// value is simply putting the value inside the slot.
struct Shared<T> {
    /// Storage for a value that has be sent and not yet received
    pub slot: Option<T>,
    /// The mythical Waker, the part making everything smartly async (◕▿◕✿)
    pub waker: Option<Waker>,
}

(I'm also going to exclude trivial implementation of new and friends like since they don't add value to this post)

/// Transmitting end
///
/// For simplicity this is a very simple and entirely synchronous implementation.
/// A Weak pointer is used so that shared is deallocated as soon as the RX is
/// dropped.
struct TX<T> {
    shared: Weak<Mutex<Shared<T>>>,
}
impl<T> TX<T> {
    /// Filling the shared slot, in essence "sending" it to the rx.
    pub fn try_send(&self, value: T) -> Result<(), T> {
        if let Some(shared) = self.shared.upgrade() {
            let mut guard = shared.lock().unwrap();

            if guard.slot.is_some() {
                return Err(value);
            }

            guard.slot = Some(value);
            if let Some(waker) = guard.waker.take() {
                waker.wake()
            }

            Ok(())
        } else {
            // This case is hit when the corresponding receiving end has been
            // dropped. To not make things complicated we don't return a
            // specific type for this kind of error.
            Err(value)
        }
    }

    /// Returns true if the RX end has not yet been dropped.
    pub fn is_rx_alive(&self) -> bool {
        self.shared.strong_count() > 0
    }
}

If you've built your own channel this code should feel familiar. The only thing that is different between this and a naive implementation of a channel is this waker variable in the shared struct and this call to waker.wake() in try_send. So, what is that doing? Well, it is waking up the receiving endpoint of course. And for that we have to take a paragraph (or three) and talk about readiness.

Readiness

A runtime has to at some point on some level be told what futures it needs to execute, to run to completion. With most runtimes there is some method usually called something similar to "spawn" that does just that. But there's more to it; "spawning" a future only tells the runtime that you want this future to be executed. That's just half of the picture though. The runtime also has to know when to resume a future that has yielded without result previously, in other words one that has returned Poll::Pending the last time it was polled.

This kind of information is "readiness". The fact that a certain future is ready to be polled again. That that future has a reasonable expectation that now it actually can continue and make at least some progress. And that is where the Waker type comes in. With a memory channel like we are currently building the only way that a receiving end can receive something is when the transmitting end has send something. So, in the try_send method we call wake if a waker is installed, informing the runtime that whatever task that waker belongs to can now make at least some progress, at least if it was blocked on receiving on this channel.

That last sentence is very important. One of the most critical assumptions that is made in the Rust async ecosystem is this: Whatever code can provide information a future is currently waiting on will have access to the Waker associated with that future's task, and will call wake on it when it has provided this information. This is the core magic behind running async futures. A runtime is responsible for providing Wakers that can be used to tell the runtime to poll certain futures once again.

Now, armed with this new knowledge, let's look at the receiving end and most importantly how it handles Waker.

struct RX<T> {
    shared: Arc<Mutex<Shared<T>>>,
}

impl<T> Future for RX<T> {
    type Output = T;

    /// Extract the internal value, if there is one.
    ///
    /// If there is no value currently, **schedules the current task to be woken
    /// up** when one is sent.
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut guard = self.shared.lock().unwrap();

        if let Some(value) = guard.slot.take() {
            // Happy path, there was something sent. We just return that and are
            // happy.
            return Poll::Ready(value);
        } else {
            let waker = cx.waker();

            // There was nothing sent yet. We need to install a waker, ensuring
            // that this task will be polled again when a value was sent, since
            // `TX::try_send` will call waker.wake().
            if let Some(ref oldwaker) = guard.waker {
                if !waker.will_wake(oldwaker) {
                    guard.waker.replace(waker.clone());
                }
            } else {
                guard.waker = Some(waker.clone());
            }

            return Poll::Pending;
        }
    }
}

Combined with the code of TX we can see the magic now. Behind the scene this is what the incantation "schedules the current task to be woken up" one can find in many doc comments of many future types actually means. At its most basic what we are doing here is providing the TX endpoint with a handle that allows it to tell our runtime to poll our future again. This is what the "scheduling" part really means, having a somewhat different meaning from the one that is usually used for scheduling.

One thing you may have surprised you is this spiel with waker.will_wake(), and that the waker is taken out of the Option in TX by value. This is deliberately and by design. Yes, just overwriting guard.waker with a new value would be just as good but the underlying issue is something I want to highlight: A future may be moved to a different context, a different task even, and it may be move after it has already been polled once. The context that was provided with the last call to poll may be different to the one provided with the next call, and thus also the Waker. A future should not assume the waker to stay the same and since the whole system relies on, in this case, TX having the correct Waker value a future should take extra care to keep stored Wakers current.

Futuring futures — Making our own Context

Now we have a future, and we have peeked behind the scenes. But we still haven't actually ran a future, least of all to completion. We know what a Waker has to do, but we still don't know how to actually make one ourself, and even less how to make a full runtime. So, no time like the present. Let's start building a runtime.

We need to be able to in the end construct a Waker, since we need one to create a Context. I said above that providing Wakers is responsibility of the runtime. But it goes a bit further; Wakers need to tell the runtime that their task should be polled again. How this needs to be done is completely different from runtime to runtime. In other words: Wakers are entirely specific to their runtime. We don't just have to provide them, in fact we have also to construct them. And implement them in the first place. Wakers being so specific to the runtime is in fact one of the main reasons why there is so little documentation around Waker and why its API is so … weird. There is no code to look at, no documentation to be found, at least not in the stdlib. That's why you can only construct a Waker from a RawWaker, a struct that manages to be even more obscure than Waker in it's documentation.

RawWaker then is in the end really just a trait object but written out by hand. If that trait existed, it would be pretty simple, looking something like this:

/// Depending on Clone so we have to implement `Clone::clone`. And of course
/// a Waker may also have custom Drop code. It will most likely have custom Drop
/// code.
trait Waker: Clone {

    /// Wake the associated task up so its future will be polled again. Consumes
    /// the Waker.
    pub fn wake(self);

    /// Wake the associated task, but doesn't consume the waker. Intended to
    /// implement a more efficient approach than `self.clone().wake()`, if
    /// available.
    pub fn wake_by_ref(&self) {
        let cloned = self.clone();
        cloned.wake()
    }
}

Now in reality the above methods and the implementations of clone and drop are marked unsafe and don't get self by value or even reference but instead as *const (), the closest thing to a void pointer Rust has. However, lifetimes and behaviours of the real implementations should behave the same, wake_by_ref must not result in Drop code being called, clone must ensure that access to any shared resources is transferred and kept alive, and so on. But we have to put all of that aside for now. Before we can start writing any code regarding our Wakers, we have to first figure out what wake is supposed to do in the first place. In other words we need to start with the runtime itself.

Favouring once again simplicity over any other goal, let's start designing a very naive runtime. Once again, only stdlib is used.

/// A struct representing an instance of our runtime doing things
struct OurRuntime {
    /// There's lots of smart fancy queuing systems around. But mpsc::Receiver is
    /// simple and it allows us to block.
    queue: Receiver<Pin<Box<dyn Future<Output=()>>>>,
    spawn: Sender<Pin<Box<dyn Future<Output=()>>>>,
}
impl OurRuntime {
    /// So, to spawn a future, just send it on the sending end.
    pub fn spawn(&self, f: Pin<Box<dyn Future<Output=()>>>) {
        self.spawn.send(f).unwrap()
    }

    /// Execute a scheduled future until it yields once. Blocks until there is a
    /// future to be ran.
    pub fn tick(&mut self) {
        // We construct our fancy Waker here. 
        let waker: Arc<OurWaker> = OurWaker::new(self.spawn.clone());

        // This is where using mpsc channels is nice. recv() will block until
        // there is more work available, stopping us from having to worry about
        // that (at least until the next post! ;p )
        let mut f = self.queue.recv().unwrap();

        if {
            // Prepare all the things required to poll a future:
            // Make our custom Waker into a `std::task::Waker`.
            let waker = unsafe { 
                Waker::from_raw(OurWaker::into_raw_waker(waker.clone())) 
            };
            // Build a Context from said waker
            let mut cx = Context::from_waker(&waker);

            // poll() and return true if we need to run this fut again
            f.as_mut().poll(&mut cx).is_pending()
        } {
            // If we do need to run the future again, we store it in our Waker
            // so the Waker can send the future to the runtime when `wake()` is 
            // called.
            waker.replace(f);
        }
    }
}

As you can see, there really isn't much to a runtime. In fact using std::mpsc hides almost all of the complexity from us, and leaves only the very core functionality to implement: Building a Context, and calling poll() with it.

As we have established a runtime is going to have custom Wakers. There's a myriad of ways to go about that, but the core idea is that a Waker has to make sure its future is run again. One of the simplest ways to think about that is by the Waker taking ownership of the future:

/// Custom waker that takes (temporary) ownership of the future when it's
/// blocked on something and spawns it again when `wake()` is called.
///
/// This is not efficient, but I think it's one of the easiest approaches to
/// understand.
struct OurWaker {
    spawn: Sender<Pin<Box<dyn Future<Output=()>>>>,
    slot: Mutex<Option<Pin<Box<dyn Future<Output=()>>>>>,
}

impl OurWaker {
    /// When we are created initially we don't have a future yet since that
    /// needs to be polled first.
    pub fn new(spawn: Sender<Pin<Box<dyn Future<Output=()>>>>) -> Arc<Self> {
        Arc::new(Self {
            spawn,
            slot: Mutex::new(None),
        })
    }

    /// Actually set the future to be maybe rescheduled
    pub fn replace(&self, f: Pin<Box<dyn Future<Output=()>>>) {
        let mut guard = self.slot.lock().unwrap();
        guard.replace(f);
    }

    /// Scheduling the future by just spawning it like a new one. it works.
    pub fn reschedule(&self) {
        let mut guard = self.slot.lock().unwrap();
        if let Some(fut) = guard.take() {
            self.spawn.send(fut);
        }
    }

    //
    // RawWaker implementation code
    //
    // We create the RawWaker from an **Arc<Self>**, not a Self directly. So the
    // methods below make heavy use of `Arc::from_raw` and `Arc::into_raw`
    //

    /// Convert an **Arc<Self>** into a RawWaker
    pub fn into_raw_waker(this: Arc<Self>) -> RawWaker {
        let ptr = Arc::into_raw(this);
        RawWaker::new(ptr.cast(), &OUR_VTABLE)
    }

    /// Convert an RawWaker into an **Arc<Self>**
    unsafe fn from_raw_waker(ptr: *const()) -> Arc<Self> {
        Arc::from_raw(ptr.cast())
    }

To be able to build a RawWaker we need to be able to construct a RawWakerVTable. And that needs four methods to be available, specifically these four:

    pub unsafe fn clone(ptr: *const ()) -> RawWaker {
        let ptr: *const Self = ptr.cast();
        Arc::increment_strong_count(ptr);
        RawWaker::new(ptr.cast(), &OUR_VTABLE)
    }

    pub unsafe fn wake(ptr: *const ()) {
        let this = Self::from_raw_waker(ptr);
        this.reschedule()
    }

    pub unsafe fn wake_by_ref(ptr: *const ()) {
        Self::clone(ptr);
        Self::wake(ptr);
    }

    pub unsafe fn drop(ptr: *const ()) {
        let ptr: *const Self = ptr.cast();
        Arc::decrement_strong_count(ptr)
    }
}

static OUR_VTABLE: RawWakerVTable = RawWakerVTable::new(
    OurWaker::clone,
    OurWaker::wake,
    OurWaker::wake_by_ref,
    OurWaker::drop,
);

So, at this point we already have a working runtime. Seriously! That is basically all that is to it, at least for the most bare bones of implementations. There's a lot of things that this runtime doesn't do, most importantly things like proper scheduling and working multithreaded. I'll talk about those topics, especially the latter of the two, in the next installment, but for now this post is already 2500 words long and there is still enough to talk about even with just this tiniest of runtimes. If you want to play with it yourself, maybe even break it (should be easy), have fun. If you just want to see it work (maybe because you can't quite believe it will), the code on GitHub comes with a main function showing it working.

Wait, what's going on? How does this actually function?

Async runtimes aren't the most intuitive of things so if you've now seen the code and left a bit stumped about how this works and where the … things and the … magics are happening, don't worry, that's to be expected. Let's try to go through this a bit slower, a bit less packed and dense if you will. It all begins with a call to spawn. At that point the future was made the responsibility of the runtime. The runtime has it's main functionality in the method tick. This method tries to do a bit of work and then returns. So, first off, to keep going the only thing we have to do is call tick in a loop. Now tick is making a blocking call to a synchronous Receiver::recv, waiting to receive a future it should run. Since we spawned a future before this won't be a long wait. Now, future in hand the runtime polls it once. As we've established this means the future is executed until it hits a yield point where it has to yield. Once one of those is hit, there are two possible outcomes: Either the future has completed and can be discarded, or it still has work to do. In the latter case, remember "Readiness"? The future will have sent the Waker we gave it to a piece of code that will call wake on it when the future can continue. At this point the runtime can't do anything further with this particular future. It's either to be discarded or blocked on something. If it's blocked however, the future has to be kept alive, it can't be dropped. And not only does it have to be kept around but it has to be kept around in a way that the Waker that was sent to whatever piece of code can have the future be re-scheduled, ran again.

Handing ownership over to the waker as we have done isn't the most efficient approach, but it has the advantage of making the runtime much simpler. The runtime never has to care about futures that aren't runnable. It only ever stores futures that it should poll. If a future is blocked it's instead stored by the Waker and thus, by extension, by the code that can unblock that future again.

The end?

There is still a lot of things that I want to talk about when it comes to runtimes. There's a lot of fun (read heinously difficult and painful to debug) issues that a runtime has to care about that we didn't even start to consider. The entire subject of how to actually get async I/O is yet another fun-filled adventure into the depths of system programming.

But, my hands hurt. And so does my head. There will be another time, another space (but same blog!) to talk about those things. But not today.

Next up: Multithreading in runtimes, being smarter about scheduling, the joys of Send + Sync, and Atomics galore.