Structured caching in an actor

During a WWDC2023 lab, caching values in an actor came up. I suggested my typical technique of caching a Task within the actor. The engineer brought up a number of pitfalls with that approach, and suggested trying to use structured concurrency instead. I really wanted to understand how to do this, but we ran out of time.

The basic idea, if I understood right, was to use withCheckedThrowingContinuation and escape its continuation. I've been trying a few different things, and I'm not getting anywhere. Does anyone have any pointers on how I might go about this?

Here's my usual unstructured approach:

actor MyActor {
	var cachingTask: Task<String, Error>?

	func getValue() async throws -> String {
		if let task = cachingTask {
			return try await task.value
		}

		let task = Task<String, Error> {
			// pretend this really expensive
			return "hello"
		}

		self.cachingTask = task

		return try await task.value
	}
}
1 Like

They are recommending the way you used in here: Protect mutable state with Swift actors - WWDC21 - Videos - Apple Developer (Code -> 11:59 - Check your assumptions after an await: A better solution)

I can't come up with a way using structured concurrency APIs (async let and TaskGroup) either.

And I'm curious about pitfalls with this approach.

1 Like

You implementation already supports priority escalation so the only potential "pitfall" I can come up with is that your implementation doesn't support any kind of cancellation, though cancellation behavior is always specific to the problem you are trying to solve.

For example here, say a task is waiting on the caching task and gets cancelled. Do you want to be able to stop that waiting task and throw a CancellationError and let the other waiting tasks keep waiting?
Do you want to forward the cancellation to the caching task as well which could lead to the other waiting tasks to get a CancellationError thrown at them if the caching task somehow responds to the cancellation? Maybe only forward the cancellation if the task that started the caching task gets cancelled? The answer to all of these questions depends on what kind of behavior you want to achieve.

Other than that I can't see a way how you could benefit from structured concurrency; rather, it would make this a lot more complicated than it needs to be.

The issue with creating Task inside actor is that you loose priority and cancellation propagation from the task/group in which getValue() would be called. More likely you want to achieve something like this:

actor Example {

    private var loadingValue = false
    private var valueContinuations: [CheckedContinuation<String, Error>] = []

    func getValue() async throws -> String {
        guard !loadingValue else {
            return try await withCheckedThrowingContinuation { continuation in
                valueContinuations.append(continuation)
            }
        }
        loadingValue = true
        var error: Error?
        do {
            let value = try await someComplexTask()
            resumeValueContinuations(with: .success(value))
        } catch let thrownError {
            error = thrownError
            resumeValueContinuations(with: .failuew(thrownError))
        }
        loadingValue = false
        if let error {
            throw error
        }
        return value
    }

    func someComplexTask() async throws -> String {
        try await Task.sleep(nanoseconds: UInt64(10 * 1e9))
        let value = UUID().uuidString
        return value
    }

    func resumeValueContinuations(with result: Result<String, Error>) {
        for continuation in valueContinuations {
            continuation.resume(returning: value)
        }
        valueContinuations = []
    }

}

func main() async throws {
    let example = Example()
    let result = try await withThrowingTaskGroup(of: String.self) { group in
        group.addTask {
            try await example.getValue()
        }
        group.addTask {
            try await example.getValue()
        }
        group.addTask {
            try await example.getValue()
        }
        var output: [String] = []
        for try await value in group {
            output.append(value)
        }
        return output
    }
    print(result)
}

final class ExampleTests: XCTestCase {

    // Expected to print 3 same UUID values after 10 seconds.
    func test_structuredActorExample() async throws {
        try await main()
    }

}
1 Like

