Dynamic exclusitity violations and async/isolated methods

i've been wondering about the interaction between (dynamic) exclusive access enforcement and actor isolation. in particular, if you have an async method that takes an inout value that evaluates to a storage reference on an actor or class instance, should the runtime detect if multiple such accesses overlap? i thought the answer was 'yes' but i've yet to find a way to make it happen with a construct i thought should do it.

the general setup is like this:

// custom executor that uses a single thread
nonisolated let threadExec = ThreadExecutor()

actor A {
  nonisolated(unsafe) var prop = 0

  func breakTheLaw(_ id: Int) async {
    func doit(_ value: inout Int) async {
      print("[exclusive] begin access (\(id)): \(value)")
      print("on thread: ", pthread_self())
      value += 1
      try? await Task.sleep(for: .seconds(1))
      value += 1
      print("[exclusive] end access (\(id)): \(value)")
      print("on thread: ", pthread_self())
    }

    await doit(&prop)
  }

  func increment() {
    print("increment from: \(prop)")
    print("on thread: ", pthread_self())
    prop += 1
  }

  nonisolated var unownedExecutor: UnownedSerialExecutor {
    threadExec.asUnownedSerialExecutor()
  }
}

@Test
func actorExclusivity() async {
  await withTaskExecutorPreference(threadExec) {
    let a = A()
    async let t1: Void = a.breakTheLaw(1)
    var count = 0
    while count < 15 {
      count += 1
      await a.increment()
      try? await Task.sleep(for: .milliseconds(100))
    }
    _ = await t1
  }
}

where a custom executor is used to force everything onto the same thread so that any thread-local exclusivity book-keeping would be shared (assuming that's relevant), and to reduce nondeterminism. the example takes exclusive access to a mutable property of the actor (which must be made nonisolated(unsafe) to compile) and mutates it before and after a suspension point, while it is concurrently being incremented from another loop.

i had expected a dynamic exclusivity violation to occur if one of the increment() calls happened during the suspension of the breakTheLaw() method, but this does not appear to be the case. is that behavior expected, or is this sort of thing supposed to be identified at runtime?

i also tried a variation using a class rather than an actor, but the behavior seemed the same in the sense that there were no runtime issues reported. i did find this section of the ownership manifesto, which maybe explains things:

The bookkeeping is intended to be best-effort. It should reliably detect deterministic violations. It is not required to detect race conditions; it often will, and that's good, but it's not required to.

but, i'm still curious to better understand when one should or should not expect the runtime detection to kick in.

and one more question – even if there is no dynamic reporting of this class of issue, is it still actually a violation of The Law of Exclusivity? my read of the docs suggests it is, but @John_McCall @Andrew_Trick i'd be interested in any insights you may have on this matter.

full code & output
import Testing
import Dispatch
import Foundation

final class ThreadExecutor: SerialExecutor, @unchecked Sendable {
  private let thread: Thread
  private let runLoop: RunLoop

  init() {
    let threadName = "executor-thread"
    let condition = NSCondition()
    nonisolated(unsafe) var runLoop: RunLoop!

    self.thread = Thread {
      condition.lock()
      runLoop = RunLoop.current
      condition.signal()
      condition.unlock()

      let port = Port()
      port.schedule(in: runLoop, forMode: .default)

      runLoop.run()
    }
    thread.name = threadName
    thread.start()

    condition.lock()
    while runLoop == nil {
      condition.wait()
    }
    condition.unlock()
    self.runLoop = runLoop
  }

  func enqueue(_ job: consuming ExecutorJob) {
    let unownedJob = UnownedJob(job)
    runLoop.perform {
      unownedJob.runSynchronously(on: self.asUnownedSerialExecutor())
    }
  }

  func asUnownedSerialExecutor() -> UnownedSerialExecutor {
    UnownedSerialExecutor(ordinary: self)
  }
}

extension ThreadExecutor: TaskExecutor {}

nonisolated let threadExec = ThreadExecutor()

actor A {
  nonisolated(unsafe) var prop = 0

  func breakTheLaw(_ id: Int) async {
    func doit(_ value: inout Int) async {
      print("[exclusive] begin access (\(id)): \(value)")
      print("on thread: ", pthread_self())
      value += 1
      try? await Task.sleep(for: .seconds(1))
      value += 1
      print("[exclusive] end access (\(id)): \(value)")
      print("on thread: ", pthread_self())
    }

    await doit(&prop)
  }

  func increment() {
    print("increment from: \(prop)")
    print("on thread: ", pthread_self())
    prop += 1
  }

  nonisolated var unownedExecutor: UnownedSerialExecutor {
    threadExec.asUnownedSerialExecutor()
  }
}

@available(iOS 18.4, *)
@Test
func actorExclusivity() async {
  await withTaskExecutorPreference(threadExec) {
    let a = A()
    async let t1: Void = a.breakTheLaw(1)
    var count = 0
    while count < 15 {
      count += 1
      await a.increment()
      try? await Task.sleep(for: .milliseconds(100))
    }
    _ = await t1
  }
}
// prints like:
increment from: 0
on thread:  0x000000016ff03000
[exclusive] begin access (1): 1
on thread:  0x000000016ff03000
increment from: 2
on thread:  0x000000016ff03000
increment from: 3
on thread:  0x000000016ff03000
increment from: 4
on thread:  0x000000016ff03000
increment from: 5
on thread:  0x000000016ff03000
increment from: 6
on thread:  0x000000016ff03000
increment from: 7
on thread:  0x000000016ff03000
increment from: 8
on thread:  0x000000016ff03000
increment from: 9
on thread:  0x000000016ff03000
increment from: 10
on thread:  0x000000016ff03000
[exclusive] end access (1): 12
on thread:  0x000000016ff03000
increment from: 12
on thread:  0x000000016ff03000
increment from: 13
on thread:  0x000000016ff03000
increment from: 14
on thread:  0x000000016ff03000
increment from: 15
on thread:  0x000000016ff03000
increment from: 16
on thread:  0x000000016ff03000
1 Like

This does seem like a bug — we should probably not be allowing inout accesses to actor-isolated storage to cross suspension points.

to be clear, i did have to try and subvert the language somewhat in the example – the storage being accessed is marked nonisolated(unsafe). i just still thought the runtime checking would flag it if the overlapping access occurred on the same thread.

Oh, I overlooked that. Yeah, the dynamic exclusivity checking does not work across either tasks or threads. The problem with your attempt to get the tasks to share a thread with a custom executor is that, while the tracking is indeed thread-local, it all gets saved and restored when changing tasks; suspended tasks don’t just leave their state on the thread.

1 Like

thanks, that explains it i think. and to try and confirm – is it correct to characterize this as an exclusivity violation, despite its not being enforced?

theoretically is the exclusive access set (or some derivation thereof) something that could be preserved across these boundaries?

Yes, it’s an exclusivity violation, just not one we can detect currently at runtime.

Not easily, no. We don’t know at runtime that the task is going to be resumed on the same thread.

1 Like