Use swift-atomics to track a “largest seen value”?

Maybe you can use something like this (not sure about what mutex implementation/syntax to use though, so consider it pseudo-code):

var storage1: BigValue // first copy of the value
var storage2: BigValue // second copy of the value
var mutex1: ReadWriteMutexOfSomeSort
var mutex2: ReadWriteMutexOfSomeSort
var setterMutex: Mutex

var value: BigValue {
   get {
      for _ in 0..<4 {
         if mutex1.tryLockForReading() {
            defer { mutex1.unlock() }
            return storage1
         }
         if mutex2.tryLockForReading() {
            defer { mutex2.unlock() }
            return storage2
         }
      }
      mutex1.lockForReading()
      defer { mutex1.unlock() }
      return storage1
   }
   set {
      setterMutex.lock() // needed in case of multiple writer threads
      defer { setterMutex.unlock() }

      mutex2.lockForWriting()
      storage2 = newValue
      mutex2.unlock()

      mutex1.lockForWriting()
      storage1 = newValue
      mutex1.unlock()
   }
}

Here the setter guaranties that at any point in time, only one of mutex1 and mutex2 are locked for writing. Therfore, at any moment in time, the getter can read either storage1 or storage2. The loop in the getter is there in case the OS scheduler interrupts the getter thread to run the setter and the setter is called again during that time locking the other mutex. I added an upper bound to the loop iterations to avoid spinlocking the getter thread and defer to the OS scheduler if/when it doesn't work, but I don't expect it'll reach the end of the loop in practice.

Of course, the setter thread blocks as long as there are readers here, which might be undesirable.

1 Like

