Since now we have a few implementations of Reactive Streams specs Combine, OpenCombine, and RxSwift. I believe we should consider adding standard/generic Rx and Future protocols to stdlib.
Motivation:
Unavailability of standard protocol/type in swift Stdlib for RxStreams and Future. It makes it impossible to create an API that is implementation/library independent. Example Future
from Combine is not compatible with EventLoopFuture
from SwiftNIO which is expected because they are different types and implementations.
The problem arises when we want to create an API that abstracts Future
implementation and can interoperate between Implementations.
// Combine implementation
import Combine
public protocol MyLib {
associatedtype Output
associatedtype Failure
func execute() -> Future<Output, Failure>
}
// NIO implementation
import NIO
public protocol MyLib {
associatedtype Output
associatedtype Failure
func execute() -> EventLoopFuture<Output, Failure>
}
This leaks my internal implementation. And doesn't allow me to create something like
public protocol MyLib {
associatedtype Output
associatedtype Failure
func execute() -> GenericFuture<Output, Failure>
}
and behind the seen swap implementation between.
public class MyLibCombine: MyLib {
public func execute() -> GenericFuture<String, Error> {
return Future { promise in
return promise(.success("Hello World!"))
}
}
}
// or implement it based of NIO
public class MyLibNIO: MyLib {
public func execute() -> GenericFuture<String, Error> {
return MultiThreadedEventLoopGroup.currentEventLoop!.makeSucceededFuture("Hello World!")
}
}
Proposed Solution:
Introduce protocols Publisher
, Subscriber
, Subscription
, Processor
, and Future
in the standard lib which implementations can conform. Similar to how JDK9
added interfaces as part of java.util.concurrent.Flow
package. I propose the following protocols.
// Rx Protocols
public protocol Publisher {
associatedtype Output
associatedtype Failure: Error
func subscribe<Sub: Subscriber>(sub: Sub) -> Void where Failure == Sub.Failure, Output == Sub.Input
}
public protocol Subscriber {
associatedtype Input
associatedtype Failure: Error
func onSubscribe(_ sub: Subscription) -> Void
func onNext(_ next: Input) -> Void
func onError(_ failure: Failure) -> Void
func onComplete() -> Void
}
public protocol Subscription {
func request(n: Int) -> Void
func cancel() -> Void
}
public protocol Processor: Subscriber, Publisher {
associatedtype Intput
associatedtype Output
}
// Future Protocol
public protocol Future {
associatedtype Output
associatedtype Failure: Error
func cancel(mayInterruptIfRunning: Bool) -> Bool
func isCancelled() -> Bool
func isDone() -> Bool
func get() -> Result<Output, Failure>
func get(wait timeout: Int) -> Result<Output, Failure>
}
These protocols are not set in stone... I attempted to create 1:1 mapping for RxStreams Specs and Java's Future interface, in swift to get things started.