Swift Async Algorithms Proposal: AsyncBufferedByteIterator

AsyncBufferedByteIterator

Introduction

Sources of bytes are a common point of asynchrony; reading from files, reading from the network, or other such tasks. Having an easy to use, uniform, and performant utility to make this approachable is key to unlocking highly scalable byte handling. This has proven useful for FileHandle, URL, and a number of others in Foundation.

This type provides infrastructure for creating AsyncSequence types with an Element of UInt8 backed by file descriptors or similar read sources.

struct AsyncBytes: AsyncSequence {
  public typealias Element = UInt8
  var handle: ReadableThing

  internal init(_ readable: ReadableThing) {
    handle = readable
  }

  public func makeAsyncIterator() -> AsyncBufferedByteIterator {
    return AsyncBufferedByteIterator(capacity: 16384) { buffer in
      // This runs once every 16384 invocations of next()
      return try await handle.read(into: buffer)
    }
  }
}

Detailed Design

public struct AsyncBufferedByteIterator: AsyncIteratorProtocol, Sendable {
  public typealias Element = UInt8

  public init(
    capacity: Int,
    readFunction: @Sendable @escaping (UnsafeMutableRawBufferPointer) async throws -> Int
  )

  public mutating func next() async throws -> UInt8?
}

For each invocation of next, the iterator will check if a buffer has been filled. If the buffer is filled with some amount of bytes, a fast path is taken to directly return a byte out of that buffer. If the buffer is not filled, the read function is invoked to acquire the next filled buffer, at which point it takes a byte out of that buffer.

If the read function returns 0, indicating it didn't read any more bytes, the iterator is decided to be finished and no additional invocations to the read function are made.

If the read function throws, the error will be thrown by the iteration. Subsequent invocations to the iterator will then return nil without invoking the read function.

If the task is cancelled during the iteration, the iteration will check the cancellation only in passes where the read function is invoked, and will throw a CancellationError.

Naming

This type was named precisely for what it does: it is an asynchronous iterator that buffers bytes.

7 Likes

Is there anything special about bytes? Or could this be generalised as an async iterator which buffers elements of any type?

Also, processing streams one byte at a time (and await-ing between each byte) can be quite inefficient. I wonder if this type should expose additional interfaces for consuming larger quantities of data from the buffer.

Of course, AsyncIteratorProtocol only works in terms of individual elements, so generic users and for await loops would use that interface; but if you know the concrete type, you would also be able to use those additional interfaces which go beyond AIP.

2 Likes

Not really... this is something that has come up for other things - but the one key portion here is that the byte variant can optimize down to some really fast code and is worth it's own specialization.

Normally this would be true, however the inline optimizer can do some pretty fancy-pants stuff. However @David_Smith and I have been plotting some other optimizations here to accommodate even further for this. I am not sure we are ready yet to really put forth a proposal for that portion, but it rests firmly on the implementation we have for this particular iteration style.

4 Likes

Yeah, we'd like to provide a bulk iteration mechanism, but the design I have in mind requires move-only types to do safely and efficiently.

As far as perf goes, we're not quite managing to lower this to memcpy yet, but we are managing to beat DispatchIO and FileHandle.readInBackground* on both time and memory footprint by pretty significant margins. My benchmark shows 5.28x and 8.4x faster, and 1.5x and 2.6x smaller, respectively. YMMV depending on the nature of the stream consumer, the hardware, and various other factors.

20 Likes

It looks great to me! My main concern is how this might interface with existing low-level byte types like Data, its NIO equivalent, and others.

What Iโ€™m trying to say is: this would be vastly more powerful as a type / proposal if we had some officially-sanctioned stdlib type to represent bytes. In another thread I saw some Collection<UInt8>, which seemed reasonable, but itโ€™d be great to formalise that (or a suitable alternative) in parallel with this proposal.

In general, I understand the desire to have this Iterator to not re-implement it but I have two questions around the implementation:

Sendability of the AsyncBufferedByteIterator

In the implemented code the AsyncBufferedByteIterator is adopting @unchecked Sendable and it has a comment that states the following:

  // If two tasks have access to this iterator then the references on
  // the storage will be non uniquely owned. This means that any reload
  // must happen into its own fresh buffer. The consumption of those
  // bytes between two tasks are inherently defined as potential
  // duplication by the nature of sending that buffer across the two
  // tasks - this means that the brief period in which they may be
  // sharing non reloaded bytes is to be expected; basically in that
  // edge case of making the iterator and sending that across to two
  // places to iterate is asking for something bizzare and the answer
  // should not be crash, but it definitely cannot be consistent.

In general, I agree that sharing this Iterator across tasks is bad, but why should we even allow it in the first place? Isn't the more sane thing to do to make the AsyncBufferedByteIterator not Sendable. This avoids a whole range of problems and as stated in the comment we don't expect it to happen at all.

Cancellation

If the task is cancelled during the iteration, the iteration will check the cancellation only in passes where the read function is invoked, and will throw a CancellationError .

I think the reason you want to do this is because checking cancellation is expensive and it will significantly slow down the performance of this Iterator. However, if you have buffered a large amount of data and the consumer of that data is doing a longer running thing on each received byte then cancellation can take a very long time. Have you measured how bad it is to check cancellation in each next call? For correctness sake and to adhere to the cooperative cancellation in Swift, I think it is quite important that we cancel as fast as possible. You also alluded to a potential bulk iteration mechanism which would fix the overhead of checking cancellation on each next() call while still cancelling earlier.

1 Like

Yes, itโ€™s several orders of magnitude slower. For context here, next is allowed to do roughly a single cpu clock cycle of work in order for this to run at full speed on a modern SSD.

1 Like

Thanks for confirming this and god damn this is fast :smiley:

Since, we are calling out the cancellation behaviour in the docs that ought to be fine. However, I am wondering if we should also call this out on the parameter doc of capacity since this is not only influencing the size of the read but also when the iterator is going to yield the next time and check for cancellation.

I still believe that we really should drop the Sendable conformance since it makes everything more complicated. If consumers want to share across Tasks we should provide a share/multicast algorithm.

1 Like

This is a good point. I'm used to the predecessor to this in FileHandle, where it's not (currently) configurable, so had forgotten about that aspect.

1 Like

I'm very interested in this performance you are referring to. I looked at the source and I don't see where you are reading from a file or socket. I must be missing something because all I see is it copying from a an array literall but you referenced FileHandle and DispatchIO

I made an async streams package IOStreams that produces Data items. At this point it has a lot of other features but one of the reasons I started down that path was because the byte oriented async sequences from FileHandle and URLSession are so ridiculously slow.

For the FileSink and FileSource I used DispatchIO because it's asynchronous and really fast.

This is a building block that you can use to make things like FileHandle.bytes. i.e. it's the same setup as that, but factored out for people to use with whatever way of getting the bytes they want. In FileHandle's case, it (currently) fills the buffer with read in an actor.

The performance improvement over DispatchIO is not because of any particularly clever way of reading, it's because DispatchIO has to allocate memory for each chunk it reads, where this can reuse a single buffer repeatedly.

2 Likes