Stream API

In the pitch for unifying Sequence and Collection there is mention of a Stream API for the case of potentially infinite sequences/collections. Below is a suggestion for such an API (note it is deliberately minimal to keep the post to a reasonable length).

Comments/suggestions?

// Stream definitions.

// Alternative to `IteratorProtocol`, `LazySequence`, etc. that is more expressive and has defined behaviour.
// Small ABI surface area (1 enum, 2 classes, and 3 extensions).
// Requires class semantics (so that lazy methods can be chained - if ownership allowed transfer of ownership then a struct could be used).
// Potentially infinite.
// Single traversal only (unless derrived class of `Stream` adds extra functionality).
// Eager methods, `next`, `reduce`, etc., all throw (so that Swift error handling can be used).
// Could add parallel at later date to give parallel, lazy evaluation.

public enum StreamControl: Error {
    case end // Thrown by `next()`, or any other method, to mark end of traversal.
}

/* abstract */ open class Stream<T> {
    /* protected abstract */ open func next() throws -> T {
        fatalError("`Stream.next()` must be overridden.")
    }
    
    public final func map<R>(_ transform: @escaping (T) throws -> R) -> Stream<R> {
        return MapStream(self, transform: transform)
    }
    
    public final func reduce<R>(_ initialResult: R, _ nextPartialResult: (R, T) throws -> R) throws -> R {
        var result = initialResult
        do {
            while true {
                result = try nextPartialResult(result, next())
            }
        } catch StreamControl.end {
            return result
        } catch {
            throw error
        }
    }
    
    // Other `Collection` style `func`s that can be implemented using a single-pass `next()`.
}
fileprivate final class MapStream<I, O>: Stream<O> {
    private let previous: Stream<I>
    private let transform: (I) throws -> O
    init(_ previous: Stream<I>, transform: @escaping (I) throws -> O) {
        self.previous = previous
        self.transform = transform
    }
    override func next() throws -> O {
        return try transform(previous.next())
    }
}

public extension Collection {
    public var stream: Stream<Element> { // The collection *converted* to a stream.
        return CollectionStream(self)
    }
}
fileprivate final class CollectionStream<T>: Stream<T> {
    private let nextClosure: () throws -> T
    init<C>(_ collection: C) where C: Collection, C.Element == T {
        var index = collection.startIndex
        nextClosure = {
            guard index < collection.endIndex else {
                throw StreamControl.end
            }
            let result = collection[index]
            index = collection.index(after: index)
            return result
        }
    }
    override func next() throws -> T {
        return try nextClosure()
    }
}

public extension Numeric {
    public var stream: ValueStream<Self> { // The numeric *converted* to a stream.
        return ValueStream(self)
    }
}
public extension Optional {
    public var stream: ValueStream<Wrapped> { // The optional *converted* to a stream.
        return ValueStream(self)
    }
}
public final class ValueStream<T>: Stream<T> {
    private var optional: T?
    public init(_ optional: T? =  nil) {
        self.optional = optional
    }
    override public func next() throws -> T {
        guard let value = optional else {
            throw StreamControl.end
        }
        optional = nil
        return value
    }
}

// Tests

print(try 1.stream.reduce(0, +)) // 1
print(try (2 as Int?).stream.reduce(0, +)) // 2
print(try [1, 2, 3].stream.map { 2 * $0 }.reduce(0, +)) // 12
print(try "abc".unicodeScalars.stream.map { $0.value }.reduce(0, ^)) // 96
print(try (0 ... Int.max).stream.map { // Could be infinite.
    guard $0 <= 17 else {
        throw StreamControl.end // Any part of chain can signal end.
    }
    return $0
}.reduce(0, +)) // 153
var c = [1, 2, 3]
let s = c.stream
print(try s.reduce(0, +)) // 6
print(try s.reduce(0, +)) // 0, stream already treversed.
c.append(4) // COW prevents extra traversal, even though underlying array modified.
print(try s.reduce(0, +)) // 0, stream already treversed.
enum RandomError: Error {
    case example
}
do {
    _ = try 1.stream.map { _ -> Int in
        throw RandomError.example // Can throw any error (normal Swift error handling).
    }.reduce(0, +)
} catch {
    print(error) // example
}

Did you really meant to say ABI instead of API, and btw. there is a typo in the first sentence Steam instead of Stream?!

Thanks x 2. Corrected.

1 Like

Can you address the possible name collision when import foundation?
https://developer.apple.com/documentation/foundation/stream

How would people feel about Flow instead of Stream?

I think it’s a bit unfortunate that Steam is already taken, since this is a term of art. I think most people are going to be looking for something named Stream. So Flow doesn’t feel like the name.

Personally, I would rename Foundation.Stream to something like ByteStream. Really though, we shouldn’t care about the collision since it can always be disambiguated, and standard library and new Swift types should take priority over the rather terrible Foundation.Stream, which needs too be replaced anyway.

1 Like

