Serializing nonisolated actor access (Is `Task` expensive)?

I feel like I'm missing something obvious here, but I’m not sure how to best implement what I’m trying to do. Maybe an Actor is not what’s needed here.

I'm trying to coordinate logging messages among a bunch of different Loggers (this is being shoehorned into swift-log, but that implementation detail shouldn't be relevant). I have an arbitrary number of LogHandlers that share a singleton actor LogCoordinator. I need to be able to call a nonisolated log method on the LogCoordinator from any context, and the calls can be executed asynchronously, but should be serialized. To that end, I’m trying to do something like this:

actor LogCoordinator
{
	nonisolated func log(message: String)
	{
		self.workQ.async
		{
			await self.logInternal(message: message)
		}
	}
	
	func logInternal(message: String)
	{
		//	do stuff with internal state
	}
}

But I can't actually await within a DispatchQueue.async call. I have to wrap it in a Task, but that strikes me as expensive. Is it expensive?

Am I going about this entirely the wrong way? If this were just a simple class, I could manage the queues myself, but I thought the point of actors was to relieve me of some of that burden.

Two things here:

Yeah the way to do this is in general:

nonisolated func thing() {
  Task { await self._thing() }

... but, you'll lose ordering guarantees (!), which is very very problematic for a logging case like you have, since you could:

log("A")
log("B")

and in theory there's no guarantee that you'll actually handle A and then B... :frowning_face:

So, you have to make a far more advanced dance to achieve what you need here and basically reimplement a mailbox/queue for all those messages, and only consume it from a single task, like so:

actor LogHandler { 
  let logs: AsyncStream<String>
  var logsCC: AsyncStream<String>.Continuation!
  var processingTask: Task<Void, Never>? = nil
  
  static func make() {
    let cc = AsyncStream<String>.Continuation
    let logs: AsyncStream<String> = AsyncStream {
      cc = $0 // yes, escape it like this... it's actually fine to do so
    }
    return Self(logs: logs, cc: cc)
  }

  private init(logs: AsyncStream<String>, cc: AsyncStream<String>.Continuation) async {
    self.logs = logs
    self.logsCC = cc
    self.processingTask = Task { await self.processing() }
  }

   // also offer ways to shutdown the processing Task

  func processing() {
    for await log in logs {
      // handle
    }
  }

  nonisolated func log(message: String) {
    self.logsCC.yield(message)
  }
}

Something like that... You may have to dance around isolation a bit more; but that'll get you the events in send-order into the processing() task. Anything else, that creates a new Task{} for new calls will have undefined ordering...


Relatedly, we also built a similar type to AsyncStream over at swift-nio that is a bit nicer to use (and can handle different backpressure requirements, though here that doesn't matter); See: Implement a back-pressure aware `AsyncSequence` source by FranzBusch · Pull Request #2230 · apple/swift-nio · GitHub


I personally think we are missing an "enqueue and don't wait" operation for void returning actor methods which would do exactly what you'd want here (like other actor runtimes "tell" or "!").

Though we wanted to spend more time designing it but haven't yet had the time to deep dive into it.

FYI @Douglas_Gregor @Joe_Groff

6 Likes

Just to give a copy-pastable version of Konrad's solution

actor LogHandler { 
    nonisolated let logger: AsyncStream<String>.Continuation
  
    public init() async {
        var cc: AsyncStream<String>.Continuation!
        let logs: AsyncStream<String> = AsyncStream { cc = $0 }
        self.logger = cc
        Task {
            await self.processing(logs: logs)
        }
    }

    private func processing(logs: AsyncStream<String>) async {
        for await log in logs {
            // handle
        }
    }

    nonisolated func shutdown() {
        // Or whatever you want to do on shutdown
        self.logger.finish()
    }

    nonisolated func log(message: String) {
        self.logger.yield(message)
    }
}
3 Likes

just out of curiousity. why does this need to be an actor? and note this is (yet another) example of the need for oneway.

1 Like

In particular, looking at the processing example above which is the only isolated func, since it is private it can only be called the one time it is started in the init and is thread safe by virtue of the for await. These examples can just be a class.

Yeah, I'm not sure it helps to be an actor in this case, but you lose the compiler warnings of access violations. Fortunately, it's simple enough that I think I can be reasonably sure I got it right.

not sure what you mean by that. if you violate thread safety you will get a compiler error. if you don't get the error then you haven't tried something unsafe.