Help with Swift concurrency

I'm struggling a bit with adopting swift concurrency in my app (a voxel-based 3d sculpting app). I have a class which manages the GPU resources for a sculpture which is approximately:

@MainActor
class GPUManager {

   /// Stream data into GPU memory.
   func load(blockStream: AsyncStream<VoxelDataChunk>) async { ... }

   /// Returns false if a sculpting operation cancelled the save.
   func save(callback: (VoxelDataChunk) -> Void) async -> Bool { ... }

   /// Render the model using Metal
   func render(queue: MTLCommandBuffer, ...) { ... }

   /// Apply a sculpting operation. Cancels any in-flight saves.
   func sculpt(sculptInfo: SculptInfo) { ... }
}

The problem I'm trying to solve is to not block the main thread while loading or saving voxel data, which takes some time. Previously I had been using internal operation queues (for the saves, so they could be cancelled) and a dispatch queue to protect the data structures from races. The old code is tricky and I was hoping async/await would simplify things.

I tried making this class into an actor, but quickly ran into the problem that MTKView expects rendering to happen synchronously. In general I've had trouble adopting async/await because the proliferation of async calls eventually hits some Apple API I don't have control over (such as MTKView or SwiftUI's FileDocument).

Any suggestions appreciated!

Hmmm. It's a little tough to tell from the example given but based on what you've said above, you'll possibly want GPUManager isolated to the @MainActor as, unless you're doing Metal rendering on a background thread, that's where you'll need your GPU calls made.

@MainActor
class GPUManager {
  ...
}

If your MTLView is a member on GPUManager that should mean you're now able to access it, too.

1 Like

Also, not quite sure what's going on here, but if you're loading data into memory progressively via an AsyncStream I'd expect a non-async function, but could be wrong. Something like:

func load(blockStream: AsyncStream<VoxelDataChunk>) {
  self.cancellableTask = Task { @MainActor in // If you annotate GPUManager as @MainActor, this annotation is implicit
    for await chunk in blockStream {
      self.loadChunk(chunk)
    }
  }
}

And for this method:

You might consider making it an AsyncThrowingStream:

/// Stream throws an error if a sculpting operation cancelled the save.
func save() -> AsyncThrowingStream<VoxelDataChunk, Error> { ... }
1 Like

Whoops, it already is. Editing the post.

Well if load spawns its own task, then I can't await it in my tests.

Thanks for your help!!

I see. Maybe ‘for await’ing on the returned AsyncStream could work in that scenario? On seconds thoughts, scrap that.

Pleasure, glad it was useful!

1 Like

Ok I tried that and the closure passed to AsyncThrowingStream is always called on the main thread, so I can't cancel it from sculpt which is also on the main thread (due to @MainActor). I think the async version of save worked because I had an await Task.yield() in there.

Not quite sure what you mean, but your mention of the the closure passed to AsyncThrowingStream, made me think there might be something else useful to know here:

Quite soon after Async(Throwing)Stream was released it became apparent that its initialiser was overly restrictive. Shortly after, a community sanctioned pattern emerged which essentially 'breaks out' the continuation from the initialiser.

You should avoid doing this with any other concurrency continuations i.e with(Checked|Unsafe)Continuation, etc., but with Async(Throwing)Stream it's a perfectly reasonable thing to do. The continuation passed in to the initialiser closure is actually quite flexible, conforms to Sendable, and can be safely used outside of its closure.

In fact, there's now a pending pitch to make this pattern part of the stdlib.

You'd use it something like:

/// Stream throws an error if a sculpting operation cancelled the save.
func save() -> AsyncThrowingStream<VoxelDataChunk, Error> {
  let (stream, continuation) = AsyncThrowingStream.makeStream(of: VoxelDataChunk.self)
  self.continuation = continuation
  self.task = Task {
    ...
  }
  return stream
}

You can then finish(throwing:) the continuation/cancel the Task elsewhere, for example your sculpt method, if that helps.

Here's the utility methods I'm using:

public extension AsyncStream {
  
  static func makeStream(
      of elementType: Element.Type = Element.self,
      bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded
  ) -> (stream: Self, continuation: Continuation) {
    var continuation: Continuation!
    return (Self(bufferingPolicy: limit) { continuation = $0 }, continuation)
  }
}

public extension AsyncThrowingStream {
  
  static func makeStream(
    of elementType: Element.Type = Element.self,
    throwing failureType: Failure.Type = Failure.self,
    bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded
  ) -> (stream: Self, continuation: Continuation) where Failure == Error {
    var continuation: Continuation!
    return (Self(bufferingPolicy: limit) { continuation = $0 }, continuation)
  }
}

I should also add that the Async(Throwing)Stream types do not apply back pressure, they simply fill the buffer as fast as they can, so if you wish to limit the source of production to the rate of consumption you'll need to use another type of AsyncSequence. I'd recommend checking out the AsyncThrowingChannel type in the Swift Async Algorithms project

1 Like

Thanks very much for the info!! I think the unbounded buffering explains why I was seeing the GPU streaming complete so quickly when trying AsyncThrowingStream. Big OOM risk there as data builds up in the buffers and the receiver can't keep up. Going to either need the back pressure or just stick with my callback.

1 Like

Yeah, I wondered if that might be an issue. AsyncThrowingChannel would probably be more suited to your use case. It's a bit more like a PassthroughSubject in shape, with the main point of difference being that its send(_:) method is async. It also doesn't multicast, but I don't think that's an issue in this example. The Swift Async Algorithms project is managed by Apple, so should be fairly reliable.

Having said that, I don't blame you for waiting it out a bit, it's early days for Swift Concurrency and there's still a lot of rough edges in my opinion.

1 Like