Parallelizing work with back pressure

Hi there, I'm trying to use Swift Concurrency to optimize some code that reads binary encoded protobuff messages from a file, and then stores them into the database.

I need to do this in a stream fashion because the files are potentially very large, also, the bottleneck of the process is the part where we store them into the database (this is an order of magnitude or two slower than parsing the element form the file). I can't know the number of elements I will need to parse beforehand.

The database is thread safe and I would like to parallelize that part to improve performance, as the majority of the CPU time is spent compressing the added entries.

This is my current (non-parallelized) simplified version of my code:

let fileHandle = try FileHandle(forReadingFrom: url)
let asyncSequence = fileHandle.bytes.binaryProtobufDelimitedMessages(of: Int.self)

var results: [Result] = []

for try await element in asyncSequence {
    let result = db.store(element) // This is very slow
    results.append(result)
}

To parallelize the DB process, I thought of using Swift Async Algorithm's AsyncChannel together with task groups but I'm not sure if there is a simpler way.
This is kind of where I am right now, I haven't been able to test it yet:

let fileHandle = try FileHandle(forReadingFrom: url)

let asyncSequence = fileHandle.bytes.binaryProtobufDelimitedMessages(of: Int.self)

let groupChannel = AsyncChannel<Void>()

Task {
    for _ in 0 ..< ProcessInfo.processInfo.activeProcessorCount {
        await groupChannel.send(())
    }
}

try await withThrowingTaskGroup(of: Result.self) { group in
    for try await (element, _) in zip(asyncSequence, groupChannel) { // I probably need a way to terminate groupChannel when asyncSequence has no more elements...
        group.addTask {
            let result = db.store(element)
            groupChannel.send(())
            return result
        }
    }

    try await group.waitForAll() // Probably not needed?
}

Is this reasonable or is there a simpler way of doing this that I'm missing?

Thanks in advance

2 Likes

I have a similar topic coming up (ie: importing a large file to a production system while limiting parallel requests) and had it spinning in my head.

I have not done anything yet, but here is what I would have started with (untested pseodo code based off your snipped):

let fileHandle = try FileHandle(forReadingFrom: url)
let asyncSequence = fileHandle.bytes.binaryProtobufDelimitedMessages(of: Int.self)

try await withThrowingTaskGroup(of: Result.self) { group in
    let maxParallelTasks = ProcessInfo.processInfo.activeProcessorCount
    var taskCount = 0

    for try await element in asyncSequence {
        group.addTask {
            let result = db.store(element)
            groupChannel.send(())
            return result
        }

        if taskCount < maxParallelTasks {
            taskCount += 1 //the first few tasks start immediately, until maxParallelTasks
        } else {
            _ = try await group.next() //start awaiting any task before adding the next
        }
    }

    try await group.waitForAll()
}

Not sure if there is anything more to consider or to rearrange (just typed this out in the forum editor), but I would love for async-algorithms to provide an idiomatic "process off this sequence using this async closure with a max parallel setting" sink.

... or something like a flatMap with a "go-a-bit-parallel" setting....

one more thought:

if you just want to parallelize based on local core count, you should not need to limit this at all - just use a TaskGroup (or DiscardingTaskGroup) and go wild. also, I noticed your db.store is not async -> thou shalt not block on a concurrency runtime thread ; )

In my case I want to limit the "bandwidth" I unleash towards the production system - so it is not really bound by CPU.

Thanks for the thoughts :slight_smile:

Although the db process is synchronous, as long as there are no dependencies and each task is able to make forward progress I think it should be okay. I think of it as an expensive calculation.

I would like to let the concurrency runtime to scale things for me automatically but I haven't found a better way to do that other than sending those initial values to the group stream to get things started.

Don't go wild. Task groups place no limits on how many tasks may be created and exist concurrently, only how many can be executing concurrently. You still need to use e.g. a counting semaphore, if you want back-pressure and to avoid task explosion (or avoid Structured Concurrency entirely).

You might be thinking of GCD's concurrentPerform which does limit the number of live tasks to the machine's logical core count.

yeah, you are right - going wild would potentially suck all off the file into memory.

in terms of "task explosion": as I understand it you'd have a ton of tasks lined up, but the concurrency runtime's thread pool will still stay narrow, right?