I think there's a bug since you don't resume the continuations when your task fails. Otherwise the continuations will wait forever since they never resume with an error. I ran into this comparing how the two implementations behave when the group is cancelled (indeed, the task implementation doesn't react to cancellation).

1 Like

Here's code that converts the ImageDownloader from the WWDC talk from the task-based cache to a continuation-based one:

actor ImageDownloader {
    private enum CacheEntry {
        case inProgress([CheckedContinuation<Image, any Error>])
        case ready(Image)

        mutating func add(_ continuation: CheckedContinuation<Image, any Error>) {
            switch self {
            case .inProgress(var continuations):
                continuations.append(continuation)
                self = .inProgress(continuations)
            case .ready:
                break
            }
        }
    }

    private var cache: [URL: CacheEntry] = [:]

    func image(fromURL url: String) async throws -> Image {
        try await image(from: URL(string: url)!)
    }

    private func addWaitingContinuation(_ continuation: CheckedContinuation<Image, any Error>, for url: URL) {
        cache[url, default: .inProgress([])].add(continuation)
    }

    func image(from url: URL) async throws -> Image {
        if let cached = cache[url] {
            switch cached {
            case .ready(let image):
                return image
            case .inProgress:
                return try await withCheckedThrowingContinuation { continuation in
                    addWaitingContinuation(continuation, for: url)
                }
            }
        }

        cache[url] = .inProgress([])

        let image: Image
        do {
            image = try await downloadImage(from: url)
        } catch {
            resumeInProgressContinuations(with: .failure(error), for: url)
            throw error
        }

        resumeInProgressContinuations(with: .success(image), for: url)

        cache[url] = .ready(image)

        return image
    }

    private func resumeInProgressContinuations(with result: Result<Image, any Error>, for url: URL) {
        guard case .inProgress(let continuations) = cache[url] else { return }
        for con in continuations {
            con.resume(with: result)
        }
    }
}

While it has all the benefits of structured concurrency it's pretty easy to get wrong, especially given how often one would want something like this (I have an actor that makes async requests). Is there a better way to do this? If not, is an API that solves this something that should be added to the standard library?

3 Likes

You're right, thank you for noticing this issue, pretty painful one. I actually constantly have issues with errors being thrown in the middle of awaited tasks and forget to handle it. Some cases I have covered with helpers, but still they arise.

BTW, your implementation needs cache.removeValue(forKey: url) in catch block, since in case of failure it would remain in .inProgress case, preventing from another try for this URL.

So this error handling is really hard topic for such cases. I suppose defer could help here...

1 Like

An option to handle errors here might be to extend CacheEntry here with third case error(any Error). In that way we can update cache[url] = .error(error) in catch block and handle this case next time method has been accessed.

Hey @mattie,

I think we were talking in the lab about this. There has already been some great suggestions here in the comments.

@adamkuipers latest code sample is quite close to what I would have suggested. The only thing that is missing is a withTaskCancellationHandler around the withCheckedContinuation. In the event, of cancellation you might have to re-create the underlying request if you have a continuation that is also interested in the result.

Overall, all of this would be easier with non-reentrant actor methods where the runtime is doing all of the magic around queueing and cancellation. cc @ktoso

4 Likes

Thank you all very much. @FranzBusch it was indeed you I was speaking with!

While I'm still wrestling with cancellation, I think I now understand the underlying ideas here. But, I have to say, I'm not very tempted by the approach. I definitely do want both priority and cancellation propagation. But, the opportunity for bugs seems extremely high.

I experimented a little with abstracting out the caching behavior into, say, a property wrapper. But it seems to me that the @_unsafeInheritExecutor behavior of both withCheckedContinuation and withTaskCancellationHandler is critical to this. So, perhaps a macro is the only option.

That is my follow-up question: am I understanding correctly, that @_unsafeInheritExecutor makes the guarantee that these functions will never suspend, despite being behind an await, until they invoke their closures? Their documentation more or less says this, but it's such a counter-intuitive behavior given how await is supposed to work that I really would love stronger wording.

@_unsafeInheritExecutor is an interesting and really unsafe annotation so it should only be used when you know what you are doing. All methods that are annotated with this have been checked to not break any invariants.
To understand what it does, we first have to talk about a few Concurrency concepts. When are suspensions actually happening? There are two places right now where async code can suspend

  1. When calling with[Checked/Unsafe]Continuation
  2. When hopping to a different executor

The best way to see where this is actually happening is looking at the emitted silgen of your code. There are three important sil functions to look out for:

  1. get_async_continuation
  2. await_async_continuation
  3. hop_to_executor

The important bit is that only the second and third of those are actually suspending. The first one get_async_continuation is merely creating a continuation that will be awaited via await_async_continuation. This translated in code like this:

func foo() async {
    await withCheckedContinuation { (continuation: CheckedContinuation<Void, Never>) in
        // Nothing has suspended yet. We just created the continuation via get_async_continuation
        
       // We can do some work here but nothing suspended yet
    } // When reaching the end of the scope here we are going to actually suspend via await_async_continuation
}

The important bit now and this is where @_unsafeInheritExecutor comes into play is how hop_to_executor works or rather when it is used. Let's try to build our own withCustomCheckedContinuation that mimics the API of withCheckedContinuation:

func withCustomCheckedContinuation(_ body: () async -> Void) async {
    await body()
}

Looking at the SILGen of this:

sil hidden [ossa] @$s6output29withCustomCheckedContinuationyyyyYaXEYaF : $@convention(thin) @async (@noescape @async @callee_guaranteed () -> ()) -> () {
bb0(%0 : $@noescape @async @callee_guaranteed () -> ()):
  debug_value %0 : $@noescape @async @callee_guaranteed () -> (), let, name "body", argno 1, loc "/app/example.swift":8:38, scope 8 // id: %1
  %2 = enum $Optional<Builtin.Executor>, #Optional.none!enumelt, loc "/app/example.swift":8:6, scope 8 // users: %5, %3
  hop_to_executor %2 : $Optional<Builtin.Executor>, loc "/app/example.swift":8:6, scope 8 // id: %3
  %4 = apply %0() : $@noescape @async @callee_guaranteed () -> (), loc "/app/example.swift":9:11, scope 9
  hop_to_executor %2 : $Optional<Builtin.Executor>, loc "/app/example.swift":9:11, scope 9 // id: %5
  %6 = tuple (), loc "/app/example.swift":10:1, scope 9 // user: %7
  return %6 : $(), loc "/app/example.swift":10:1, scope 9 // id: %7
} // end sil function '$s6output29withCustomCheckedContinuationyyyyYaXEYaF'

You can see that there are two hop_to_executor in this SIL. One right at the start and another right after calling the body closure. In Swift Concurrency, the callee is responsible to hop to the correct executor and the caller of an async function has to hop back to its executor after the async function returned. This is to ensure that at any point in an async method we are on the correct executor.

Now let's apply @_unsafeInheritExecutor to our withCustomCheckedContinuation and look at the SIL

sil hidden [ossa] @$s6output29withCustomCheckedContinuationyyyyYaXEYaF : $@convention(thin) @async (@noescape @async @callee_guaranteed () -> ()) -> () {
bb0(%0 : $@noescape @async @callee_guaranteed () -> ()):
  debug_value %0 : $@noescape @async @callee_guaranteed () -> (), let, name "body", argno 1, loc "/app/example.swift":9:38, scope 8 // id: %1
  %2 = apply %0() : $@noescape @async @callee_guaranteed () -> (), loc "/app/example.swift":10:11, scope 9
  %3 = tuple (), loc "/app/example.swift":11:1, scope 9 // user: %4
  return %3 : $(), loc "/app/example.swift":11:1, scope 9 // id: %4
} // end sil function '$s6output29withCustomCheckedContinuationyyyyYaXEYaF'

As you can see there is no hop_to_executor here at all. Since the body closure can do an arbitrary amount of hops we are on an arbitrary executor after calling it. This is insanely scary and the reason why this annotation should only be used with extreme care.

For withCheckedContinuation however this exactly what we want. Since we must avoid any potential suspension points before creating the continuation otherwise invariants inside an actor are almost impossible to maintain. It is safe to use withCheckedContinuation because the caller of withCheckedContinuation is going to hop back to its executor and anything that is done in the actual implementation of withCheckedContinuation is fully thread safe and agnostic to what executor it is running on.

I hope this helps to clarify this a bit. If you have more questions, let me know!

8 Likes

This does indeed help to clarify. Thank you so much. However I do have follow up!

Suppose I wanted to make a combination of withTaskCancellationHandler and withCheckedThrowingContinuation. It might look like this:

func withCancellingContinuation<T>(
	operation: (CheckedContinuation<T, Error>) -> Void,
	onCancel handler: @Sendable () -> Void
) async throws -> T {
	try await withTaskCancellationHandler {
		try await withCheckedThrowingContinuation { continuation in
			operation(continuation)
		}
	} onCancel: {
		handler()
	}
}

To maintain the semantics of these underlying functions, I think it is both safe and necessary to mark such a function with @_unsafeInheritExecutor. And, I think more generally, any function that does nothing but call a function marked as @_unsafeInheritExecutor can safely itself be marked @_unsafeInheritExecutor. Am I correct here?

I'm focusing in so much on this because every effort I have made to generalize these caching patterns is foiled unless I can use @_unsafeInheritExecutor (or write a macro, which I have not investigated at all).

You can but I don't think this is the solution we want in the end. We should probably explore either making a safe variant of @_unsafeInheritExecutor which just elides the initial hop or custom task executors which gives the caller more control where the callees are going to hop to.

I am a bit confused by this. I have rarely come across a place where we need to apply this attribute. Could you write an example where you think this is necessary to make your code functionally correct which is not the convenience method you just showed?

It is likely you aren't the one confused here :wink:

Instead, let me post a first-pass of an AsyncValueCache. It feels slightly strange to me that the cache itself is an actor, since it would be used only from within another actor. I also feel pretty unconfident in my ability to handle cancellation correctly. But, everyone has been so helpful and I really do feel like I'm making a lot of progress in my understanding.

actor AsyncValueCache<Value: Sendable> {
	private enum Entry {
		typealias Continuation = CheckedContinuation<Value, any Error>

		case none
		case inProgress([UUID: Continuation])
		case ready(Result<Value, Error>)

		mutating func add(_ continuation: Continuation, id: UUID) {
			guard case .inProgress(var continuations) = self else {
				fatalError()
			}

			continuations[id] = continuation

			self = .inProgress(continuations)
		}

		mutating func cancel(id: UUID) {
			// the cancel may happen too late...
			guard case .inProgress(var continuations) = self else {
				return
			}

			// ... but when in progress the continuation must be present
			guard let continuation = continuations[id] else {
				fatalError()
			}

			continuation.resume(throwing: CancellationError())

			continuations[id] = nil

			self = .inProgress(continuations)
		}

		mutating func begin() {
			self = .inProgress([:])
		}

		mutating func complete(with result: Result<Value, Error>) {
			guard case .inProgress(let continuations) = self else {
				fatalError()
			}

			for con in continuations.values {
				con.resume(with: result)
			}

			self = .ready(result)
		}
	}

	private var cache = Entry.none

	private func cancelContinuation(with id: UUID) {
		cache.cancel(id: id)
	}

	public func getValue(_ provider: @Sendable () async throws -> Value) async throws -> Value {
		switch cache {
		case .ready(let result):
			return try result.get()
		case .inProgress:
			let id = UUID()

			return try await withTaskCancellationHandler {
				try await withCheckedThrowingContinuation { continuation in
					cache.add(continuation, id: id)
				}
			} onCancel: {
				Task {
					await cancelContinuation(with: id)
				}
			}
		case .none:
			break
		}

		self.cache.begin()

		do {
			let value = try await provider()

			self.cache.complete(with: .success(value))

			return value
		} catch {
			self.cache.complete(with: .failure(error))

			throw error
		}
	}
}
1 Like

What I struggle with is the WWDC example of an ImageDownloader seems like such a common pattern for apps. An app commonly wants something that caches a network resource and provides an interface to it that's globally accessible, and, in swift concurrency, an actor is what first comes to mind. The CheckedContinuation implementation seems complicated and easy to mess up, and while the task-based implementation is much simpler you lose structured concurrency.

Non-reentrant actor methods sound nice, but I wonder when or if those will be added to the language. But maybe this pattern is more niche than I'm thinking and there's a better way to globally interface data initialized from a server.

4 Likes

I was just wondering today if we can extract this logic into separate type? Can be reused then everywhere.
Something like this:

actor ImageDownloader {
  
  private var cache: [URL: Image] = [:]
  private var waiter = Waiter<URL, Image>()
  
  func image(from url: URL) async throws -> Image {
    if let cached = cache[url] {
      return cached
    }
    let image = try await self
      .waiter
      .process(id: url) {
        try await downloadImage(from: url)
      }
    self.cache[url] = image
    return image
  }
}

actor Waiter<ID, V> where ID: Hashable {
  
  private var continuations: [ID: [CheckedContinuation<V, any Error>]] = [:]

  func process(
    id: ID,
    _ work: () async throws -> V
  ) async throws -> V {
    if self.continuations[id] != nil {
      return try await withCheckedThrowingContinuation { continuation in
        self.continuations[id, default: []].append(continuation)
      }
    }
    self.continuations[id] = []
    do {
      let value = try await work()
      self.serve(id: id, with: .success(value))
      return value
    } catch {
      self.serve(id: id, with: .failure(error))
      throw error
    }
  }
  
  private func serve(
    id: ID,
    with result: Result<V, any Error>
  ) {
    for continuation in (self.continuations[id] ?? []) {
      continuation.resume(with: result)
    }
    self.continuations
      .removeValue(forKey: id)
  }
}

Note that Waiter is just name that came to my mind, could be anything. :upside_down_face:

1 Like

Having the cache be an actor is totally fine IMO. Actors can hold other actors and should be used aggressively when you want to make something thread safe. You could implement the same thing with locks but that just requires more work due to handling the locking.

Please file GitHub issues and Feedback requests for any feature that you think should be better handled by the language. There is still room for improvement here and getting more real world usage helps decide what feature are important.

Thanks, I will. I wanted to be sure I wasn’t holding the language wrong in this instance or if there was a better way to think about the problem

I've been thinking around this implementation with cancellation aspects in mind. First, I'm assuming there should be try Task.checkCancellation() call before calling provider, so that cancelled tasks will not do unwanted work. Then, in that case all continuations will fail with the CancellationError.

However, I feel it is not the desired behaviour, since we could have several unrelated parent tasks, and only one of these has requested cancellation. We can check for continuations and cancel only if they empty:

public func getValue(
    _ provider: @Sendable () async throws -> Value
) async throws -> Value {
    if let value = try await checkCache() {
        return value
    }
    cache.begin()
    do {
        if 
            case let .inProgress(continuations) = cache, 
            continuations.isEmpty 
        {
            try Task.checkCancellation()
        }
        let value = try await provider()
        cache.complete(with: .success(value))
        return value
    } catch {
        cache.complete(with: .failure(error))
        throw error
    }
}

But in that case the task which was cancelled would receive the result too. So I've stuck here.

There also is a question on how to test cancellation in different scenarios. I haven't succeeded in writing reliable tests, which would fail instantly, but only over iterations. And for the case I've described above I haven't managed to write any valid test at all.

FWIW with SE-0420, we can now implement this using the latest toolchain:

func withCancellingContinuation<T>(
  isolation: isolated (any Actor)? = #isolation,
  operation: (CheckedContinuation<T, Error>) -> Void,
  onCancel handler: @Sendable () -> Void
) async throws -> T {
  try await withTaskCancellationHandler {
    try await withCheckedThrowingContinuation { continuation in
      _ = isolation
      operation(continuation)
    }
  } onCancel: {
    handler()
  }
}

Note: current version requires caller to explicitly pass #isolation:

try await withCancellingContinuation(isolation: #isolation) {
  self.assertIsolated()
  $0.resume()
} onCancel: {
  print("🐟")
}
2 Likes