Communicating between two concurrent tasks

The full code is here, ready to be pasted into an Xcode playground.

Setup

Suppose I have an actor that protects access to a buffer of values (simplified example):

actor Buffer {
  var elements: [Int] = []

  func enqueue(_ value: Int) {
    elements.append(value)
  }

  func dequeue() async -> Int {
    // If the buffer is empty, wait for the next enqueued value
    // ???
  }
}

Now I want to have two concurrent tasks, one that fills the buffer and one that empties it:

let buffer = Buffer()

// Producer task
Task {
  await buffer.enqueue(1)
  try! await Task.sleep(nanoseconds: 400_000_000)
  await buffer.enqueue(2)
  await buffer.enqueue(3)
}

// Consumer task
Task {
  while true {
    let next = await buffer.dequeue()
    print(next)
    try! await Task.sleep(nanoseconds: 100_000_000)
  }
}

(For simplicity, I omitted API to end the stream. As is, the consumer task will never complete.)

Question

My question is how to implement the dequeue method with my desired semantics:

  • If there is at least one element in the buffer, remove and return it.
  • If the buffer is currently empty, suspend until another value has been enqueued.

Approach 1: spin in a loop

We could do this by spinning in an infinite loop (taking care to yield to give the other task a chance to call enqueue):

  func dequeue() async -> Int {
    while true {
      if !elements.isEmpty {
        return elements.removeFirst()
      } else {
        await Task.yield()
      }
    }
  }

That's obviously not ideal.

Approach 2: use a CheckedContinuation for signaling

What we need is a way for the enqueue method to signal a suspended dequeue method that it can resume. My idea was to use a CheckedContinuation for this:

  • When dequeue needs to suspend, it creates a continuation and stores it in a property.
  • When enqueue runs, it resumes the continuation, thus resuming dequeue.
actor Buffer {
  var elements: [Int] = []
  private var isNotEmpty: CheckedContinuation<Void, Never>? = nil

  deinit {
    // TODO: If the continuation is not nil,
    // resume it by throwing cancellation error?
  }

  func enqueue(_ value: Int) {
    elements.append(value)
    // Signal dequeue if needed.
    isNotEmpty?.resume()
    isNotEmpty = nil
  }

  func dequeue() async -> Int {
    while true {
      if !elements.isEmpty {
        return elements.removeFirst()
      } else {
        // Suspend until another element is enqueued.
        print("dequeue waiting")
        await withCheckedContinuation { cont in
          self.isNotEmpty = cont
        }
      }
    }
  }
}

In a more complex example, you could add a maximum buffer size to the Buffer actor and use another continuation to have enqueue suspend when the buffer is full.

Is this a valid use case for continuations?

Is this kind of intra-task communication — like a condition variable — a valid use case for continuations? Or are continuations supposed to only be used for interop with legacy code?

The full code is here, ready to be pasted into an Xcode playground.

4 Likes

I actually had a very similar use case to this. A "Storage" type that I wanted to query via a key and get a value back, but the values were filled by another producer. Funny enough I also started with a spin loop but quickly dismissed it when I realised that task.yield doesn't stop the loop from going at maximum speed ^^

What I ended up is making a AsyncFuture type that basically wraps a continuation. With that the dequeue can await to get a value from the "future" while the enqueue can add the value to it. It's kind of a Combine PasstroughSubject but just using Swift concurrency.

I see that the difference is that you just used the continuation to signal while keeping your own storage while I actually made my wrapper "future" hold and pass the value trough.

1 Like

Have you considered using AsyncStream, or at least checking out the implementation? It does exactly this kind of continuation management.

2 Likes

I personally did and based my implementation on that (<3 open source!).

I wondered how safe it is to make the AsyncStream.Continuation escape the init. (to be honest same for the normal continuation API).

How about something like this:

final class Buffer2 {
    private var stream: AsyncStream<Int>!
    private var continuation: AsyncStream<Int>.Continuation!
    private var iterator: AsyncStream<Int>.Iterator!
    
    public init(initialValues: [Int] = []) {
        self.stream = .init { self.continuation = $0 }
        self.iterator = self.stream.makeAsyncIterator()
        for value in initialValues { self.continuation.yield(value) }
    }
    
    public func enqueue(_ value: Int) async {
        self.continuation.yield(value)
    }
    
    public func dequeue() async throws -> Int {
        guard let value = await self.iterator.next() else { throw CancellationError() }
        return value
    }
    
    public func cancel() {
        continuation.finish()
    }
}

You don't see when you are waiting though, and you can't make it an actor, as the iterator is being mutated in the public method.

1 Like

AsyncStream<T>.Continuation is supposed to escape the init. the typical use case is to kick off the (concurrent) producer task from inside the init, while the consumer operates from outside the init. i suspect that the intention of this API was to prevent a potential deadlock condition when you start iterating the stream before starting up the producer task. however, (after having written over 100k lines of async/await this year) i am firmly convinced this was a mistake and does not actually provide much value.

moreover, something seems to have been broken in the typechecker for the past month or so where AsyncStream instances cannot be constructed in the first place. AsyncStream used to work well though.

1 Like

I did consider it briefly, but from reading the docs I didn't get the impression it was the right tool for this job. In particular, I deemed it "too big" for my use case when a plain continuation also does the job. Thank you, I'll check out how it's implemented.

1 Like

"Normal" continuations are also allowed/expected to escape, confirmed here by John.

1 Like

anybody know how much overhead the AsyncStream implementation has? personally, i have found great use of AsyncStream<Void> with a buffering capacity of 1 as a signaling mechanism and i’m wondering what the performance implications of that are…

2 Likes

For completeness I'm leaving a link here to what I was mentioning the other day

which you can find on github alexito4/AsyncChannel. There is also a blog post linked.

But I'm still curious about

because is what I will probably use in production systems.

Terms of Service

Privacy Policy

Cookie Policy