Testing, Task.yield(), Task priority, and order of execution in Swift 5.10

I'm trying to understand what I can rely on when it comes to the ordering of async job execution in Swift 5.10, particular on the Main Actor. Of course I can and have experimented, but there's a difference between "I observed this test passing 50,000 times" and "I know this test will always pass," hence the question:

If I yield from a lower priority task, am I guaranteed that a waiting higher priority job will be run? If there are two jobs of differing priorities both enqueued, is the higher one always run?

For instance, suppose I'm iterating through a series of "steps" in a test. I want to make sure that the code I'm testing is always given a chance to hit its next yield point before any test code is run. So if I'm testing code like this:

func whileVisible() async { 
     // note: My test code operates under the assumption that this never terminates, so we can't use purely structured concurrency to test
     for  await response in service.subscribe() {
            title = response.title // I want to assert that title is set
     }
}

I want to have begun the for-await before the test code providing a fake response is run.

Simple code like this won't work:

await withCheckedContinuation { continuation in
       Task {
            continuation.resume()
            await whileVisible() // A
         }
}
Task.yield()
service.sendFakeResponse()

Because while the continuation ensures that the task started, there's a yield point (A), and when Swift hits it, it defers back to the test code. And the Task.yield() at the end does not actually cause Swift to yield execution. This kind of makes sense based on the documentation; apparently it yields to lower priority tasks.

After a bit of time trying to figure out why just giving the unstructured Task a userInitiated priority didn't work, I realized that this was probably capped by the parent Task (which I don't think is documented, though perhaps it's implied by the base and current priorities).

So this code seems to work:

