In attempting to adapt Alamofire's DataStreamRequest
for Swift concurrency, I've run into issues with AsyncStream'
s cancellation handling. Namely, I'm trying to get a handle on the proper sequence of operations so I can properly cancel the underlying request when the task is cancelled.
Current Code
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
@dynamicMemberLookup
public struct DataStreamTask {
public typealias Stream<Success, Failure: Error> = AsyncStream<DataStreamRequest.Stream<Success, Failure>>
private let request: DataStreamRequest
fileprivate init(request: DataStreamRequest) {
self.request = request
}
public func streamData(bufferingPolicy: Stream<Data, Never>.Continuation.BufferingPolicy = .unbounded) -> Stream<Data, Never> {
createStream(bufferingPolicy: bufferingPolicy) { onStream in
self.request.responseStream(on: .streamCompletionQueue, stream: onStream)
}
}
public func streamStrings(bufferingPolicy: Stream<String, Never>.Continuation.BufferingPolicy = .unbounded) -> Stream<String, Never> {
createStream(bufferingPolicy: bufferingPolicy) { onStream in
self.request.responseStreamString(on: .streamCompletionQueue, stream: onStream)
}
}
public func stream<Serializer: DataStreamSerializer>(serializedUsing serializer: Serializer,
bufferingPolicy: Stream<Serializer.SerializedObject, AFError>.Continuation.BufferingPolicy = .unbounded)
-> Stream<Serializer.SerializedObject, AFError> {
createStream(bufferingPolicy: bufferingPolicy) { onStream in
self.request.responseStream(using: serializer, on: .streamCompletionQueue, stream: onStream)
}
}
private func createStream<Success, Failure: Error>(
bufferingPolicy: Stream<Success, Failure>.Continuation.BufferingPolicy = .unbounded,
forResponse onResponse: @escaping (@escaping (DataStreamRequest.Stream<Success, Failure>) -> Void) -> Void)
-> Stream<Success, Failure> {
Stream(bufferingPolicy: bufferingPolicy) { continuation in
continuation.onTermination = { @Sendable termination in
if case .cancelled = termination {
request.cancel()
}
}
onResponse { stream in
debugPrint(stream)
continuation.yield(stream)
if case .complete = stream.event {
continuation.finish()
}
}
}
}
/// Cancel the underlying `DataStreamRequest`.
public func cancel() {
request.cancel()
}
/// Resume the underlying `DataStreamRequest`.
public func resume() {
request.resume()
}
/// Suspend the underlying `DataStreamRequest`.
public func suspend() {
request.suspend()
}
public subscript<T>(dynamicMember keyPath: KeyPath<DataStreamRequest, T>) -> T {
request[keyPath: keyPath]
}
}
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
extension DataStreamRequest {
public func task() -> DataStreamTask {
DataStreamTask(request: self)
}
}
I understand that Task
cancellation only applies when a parent Task
is cancelled, so in my test I've created a local Task
and then attempt to await
its value
, thinking that would be enough to guarantee the completion of the AsyncStream
's cancellation handler. But that doesn't seem to be the case, nor do other methods of waiting in tests seem to work.
func testThatDataStreamTaskCanBeImplicitlyCancelled() async {
// Given
let session = Session(); defer { withExtendedLifetime(session) {} }
// When
let expectedPayloads = 100
let request = session.streamRequest(.payloads(expectedPayloads))
let task = Task {
let task = request.task()
var datas: [Data] = []
for await data in task.streamData().compactMap(\.value) {
datas.append(data)
}
XCTAssertTrue(datas.count == 0)
}
task.cancel()
let void: Void = await task.value
// Then
XCTAssertTrue(request.isCancelled)
XCTAssertNotNil(void)
}
In this test, await task.value
completes before the AsyncStream
's termination handler is called, so the test method completes and the local Session
is deinitialized, leading to a failure that is sent through the AsyncStream
, eventually calling finish
.
However, updating the test to use the global AF
singleton, the test itself just dies, as the method completes. Occasionally it does print a full set of output values (when I add logging), presumably because the local server responds fast enough to return data before the local test run is killed. Only in that case do I see the termination handler called, with the finished
case.
In both cases, the closure for the AsyncStream
doesn't inherit the calling Task
's context, leading Task.isCancelled
to never be true
, even while checking that value within the test Task
's closure properly returns true
.
So this leaves me with a bunch of questions:
- Under what conditions will the termination handler be called with the
cancelled
case? - Are child tasks supposed to complete or at least be terminated before the parent
Task
completes? - How can I set up a test to ensure this cancellation logic is actually hit?
- In general, how far does
Task
cancellation propagate? I expected that state to be visible to all substate, but that's apparently not the case. That, or I misunderstand the relationship between theTask
and the code called in its initialization closure.