Use swift-atomics to track a “largest seen value”?

this is sort of a continuation from How to use swift-atomics to expose non-isolated state of an actor?

is there a good algorithm for updating an atomic value only if the new value is larger than the old value?

for example, i would like

let currentTime:UnsafeAtomic<UInt64>

to only update the time if it is larger than the current time, so that the time increases monotonically.

one idea i had was to use exchange(_:ordering:) and roll back the change if the old value ended up being greater than the new value. but i am unsure if this risks livelock when updating the time concurrently because the rollback could get rolled back again. and the exposed time would no longer be monotonic.

I'm not an expert, but I'm pretty sure what you need is compareExchange:

  1. load the current value
  2. if the loaded value is already equal or higher to your new value, you're done
  3. call compareExchange with the expected value the one you loaded and the desired value the new one
  4. if it succeeds, you're done
  5. if it fails, take the current value returned by compareExchange and retry from step 2

Edit: updated step 5 to direct you to step 2 instead of step 1 (and thus save an extra load when retrying).

4 Likes

This is the algorithm to use. Note that under excessive contention this may perform worse than a locked check-and-update, but under low contention this performs well.

3 Likes

Great article on this topic: A Concurrency Cost Hierarchy | Performance Matters

It has a good overview of the different kinds of approaches that are available (atomics, locks, TLS, etc).

4 Likes

thanks for the read!

there are only two things i am still having a hard time understanding:

  1. what is the point of using compare_exchange_weak ( weakCompareExchange(expected:desired:successOrdering:failureOrdering:)) instead of the strong variant (compareExchange(expected:desired:ordering:))?

    as far as i could tell the strong form just implements the loop part for you.

  2. what memory ordering should be used in this type of “max-accumulator” atomic? is relaxed safe here?


here is what i have so far (where self.timestamp is the atomic reference):

get
{
    switch self.timestamp.load(ordering: .relaxed)
    {
    case 0:
        return nil
    case let timestamp:
        return timestamp
    }
}
set(value)
{
    var expected:UInt64 = self.timestamp.load(ordering: .relaxed)
    while expected < value
    {
        switch self.timestamp.compareExchange(expected: expected,
            desired: value,
            ordering: .relaxed)
        {
        case (exchanged: false, let current):
            expected = current
        case (exchanged: true, _):
            return
        }
    }
}

Yep, that’s the point. Since you’re already implementing the loop yourself with a >= condition, you don’t need to use the library’s loop with a == condition.

I believe you can use relaxed for the initial load, since that won’t have any other atomic dependencies. You’ll want to use acquiringAndReleasing for the compare-exchange.

3 Likes

ah that makes sense, thank you!

okay, i had to get a little fancier because it turns out i need to store more than one word’s worth of atomic data. so i upgraded to a final class ClusterTime, and now i have:

    /// The current largest-seen cluster time.
    private nonisolated
    let _clusterTime:UnsafeAtomic<Unmanaged<ClusterTime>?>

now my computed property looks like:

get
{
    self._clusterTime.load(ordering: .relaxed)?.takeUnretainedValue()
}
set(value)
{
    guard let value:ClusterTime
    else
    {
        return
    }

    let owned:Unmanaged<ClusterTime> = .passRetained(value)
    var shared:Unmanaged<ClusterTime>? = self._clusterTime.load(
        ordering: .relaxed)
    
    while true
    {
        if  let old:ClusterTime = shared?.takeUnretainedValue(),
                old.max.timestamp >= value.max.timestamp
        {
            owned.release()
            return
        }

        switch self._clusterTime.weakCompareExchange(expected: shared,
            desired: owned,
            successOrdering: .acquiringAndReleasing,
            failureOrdering: .acquiring)
        {
        case (exchanged: false, let current):
            shared = current
        
        case (exchanged: true, let owned?):
            owned.release()
            return
        
        case (exchanged: true, nil):
            return
        }
    }
}

and my deinit looks like:

deinit
{
    let _:ClusterTime? = self._clusterTime.destroy()?.takeRetainedValue()
}

i assume the atomic “owns” the object, so i pass it retained inside the setter (which should cancel out the setter’s implicit __owned release), and i exit either by releasing the new value (if the update lost the race), or releasing the old value (if the update won the race).