await withCheckedContinuation { continuation in
    Task(
       priority: .userInitiated
    ) {
          Task.detached(
                 priority: .low
            ) {
                  continuation.resume()
            }
            await whileVisible()
 }
service.sendFakeResponse()

After 50,000 runs, it always starts listening to the network request before we send the fake response.

More broadly, here is pseudocode for an example "test runner" that would attempt to handle this ordering internally:

 // A strawman for a way to make ViewModels testable. 
 // This is a terrible design, but it's super simple, a better one would leverage 
// features of `Observable` or `ObservedObject` to avoid individual objects
// needing to implement this. 
@MainActor
protocol TestableViewModel {
     // the implementor runs the first element of this array whenever something changes.
     var assertions: [(Id, (Any) -> ())] 
     // Called by the test to wait for the next assertion before continuing.
     // Because we're isolated to main, no matter how many emissions 
     // happen we would always handle them in sequence so it should be 
     // okay to force this ordering.
     func wait(for continuation: CheckedContinuation<(), Never>)
}
enum Step {
     case fn(() async -> ())
     case assertion(Id, Any -> ())
     case sendFakeData(() -> ())
}

struct Timeline {
      var steps: [Step]
      var assertions: [(Id, (Any) -> ())]
}

func test(vm: some TestableViewModel, steps: Timeline) async {
      vm.assertions = steps.assertions
      for step in steps {
             switch step {
                  case fn(let fn): 
                      await withCheckedContinuation { continuation in 
                      Task {
                          Task.detached(priority: .low) {
                                continuation.resume()
                          }
                          await fn()
                      } // In real code we'd use a task group so we could cancel at the end
                       }
                  case assertion(let Id, _):
                       await withCheckedContinuation { continuation in 
                              vm.wait(for: continuation)
                       }
                  case sendFakeData(let fn):
                        // in my real code, these were necessary, so I'm including them
                       // as a curiosity, b/c I cannot figure out why
                       await Task.yield()
                       fn()
                       await Task.yield()
             }
      }
      // we'd cancel all the tasks here in real code, and collect the results
      // b/c we can't reach here till all assertions are run, so it's safe to do so      
}

I'm curious if folks think this is a good solution, or if there are problems I'm not seeing. One that I am aware of is that if the number of times vm publishes changes is less than the number of assertions, the test would hang till it timed out, and I don't think there is any way to detect that.

As an aside, I'm aware of some existing discussions and this potential solution, but I'm hoping to avoid needing it; the only code that I want to ensure the ordering of is the code in the test that sends fake data and asserts the results, not my systems under test.

2 Likes

if the client code you wish to allow to run to its next suspension point is isolated to a known actor, then perhaps something like this would suffice and avoid having to mess with priorities or implementation-specific executor behaviors:

await withCheckedContinuation { continuation in
  Task { @MainActor in // assuming `whileVisible()` is main-actor-isolated
    continuation.resume()
    whileVisible()
  }
}
// once here we've ensured `whileVisible()` has either run to suspension, or is still executing on the main actor.
// 'flush' a no-op through the same isolation domain to be certain `whileVisible()` has actually been suspended.
await MainActor.run { _ = () }
service.sendFakeResponse()

edit: this analysis is wrong because whileVisible() would still need to be awaited, so the logic that it is either running or suspended is erroneous.

It looks like whileVisible is async? Correct? And the service.subscribe function returns some AsyncSequence? Correct? Is it an AsyncStream? Or a custom AsyncSequence type you built yourself?

A different POV that might help here is to build a version of AsyncSequence just for testing. You can then have complete control over what kind of "backpressure" this sequence might have. You can try to hack your own "blocking" style "SyncAsyncSequence" that deadlocks when values are dispatched and waits for them to be received (by the type you are testing). Deadlocks are always going to risky… but if it's only shipping for test code it might be worth it to you (the worst thing that could happen is your test code deadlocks and times out).

Awaiting a call to MainActor is an interesting idea, but I don't see how you can be sure that the call to MainActor.run got called after whileVisible has been run and yielded at its for await call, any more than I can be sure that the priorities are always respected.

It looks like whileVisible is async ? Correct? And the service.subscribe function returns some AsyncSequence ? Correct?

Yes to both.

Is it an AsyncStream ? Or a custom AsyncSequence type you built yourself?

I want the code I am writing to not care what it is, whether it buffers or not, etc. The example I gave is a toy example to demonstrate the problem; I want to test objects that consume arbitrary kinds of AsyncSequence types.

A different POV that might help here is to build a version of AsyncSequence just for testing

I considered this approach and ultimately rejected it. I don't want to require that all async sequences be wrapped, especially because they might be composed of other AsyncSequences using combineLatest or other AsyncAlgorithms-style operators.

ah yes, you're right there would still be an await needed when calling into the client code, so i guess it comes down to whether or not a suspension actually occurs. so i suppose it's also implementation dependent, and not clearly ensured to work in a specific way by The System. if the body of the client code were inlined into your test runner then i think the approach should work, but i'll have to dig in to see if there's anything that can be said about when suspensions will actually occur or not.

after thinking about this problem a bit more, while both approaches seem like they will not guarantee any particular ordering behaviors in theory, in practice i suspect the ordering you want will generally be ensured with an approach like the one i suggested earlier.

if we consider the implementation you proposed that worked empirically:

await withCheckedContinuation { continuation in
  Task(
    priority: .userInitiated
  ) {
    Task.detached(
      priority: .low
    ) {
      continuation.resume()
    }
    await whileVisible()
  }
}
service.sendFakeResponse()

the continuation resumption can run concurrently with the call you want to happen first. trying to use priorities to help manage the desired ordering seems like it creates a dependency on the global state of the entire concurrency runtime, which seems quite fragile and at risk of behaving differently depending on things out of one's control.

for example, if whileVisible() were isolated to the main actor, then in this formulation it would need to 'hop' to the main actor before it could be run. if there was some existing, time-consuming work already occupying the main actor, then whileVisible() would have to wait until that was complete before it could start. however, the low-priority detached task to resume the continuation may have more available execution resources, since it is not main-actor-bound, and so could potentially run first, despite being part of a lower-priority task.

going back to my earlier suggestion:

await withCheckedContinuation { continuation in
  Task { @MainActor in
    continuation.resume()
    await whileVisible()
  }
}
await MainActor.run {}
service.sendFakeResponse()

you were right to point out that there is still an await needed on the call to whileVisible(). however, if we can assume that we can invoke the client code from a Task that has a matching 'formal executor', then i think there is good reason to believe it will be (initially) run without suspending.

the reason, i think, is that the runtime is incentivized to work this way to improve performance. i.e. if an async function is called in a context which is already appropriate for its execution, the runtime will prefer to run it without paying the overhead of suspending. you can see some evidence of this in various comments within the (current) implementation of swift_task_switch.

additionally, there is a specific reference to this optimization in the SE-338 proposal, which states:

Dynamically, a switch will not suspend the task if the task is already on an appropriate executor.

so i suspect for the purposes you've outlined here, this approach would work. however, as a general means of establishing ordering between async code, this strategy does seem a bit round-about and limited in how granular the level of control it can offer is.