Have a look, this might work for you, assuming your getters can tolerate being without data sometimes (and they won't busy wait chasing for that data at all costs). Data validity is checked via another 64 bit checksum field.

var atomicTimeStamp = ManagedAtomic<UInt64>(0)
var atomicHashHi = ManagedAtomic<UInt64>(0)
var atomicHashLo = ManagedAtomic<UInt64>(0)
var atomicServerKey = ManagedAtomic<UInt64>(0)
var atomicCheckSum = ManagedAtomic<UInt64>(0)

struct Values {
    let timeStamp: UInt64
    let hashLo: UInt64
    let hashHi: UInt64
    let serverKey: UInt64
    
    var checkSum: UInt64 {
        fatalError("TODO") // perhaps hash, if this isn't used across app relaunch
    }
}

private func readCurrentValues() -> Values {
    Values(
        timeStamp: atomicTimeStamp.load(ordering: .relaxed),
        hashLo: atomicHashLo.load(ordering: .relaxed),
        hashHi: atomicHashHi.load(ordering: .relaxed),
        serverKey: atomicServerKey.load(ordering: .relaxed)
    )
}

func getValues() -> (values: Values, checkSum: UInt64, valid: Bool) {
    let values = readCurrentValues()
    let checkSum = atomicCheckSum.load(ordering: .relaxed)
    return (values: values, checkSum: checkSum, valid: values.checkSum == checkSum)
}

func setValues(calcNewValues: (Values?) -> Values?) -> Bool {
    while true {
        let (values, checkSum, valid) = getValues()
        
        guard let newValues = calcNewValues(valid ? values : nil) else {
            return false
        }
        guard atomicTimeStamp.compareExchange(expected: values.timeStamp, desired: newValues.timeStamp, ordering: .acquiringAndReleasing).exchanged else {
            continue
        }
        guard atomicHashHi.compareExchange(expected: values.hashHi, desired: newValues.hashHi, ordering: .acquiringAndReleasing).exchanged else {
            continue
        }
        guard atomicHashLo.compareExchange(expected: values.hashLo, desired: newValues.hashLo, ordering: .acquiringAndReleasing).exchanged else {
            continue
        }
        guard atomicServerKey.compareExchange(expected: values.serverKey, desired: newValues.serverKey, ordering: .acquiringAndReleasing).exchanged else {
            continue
        }
        guard atomicCheckSum.compareExchange(expected: checkSum, desired: newValues.checkSum, ordering: .acquiringAndReleasing).exchanged else {
            continue
        }
        return true
    }
}

And in case getters can't tolerate being without data at all – this will likely get data reasonably quick:

func getValuesAsync(execute: @escaping (Values) -> Void) {
    let v = getValues()
    if v.valid {
        execute(v.values)
    } else {
        DispatchQueue.main.asyncAfter(deadline: .now() + 0.01) {
            getValuesAsync(execute: execute)
        }
    }
}
1 Like

i think this looks a lot like what i need, though i think i will simplify to one writer + many readers by serializing the cluster time updates into an AsyncStream, since there is no “read your writes” requirement for the cluster times — it is okay to gossip a slightly stale cluster time.

one thing i am curious about: will a native actor await always put the thread to sleep the first time it fails to acquire the actor lock, or does it also spin for a few iterations before giving up?

how expensive is a plain

let clusterTime:ClusterTime = await deploymentState.clusterTime

under contention? is there a way to hint to the concurrency runtime how long deploymentState’s critical sections take to execute?

usually, the reads will be paired with writes. the procedure looks like:

func request() async throws
{
    let ourTime:ClusterTime = getOurClusterTime()
    let response:Response = try await networkRequest(gossiping: ourTime)
    setOurClusterTimeIfGreater(response.clusterTime)
}

so unless there are a lot of network errors, there should be ~51% reads and ~49% updates.

i think under a naïve implementation there would be three synchronization points:

  1. reading our cluster time
  2. awaiting the server’s response
  3. writing our cluster time

but 3 shouldn’t be a synchronization point because we want to fire-and-forget the cluster time update. which can be done by yielding to a stream. 2 is inherent to the operation. so as i think about this more i think synchronization point 1 is the main issue.

i’m not sure why we need five atomics here, my understanding has always been that atomics should be used to regulate concurrent access, not bear data themselves. would it be more performant to have some sort of non-atomic scratch buffer whose access is governed by one or two atomics?

Cache coherency topic was always a voodoo magic to me. The fear is:

It might be the case that "rawPointer.pointer = 123" is not ok and "memmove(rawPointer, &data, 8)" is ok IRT flushing cache - not sure. (Reminds me the prehistoric precedents of BlockMove vs BlockMoveCode vs BlockMoveData).

how do actor locks flush the cache? i think the thing i am trying to implement is just an actor that knows it has short critical sections and knows to spin for a few iterations before going to sleep.

I don't think that's how actors work (but I don't really know), and even if this was the case that seems like an implementation detail that could change.

Note that await isn't supposed to suspend the thread, it is expected to pick another partial task to run. I suppose it'll sleep the thread only if there are no other task to run.

It's also bad to mix Swift Concurrency with mutexes that can block waiting on other tasks because all the tasks share the same limited thread pool. If I wanted to make my example compatible with Swift Concurrency, I'd avoid any "lock" operation in the functions called from a Swift Concurrency task ("tryLock" is fine though). So the setter in my example should be called its own dispatch queue and I'd probably change the fallback in the getter to use something else than "lock", perhaps just calling await Task.yield() to let other tasks progress in their work and then attempting tryLock again.

So it'd be more like this:

var value: BigValue {
   get async {
      while true {
         for _ in 0..<4 {
            if mutex1.tryLockForReading() {
               defer { mutex1.unlock() }
               return storage1
            }
            if mutex2.tryLockForReading() {
               defer { mutex2.unlock() }
               return storage2
            }
         }
         await Task.yield()
      }
   }
}

let updateSerialQueue = DispatchQueue()

func bump(to newValue: BigValue) {
   // ignore newValue if not higher
   guard newValue > value else { return }

   updateSerialQueue.async {
      // test again because this might no longer be the case
      // - can avoid locking for reading here because we know nobody is writing 
      //   outside of this serial queue
      guard newValue > storage1 else { return }

      mutex2.lockForWriting() // stop readers from reading storage2
      storage2 = newValue
      mutex2.unlock()

      mutex1.lockForWriting() // stop readers from reading storage1
      storage1 = newValue
      mutex1.unlock()
   }
}
1 Like

There are CPU instructions which ensure exclusive access to particular memory regions, but that's lower level than the OS scheduler. Just use one of the OS-provided locks, such as a mutex. Unfair locks can also help improve throughput, but at the cost of latency.

Linus Torvalds on Spinlocks, Unfair locks, etc

So what's the fix for this?

Use a lock where you tell the system that you're waiting for the lock, and where the unlocking thread will let you know when it's done, so that the scheduler can actually work with you, instead of (randomly) working against you.

Notice, how when the author uses an actual std::mutex, things just work fairly well, and regardless of scheduler. Because now you're doing what you're supposed to do. Yeah, the timing values might still be off - bad luck is bad luck - but at least now the scheduler is aware that you're "spinning" on a lock.

Or, if you really want to use use spinlocks (hint: you don't), make sure that while you hold the lock, you're not getting scheduled away. You need to use a realtime scheduler for that (or be the kernel: inside the kernel spinlocks are fine, because the kernel itself can say "hey, I'm doing a spinlock, you can't schedule me right now").

But if you use a realtime scheduler, you need to be aware of the other implications of that. There are many, and some of them are deadly. I would suggest strongly against trying. [...]

Because otherwise you're going to at some time be scheduled away while you're holding the lock (perhaps after you've done all the work, and you're just about to release it), and everybody else will be blocking on your incorrect locking while you're scheduled away and not making any progress. All spinning on CPU's.

Really, it's that simple.

This has absolutely nothing to do with cache coherence latencies or anything like that. It has everything to do with badly implemented locking.

I repeat: do not use spinlocks in user space, unless you actually know what you're doing . And be aware that the likelihood that you know what you are doing is basically nil.

There's a very real reason why you need to use sleeping locks (like pthread_mutex etc).
[...]

Because you should never ever think that you're clever enough to write your own locking routines.. Because the likelihood is that you aren't (and by that "you" I very much include myself - we've tweaked all the in-kernel locking over decades , and gone through the simple test-and-set to ticket locks to cacheline-efficient queuing locks, and even people who know what they are doing tend to get it wrong several times).

There's a reason why you can find decades of academic papers on locking. Really. It's hard.

5 Likes

Very colourful blog post, thanks for sharing.

Overall this kind of "premature" optimisation doesn't seem worth doing at all. In this code:

let ourTime:ClusterTime = getOurClusterTime()
let response:Response = try await networkRequest(gossiping: ourTime)
setOurClusterTimeIfGreater(response.clusterTime)

the network request will be many orders of magnitude longer compared to any potential win of using atomics or short critical sections protected by a lock.

Try to measure it? (min / max / avg time).

in many domains, such as centralized arbitrage, latency is relative, especially when co-location becomes dominant in a particular space. when everyone (yes, everyone) is co-locating in the same AWS region as the master recordkeeper, it sometimes comes down to factors like scheduler latency that break ties and determine who is on the winning or losing side of a transaction. and if one party is on the losing side just 50.01% of the time, for silly optional reasons such as a scheduling delay, they will eventually go bankrupt through the process of adverse selection.

in some spaces, like cryptocurrency arbitrage, the selection environment is quite forgiving (maybe not so any more in 2023), because intra-AWS region latencies are still on the order of 10–20 ms. (to compare: a typical time quanta on a linux system can be 1–6 ms long!)

but i know folks who work with more traditional securities where people are measuring fiber optic cables and sinking thousands of SWE-hours into writing custom schedulers to stay ahead of the selection horizon. and on the extreme end of the spectrum, i know people doing forex arbitrage where even writing custom schedulers is not enough, these firms are investing in ASICs.

1 Like

Another potential approach having non blocking readers and writers is to use a ring buffer. Here writers always write into a newly writer position, established atomically (hopefully there are no readers left who reads from that location of the ring buffer if the ring buffer is big enough and that position was last active, say, several seconds ago – by that time (fingers crossed) all readers of that location are gone (†). After writing the new state writer does compare exchange on the readPosition - one of the competing writers wins - and from then on until the new writer changes it – that's the current reading position for the new readers.

(†) potentially, after a reading operation readers can check if their reading position is too far from the current reading position - if so - they retry.

Pseudocode:

ringBuffer (the quicker the writers - the bigger it should be)

var writePos: atomic Int // writers atomically increment this
var readPosPos: atomic Int

initial setup:
    write(bytes, size) // write initial state
    // after that readers can read
    
// among two or more concurrent writers one will win:

func write(bytes, size) {
	let newPos = writePos.increment(size) // increments writePos atomically, gets new pos
	let pos = newPos - size
	ringBufferWrite(pos, bytes, size)
	// memory fence
	readPos.compareExchange(pos) // in a loop. this will compete against all concurrent writers
	// eventually the final competing writer writes its value - "last word" wins
}

func read(bytes, size) {
    let pos = readPos // read atomically
    // memory fence
    ringBufferRead(pos, bytes, size)
    // † see above
}

ring buffer size should be big enough so that writePos - readPos distance is smaller than the ring buffer size.

Note that in this form it doesn't compare timestamps.

IIRC x86’s AVX has atomic loads up to 512 bits. I think NEON only goes up to 128 bits, though. Dunno how much overhead there is in packing and unpacking the data, but it might be worth trying.

Edit: Nope, I’m wrong.

Going back to this... if you don't plan to run it on a multi-core system, couldn't you just do everything in the main actor (or another global actor) and thus side-step the synchronization issue? There's little point in performing short-running non-blocking tasks using multiple threads on a single core system, and it adds some overhead.

Alas no. The relevant section of the Intel architecture reference is vol 3A, §9.1.1 Guaranteed Atomic Operations, which guarantees that:

  • 2 and 4 byte aligned accesses are guaranteed atomic since 486.
  • 2 byte unaligned accesses to uncached memory that fits within a 32b data bus are atomic since Pentium.
  • 8 byte aligned accesses are atomic since Pentium.
  • 2, 4, and 8 byte unaligned accesses to cached memory that do not cross a cache line are atomic since the P6.
  • 16 byte aligned SSE and AVX operations are atomic when AVX is supported.

There is no guarantee about accesses wider than 16 bytes.

As for ARM NEON, §B2.2.1 Requirements for single-copy atomicity:

  • Reads to SIMD and floating-point registers of a 128-bit value that is 64-bit aligned in memory are treated as a pair of single-copy atomic 64-bit reads.
  • Writes from SIMD and floating-point registers of a 128-bit value that is 64-bit aligned in memory are treated as a pair of single-copy atomic 64-bit writes.
1 Like

Well, that’ll teach me to trust Google over reading the manual. Thanks!

It's entirely possible that specific implementations have made them single-copy atomic; it's just not guaranteed by the architecture specification.

By the way, on this point:

I think they are widely supported by hardware since 2008, but Linux doesn't require it (Windows 8.1, released in 2013, does), so theoretically they might be implemented using a mutex on ancient hardware. But practically speaking, most Linux systems should indeed have the ability.

Maybe we should consider enabling the feature by default and having it be opt-out, rather than opt-in?

1 Like

this comes back to re-usability. running everything on a global actor means it will be very difficult to port the implementation to run efficiently on a multi-core system later on.

It shouldn’t affect porting to a multi-core host one way or another, since global actors use the default executor. If you want to implement a multicore guest, you could do so by implementing each vCPU as an actor.

The Swift runtime now requires double-wide atomics anyway, so the underlying issue here has been resolved in recent toolchains. The next minor version of swift-atomics will bump the required toolchain version to enable them on all platforms, making AtomicReference universally available.

7 Likes

what i meant is that designing the code to run on a single thread necessarily means cutting off opportunities for parallelism if we ever wanted to allocate slightly more resources (like an extra CPU core) to that application. for example, the single-threaded version of the MongoDB server monitoring algorithm is much less responsive than its multithreaded variant.

how recent? is swift 5.5 sufficient?