My experience with Concurrency

or "code I had to write on my own but I'd hoped for smarter people to already have implemented".

I have been using Concurrency religiously since its very beginning in several iOS projects with often very niche system frameworks or third party frameworks and I wanted to share my learnings and my wishes to further progression in Concurrency.

While I appreciate the (I'd call them) advanced Concurrency features recently proposed (region based isolation, custom actor executors, inheritance of actor isolation, ...) I'd also like to see some movement for more basic functionalities, which I will elaborate.

I'd also be interested in how other teams or developers solve these kind of problems. Maybe I am just missing some points.

Deadline (or timeouts)

I have seen many implementations in this forum. Some good, some which are missing a couple of points. Some relying on cooperative cancellation, some relying on multiple Task { }.

I think this is a necessary tool when working with APIs that work with different kinds of communication channels. Fortunately a lot of frameworks already support this out of the box, like URLSession or even NetService. One framework which does not have such behavior is CoreBluetooth. One could argue that this is a missing feature of this framework, but I am pretty sure there are a lot of other use cases (based on the number of posts about this in the forums), which would make it a good candidate to include a maintained implementation somewhere in the Swift ecosystem. Another reason would be that I think it is not easy to get right, and a lot of people don't get it quite right.

There has been some drafting on this in AsyncAlgorithms but there was no real movement for almost 2 years. One of the more prominent thread with implementations is this one.

My temporary solution was to copy the implementation of AsyncAlgorithms.

Auto cancelling continuations

For a long time I felt like I am doing something wrong for wanting this, but this post gave me confidence that this is actually something missing.

A lot of APIs do not support cancellation on their own, so there is nothing to put in the onCancel block. However in a some cases you actually still want the continuation to resume (with eg. CancellationError) in order to make other concepts like TaskGroups work like you (or maybe just me, naively) expect because it will wait for every child to be cancelled and not just resume work.

As pointed out in the other thread it again is not easy to implement. You need locks or an actor, and locking in onCancel as pointed out in the documentation could be problematic.

My temporary solution was to write my own (potentially deadlocking) auto cancelling continuation.

Broadcasting async sequences

I think this is also wished for quite frequently on here. Either I am misusing some APIs or this is a very useful tool when bridging frameworks into either reactive, or in general the concurrent world. I am thinking of a manager type class or actor that conforms to some delegate which you want to subscribe to in multiple points in your app.

This is currently not really possible without either building your own array of Continuations, like creating one AsyncSequence at each call point and storing the continuations in an array and yield on all elements once you receive mentioned delegate events, or relying on Combine features like PassthroughSubject or CurrentValueSubject and using their .values. Both of these solutions feel kind of wrong. The first solution probably even wronger, because I am probably missing some conditions where this will not work.

This has also already been proposed in AsyncAlgorithms back in 2022. No movement here. (Why?)

Queueing

I've had to work with APIs or hardware which require queueing but don't implement it themselves. This is rather easily implemented in non-Concurrency code with GCD. However there is no official way on how to implement this with Concurrency.

There are many open source projects out there which implement this (also in posts in this forum). Some of them miss edge cases or are just not ergonomic. The best solution (and also my go-to for now) is using a small wrapper around AsyncSemaphore.

I do get that this problem might not be something stdlib or AsyncAlgorithms should solve. But this kind of relates to the problem that there are no non-reentrant types in the current Concurrency system. Also recently mentioned in this post. I think with non-reentrant actors you could implement a queue rather easily yourself.

Final words

Please don't get me wrong, I don't want to complain about missing progress or missing features, or hijack the other forum posts. I just wanted to collect some pain points which I've had the last couple of years and maybe get some more insights or ultimately progress on these topics.
Thanks.

23 Likes

Thanks for collecting this list! I think it is a spot-on summary, and I too frequently wish there were more high-quality, out-of-the-box plumbing utilities for dealing with streams and async sequences.

One of the main ones for me that is not directly referenced above is the functionality outlined in AsyncBackPressuredStream by @FranzBusch.

Maybe now that the dust is settling from Swift 6 sending, the improved iterator, and the effects of typed throws on protocols in general, we could crank up the temperature again and cook these utilities to completion.

2 Likes

Nice Post!
I would like to add item to the wish list

It could be nice to have Consumer aware Task awaiting api.
await for result from Task, but it the consumer is cancelled, return as soon as possible.

extension Task {
      func reactiveValue() async throws(Cancellation) -> Result<Success,Failure>
}
1 Like

I have now renamed this thread to "My experience with Concurrency" as in retrospect this might have given off a wrong impression.

It is definitely nice to have list of what people would like to see but in this thread I'd like to stay on topic of the points I mentioned and for people to open their own threads.

As for

I think there also already has been a post about this.

Maybe I'm missing something, but if you have an actor whose methods never await within them, wouldn't that be what you need? If there's no await then nothing is reentrant and you already have a fairly efficient built-in queue.

(Or if you work with a device with asynchronous I/O, then use lower level synchronization with it, which should happen on some level anyway.)

@crontab
Unfortunately I did have to await.

Concretely, the Bluetooth peripheral required a serial communication. Which means if I send something to it, I had to either wait for an acknowledgement or a response. After that I could send more data.

So either with or without Concurrency there has to be some waiting and blocking to be done.

I am not arguing that there is no use case for synchronization with actors but in my experience (2 out of 2 use cases) it needed to be asynchronous.

The functions in those APIs. Are they synchronous functions?

If they are, then, can you not use an actor with nonreentrant methods to implement the queuing required as @crontab pointed out?

Could you provide a concrete example to demonstrate what can be improved?

actor DeviceProxy  {
   ...
   func enqueue (request: Request) {
      // no awaits in this method
      ...
      // synchronous operation
      devicePort.write (request)
      ...
   }
}

There is always, always asynchronicity when you work with hardware. You can hide it like for example the system file I/O does (read() / write()) but hiding it requires some multithreading wizardry.

@philipp.gabriel mentioned Bluetooth, so take a look at the Bluetooth API: it's all based on callbacks. I actually began replying to Philipp to show how you can create an illusion of synchronicity using semaphores, but that's where it got a bit too complicated: you can't use DispatchSemaphore within your actor's async method, so you need a different implementation (like the one already mentioned here, AsyncSemaphore) and then you need to use the same trick in your every actor method, it gets cluttered and a bit ugly too, and I ended up deleting my reply.

After all the OP's wish was to have a standard queue implementation that doesn't conflict with structured concurrency, that would solve this problem.

2 Likes

@crontab That was spot on. Thanks.
If it helps @ibex10, I can provide more concrete examples.

Case 1

Assume that I am writing library that encapsulates the communication to a Bluetooth peripheral. As I've outlined before, the requirements are that before I send subsequent requests I need to wait for an acknowledgement from the peripheral.

func enqueue(_ data: Data) async throws {
  // This is synchronous but theoretically I could even use .withResponse and wait for a callback from peripheral(_:didWriteValueFor:error:)
  peripheral.writeValue(data, for: characteristic, type: .withoutResponse)
  // However in this scenario I am just waiting for peripheral(_:didUpdateValueFor:error:) which I converted to an AsyncStream
  for try await response in responseStream {
    if /* is correct acknowledgement for request */ {
      break
    }
  }
}

I now need to ensure, that if enqueue is called multiple times from multiple tasks, it still waits for the previous acknowledgement before the next peripheral.writeValue(_:for:type:).

I solved this with AsyncSemaphore by @gwendal.roue by doing this:

let bluetoothSemaphore = AsyncSemaphore(value: 1)

func enqueue(_ data: Data) async throws {

  await bluetoothSemaphore.wait()
  defer { bluetoothSemaphore.signal() }

  peripheral.writeValue(data, for: characteristic, type: .withoutResponse)
  for try await response in responseStream {
    if /* is correct acknowledgement for request */ {
      break
    }
  }
}

Case 2

Kind of a similar use case now that I am thinking about it. However this time, it was a device I was communicating with over local network. This time the requirement was to have a cooldown between sending commands.

func enqueue(_ data: Data) async throws {
  // This is synchronous but theoretically I could even use .contentProcessed and wait for its callback
  connection.send(content: data, completion: .idempotent)
  // Let the remote device cooldown by waiting here
  try await Task.sleep(...)
}

Same solution here with AsyncSemaphore.

Conclusion

Again, I am not saying that this kind of API requires primitives like semaphores. I do see and agree with the point of @ktoso being made here Using semaphores in actor code - #40 by ktoso. And I do think this can be solved with actors, if they would support some kind of non-reentrancy.

At the time of implementing those features the best solution was (and I still think it is) AsyncSemaphore.

1 Like

I liked AsyncStream-based solution from here, especially in the context of Bluetooth interactions it make sense to use it as there is a stream of events from device.

As a general solution to that, I'm in favour of solution with some sort of AsyncLimiter suggested here, as it will effectively do the desired thing (serializing method execution) without introducing more to actors.


I'm still not sure if that's what is needed to address the issue with continuation cancellation. The patterns as alternative it forces to use seems more appealing to me. If we'd have documented pattern on how to address this, that might be less intrusive and beneficial approach. Yet I might be wrong here, as cancellation not easy topic.

If the API that is used does not support cancellation, what will change having continuation automatically cancelled, as the work is not being cancelled anyway?


In overall, as was stated later in the latest thread on actors, we clearly miss proper understanding and patterns guide for new concurrency world, as well as rich stdlib tools to handle various cases that arise more frequently in the real world, so that we don't re-implement them on a constant basis. Latter especially is probably is inevitable transition period, that would've been anyway, since there is a need of a time to collect use cases.

Thank's for the thread, and collection of common cases (and solutions).

This might be a poor example, but I think it is a valid use case:


func someUncancellableClosureApi(_ completion: @escaping () -> Void) {
    // This is just for demonstration purpose
    DispatchQueue.global().asyncAfter(deadline: .now() + 3) {
        completion()
    }
}

let task = Task {
    
    let date = Date()
    
    defer {
        print(Date().timeIntervalSince(date))
        // We are here after 3 seconds, instead of 1
    }
    
    try await withThrowingTaskGroup(of: Void.self) { taskGroup in
        
        taskGroup.addTask {
          // some cancellable api, maybe URLSession.data(for:)
        }
        
        taskGroup.addTask {
            await withCheckedContinuation { continuation in
                someUncancellableClosureApi {
                    continuation.resume()
                }
            }
        }
        
        try await taskGroup.waitForAll()
    }
}

Task {
    // This wait is also just for demonstration purpose. You could imagine a user leaving a view after one second.
    try await Task.sleep(nanoseconds: 1 * NSEC_PER_SEC)
    task.cancel()
}

I know that this is to be expected, and I think also documented somewhere. However I think in some cases this is unwanted. Sometimes I'd prefer to have a continuation which cancels (and consumes) itself, if it detects that the task is cancelled, and just does not care about when or how someUncancellableClosureApi will resume the continuation.

1 Like

I think that this is somewhat reasonable that to achieve this behaviour you need to do some extra work. As you state,

which perfectly makes sense, IMO this some cases still should be rare and thoroughly evaluated. Exposing automatic cancellation as part of Swift might bring unwanted usages all over the place. And if you are have such uncancellable API, wrapping it in a separate actor (which does not seem to be hard), for example, might better communicate what is happening and explicitly state that you aware of the behaviour and did this intentionally. I just don't think that this is something that should be easy reachable as it might be not well understood.

As I don't expect any movement in regards of the deadline/timeout algorithm soon, I open sourced my implementation here: GitHub - ph1ps/swift-concurrency-deadline: A deadline algorithm for Swift Concurrency.

I will be adding unit tests soon. However feel free to PR if you miss anything or you have better ideas/implementations. :slight_smile:

2 Likes