Alexito's World

A world of coding 💻, by Alejandro Martinez

Building a Channel with Swift Concurrency Continuations

Following Structured Concurrency was one of the best decisions Swift could have made when introducing Concurrency into the language. The impact of that decision on all the code written with concurrency in mind can't be underestimated.

But the other day I needed a tool that, while allowing me to stay in a structured concurrency system, it internally could leverage unstructured techniques. The exact situation is not really relevant besides understanding that I have a system that needs to read a value from some storage with the quirk that the value may not be there yet and thus it should wait for it.

let value = await storage.value(for: key)

I want to follow the structured concurrency principles on the reading side. But we can’t implement this without escaping the confines of this structure. That's because reading the value is not what starts the work of generating it. Instead, it’s another independent subsystem, at another time, that will end up saving the value into the storage.

To accomplish this, we need a way to pause the execution and resume it when another system tells us the value is available.

Consider this a proof of concept and an excuse for me to dig deeper into these topics. You can read the source but if you really need a solution like this for production, I recommend you to review it carefully and maybe look for other tools.

Discarded solutions

  1. Future/Promise: I want to stay as much as possible in Swift Structured Concurrency, so this is a no go, although the Channel we're implementing looks a lot like a Future.
  2. Combine PassthroughSubject: this is more akin to what we're looking for. A way to pass values from one place to another. But I want to use the tools in the language, not an Apple-only framework. Also, I just want to pass one value.
  3. Task: Nope! People still treat this as a Future and I have a lot to say against that (I should have written already about this 😢 ). Suffices to say that it won't work because you need to feed data from the outside and Task doesn't have an API for that, because it shouldn't!
  4. AsyncStream: we could use this since it is using just the language tools. But you will still need to write something to keep its continuations (which are slightly different that the ones we're using). Also, I just want to pass one value so this is an overkill.

Building the AsyncChannel

Now that we understand our problem, let's build our own solution! I call it AsyncChannel, although is probably not the best name for it 😂

The following code shows a simplified implementation for illustration purposes. Handling cancellation adds a bunch of verbosity so I left it out of the article. You can read the full source for more details.

Let's start by looking at what we eventually need:

// In one side...
let value = try await channel.value

// In the other side...
channel.send(42)

From this usage, we can already see a couple of important details:

  1. We need to be able to await for getting the value (the whole point of this!)
  2. We need to be able to send a value without awaiting (we don't want to force other systems to be async)

With this in mind, let's create a class with the methods we need:

final class AsyncChannel<T> {
    var value: T {
        get async {
            ...
        }
    }

    func send(_ v: T) {
        ...
    }
}

To make sure we avoid concurrency problems, we will put the bulk of the implementation in an internal type, called Buffer, that will be an Actor. In this way, we can keep a simple class interface to the outside but ensure we don't have concurrency issues.

To understand what the buffer will do, first let's look at a simplified implementation for the value computed property.

var value: T {
    get async {
        await withCheckedContinuation { continuation in
            Task {
                await buffer.addContinuationIfNeeded(continuation, id)
            }
        }
    }
}

Here is the main point of the solution. The Continuation APIs! You can think of them as a hook into the concurrency runtime, a sort of low-level tool that lets you control the execution of a function. With them, you can pause the execution of the function while having control of when resumption happens. The common usage of these APIs involved calling a callback based function and resuming on its completion, but here we keep hold of the continuation, passing it to our buffer which will resume it at a later point.

A proper implementation should handle cancellation of the current Task to make sure the continuations are released properly. You can see how the full source uses a UUID to identify each value call and makes sure to cancel it appropriately.

The send method is very simple since it just forwards the call to the internal buffer:

func send(_ v: T) {
    Task {
        await buffer.send(v)
    }
}

Since it’s calling an actor isolated method, we need to perform the call asynchronously. It's important to note that the method itself pretends to be synchronous so we don't pollute systems that don't need to be async.

The buffer implementation is not that much more complicated. It keeps the current value and a list of continuations and provides a safe interface to managing them.

The addContinuationIfNeeded checks if the value is already there and resumes the continuation immediately. If it's not there yet, it keeps the continuation around to resume it later.

func addContinuationIfNeeded(_ continuation: CheckedContinuation<T>) {
    if let value = state.value {
        continuation.resume(returning: value)
        return
    }

    continuations.append(continuation)
}

Finally, the send method saves the value and resumes and releases all the continuations:

func send(_ v: T) {
    value = v
    continuations.values.forEach { $0.resume(returning: v) }
    continuations.removeAll()
}

And that’s most of it. The implementation performs extra checks to ensure the channel doesn’t already have a value, or it’s cancelled.

Conclusions

As you can see, the trick relies on keeping the continuations around and resume them later. The complicated part is managing the continuations themselves since they are a delicate object to manage manually.

You need to resume the continuations only once. And if you forget to resume them at all, the piece of code that is waiting for them will hang. And you also need to make sure they are released when no longer needed, especially when Tasks are cancelled, otherwise you may leak resources from other Tasks. That's why you always need to handle cancellation properly.

Think of them as a low level implementation of concurrency. They give you great power and you know what happens with great power ;)

You can read the full source code to see how the cancellation is handled and how the continuations are managed. I will advise against running this code in production since it's just a proof of concept for a very specific need I had.

I hope you found this useful and learned something new!

If you liked this article please consider supporting me