is this correct?

You can't do that. Just consider the getter:

self._clusterTime.load(ordering: .relaxed)?.takeUnretainedValue()

This is equivalent equivalent to that:

let value = self._clusterTime.load(ordering: .relaxed)
return value?.takeUnretainedValue()

And here it's more obvious that something can happen between the time you load the object reference and the time you call takeUnretainedValue. Something like the object getting released by another thread.

For it to work correctly, you'd need the load and the takeUnretainedValue (a retain) to be done as a single atomic operation. But I don't think there's a way to do that.


Ok, there's a way, sort of. You could use compareExchange to read the value while at the same time writing a sentinel value (nil I suppose) that will tell other threads to wait while you're doing your thing. And we just reinvented the spin lock.


Even if this was correct, considering each takeUnretainedValue and release operation is an atomic increment/decrement and also that you might need to allocate a new instance of that class pretty often, I doubt you could make it faster than a spin lock or a mutex protecting a small struct.

1 Like

great catch.

in my case, it is not important that the clusterTime read retrieves the freshest available value, because i am not doing anything with the value except broadcasting it over a network to gossip the current time to other nodes.

so if a thread tries to read the current time and discovers someone else is currently updating it, i don’t want the thread to go to sleep and wait for the update to complete i just want it to read the value (if any) of the current time from before the update began.

which is getting me thinking now if i could use a pointer to an unsafe (ClusterTime, ClusterTime) and toggle between the elements with an atomic Bool.

Then you have a race between one thread reading the bool and another thread swapping it.

About the object reference approach, note how it's the release operation that is causing issues when trying to read the value. If you were willing to leak memory (never release the object) it would work because there would be no possibility of the object being released at the wrong time (between the two operations in the getter).

Knowing this, you could set a sentinel value while inside your getter (an atomic bool) that would tell the the setter to wait before releasing anything. This way the getter never has to wait over a lock, but the setter might have to wait for all the getters to be done before finishing.

Edit: not an atomic bool, an atomic counter counting the number of simultaneous calls to the getter, and the setter would be able to release the object when that counter falls to zero (assuming there are times no getters are active).

Would that work and if not why? Pseudocode:

func set(parameters: ...) {
	while true {
		let a = loadA()
		let b = loadB()
		let (needToChange, newA, newB) = calculateValues(a, b, parameters)
		if !needToChange { break }
		let (changedA, origA) = compareExchange(expected: a, desired: newA)
		let (changedB, origB) = compareExchange(expected: b, desired: newB)
		if changedA && changedB {
			break
		}
	}
}
Edit. Generalised real code, untested
import Atomics

func changeValuesAtomically<T>(_ atomicValues: [ManagedAtomic<T>], calculateValues: ([T]) -> [T]?) {
    while true {
        let curValues = atomicValues.map {
            $0.load(ordering: AtomicLoadOrdering.relaxed) // order?
        }
        guard let newValues = calculateValues(curValues) else {
            break
        }
        precondition(newValues.count == atomicValues.count)
        
        var done = true
        for i in 0 ..< atomicValues.count { // three-way zip ?
            let atomic = atomicValues[I]
            let curValue = curValues[I]
            let newValue = newValues[I]
            let (changed, oldValue) = atomic.compareExchange(expected: curValue, desired: newValue, ordering: .acquiringAndReleasing) // order?
            // any good use for oldValue here?
            if !changed {
                done = false
                break
            }
        }
        if done { break }
    }
}

// Usage:

var timeStamp = ManagedAtomic(0)
var somethingElse = ManagedAtomic(0)

var newTimeStamp = 123
var newSomethingElse = 456

changeValuesAtomically([timeStamp, somethingElse]) { values in
    precondition(values.count == 2)
    if newTimeStamp <= values[0] {
        return nil
    }
    return [newTimeStamp, newSomethingElse]
}

However, for time critical use I'd not use dynamically allocated arrays. De-generalised version:

import Atomics

