I've been looking into migrating our iOS codebase from reactive streams using Combine framework to async/await, and stumbled upon a piece of log I'm not entirely sure how to migrate.
Have a method that performs a REST API call to fetch a user and returns AnyPublisher<User, Error>
Have a repository that has the same method AnyPublisher<User, Error>
The repository has extra logic where if you request a user and there already is an active stream fetching a user, it reuses the stream instead of starting a new stream that would start a new network request.
I wonder how to achieve the same using the concurrency APIs. Been wondering if somehow wrapping the fetching in a Task and reusing it would be way, or whether I should somehow use an AsyncStream.
In case someone starts this discussion, yes, this logic should probably be on the level of the networking library, but imho the same architectural discussion in the end.
func getUser(id: String) async throws -> User {
// performs a network reqeust and fetches a user or throws
}
class UserRepository {
func getUser(id: String) async throws -> User {
// this should reuse the network request if there's already 1 running
try await getUser(id: id)
}
}
let userRespotiroy = UserRepository()
// let's assume that this is launched at roughly the same time by different places in the code base
// these 3 calls should trigger only a single request to the server
_ = try await userRepository.getUser(id: "x")
_ = try await userRepository.getUser(id: "x")
_ = try await userRepository.getUser(id: "x")
struct ShareAsyncResult<T: Sendable>: Sendable {
enum State: Sendable {
case empty
case pending(Task<Void, Never>, [CheckedContinuation<T, any Error>])
case complete(Result<T, any Error>)
}
actor Inner {
var compute: @Sendable () async throws -> T
var state = State.empty
init(compute: @escaping @Sendable () async throws -> T) {
self.compute = compute
}
func get() async throws -> T {
try await withTaskCancellationHandler {
try await withCheckedThrowingContinuation { continuation in
switch state {
case .empty:
state = .pending(
Task {
do {
succeed(try await compute())
} catch {
fail(error)
}
},
[continuation]
)
case .pending(let task, var continuations):
continuations.append(continuation)
state = .pending(task, continuations)
case let .complete(result):
continuation.resume(with: result)
}
}
} onCancel: {
self._cancel()
}
}
nonisolated
func _cancel() {
Task { await self.cancel() }
}
func cancel() {
switch state {
case .empty, .complete:
break
case let .pending(task, continuations):
task.cancel()
let error = CancellationError()
for continuation in continuations {
continuation.resume(throwing: error)
}
state = .complete(.failure(error))
}
}
func succeed(_ value: T) {
switch state {
case .empty, .complete:
// could be possible if we were cancelled. Drop the value,
// continuations already received CancellationError.
break
case let .pending(_, continuations):
for continuation in continuations {
continuation.resume(returning: value)
}
state = .complete(.success(value))
}
}
func fail(_ error: any Error) {
switch state {
case .empty, .complete:
// could be possible if we were cancelled. Drop the value,
// continuations already received CancellationError.
break
case let .pending(_, continuations):
for continuation in continuations {
continuation.resume(throwing: error)
}
state = .complete(.failure(error))
}
}
}
var inner: Inner
init(compute: @escaping @Sendable () async throws -> T) {
self.inner = Inner(compute: compute)
}
func callAsFunction() async throws -> T {
try await inner.get()
}
}
struct User {}
func getUser(_ userID: Int) async throws -> User {
print("actually getting user")
return User()
}
let sharedGetUser = ShareAsyncResult { try await getUser(42) }
print(try await sharedGetUser())
print(try await sharedGetUser())
print(try await sharedGetUser())
Apple's WWDC'21 included one talk (Protect mutable state with Swift actors) where something very similar to this was demonstrated very elegantly using actors (in the video, the use case is downloading an image from a URL, reusing the result of any ongoing download task instead of creating a new download task if a download is already happening).
The video is great so I won't attempt to rephrase it here. It aligns with your use case so I recommend watching it