Generalizing over Sequence and AsyncSequence

I'm building a JSON tokenizer and would like to emit tokens as data comes in to allow SAX-style parsing, and other features like partial parsing to minimize memory use. It seems natural to make the tokenizer an AsyncSequence and accept URLSession AsyncSequences as input.

As a simplified example, consider a LineSplitter that accepts an AsyncSequence of Characters and is itself an AsyncSequence of Strings.

public struct LineSplitter<DataSource>: AsyncSequence
where DataSource: AsyncSequence, DataSource.Element == Character {
    public typealias Element = String

    public struct AsyncIterator : AsyncIteratorProtocol {
        var sourceIterator: DataSource.AsyncIterator
        private var finished = false

        init(source: DataSource.AsyncIterator) {
            self.sourceIterator = source
        }

        public mutating func next() async throws -> String? {
            guard !finished else { return nil }

            var line = ""

            while let character = try await sourceIterator.next() {
                if character == "\n" {
                    return line
                } else {
                    line.append(character)
                }
            }

            finished = true
            return line
        }
    }

    let dataSource: DataSource

    public init(dataSource: DataSource) {
        self.dataSource = dataSource
    }

    public func makeAsyncIterator() -> AsyncIterator {
        return AsyncIterator(source: dataSource.makeAsyncIterator())
    }
}

This supports the following usage, printing lines as the data comes from URLSession.

Task {
    let request = URLRequest(url: URL(string: "https://swift.org")!)
    let (chars, _) = try await URLSession.shared.bytes(for: request)

    for try await line in LineSplitter(dataSource: chars.characters) {
        print("LINE:", line)
    }
}

The first question is whether this is even a good way to build this. It does require quite a lot of boilerplate to be a AsyncSequence. And maybe it doesn't make sense for the splitter itself to be an AsyncSequence? Maybe there's a better starting point.

The second question is how to adapt this to accept a Sequence and be a Sequence without rewriting the entire tokenizer. If you have [Character] as an input, there's no reason to require callers to try await. In a JSON parser, next() is a little more complex, and may advance the sourceIterator multiple times, so this creates a lot of duplicate code. I haven't figured out a way to write next() so it generalizes over both AsyncSequence and Sequence.

Adapting Sequence into an AsyncSequence is simple. But then the caller must always use await even when nothing is async. And I still want the results to be lazy.

3 Likes

Unfortunately I think making the Sequences into AsyncSequences is the best you’ll be able to do here. The calling conventions are incompatible. We should make sure that generates code that doesn’t impose unnecessary costs on the synchronous case though.

(And I’m sure you saw but there’s a line splitting one included already)

1 Like

Thanks. Yes, line splitting is just a really easy example. My goal is a JSON tokenizer. I've gotten it down to the following by extracting a synchronous "engine" component that does the tokenizing (line splitting). But the Sequence/AsyncSequence boilerplate is more than I'd really like. But not terrible, and doesn't have to duplicate the tokenizing logic (which is generally more complex than the line splitting example).

My main concern about requiring await is that it spreads "async-ness" to places it isn't meant (which then tends to take over everything). It's not really a performance concern.

// Core "engine" that receives characters and emits tokens eventually.
// (Lines in this case, but generally tokens.)
struct LineSplitterEngine {
    var line = ""
    mutating func receive(character: Character) -> String? {
        if character == "\n" {
            defer { line = "" }
            return line
        } else {
            line.append(character)
            return nil
        }
    }
    mutating func finish() -> String? {
        defer { line = "" }
        return line.isEmpty ? nil : line
    }
}

// Async wrapper
public struct AsyncLineSplitter<DataSource> : AsyncSequence
where DataSource: AsyncSequence, DataSource.Element == Character {
    public typealias Element = String

    public struct Iterator : AsyncIteratorProtocol {
        var sourceIterator: DataSource.AsyncIterator
        private var finished = false

        init(source: DataSource.AsyncIterator) {
            self.sourceIterator = source
        }

        var engine = LineSplitterEngine()

        public mutating func next() async throws -> String? {
            guard !finished else { return nil }

            while let character = try await sourceIterator.next() {
                if let line = engine.receive(character: character) {
                    return line
                }
            }

            finished = true
            return engine.finish()
        }
    }

    let dataSource: DataSource

    public init(dataSource: DataSource) {
        self.dataSource = dataSource
    }

    public func makeAsyncIterator() -> Iterator {
        return Iterator(source: dataSource.makeAsyncIterator())
    }
}


// Synchronous, which is almost identical.
public struct LineSplitter<DataSource> : Sequence
where DataSource: Sequence, DataSource.Element == Character {
    public typealias Element = String

    public struct Iterator : IteratorProtocol {
        var sourceIterator: DataSource.Iterator
        private var finished = false

        init(source: DataSource.Iterator) {
            self.sourceIterator = source
        }

        var engine = LineSplitterEngine()

        public mutating func next() -> String? {
            guard !finished else { return nil }

            while let character = sourceIterator.next() {
                if let line = engine.receive(character: character) {
                    return line
                }
            }

            finished = true
            return engine.finish()
        }
    }

    let dataSource: DataSource

    public init(dataSource: DataSource) {
        self.dataSource = dataSource
    }

    public func makeIterator() -> Iterator {
        return Iterator(source: dataSource.makeIterator())
    }
}

let request = URLRequest(url: URL(string: "https://swift.org")!)

// Async Version passes an AsyncBytes.characters
Task {
    let (chars, _) = try await URLSession.shared.bytes(for: request)

    for try await line in AsyncLineSplitter(dataSource: chars.characters) {
        print("LINE:", line)
    }
}

// Synchronous Version passes a String.
Task {
    let (data, _) = try await URLSession.shared.data(for: request)
    let chars = String(data: data, encoding: .utf8)!

    for line in LineSplitter(dataSource: chars) {
        print("LINE:", line)
    }
}

For me this seems like the case for a library function “block on this task and assert if it ever awaits something that isn’t ready”.

runToCompletionWithoutBlocking {
  try await parse(input)
}

Then you can make a synchronous wrapper around your async logic. It falls down a bit if you want to expose each part of your pipeline, though.

(This is now_or_never in Rust, although that produces an Optional rather than asserting.)

EDIT: I suppose this could be “the simplest custom executor”.

5 Likes

Obviously, given our history, the correct spelling for this is await!

:smiley:

4 Likes

I just want to say that now_or_never is an absolutely adorable spelling of that. I'm delighted :smiley:

4 Likes