Correct. Swift's Structured Concurrency system never attempts to execute more items in parallel than the logical core count of the machine (and sometimes less, in odd contexts, such as the iOS simulator on a Mac).

In contrast, GCD will oversubscribe the cores with threads (by about two orders of magnitude) - allowing you a degree of liberty to block inside a GCD task, unlike Structured Concurrency tasks which expressly forbid blocking and behave very poorly if you do so. The cost of that liberty is that GCD can incur some pre-emptive context switching and therefore extra overhead (iff you actually overload it with concurrent tasks - which you can largely avoid through use of mechanisms like concurrentPerform).

Fundamentally, threads - whether created via GCD or any other mechanism - will contend more aggressively with each other in a way that incurs more overhead, because of their pre-emption behaviour. In fact, you usually get some of that overhead even if you don't have more threads than cores, because the kernel's thread scheduler on Apple's platforms is not great ay spreading threads evenly and stably across available cores. But the problem does get notably worse when the runnable thread count exceeds actual core count.

That all said, having lots of Tasks is still undesirable in the same way as having lots of enqueued blocks (in GCD) or lots of threads. The inherent overhead of creating & maintaining a Task is less than a full thread (but maybe more expensive than creating & enqueuing a block on a dispatch queue, from what I've seen). More concerning is the application-specific overhead, like memory associated with each task, as well as suboptimal ordering of work (e.g. reading in the contents of all files way ahead of being able to actually process them, causing excessive peak memory use and unnecessarily bottlenecking I/O on the system).

4 Likes

And to add: also prefers low power cores over high performance ones even on desktop machines on power - which is just a bit crazy as a default. I fully understand that bias on battery powered devices, but now I get suboptimal performance on my 20-core Mac Studio as it just wants to use the low power cores first. I wish the default for plugged in machines was the reverse - use high performance cores as far as possible and only scale out to the high efficiency ones if needed. Should probably create a FB on this one day, as it’s really not what you want from your high performance desktop machine….

1 Like

Worse, I've seen it refuse to use the P cores on a battery-powered laptop, even with plenty of waiting Tasks. I don't know if that's intentional or not (there are a lot of weird behaviours re. job scheduling on Apple Silicon, particularly around P vs E core use, and it's not all specific to use of Swift Tasks).

Still, I don't think that should - in principle - be a factor in whether to use Structured Concurrency or not. Hopefully at worst those sorts of things are merely bugs that Apple will fix.

Well, that said… from what I've observed, one of the dubious design choices is that tasks with 'background' priority are never executed on P cores. Whether they come from Structured Concurrency or GCD. That seems obviously intentional and Apple might mistakenly believe it's a feature.

1 Like

Yeah, that’s definitely an anti feature too.

I understand the desire to be frugal with battery by default, but doing “pro” machines with performance ambitions should imply the users to make use of that power.

Personally I’d be happy to have a performance vs battery saving setting (which when plugged in should default to performance) - just as was possibly for the GPU on older intel based pro laptops.

3 Likes

I filed FB13223271 for the few things we discussed here with E/P cores and scheduling, don't have too high hopes it will lead to something, but then it's at least reported.

2 Likes

Please do that and publish @ OpenRadar. I'd love to duplicate.

1 Like

https://openradar.appspot.com/radar?id=5615292709339136

4 Likes

I think that the initial post is missing some information:

  • what database are you using? - maybe it is something specific to a given db. SQLite works differently than MySQL.
  • how many rows? - if it is below 100k then I would not expect any major problems. 500k? This is the range where things start to matter. Though, it also depends on the time budget. If you insert 10 million rows and your budget is 1 second, then 'it will not work'.
  • do you insert a single row at the same time? - your code suggest so: element (singular) as a variable name. This is definitely NOT what you want to do. I think that if you were doing things in batches then you would mention it in the post, so I assume: 'no'.
  • do files/rows need to be inserted in a specific order? - you mention parallel tasks, so I assume 'no'. This is very important for design/performance of the whole operation.
  • multiple files with hundreds of rows OR single file with million of rows - having multiple files allows you to process multiple files at the same time.

In general SQL is designed to process multiple data at the same time. If your db supports BULK INSERT (and similar operations) then go for it. Most of the databases already have some solution for 'inserting millions of very fast'. If your driver support is then just use it.

