Swift concurrency "first" function

I'm trying to write a Swift function that is similar to the Rx race function that has the following signature:

/// Returns the first value that is returned by one of the supplied closures
/// or throws the first error that is encountered
func first<Success: Sendable>(
  _ t1: @escaping () async throws -> Success,
  _ t2: @escaping () async throws -> Success
) async throws -> Success

A usage example would be to load identical data from servers, and cancel after the first request has finished, or adding timeouts to arbitrary tasks:

try await first(
  { try await timeout(after: .seconds(5)).value }, // Throws a `TimeoutError` after 5 seconds
  { try await loadDataFromSomeServer() }
)

One implementation I had in mind uses task groups:

func first<Success: Sendable>(
  _ t1: @escaping () async throws -> Success,
  _ t2: @escaping () async throws -> Success
) async throws -> Success {

  try await withThrowingTaskGroup(of: Success.self) { group in
    group.addTask { try await t1() }
    group.addTask { try await t2() }

    guard let value = try await group.next() else { fatalError("unreachable") }
    group.cancelAll()
    return value
  }
}

This, however, doesn't return immediately after one group has finished but waits until all tasks have finished.
Are there any alternative implementations that would fulfil my needs?

2 Likes

This could be happening for you because the child tasks you add to the group do not support cancellation themselves: group.cancelAll() essentially just sets some atomic boolean flag on the subtasks, but it doesn't abort or otherwise discard them — tasks themselves have to decide internally when it's appropriate to short-circuit (otherwise this forced halting could break lots of invariants).

You might want to write either if Task.isCancelled { return } or try Task.checkCancellation() in your long-running functions.

2 Likes

I quite like the idea and the function signature itself. :slight_smile:

I wish there were a built-in language construct for this kind of things.

In case you haven't already seen it, you may want to visit this old discussion, which is to some extent related to what you are trying to do.

This seems to work.

// async_first

import Foundation

@main
enum Driver {
    static func main () async throws {
        let u = try await first(
          { try await task (load: 8, value: 3) },
          { try await task (load: 5, value: 5) },
          { try await timeout (after: 3, value: 2) }
        )
        
        print (u)
    }
}

func first <Success: Sendable>(
  _ t1: @escaping () async throws -> Success,
  _ t2: @escaping () async throws -> Success,
  _ t3: @escaping () async throws -> Success

) async throws -> Success {

  try await withThrowingTaskGroup(of: Success.self) { group in
    group.addTask { try await t1() }
    group.addTask { try await t2() }
    group.addTask { try await t3() }

    guard let value = try await group.next() else { fatalError("unreachable") }
    group.cancelAll()
    return value
  }
}

func task (load n: Int, value: Int) async throws -> Int {
    var n = n
    while n > 0 {
        try await Task.sleep (nanoseconds: 1_000_000_000)
        n -= 1
    }
    return value
}

func timeout (after seconds: TimeInterval, value: Int) async throws -> Int {
    try await Task.sleep (nanoseconds: 1_000_000_000 * UInt64(seconds))
    try Task.checkCancellation()
    return value
}

That looks pretty good, yeah.

I'd suggest using the Task.sleep(for: .seconds...) but that looks okey.

In general we're interested in both deadlines (rather than timeouts) for tasks as well as such operations I think. Deadlines were blocked by not having time units in stdlib before, but now we could revisit the topic.

And a first() could be implemented with ... so it is for arbitrary amounts of child tasks as well.

6 Likes