In attempting to adapt Alamofire's DataStreamRequest for Swift concurrency, I've run into issues with AsyncStream's cancellation handling. Namely, I'm trying to get a handle on the proper sequence of operations so I can properly cancel the underlying request when the task is cancelled.
Current Code
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
@dynamicMemberLookup
public struct DataStreamTask {
public typealias Stream<Success, Failure: Error> = AsyncStream<DataStreamRequest.Stream<Success, Failure>>
private let request: DataStreamRequest
fileprivate init(request: DataStreamRequest) {
self.request = request
}
public func streamData(bufferingPolicy: Stream<Data, Never>.Continuation.BufferingPolicy = .unbounded) -> Stream<Data, Never> {
createStream(bufferingPolicy: bufferingPolicy) { onStream in
self.request.responseStream(on: .streamCompletionQueue, stream: onStream)
}
}
public func streamStrings(bufferingPolicy: Stream<String, Never>.Continuation.BufferingPolicy = .unbounded) -> Stream<String, Never> {
createStream(bufferingPolicy: bufferingPolicy) { onStream in
self.request.responseStreamString(on: .streamCompletionQueue, stream: onStream)
}
}
public func stream<Serializer: DataStreamSerializer>(serializedUsing serializer: Serializer,
bufferingPolicy: Stream<Serializer.SerializedObject, AFError>.Continuation.BufferingPolicy = .unbounded)
-> Stream<Serializer.SerializedObject, AFError> {
createStream(bufferingPolicy: bufferingPolicy) { onStream in
self.request.responseStream(using: serializer, on: .streamCompletionQueue, stream: onStream)
}
}
private func createStream<Success, Failure: Error>(
bufferingPolicy: Stream<Success, Failure>.Continuation.BufferingPolicy = .unbounded,
forResponse onResponse: @escaping (@escaping (DataStreamRequest.Stream<Success, Failure>) -> Void) -> Void)
-> Stream<Success, Failure> {
Stream(bufferingPolicy: bufferingPolicy) { continuation in
continuation.onTermination = { @Sendable termination in
if case .cancelled = termination {
request.cancel()
}
}
onResponse { stream in
debugPrint(stream)
continuation.yield(stream)
if case .complete = stream.event {
continuation.finish()
}
}
}
}
/// Cancel the underlying `DataStreamRequest`.
public func cancel() {
request.cancel()
}
/// Resume the underlying `DataStreamRequest`.
public func resume() {
request.resume()
}
/// Suspend the underlying `DataStreamRequest`.
public func suspend() {
request.suspend()
}
public subscript<T>(dynamicMember keyPath: KeyPath<DataStreamRequest, T>) -> T {
request[keyPath: keyPath]
}
}
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
extension DataStreamRequest {
public func task() -> DataStreamTask {
DataStreamTask(request: self)
}
}
I understand that Task cancellation only applies when a parent Task is cancelled, so in my test I've created a local Task and then attempt to await its value, thinking that would be enough to guarantee the completion of the AsyncStream's cancellation handler. But that doesn't seem to be the case, nor do other methods of waiting in tests seem to work.
func testThatDataStreamTaskCanBeImplicitlyCancelled() async {
// Given
let session = Session(); defer { withExtendedLifetime(session) {} }
// When
let expectedPayloads = 100
let request = session.streamRequest(.payloads(expectedPayloads))
let task = Task {
let task = request.task()
var datas: [Data] = []
for await data in task.streamData().compactMap(\.value) {
datas.append(data)
}
XCTAssertTrue(datas.count == 0)
}
task.cancel()
let void: Void = await task.value
// Then
XCTAssertTrue(request.isCancelled)
XCTAssertNotNil(void)
}
In this test, await task.value completes before the AsyncStream's termination handler is called, so the test method completes and the local Session is deinitialized, leading to a failure that is sent through the AsyncStream, eventually calling finish.
However, updating the test to use the global AF singleton, the test itself just dies, as the method completes. Occasionally it does print a full set of output values (when I add logging), presumably because the local server responds fast enough to return data before the local test run is killed. Only in that case do I see the termination handler called, with the finished case.
In both cases, the closure for the AsyncStream doesn't inherit the calling Task's context, leading Task.isCancelled to never be true, even while checking that value within the test Task's closure properly returns true.
So this leaves me with a bunch of questions:
- Under what conditions will the termination handler be called with the
cancelledcase? - Are child tasks supposed to complete or at least be terminated before the parent
Taskcompletes? - How can I set up a test to ensure this cancellation logic is actually hit?
- In general, how far does
Taskcancellation propagate? I expected that state to be visible to all substate, but that's apparently not the case. That, or I misunderstand the relationship between theTaskand the code called in its initialization closure.