Using async/await, how to prevent triggering a new API call when there's one already running?

I've been looking into migrating our iOS codebase from reactive streams using Combine framework to async/await, and stumbled upon a piece of log I'm not entirely sure how to migrate.

  • Have a method that performs a REST API call to fetch a user and returns AnyPublisher<User, Error>
  • Have a repository that has the same method AnyPublisher<User, Error>
  • The repository has extra logic where if you request a user and there already is an active stream fetching a user, it reuses the stream instead of starting a new stream that would start a new network request.

I wonder how to achieve the same using the concurrency APIs. Been wondering if somehow wrapping the fetching in a Task and reusing it would be way, or whether I should somehow use an AsyncStream.

In case someone starts this discussion, yes, this logic should probably be on the level of the networking library, but imho the same architectural discussion in the end.


func getUser(id: String) async throws -> User {
	// performs a network reqeust and fetches a user or throws
}

class UserRepository {
	func getUser(id: String) async throws -> User {
		// this should reuse the network request if there's already 1 running
		try await getUser(id: id)
	}
}

let userRespotiroy = UserRepository()

// let's assume that this is launched at roughly the same time by different places in  the code base
// these 3 calls should trigger only a single request to the server
_ = try await userRepository.getUser(id: "x")
_ = try await userRepository.getUser(id: "x")
_ = try await userRepository.getUser(id: "x")

Something like this?

struct ShareAsyncResult<T: Sendable>: Sendable {

    enum State: Sendable {
        case empty
        case pending(Task<Void, Never>, [CheckedContinuation<T, any Error>])
        case complete(Result<T, any Error>)
    }

    actor Inner {
        var compute: @Sendable () async throws -> T
        var state = State.empty

        init(compute: @escaping @Sendable () async throws -> T) {
            self.compute = compute
        }

        func get() async throws -> T {
            try await withTaskCancellationHandler {
                try await withCheckedThrowingContinuation { continuation in
                    switch state {
                    case .empty:
                        state = .pending(
                            Task {
                                do {
                                    succeed(try await compute())
                                } catch {
                                    fail(error)
                                }
                            },
                            [continuation]
                        )
                    case .pending(let task, var continuations):
                        continuations.append(continuation)
                        state = .pending(task, continuations)
                    case let .complete(result):
                        continuation.resume(with: result)
                    }
                }
            } onCancel: {
                self._cancel()
            }
        }

        nonisolated
        func _cancel() {
            Task { await self.cancel() }
        }

        func cancel() {
            switch state {
            case .empty, .complete:
                break
            case let .pending(task, continuations):
                task.cancel()
                let error = CancellationError()
                for continuation in continuations {
                    continuation.resume(throwing: error)
                }
                state = .complete(.failure(error))
            }
        }

        func succeed(_ value: T) {
            switch state {
            case .empty, .complete:
                // could be possible if we were cancelled. Drop the value,
                // continuations already received CancellationError.
                break
            case let .pending(_, continuations):
                for continuation in continuations {
                    continuation.resume(returning: value)
                }
                state = .complete(.success(value))
            }
        }

        func fail(_ error: any Error) {
            switch state {
            case .empty, .complete:
                // could be possible if we were cancelled. Drop the value,
                // continuations already received CancellationError.
                break
            case let .pending(_, continuations):
                for continuation in continuations {
                    continuation.resume(throwing: error)
                }
                state = .complete(.failure(error))
            }
        }
    }

    var inner: Inner

    init(compute: @escaping @Sendable () async throws -> T) {
        self.inner = Inner(compute: compute)
    }

    func callAsFunction() async throws -> T {
        try await inner.get()
    }

}

struct User {}
func getUser(_ userID: Int) async throws -> User {
    print("actually getting user")
    return User()
}

let sharedGetUser = ShareAsyncResult { try await getUser(42) }
print(try await sharedGetUser())
print(try await sharedGetUser())
print(try await sharedGetUser())
1 Like

Apple's WWDC'21 included one talk (Protect mutable state with Swift actors) where something very similar to this was demonstrated very elegantly using actors (in the video, the use case is downloading an image from a URL, reusing the result of any ongoing download task instead of creating a new download task if a download is already happening).

The video is great so I won't attempt to rephrase it here. It aligns with your use case so I recommend watching it :blush:

1 Like

This works for caching though and not with simulteonus requests, as actors are reentrant.

I've come up with sort of Waiter helper for such cases:

actor Waiter<ID, V> where ID: Hashable & Sendable, V: Sendable {
  
  private var continuations: [ID: [UnsafeContinuation<V, any Error>]] = [:]

  func process(
    by id: ID,
    work: () async throws -> V
  ) async throws -> V {
    if self.continuations[id] != nil {
      return try await withUnsafeThrowingContinuation { continuation in
        self.continuations[id, default: []].append(continuation)
      }
    }
    self.continuations[id] = []
    do {
      let value = try await work()
      self.serve(id: id, with: .success(value))
      return value
    } catch {
      self.serve(id: id, with: .failure(error))
      throw error
    }
  }
  
  private func serve(
    id: ID,
    with result: Result<V, any Error>
  ) {
    for continuation in (self.continuations[id] ?? []) {
      continuation.resume(with: result)
    }
    self.continuations
      .removeValue(forKey: id)
  }
  
  init() {}
}

so then you can call try await waiter.process(by: "someId") {}. Maybe not the best code, but works.

1 Like

Thanks! I'll watch the video.

I see, so the idea is to use a continuation to do the waiting. Thanks!