Task as AsyncSequence

Hello, this was just a small idea to be able to do the following:

func testing() async {
    let task = Task {
        [0,1,2,3,4].publisher.values // contrived example to show the for await below
    }
    for await item in task {
        print(item)
    }
}

versus this:

func testing() async {
    let task = Task {
        [0,1,2,3,4].publisher.values // contrived example to show the for await below
    }
    for await item in await task.value {
        print(item)
    }
}

My implementation:

extension Task: @retroactive AsyncSequence where Success: AsyncSequence, Failure == Success.Failure {
    public struct AsyncIterator: AsyncIteratorProtocol {
        public typealias Element = Success.Element
        public typealias Failure = Success.Failure

        private enum State {
            case task(Task<Success, Success.Failure>)
            case iterator(Success.AsyncIterator)
        }

        private var state: State

        fileprivate init(task: Task<Success, Success.Failure>) {
            state = .task(task)
        }

        public mutating func next() async throws -> Success.Element? {
            try await next(isolation: #isolation)
        }

        mutating public func next(isolation actor: isolated (any Actor)? = #isolation) async throws(Success.Failure) -> Success.Element? {
            var iterator = switch state {
            case .task(let task): try await task.result.get().makeAsyncIterator()
            case .iterator(let iterator): iterator
            }
            let element = try await iterator.next(isolation: `actor`)
            state = .iterator(iterator)
            return element
        }
    }

    public func makeAsyncIterator() -> AsyncIterator {
        AsyncIterator(task: self)
    }
}

What this allows for is a more simple handling of APIs that can easily end up like this:

final actor MySequenceProvider {
    func mySequence() async -> AsyncStream<Int> {
        return /*...*/
    }
}

Which then need an await to get the sequence. This seems ok, until you account for the many APIs that are still synchronous, and expect to be able to map async sequences without awaiting. So doing this should allow usage of all the AsyncSequence operator methods, on any Task returning an AsyncSequence.

Now, my implementation is questionable, as I am assuming the Task.Failure is/should be the same as the returned sequence's Failure. Which is totally possible that this is wrong, but that was the only way I could figure out how to make the AsyncIterator of the Task use typed throws, without wrapping each possible error in a new type of error.

This honestly feels less like a pitch, and more like a "am I barking up the wrong tree?" type of a prompt.

Thanks!

1 Like

May I ask why you need to wrap [0, 1 ,2, 3, 4] in a Task in the first place?

If what you want is to turn a Sequence into an AsyncSequence, you can just use swift-async-algorithms, and call [0, 1, 2, 3, 4].async

4 Likes

It was a contrived example to just show the difference in the for await. The point is not to convert a Sequence into an AsyncSequence. The point is to turn a Task into an AsyncSequence.

2 Likes

I think Task semantically just doesn't make a whole lot of sense as an AsyncSequence. That would seem to imply to me that we should base its Equatable/Hashable conformances on its Success as well, which also doesn't make much sense. Is it irking you that much to just do for await value in await task.value { /*...*/ }?

1 Like

This point:

... is the real reason. So you can map, flatMap, etc. a task. without having to muddle with async await.

1 Like

What you're really looking for are Task-mapping methods. They are missing, and should be added to the standard library once we can create tasks whose Failures aren't only Never or any Error.

let task = Task { [0,1,2,3,4].async }
let filtered = task.mapValue { $0.filter { $0.isMultiple(of: 2) } }
#expect(await .init(filtered.value) == [0, 2, 4])

(Success being an AsyncSequence or not doesn't matter.)

let mapped = Task { 1 }.mapValue { $0 + 1 }
#expect(await mapped.value == 2)

You can almost do a complete copy/paste at this point, using the code that will work with typed errors. Using try in the Never overload just produces a warning.

public extension Task where Failure == Never {
  @inlinable func mapValue<NewSuccess>(
    _ transform: sending @escaping @isolated(any) (Success) throws(Failure) -> NewSuccess
  ) -> Task<NewSuccess, Failure> {
    .init { () throws(_) in await transform(value) }
  }
}

public extension Task where Failure == any Error {
  @inlinable func mapValue<NewSuccess>(
    _ transform: sending @escaping @isolated(any) (Success) throws(Failure) -> NewSuccess
  ) -> Task<NewSuccess, Failure> {
    .init { () throws(_) in try await transform(value) }
  }
}