RFC: Using Swift Concurrency as a serial blocking queue

Hoping to get some comments & feedback on an actor I wrote. I've been wanting to build a "AuthTokenProvider" using Swift Concurrency. There are a few goals I had:

  • Using an actor for thread-safety generic over a Value
  • Has a single getter method/property to get an up to date Value
  • The value is fetched by a caller-defined async function
  • Lazily fetch the first value upon first request
  • While fetching, all subsequent requests must also await the fetch value
  • Once fetched, cache it in-memory for faster performance in subsequent requests
  • Include a way to invalidate the locally cached value, requiring the value to be re-fetched if invalid
  • No concurrency warnings!

Here is my first attempt, the a version hosted in GitHub: https://github.com/rjchatfield/SerialUpdatingValue.
It involves a queue of unsafe continuations. Pretty gnarly.

/// Thread-safe access to a lazily retrieved value, with optional validity checking
public actor SerialUpdatingValue<Value> where Value: Sendable {
    
    // MARK: - Properties
    
    private let isValid: @Sendable (Value) -> Bool
    private let getUpdatedValue: @Sendable () async throws -> Value
    
    private var latestValue: Result<Value, Error>?
    private var callbackQueue: [@Sendable (Result<Value, Error>) -> Void] = []
    private var taskHandle: Task<(), Never>?
    
    // MARK: - Life cycle
    
    /// - Parameters:
    ///   - isValid: Run against the locally stored `latestValue`, if `false` then value will be updated.
    ///   - getUpdatedValue: Long-running task to get updated value. Will be called lazily, initially, and if stored `latestValue` is no longer valid
    public init(
        isValid: @escaping @Sendable (Value) -> Bool = { _ in true },
        getUpdatedValue: @escaping @Sendable () async throws -> Value
    ) {
        self.isValid = isValid
        self.getUpdatedValue = getUpdatedValue
    }
    
    deinit {
        /// Not if there is a valid case when an Actor could
        taskHandle?.cancel() /// cancel long-running update task
        update(.failure(SerialUpdatingValueError.actorDeallocated)) /// flush callbacks
    }
    
    // MARK: - Public API
    
    /// Will get up-to-date value
    public var value: Value {
        get async throws {
            /// Using "unsafe" to capture `continuation` outside of scope
            try await withUnsafeThrowingContinuation { continuation in
                append(callback: { [continuation] result in
                    continuation.resume(with: result)
                })
            }
        }
    }
    
    // MARK: - Private methods
    
    private func append(
        callback: @escaping @Sendable (Result<Value, Error>) -> Void
    ) {
        if case .success(let value) = latestValue, isValid(value) {
            return callback(.success(value))
        } else {
            /// There is no valid value, so must get a new value and
            latestValue = nil /// clear out invalid value
            callbackQueue.append(callback) /// enqueue callback
            guard taskHandle == nil else { return } /// task is already running, will be called back from other callback
            taskHandle = Task {
                let newValue: Result<Value, Error>
                do {
                    newValue = .success(try await getUpdatedValue())
                } catch is CancellationError {
                    return /// Task may be cancelled during dealloc and callbacks will be handled differently
                } catch {
                    newValue = .failure(error)
                }
                guard !Task.isCancelled else {
                    return /// Task may be cancelled during dealloc and callbacks will be handled differently
                }
                update(newValue)
                taskHandle = nil
            }
        }
    }
    
    private func update(
        _ updatedValue: Result<Value, Error>
    ) {
        latestValue = updatedValue
        /// Call all callbacks
        let _callbacks = self.callbackQueue
        self.callbackQueue = [] /// empty out queue before calling out to avoid possible reentrancy behaviour
        for callback in _callbacks {
            callback(updatedValue)
        }
    }
}

Does anyone here see anything wrong with this, or have any suggestions? Perhaps there is some prior art somewhere? All feedback or bikeshedding welcome.

I was inspired by a Tweet reply from @Douglas_Gregor to @layoutSubviews https://twitter.com/dgregor79/status/1486933532005961731

[...] Swift Concurrency has the building blocks for concurrency, but doesn’t have many higher-level APIs to make things like this easy. As a community, we should experiment with what works best, and then codify the best practices in new APIs.

Thanks,
Rob

1 Like

I've done something similar with my CoalescingTask actor here: https://github.com/gshahbazian/playgrounds/blob/main/AsyncAwait.playground/Sources/CoalescingTask.swift

I think the main difference between our two versions is that CoalescingTask avoids the need for a callbackQueue by simply awaiting that the underlying Task has completed and returning the value, which you might be able to try as well.

I'm also interested in feedback on the best ways to achieve these types of structures.

1 Like

That seems sneakily similar to the next() async throws -> Element? shape of AsyncIteratorProtocol. Would this perhaps be served well as an AsyncSequence?

That's great. Thank you so much for sharing. I've updated my implementation and deleted heaps of possibly buggy logic. Boiled down to just:

public var value: Value {
    get async throws {
        if let value = try? await taskHandle?.value, isValid(value) {
            return value
        } else {
            let taskHandle = Task { try await getUpdatedValue() }
            self.taskHandle = taskHandle
            return try await taskHandle.value
        }
    }
}

What do you think?

I've pushed up the changes to https://github.com/rjchatfield/SerialUpdatingValue/blob/main/Sources/SerialUpdatingValue/SerialUpdatingValue.swift. All the tests work fine. Thanks again.


That's an interesting observation. I think I'll avoid the conformance for now. Imagine if someone called this in a loop like for try async value in valueProvider { ... }, then it would keep pulling the value in memory and would get into an infinite loop I think.

Indefinite AsyncSequence types are fine; they either have functions that reduce the scope like prefix(_:) or cooperate in cancellation of the tasks iterating them. Other systems have behavioral similarities to what you are describing that work like that; see ReactiveX - Subject. Some of those patterns could perhaps help simplify even further.

1 Like

Okay I found a bug :sweat_smile: I wasn't handling cancellation or propagating the thrown error correctly. Here's another attempt...

public var value: Value {
    get async throws {
        /// Check for in-flight task, return value only if still valid, rethrow error and reset state
        if let taskHandle = taskHandle {
            do {
                let value = try await taskHandle.value
                try Task.checkCancellation()
                if isValid(value) {
                    return value
                }
            } catch let cancellationError as CancellationError {
                /// If this child-task is cancelled, don't nil out `taskHandle` so next caller can still get value
                throw cancellationError
            } catch {
                /// If in-flight task returns an error, rethrow error and nil out `taskHandle` so next caller will get updated value
                self.taskHandle = nil
                throw error
            }
        }
        /// Else, there is no valid value & now requires updated value
        let taskHandle = Task { try await getUpdatedValue() }
        self.taskHandle = taskHandle
        return try await taskHandle.value /// Assumes new values are valid
    }
}