Add Protocols for ReactiveStreams and Future in standard library

Since now we have a few implementations of Reactive Streams specs Combine, OpenCombine, and RxSwift. I believe we should consider adding standard/generic Rx and Future protocols to stdlib.

Motivation:

Unavailability of standard protocol/type in swift Stdlib for RxStreams and Future. It makes it impossible to create an API that is implementation/library independent. Example Future from Combine is not compatible with EventLoopFuture from SwiftNIO which is expected because they are different types and implementations.
The problem arises when we want to create an API that abstracts Future implementation and can interoperate between Implementations.


// Combine implementation
import Combine

public protocol MyLib {
    associatedtype Output
    associatedtype Failure
    
    func execute() -> Future<Output, Failure>
}

// NIO implementation
import NIO

public protocol MyLib {
    associatedtype Output
    associatedtype Failure
    
    func execute() -> EventLoopFuture<Output, Failure>
}

This leaks my internal implementation. And doesn't allow me to create something like

public protocol MyLib {
    associatedtype Output
    associatedtype Failure
    
    func execute() -> GenericFuture<Output, Failure>
}

and behind the seen swap implementation between.

public class MyLibCombine: MyLib {
    public func execute() -> GenericFuture<String, Error> {
        return Future { promise in
            return promise(.success("Hello World!"))
        }
    }   
}
// or implement it based of NIO
public class MyLibNIO: MyLib {
    public func execute() -> GenericFuture<String, Error>  {
        return MultiThreadedEventLoopGroup.currentEventLoop!.makeSucceededFuture("Hello World!")
    }
}

Proposed Solution:

Introduce protocols Publisher, Subscriber, Subscription, Processor, and Future in the standard lib which implementations can conform. Similar to how JDK9 added interfaces as part of java.util.concurrent.Flow package. I propose the following protocols.

// Rx Protocols
public protocol Publisher {
    associatedtype Output
    associatedtype Failure: Error
    
    func subscribe<Sub: Subscriber>(sub: Sub) -> Void where Failure == Sub.Failure, Output == Sub.Input
    
}

public protocol Subscriber {
    associatedtype Input
    associatedtype Failure: Error
    
    func onSubscribe(_ sub: Subscription) -> Void
    func onNext(_ next: Input) -> Void
    func onError(_ failure: Failure) -> Void
    func onComplete() -> Void
}

public protocol Subscription {
    func request(n: Int) -> Void
    func cancel() -> Void
}

public protocol Processor: Subscriber, Publisher {
    associatedtype Intput
    associatedtype Output
}

// Future Protocol

public protocol Future {
    associatedtype Output
    associatedtype Failure: Error
    
    func cancel(mayInterruptIfRunning: Bool) -> Bool   
    func isCancelled() -> Bool   
    func isDone() -> Bool  
    func get() -> Result<Output, Failure>  
    func get(wait timeout: Int) -> Result<Output, Failure>
    
}

These protocols are not set in stone... I attempted to create 1:1 mapping for RxStreams Specs and Java's Future interface, in swift to get things started.

8 Likes

This is the same problem that Promises/A+ solves in Javascript. Although not type checked, and not in the standard library, A+ is a set of informal interface declarations, that any Promise library is expected to follow. This means you can work with both ES6 Promise, Blackbird or any other third party implementation and use pass one as input to another.

Unfortunately it's not currently possible (I think?) to declare eg. map like this. We can't return -> Self<Input, Output> and also not return -> some Future where ...