func changeValuesAtomically<T1, T2>(_ atomic0: ManagedAtomic<T1>, _ atomic1: ManagedAtomic<T2>, calculateValues: (T1, T2) -> (T1, T2)?) {
    while true {
        let curValue0 = atomic0.load(ordering: .relaxed) // order?
        let curValue1 = atomic1.load(ordering: .relaxed) // order?
        
        guard let (newValue0, newValue1) = calculateValues(curValue0, curValue1) else {
            break
        }
        
        let (changed0, oldValue0) = atomic0.compareExchange(expected: curValue0, desired: newValue0, ordering: .acquiringAndReleasing) // order?
        // any good use for oldValue here?
        if !changed0 { continue }
        let (changed1, oldValue1) = atomic1.compareExchange(expected: curValue1, desired: newValue1, ordering: .acquiringAndReleasing) // order?
        // any good use for oldValue here?
        if !changed1 { continue }
        break
    }
}

// Usage:
var timeStamp = ManagedAtomic(0)
var somethingElse = ManagedAtomic(0)

var newTimeStamp = 123
var newSomethingElse = 456

changeValuesAtomically(timeStamp, somethingElse) { timeStamp, somethingElse in
    if newTimeStamp <= timeStamp { return nil }
    return (newTimeStamp, newSomethingElse)
}

Note on atomicity of loads. This sequence of loads:

    let curValue0 = atomic0.load(ordering: .relaxed)
    let curValue1 = atomic1.load(ordering: .relaxed)
    // checkValues here, e.g. precondition((curValue1 + curValue0) % 3 == 0)

doesn't give "overall atomic result", for example if you were to maintain the invariant that "somethingElse + timeStamp must be divisible by 3" that invariant could be broken at "checkValues here" point above. If that kind atomicity is required – one (perhaps non ideal) way would be to loop those two lines until the wanted invariant is satisfied, or in the code above just "continue":

    while true {
        let curValue0 = atomic0.load(ordering: .relaxed)
        let curValue1 = atomic1.load(ordering: .relaxed)
        if !invariantSatisfied(curValue0, curValue1) { continue }
        ....
    }

swift-atomics has a tool for solving this problem, using AtomicReference. Importantly, this is hard to use correctly because it requires double-wide atomic operations, not widely available on most Linux systems at this time.

As @michelf and @ksluder have rightly pointed out, you cannot safely achieve this pattern any other way without exposing yourself to mismanaged memory. I encourage you not to try to rewrite this pattern, and instead to either use a lock or deliberately leak your storage such that the downside risk is avoided.

The following multi-word (in this example 3-word) atomic load / store "lock free" algorithm uses a separate atomic lock variable to avoid the issue of load atomicity (observing a mixture of old and new partially written data). Edit: as pointed out below this is doing a spin lock, so not the best idea.

Multi-word atomic load / store lock free algorithm - as shown below this is similar to spin lock
import Atomics

func acquireLock(_ v: ManagedAtomic<Int>) {
    var done = false
    repeat {
        done = v.compareExchange(expected: 0, desired: 1, ordering: .acquiringAndReleasing).exchanged
        // busy loop
    } while !done
}

func releaseLock(_ v: ManagedAtomic<Int>) {
    let done = v.compareExchange(expected: 1, desired: 0, ordering: .acquiringAndReleasing).exchanged
    precondition(done, "should always succeed, right?")
}

func getABC<A, B, C>(lock: ManagedAtomic<Int>, a: ManagedAtomic<A>, b: ManagedAtomic<B>, c: ManagedAtomic<C>) -> (A, B, C) {
    acquireLock(lock)
    let curA = a.load(ordering: .relaxed)
    let curB = b.load(ordering: .relaxed)
    let curC = c.load(ordering: .relaxed)
    releaseLock(lock)
    return (curA, curB, curC)
}

func setABC<A, B, C>(lock: ManagedAtomic<Int>, a: ManagedAtomic<A>, b: ManagedAtomic<B>, c: ManagedAtomic<C>, newValues: (A, B, C) -> (A, B, C)?) {
    acquireLock(lock)
    let curA = a.load(ordering: .relaxed)
    let curB = b.load(ordering: .relaxed)
    let curC = c.load(ordering: .relaxed)
    if let (newA, newB, newC) = newValues(curA, curB, curC) {
        var done = a.compareExchange(expected: curA, desired: newA, ordering: .acquiringAndReleasing).exchanged
        precondition(done, "should always succeed, right?")
        done = b.compareExchange(expected: curB, desired: newB, ordering: .acquiringAndReleasing).exchanged
        precondition(done, "should always succeed, right?")
        done = c.compareExchange(expected: curC, desired: newC, ordering: .acquiringAndReleasing).exchanged
        precondition(done, "should always succeed, right?")
    }
    releaseLock(lock)
}

