Hoping to get some comments & feedback on an actor I wrote. I've been wanting to build a "AuthTokenProvider" using Swift Concurrency. There are a few goals I had:
- Using an
actor for thread-safety generic over a Value
- Has a single getter method/property to get an up to date
Value
- The value is fetched by a caller-defined async function
- Lazily fetch the first value upon first request
- While fetching, all subsequent requests must also await the fetch value
- Once fetched, cache it in-memory for faster performance in subsequent requests
- Include a way to invalidate the locally cached value, requiring the value to be re-fetched if invalid
- No concurrency warnings!
Here is my first attempt, the a version hosted in GitHub: GitHub - rjchatfield/SerialUpdatingValue: Thread -safe access to a lazily retrieved value, with optional validity checking.
It involves a queue of unsafe continuations. Pretty gnarly.
/// Thread-safe access to a lazily retrieved value, with optional validity checking
public actor SerialUpdatingValue<Value> where Value: Sendable {
// MARK: - Properties
private let isValid: @Sendable (Value) -> Bool
private let getUpdatedValue: @Sendable () async throws -> Value
private var latestValue: Result<Value, Error>?
private var callbackQueue: [@Sendable (Result<Value, Error>) -> Void] = []
private var taskHandle: Task<(), Never>?
// MARK: - Life cycle
/// - Parameters:
/// - isValid: Run against the locally stored `latestValue`, if `false` then value will be updated.
/// - getUpdatedValue: Long-running task to get updated value. Will be called lazily, initially, and if stored `latestValue` is no longer valid
public init(
isValid: @escaping @Sendable (Value) -> Bool = { _ in true },
getUpdatedValue: @escaping @Sendable () async throws -> Value
) {
self.isValid = isValid
self.getUpdatedValue = getUpdatedValue
}
deinit {
/// Not if there is a valid case when an Actor could
taskHandle?.cancel() /// cancel long-running update task
update(.failure(SerialUpdatingValueError.actorDeallocated)) /// flush callbacks
}
// MARK: - Public API
/// Will get up-to-date value
public var value: Value {
get async throws {
/// Using "unsafe" to capture `continuation` outside of scope
try await withUnsafeThrowingContinuation { continuation in
append(callback: { [continuation] result in
continuation.resume(with: result)
})
}
}
}
// MARK: - Private methods
private func append(
callback: @escaping @Sendable (Result<Value, Error>) -> Void
) {
if case .success(let value) = latestValue, isValid(value) {
return callback(.success(value))
} else {
/// There is no valid value, so must get a new value and
latestValue = nil /// clear out invalid value
callbackQueue.append(callback) /// enqueue callback
guard taskHandle == nil else { return } /// task is already running, will be called back from other callback
taskHandle = Task {
let newValue: Result<Value, Error>
do {
newValue = .success(try await getUpdatedValue())
} catch is CancellationError {
return /// Task may be cancelled during dealloc and callbacks will be handled differently
} catch {
newValue = .failure(error)
}
guard !Task.isCancelled else {
return /// Task may be cancelled during dealloc and callbacks will be handled differently
}
update(newValue)
taskHandle = nil
}
}
}
private func update(
_ updatedValue: Result<Value, Error>
) {
latestValue = updatedValue
/// Call all callbacks
let _callbacks = self.callbackQueue
self.callbackQueue = [] /// empty out queue before calling out to avoid possible reentrancy behaviour
for callback in _callbacks {
callback(updatedValue)
}
}
}
Does anyone here see anything wrong with this, or have any suggestions? Perhaps there is some prior art somewhere? All feedback or bikeshedding welcome.
I was inspired by a Tweet reply from @Douglas_Gregor to @layoutSubviews https://twitter.com/dgregor79/status/1486933532005961731
[...] Swift Concurrency has the building blocks for concurrency, but doesn’t have many higher-level APIs to make things like this easy. As a community, we should experiment with what works best, and then codify the best practices in new APIs.
Thanks,
Rob
1 Like
I've done something similar with my CoalescingTask actor here: playgrounds/CoalescingTask.swift at main · gshahbazian/playgrounds · GitHub
I think the main difference between our two versions is that CoalescingTask avoids the need for a callbackQueue by simply awaiting that the underlying Task has completed and returning the value, which you might be able to try as well.
I'm also interested in feedback on the best ways to achieve these types of structures.
1 Like
That seems sneakily similar to the next() async throws -> Element? shape of AsyncIteratorProtocol. Would this perhaps be served well as an AsyncSequence?
That's great. Thank you so much for sharing. I've updated my implementation and deleted heaps of possibly buggy logic. Boiled down to just:
public var value: Value {
get async throws {
if let value = try? await taskHandle?.value, isValid(value) {
return value
} else {
let taskHandle = Task { try await getUpdatedValue() }
self.taskHandle = taskHandle
return try await taskHandle.value
}
}
}
What do you think?
I've pushed up the changes to https://github.com/rjchatfield/SerialUpdatingValue/blob/main/Sources/SerialUpdatingValue/SerialUpdatingValue.swift. All the tests work fine. Thanks again.
That's an interesting observation. I think I'll avoid the conformance for now. Imagine if someone called this in a loop like for try async value in valueProvider { ... }, then it would keep pulling the value in memory and would get into an infinite loop I think.
Indefinite AsyncSequence types are fine; they either have functions that reduce the scope like prefix(_:) or cooperate in cancellation of the tasks iterating them. Other systems have behavioral similarities to what you are describing that work like that; see ReactiveX - Subject. Some of those patterns could perhaps help simplify even further.
1 Like
Okay I found a bug
I wasn't handling cancellation or propagating the thrown error correctly. Here's another attempt...
public var value: Value {
get async throws {
/// Check for in-flight task, return value only if still valid, rethrow error and reset state
if let taskHandle = taskHandle {
do {
let value = try await taskHandle.value
try Task.checkCancellation()
if isValid(value) {
return value
}
} catch let cancellationError as CancellationError {
/// If this child-task is cancelled, don't nil out `taskHandle` so next caller can still get value
throw cancellationError
} catch {
/// If in-flight task returns an error, rethrow error and nil out `taskHandle` so next caller will get updated value
self.taskHandle = nil
throw error
}
}
/// Else, there is no valid value & now requires updated value
let taskHandle = Task { try await getUpdatedValue() }
self.taskHandle = taskHandle
return try await taskHandle.value /// Assumes new values are valid
}
}