Stream an abstract class which begs the question why was is renamed as part of https://github.com/apple/swift-evolution/blob/master/proposals/0086-drop-foundation-ns.md

I wonder if typealias Stream = Swift.Stream would solve our problem. It is unfortunate. I think the class Foundation.Stream should be deprecated in swift 4 and renamed in Swift 5 but this is complicated because the Foundation process is not open.

Changes to Foundation that affect Apple platform APIs aren’t open, but the Swift overlay is, AFAIK, so a renaming should be possible.

2 Likes

Could you say a bit about how this is more expressive than iterators, and what the undefined behaviour is there? I know you were trying to keep it short, but with little explanation it’s hard to deduce the reasons for the choices you’ve made with the core API:

open class Stream<T> {
    open func next() throws -> T { fatalError() }
}

This seems quite similar to implementing IteratorProtocol for classes (and providing convenience transforms like map/reduce on that protocol instead of just Sequence), except using subclassing and throws instead of the (arguably) more Swifty generics & protocols and Optional. (To be clear, Optional seems more Swifty than using throws for non-error control flow.)

3 Likes

Point taken, I didn’t really explain the reasoning!

  1. Undefined behaviour
    1a. It is defined as single pass whereas both IteratorProtocol and Lazy can be either single or multiple (you don’t know).
    1b. All methods work with a potentially infinite Stream; e.g. no buffering.
    1c. All methods work without knowing the length of the Stream in advance.
    1d. Defined behaviour after single pass ended, as demonstrated by:
       var c = [1, 2, 3]
       let s = c.stream
       print(try s.reduce(0, +)) // 6
       print(try s.reduce(0, +)) // 0, stream already treversed.
       c.append(4) // COW prevents extra traversal, even though underlying array modified.
       print(try s.reduce(0, +)) // 0, stream already traversed.
  1. Expressive
    2a. Normal Swift error propagation via throw; even for lazy methods, because eager methods all throw.
    2b. With the current return nil semantics of next() only next() can signal finished; therefore if it was required that map for example decided the calculation was finished it would be tricky to write for IteratorProtocol and also for LazySequenceProtocol, whereas for Stream it is easy:
       print(try (0 ... Int.max).stream.map { // Could be infinite.
           guard $0 <= 17 else {
               throw StreamControl.end // Any part of chain can signal end.
           }
           return $0
       }.reduce(0, +)) // 153

Iterators are always single-pass. It’s lifecycle goes like this:

  1. Create an instance (with Sequence.makeIterator() or some other way)
  2. Call it.next() until the iterator returns nil
  3. All subsequent calls to it.next() are defined as returning nil

Interestingly, IteratorProtocol.reduce seems broken for value-types. This looks like a bug to me:

var c = [1, 2, 3]
var it = c.makeIterator()

it.reduce(0, +) // 6
it.reduce(0, +) // 6... wait, what?
it.reduce(0, +) // 6... again?!

it.next() // 1
it.next() // 2
it.next() // 3
it.next() // nil
it.next() // nil
it.reduce(0, +) // 0...
it.reduce(0, +) // 0
1 Like

I don’t think that is a bug, as such. From my understanding of the semantics here, that is just undefined behaviour.

The return value of it.next() after the first nil used to be undefined. It was changed in Swift 3 so that all subsequent calls also return nil. Looking at the history and method naming, it seems that iterators were intended to be single-pass from the beginning. Others can say whether or not that is indeed the case.

1 Like

IteratorProtocol.reduce doesn’t exist.

You’re using the Sequence.reduce method of your it: IndexingIterator variable.

3 Likes

I object to the name Stream, in any context. It’s such a vague word that naming something a Stream will inevitably cause a lot of conflicts with existing code. FRP has streams, Foundation has streams, …

3 Likes

I don’t see the benefit over IteratorProtocol. As @Karl said, iterators are already single pass and returning an optional seems like a better choice than throwing for an expected non-error condition: which is exactly what reaching the end of a stream is.

Please provide alternative names. Foundation’s current name can be easily solved with a depreciation and rename. I do not think that third party libraries’ naming choice of types should deter inclusion in the standard library. The standard library needs to reserve the right to introduce types freely without having to worry about third party collisions. This would be like saying that the standard library can never introduce Either because of a popular third party library which uses the same name.

On a related note, I’m very happy with the Cursor protocol embedded in GRDB: https://github.com/groue/GRDB.swift/blob/GRDB3/GRDB/Core/Cursor.swift

Cursor is a protocol designed to handle SQLite lazy results, but can be used independently. It is not based on Iterator and Sequence, because those can’t handle errors (and a sequence of Result, which has a different meaning, was not a good fit either). Cursor is a reference type, because I did not want to fight with value types semantics, when cursors were designed to handle external resources. But community suggestions may help sorting this out.

Cursor may share some interesting traits with the topic discussed in this thread. It comes with ready-made support for map, filter, etc.

Oh, I didn’t mean the nil thing, I meant the question of whether copying an iterator (presumably what is happening inside the reduce method) and advancing it invalidates the other copies of the iterator.