// Usage:

var lockVar = ManagedAtomic(0)
var a = ManagedAtomic(0)
var b = ManagedAtomic(0)
var c = ManagedAtomic(0)

// atomically gets three values
let (av, bv, cv) = getABC(lock: lockVar, a: a, b: b, c: c)

// atomically sets three values
setABC(lock: lockVar, a: a, b: b, c: c) { curA, curB, curC in
    // return nil to not change
    // use current values if needed
    (1, 2, 3)
}

Do I understand correctly that it would be incorrect to load / store data without using "load" / "compareExchange" (even when under "lock" when there no contention) as another thread on a different CPU core might not read correct data as it wasn't propagated yet to the relevant cache line / main memory? And by using load / compareExchange (still under lock) there's no such issue? Anything unnecessary this algorithm is doing, or anything missing? It looks alright to me but if spot anything suspicious please share.

That looks like a spinlock. acquireLock must wait for other threads to release the lock before it can exit its loop.

I'd be warry of priority inversion problems. If a low priority thread aquires the lock and then gets suspended, and a couple high priority threads start actively spinning on it, when is the low priority thread going to awake and release the lock? It might take a long time, or never happen if the scheduler waits on the high priority threads to have no work to do before running the low priority one (as I think would happen on iOS).

Today's mutexes are generally hybrid: running for a few cycles as a spinlock before handing things to the OS scheduler. So it's not necessarily more costly than a spinlock. Apple also has "unfair" locks with could be used here.

2 Likes

yes, im deploying to a single-vCPU target so spin-locking is a no-go. so if the cluster time cannot be tracked atomically it must suspend the thread.

which is really bad because the request API absolutely cannot go to sleep just to read something as unimportant as the current cluster time.

the gossiping protocol i'm implementing is the MongoDB replication protocol, which says that drivers MUST gossip the cluster time, but truth be told MongoDB specs overuse the word “MUST” to the point of being meaningless, and mongod servers don’t seem to care if you omit the $clusterTime.

so i think the only real “solution” is to just not track the cluster time to begin with.


EDIT: on rereading the linked answer i don't really understand why you cannot spin lock on a single-core system. if there is only one thread won't there never be contention in the first place?

On a single core and with preemptive multithreading, the OS can suspend your thread while the lock is aquired. A second thread will then spin until it can aquire the lock, but the lock can never be relinquished by the first thread until there is a context switch and first thread runs again. So the spinning part in the second thread is just wasted time: you might as well use a mutex to let the OS scheduler switch to the first thread immediatly.

If instead you have two cores, it's likely both threads will be running simultanously, in which case spinning a few cycles will likely let the other thread relinquish the lock.

So it's not that you cannot spin on a single core, it's just that spinning is wasting time because no progress can happen during that time.

2 Likes

Indeed :thinking:.

So other than using mutexes there is no way to ask OS:

  • "please write these N (e.g. 100) bytes to a specified memory location but once you start writing make sure you (1) finish it "quick" enough and (2) nobody is reading any bytes within that whole N byte region until you fully finish writing",
  • "please read N bytes from a specified memory location but once you start reading make sure you (3) finish reading "quick" enough and (4) nobody changes bytes anywhere within that whole N byte region until you fully finish reading".

where N is a number like 50 or 100? IIUC a small N like 8 is not a problem.

How much more? Can you squeeze evertything into 64 bits? (e.g. by using a timer with a smaller resolution, etc.)

256 bits:

  • 64 bit timestamp
  • 128 bit "cryptographic hash"
  • 64 bit "server key"

it is a gossip payload, so it must be gossiped in its entirety, or not at all.

retransmitting it isn't mandatory because the other nodes will just assume the current node is new to the cluster. but having too many "antisocial" nodes is probably not good for the cluster.