AsyncStream and Cancellation

In attempting to adapt Alamofire's DataStreamRequest for Swift concurrency, I've run into issues with AsyncStream's cancellation handling. Namely, I'm trying to get a handle on the proper sequence of operations so I can properly cancel the underlying request when the task is cancelled.

Current Code
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
@dynamicMemberLookup
public struct DataStreamTask {
    public typealias Stream<Success, Failure: Error> = AsyncStream<DataStreamRequest.Stream<Success, Failure>>
    private let request: DataStreamRequest

    fileprivate init(request: DataStreamRequest) {
        self.request = request
    }
    
    public func streamData(bufferingPolicy: Stream<Data, Never>.Continuation.BufferingPolicy = .unbounded) -> Stream<Data, Never> {
        createStream(bufferingPolicy: bufferingPolicy) { onStream in
            self.request.responseStream(on: .streamCompletionQueue, stream: onStream)
        }
    }
    
    public func streamStrings(bufferingPolicy: Stream<String, Never>.Continuation.BufferingPolicy = .unbounded) -> Stream<String, Never> {
        createStream(bufferingPolicy: bufferingPolicy) { onStream in
            self.request.responseStreamString(on: .streamCompletionQueue, stream: onStream)
        }
    }

    public func stream<Serializer: DataStreamSerializer>(serializedUsing serializer: Serializer,
                                                         bufferingPolicy: Stream<Serializer.SerializedObject, AFError>.Continuation.BufferingPolicy = .unbounded)
        -> Stream<Serializer.SerializedObject, AFError> {
        createStream(bufferingPolicy: bufferingPolicy) { onStream in
            self.request.responseStream(using: serializer, on: .streamCompletionQueue, stream: onStream)
        }
    }
    
    private func createStream<Success, Failure: Error>(
        bufferingPolicy: Stream<Success, Failure>.Continuation.BufferingPolicy = .unbounded,
        forResponse onResponse: @escaping (@escaping (DataStreamRequest.Stream<Success, Failure>) -> Void) -> Void)
    -> Stream<Success, Failure> {
        Stream(bufferingPolicy: bufferingPolicy) { continuation in
            continuation.onTermination = { @Sendable termination in
                if case .cancelled = termination {
                    request.cancel()
                }
            }

            onResponse { stream in
                debugPrint(stream)
                continuation.yield(stream)
                if case .complete = stream.event {
                    continuation.finish()
                }
            }
        }
    }

    /// Cancel the underlying `DataStreamRequest`.
    public func cancel() {
        request.cancel()
    }

    /// Resume the underlying `DataStreamRequest`.
    public func resume() {
        request.resume()
    }

    /// Suspend the underlying `DataStreamRequest`.
    public func suspend() {
        request.suspend()
    }

    public subscript<T>(dynamicMember keyPath: KeyPath<DataStreamRequest, T>) -> T {
        request[keyPath: keyPath]
    }
}

@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
extension DataStreamRequest {
    public func task() -> DataStreamTask {
        DataStreamTask(request: self)
    }
}

I understand that Task cancellation only applies when a parent Task is cancelled, so in my test I've created a local Task and then attempt to await its value, thinking that would be enough to guarantee the completion of the AsyncStream's cancellation handler. But that doesn't seem to be the case, nor do other methods of waiting in tests seem to work.

func testThatDataStreamTaskCanBeImplicitlyCancelled() async {
    // Given
    let session = Session(); defer { withExtendedLifetime(session) {} }

    // When
    let expectedPayloads = 100
    let request = session.streamRequest(.payloads(expectedPayloads))
    let task = Task {
        let task = request.task()
        var datas: [Data] = []
        for await data in task.streamData().compactMap(\.value) {
            datas.append(data)
        }
        
        XCTAssertTrue(datas.count == 0)
    }
    task.cancel()
    let void: Void = await task.value

    // Then
    XCTAssertTrue(request.isCancelled)
    XCTAssertNotNil(void)
}

In this test, await task.value completes before the AsyncStream's termination handler is called, so the test method completes and the local Session is deinitialized, leading to a failure that is sent through the AsyncStream, eventually calling finish.

However, updating the test to use the global AF singleton, the test itself just dies, as the method completes. Occasionally it does print a full set of output values (when I add logging), presumably because the local server responds fast enough to return data before the local test run is killed. Only in that case do I see the termination handler called, with the finished case.

In both cases, the closure for the AsyncStream doesn't inherit the calling Task's context, leading Task.isCancelled to never be true, even while checking that value within the test Task's closure properly returns true.

So this leaves me with a bunch of questions:

  • Under what conditions will the termination handler be called with the cancelled case?
  • Are child tasks supposed to complete or at least be terminated before the parent Task completes?
  • How can I set up a test to ensure this cancellation logic is actually hit?
  • In general, how far does Task cancellation propagate? I expected that state to be visible to all substate, but that's apparently not the case. That, or I misunderstand the relationship between the Task and the code called in its initialization closure.

cc @ktoso @Philippe_Hausler

3 Likes

The expected behavior is that the stream termination should be triggered if the task iterating it is cancelled. I will take a look at this on Monday (it may be partially related to another issue w.r.t. captures that cause the stream to not hit an early cancel when break is used)

1 Like

I also can't seem to trigger task cancellation in my simpler data and download cases, so this doesn't seem specific to AsyncStream. This may still be a test setup issue or a misunderstanding but I'll report it either way.

I misunderstood this when first experimenting with AsyncStream in the form of an AsyncSequence and thought the stream would be terminated if the iterator was cancelled. Someone pointed out my mistake and that it had to be that the task was cancelled. Perhaps this should be called out in the docs.

1 Like

I initially thought that too when I initially tried to add tests, but I was corrected and moved to trying to test Task cancellation. So I’ll need additional logic for canceling the steam when the stream finishes while it’s still in progress. But handling implicit cancellation is necessary too.

2 Likes

I've been experimenting with AsyncStream more, both to figure out what lifetime management is required, and when the termination handler is called. So far I think I have a few realizations.

  1. If you call continuation.finish() at all, the AsyncStream will terminate in the finished state, even when you exit iteration early with break. This may seem obvious but it was confusing to see alongside early iteration exit.
  2. If you don't call continuation.finish() but you exit iteration early, the AsyncStream will terminate in the cancelled state, but only once the continuation has been deinitialized. I'm not sure about this as I haven't been able to specifically structure a test to confirm it. Alternatively, this behavior could be due to the lifetime of the iterator since the loop was broken early. But in my testing the termination handler isn't called until my use of the continuation has ended. This means infinite streams will never actually be cancelled by early loop exit, as far I can tell, but streams with a fixed number of elements will.
  3. If you don't call continuation.finish() and don't exit the loop early, the loop will hang awaiting more values. Again, obvious, but it means that if you can't determine when to finish the AsyncStream from the values it's being provided, you need to hook into some other method to call finish(). In Alamofire this likely means I'll need a new internal lifetime method I can hook into.

Has anyone else observed these behaviors?

I produce infinite streams for GRDB (database observation never ends), so I'm very eager to test that task cancellation and early loop exit correctly release resources as soon as possible.

Speaking of early exit specifically, I had some success based on the (experimental) fact that the async runtime notifies this termination with the deinitialization of the iterator (deinitialization of the continuation is not observable, so I ignore it). I thus make sure resources that should be released by the end of loop are only retained by the iterator (there), and tests pass.

By this you mean that, since your Iterator captures the underlying Iterator from the stream and your AnyDatabaseCancellable, when the Iterator is deinited you can ensure the cancelable is as well, thereby performing whatever deinit logic exists on the token?

Yes I do!