Otherwise you should try to reduce round trips to the server - insert multiple rows at the same time. This also means that you are 'compressing' (or more precisely: putting rows into a SQL protocol data frame) multiple rows at the same time. From my experience decreasing the number of operations is more important than the size of a single operation. This means that it is better to do 1000 operations with 1000 rows each than 1_000_000 operations with 1 row each (by a lot). You mention that the compression is a problem, but are you already batchig stuff, or just sending a single row at the same time?

Also, you mention that 'database is thread safe', but is your database driver thread safe? Can the driver properly 'do many things at the same time'? I have seen a case when driver opened multiple connections to the same db which promoted the transaction to distributed transaction (!). Though this would not be an 'compression' problem.

Tbh. Whatever you do on the box in terms of CPU, should not matter because in just a second wou will be doing networking IO. Usually insertion speeds should be in tens of thousands of rows per second (at least, this number is for a VERY weak box). Have you tried to import similar data using some different tool? What were the times? Maybe it is a bug in the SQL driver?

IMPORTANT: You may need to do the whole operation in transaction! I don't see you mentioning it.

IMPORTANT-2: In a certain kind of business you are not allowed to do things in parallel, because this may reorder operations.

sliemeobn Simon Leeb wrote:
I noticed your db.store is not async -> thou shalt not block on a concurrency runtime thread ; )

How can you make a sync operation exposed by a database driver async? At some point it has to block:

  1. Driver send data
  2. Driver waits for ACK

You driver needs to support async for the whole operation to be async. In local terms: if read is sync then you can't make it async, as soon as you call the function it will block the thread.

Pseudo-code

When batching SQL writes the database should not be that big of a problem. Even a single thread should be able to do 'sufficiently' well below 500k (though it also depends on the data, indexes (indices?) and triggers also have cost). But you have not mentioned the database you are using/estimated number of rows/time budget, so it is hard to tell.

func main(files: [File]) async {
  var timer = Timer()
  timer.start()

  // If you need everything in 1 transaction.
  // Though remember that long transactions are BAD!
  // (like really, really BAD)
  let db = Database(…)
  await db.beginTransaction()

  let results = await withTaskGroup(of: FileReadResult.self) { group in
    // Assuming that your business rules allow you to process files in parallel.
    // In some scenarios (banks, stocks, ERP, finance) this may not be allowed.
    //
    // I think that it is perfectly fine to start 20 tasks at the same time.
    // Maybe if you have more files then you should do something (counting semaphore?).
    // You are doing IO operations (file reads, db writes), those are a few
    // orders of magnitude heavier than `Task`.
    //
    // Also, do not use `processorCount`, the whole point of Swift structured
    // concurrency is to not care about such stuff.
    for file in files {
      group.addTask {
        let reader = FileReader(file: file, db: db)
        await reader.run()
      }
    }

    // FileReadResult = statistics for a single file (row count, duration etc…).
    var acc = [FileReadResult]()

    for await result in group {
      acc.append(result)
    }

    return acc
  }

  await db.endTransaction()

  timer.end()
  let duration = timer.duration.secondsString

  // Log 'results/duration' somewhere.
  // How many rows in total? How long did it take? etc…
}

struct FileReadResult {
  /// How many rows were processed?
  let rowCount: Int
  /// How long did the read take?
  let duration: Duration
}

actor FileReader {

  private static let partitionCount = 5000

  private let file: File
  private let db: Database

  /// Read file and send to database.
  func run() -> FileReadResult {
    var partition = [Row]()
    partition.reserveCapacity(Self.partitionCount)

    // Your file reading logic.
    for row in self.file {
      partition.append(row)

      if partition.count = Self.partitionCount {
        await self.db.insert(rows: partition)
        partition.clear(preserveCapacity: true)
      }
    }

    return …
  }
}

actor Database {

  /// Insert multiple rows at the same time.
  func insert(rows: [Row]) async {
    // Depends on the database.
    // Nothing that we can help with, if we do not know the db.
  }

  /// Remember to keep your transaction as small as possible!
  func beginTransaction() async {}
  /// In real life this may throw!
  func endTransaction() async {}
}
1 Like