AsyncStream and Cancellation

In attempting to adapt Alamofire's DataStreamRequest for Swift concurrency, I've run into issues with AsyncStream's cancellation handling. Namely, I'm trying to get a handle on the proper sequence of operations so I can properly cancel the underlying request when the task is cancelled.

Current Code
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
@dynamicMemberLookup
public struct DataStreamTask {
    public typealias Stream<Success, Failure: Error> = AsyncStream<DataStreamRequest.Stream<Success, Failure>>
    private let request: DataStreamRequest

    fileprivate init(request: DataStreamRequest) {
        self.request = request
    }
    
    public func streamData(bufferingPolicy: Stream<Data, Never>.Continuation.BufferingPolicy = .unbounded) -> Stream<Data, Never> {
        createStream(bufferingPolicy: bufferingPolicy) { onStream in
            self.request.responseStream(on: .streamCompletionQueue, stream: onStream)
        }
    }
    
    public func streamStrings(bufferingPolicy: Stream<String, Never>.Continuation.BufferingPolicy = .unbounded) -> Stream<String, Never> {
        createStream(bufferingPolicy: bufferingPolicy) { onStream in
            self.request.responseStreamString(on: .streamCompletionQueue, stream: onStream)
        }
    }

    public func stream<Serializer: DataStreamSerializer>(serializedUsing serializer: Serializer,
                                                         bufferingPolicy: Stream<Serializer.SerializedObject, AFError>.Continuation.BufferingPolicy = .unbounded)
        -> Stream<Serializer.SerializedObject, AFError> {
        createStream(bufferingPolicy: bufferingPolicy) { onStream in
            self.request.responseStream(using: serializer, on: .streamCompletionQueue, stream: onStream)
        }
    }
    
    private func createStream<Success, Failure: Error>(
        bufferingPolicy: Stream<Success, Failure>.Continuation.BufferingPolicy = .unbounded,
        forResponse onResponse: @escaping (@escaping (DataStreamRequest.Stream<Success, Failure>) -> Void) -> Void)
    -> Stream<Success, Failure> {
        Stream(bufferingPolicy: bufferingPolicy) { continuation in
            continuation.onTermination = { @Sendable termination in
                if case .cancelled = termination {
                    request.cancel()
                }
            }

            onResponse { stream in
                debugPrint(stream)
                continuation.yield(stream)
                if case .complete = stream.event {
                    continuation.finish()
                }
            }
        }
    }

    /// Cancel the underlying `DataStreamRequest`.
    public func cancel() {
        request.cancel()
    }

    /// Resume the underlying `DataStreamRequest`.
    public func resume() {
        request.resume()
    }

    /// Suspend the underlying `DataStreamRequest`.
    public func suspend() {
        request.suspend()
    }

    public subscript<T>(dynamicMember keyPath: KeyPath<DataStreamRequest, T>) -> T {
        request[keyPath: keyPath]
    }
}

@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
extension DataStreamRequest {
    public func task() -> DataStreamTask {
        DataStreamTask(request: self)
    }
}

I understand that Task cancellation only applies when a parent Task is cancelled, so in my test I've created a local Task and then attempt to await its value, thinking that would be enough to guarantee the completion of the AsyncStream's cancellation handler. But that doesn't seem to be the case, nor do other methods of waiting in tests seem to work.

func testThatDataStreamTaskCanBeImplicitlyCancelled() async {
    // Given
    let session = Session(); defer { withExtendedLifetime(session) {} }

    // When
    let expectedPayloads = 100
    let request = session.streamRequest(.payloads(expectedPayloads))
    let task = Task {
        let task = request.task()
        var datas: [Data] = []
        for await data in task.streamData().compactMap(\.value) {
            datas.append(data)
        }
        
        XCTAssertTrue(datas.count == 0)
    }
    task.cancel()
    let void: Void = await task.value

    // Then
    XCTAssertTrue(request.isCancelled)
    XCTAssertNotNil(void)
}

In this test, await task.value completes before the AsyncStream's termination handler is called, so the test method completes and the local Session is deinitialized, leading to a failure that is sent through the AsyncStream, eventually calling finish.

However, updating the test to use the global AF singleton, the test itself just dies, as the method completes. Occasionally it does print a full set of output values (when I add logging), presumably because the local server responds fast enough to return data before the local test run is killed. Only in that case do I see the termination handler called, with the finished case.

In both cases, the closure for the AsyncStream doesn't inherit the calling Task's context, leading Task.isCancelled to never be true, even while checking that value within the test Task's closure properly returns true.

So this leaves me with a bunch of questions:

  • Under what conditions will the termination handler be called with the cancelled case?
  • Are child tasks supposed to complete or at least be terminated before the parent Task completes?
  • How can I set up a test to ensure this cancellation logic is actually hit?
  • In general, how far does Task cancellation propagate? I expected that state to be visible to all substate, but that's apparently not the case. That, or I misunderstand the relationship between the Task and the code called in its initialization closure.

cc @ktoso @Philippe_Hausler

2 Likes

The expected behavior is that the stream termination should be triggered if the task iterating it is cancelled. I will take a look at this on Monday (it may be partially related to another issue w.r.t. captures that cause the stream to not hit an early cancel when break is used)

1 Like

I also can't seem to trigger task cancellation in my simpler data and download cases, so this doesn't seem specific to AsyncStream. This may still be a test setup issue or a misunderstanding but I'll report it either way.

I misunderstood this when first experimenting with AsyncStream in the form of an AsyncSequence and thought the stream would be terminated if the iterator was cancelled. Someone pointed out my mistake and that it had to be that the task was cancelled. Perhaps this should be called out in the docs.

1 Like

I initially thought that too when I initially tried to add tests, but I was corrected and moved to trying to test Task cancellation. So I’ll need additional logic for canceling the steam when the stream finishes while it’s still in progress. But handling implicit cancellation is necessary too.

2 Likes
Terms of Service

Privacy Policy

Cookie Policy