Here's a custom Publisher approach.
It's a copy-past-modify job from GitHub - cx-org/CombineX: Open source implementation of Apple's Combine 's TryPrefixWhile implementation.
Lots more code, but much is dependencies... the actual publisher logic isn't TOO complex. Performance is about 5x of the composed flatMap solution. Depending on use case it might not matter... for 1000 elements the timings were 0.001 vrs 0.005.
Probably flatMap solution is best for most everyone... but was fun to make a publisher!
import Combine
extension Publisher {
public func cut(after predicate: @escaping (Output) -> Bool) -> Publishers.CutAfter<Self> {
.init(upstream: self, predicate: predicate)
}
}
extension Publishers {
public struct CutAfter<Upstream: Publisher>: Publisher {
public typealias Output = Upstream.Output
public typealias Failure = Upstream.Failure
public let upstream: Upstream
public let predicate: (Upstream.Output) -> Bool
public init(upstream: Upstream, predicate: @escaping (Publishers.CutAfter<Upstream>.Output) -> Bool) {
self.upstream = upstream
self.predicate = predicate
}
public func receive<S: Subscriber>(subscriber: S) where Upstream.Failure == S.Failure, Upstream.Output == S.Input {
upstream
.tryCut(after: predicate)
.mapError {
$0 as! Failure
}
.receive(subscriber: subscriber)
}
}
}
extension Publisher {
public func tryCut(after predicate: @escaping (Output) throws -> Bool) -> Publishers.TryCutAfter<Self> {
.init(upstream: self, predicate: predicate)
}
}
extension Publishers {
public struct TryCutAfter<Upstream: Publisher>: Publisher {
public typealias Output = Upstream.Output
public typealias Failure = Error
public let upstream: Upstream
public let predicate: (Upstream.Output) throws -> Bool
public init(upstream: Upstream, predicate: @escaping (Publishers.TryCutAfter<Upstream>.Output) throws -> Bool) {
self.upstream = upstream
self.predicate = predicate
}
public func receive<S: Subscriber>(subscriber: S) where Upstream.Output == S.Input, S.Failure == Publishers.TryCutAfter<Upstream>.Failure {
let s = Inner(pub: self, sub: subscriber)
upstream.subscribe(s)
}
}
}
extension Publishers.TryCutAfter {
private final class Inner<S>:
Subscription,
Subscriber,
CustomStringConvertible,
CustomDebugStringConvertible
where
S: Subscriber,
S.Input == Output,
S.Failure == Failure {
typealias Input = Upstream.Output
typealias Failure = Upstream.Failure
typealias Pub = Publishers.TryCutAfter<Upstream>
typealias Sub = S
typealias Predicate = (Upstream.Output) throws -> Bool
let lock = Lock()
let predicate: Predicate
let sub: Sub
var state = RelayState.waiting
init(pub: Pub, sub: Sub) {
predicate = pub.predicate
self.sub = sub
}
deinit {
lock.cleanupLock()
}
func request(_ demand: Subscribers.Demand) {
lock.withLockGet(state.subscription)?.request(demand)
}
func cancel() {
lock.withLockGet(state.complete())?.cancel()
}
func receive(subscription: Subscription) {
guard lock.withLockGet(state.relay(subscription)) else {
subscription.cancel()
return
}
sub.receive(subscription: self)
}
func receive(_ input: Input) -> Subscribers.Demand {
lock.lock()
guard state.isRelaying else {
lock.unlock()
return .none
}
do {
if try predicate(input) {
let subscription = state.complete()
lock.unlock()
subscription?.cancel()
_ = sub.receive(input)
sub.receive(completion: .finished)
return .none
} else {
lock.unlock()
return sub.receive(input)
}
} catch {
let subscription = state.complete()
lock.unlock()
subscription?.cancel()
sub.receive(completion: .failure(error))
return .none
}
}
func receive(completion: Subscribers.Completion<Failure>) {
complete(completion.mapError { $0 })
}
private func complete(_ completion: Subscribers.Completion<Error>) {
guard let subscription = lock.withLockGet(state.complete()) else {
return
}
subscription.cancel()
sub.receive(completion: completion.mapError { $0 })
}
var description: String {
"TryCutAfter"
}
var debugDescription: String {
"TryCutAfter"
}
}
}
enum RelayState {
case waiting
case relaying(Subscription)
case completed
}
extension RelayState {
var isWaiting: Bool {
switch self {
case .waiting:
return true
default:
return false
}
}
var isRelaying: Bool {
switch self {
case .relaying:
return true
default:
return false
}
}
var isCompleted: Bool {
switch self {
case .completed:
return true
default:
return false
}
}
var subscription: Subscription? {
switch self {
case let .relaying(s):
return s
default:
return nil
}
}
}
extension RelayState {
func preconditionValue(file: StaticString = #file, line: UInt = #line) {
if isWaiting {
fatalError("Received value before receiving subscription", file: file, line: line)
}
}
func preconditionCompletion(file: StaticString = #file, line: UInt = #line) {
if isWaiting {
fatalError("Received completion before receiving subscription", file: file, line: line)
}
}
}
extension RelayState {
mutating func relay(_ subscription: Subscription) -> Bool {
guard isWaiting else { return false }
self = .relaying(subscription)
return true
}
mutating func complete() -> Subscription? {
defer {
self = .completed
}
return subscription
}
}
extension Subscribers.Completion {
func mapError<NewFailure: Error>(_ transform: (Failure) -> NewFailure) -> Subscribers.Completion<NewFailure> {
switch self {
case .finished:
return .finished
case let .failure(error):
return .failure(transform(error))
}
}
}
public protocol Locking {
func lock()
func tryLock() -> Bool
func unlock()
}
extension Locking {
public func withLock<T>(_ body: () throws -> T) rethrows -> T {
lock(); defer { self.unlock() }
return try body()
}
public func withLockGet<T>(_ body: @autoclosure () throws -> T) rethrows -> T {
lock(); defer { self.unlock() }
return try body()
}
}
// MARK: - Lock
public struct Lock: Locking {
private let _lock: UnsafeMutableRawPointer
public init() {
#if canImport(Darwin)
if #available(macOS 10.12, iOS 10.0, tvOS 10.0, watchOS 3.0, *) {
_lock = OSUnfairLock().raw
return
}
#endif
_lock = PThreadMutex(recursive: false).raw
}
public func cleanupLock() {
#if canImport(Darwin)
if #available(macOS 10.12, iOS 10.0, tvOS 10.0, watchOS 3.0, *) {
_lock.as(OSUnfairLock.self).cleanupLock()
return
}
#endif
_lock.as(PThreadMutex.self).cleanupLock()
}
public func lock() {
#if canImport(Darwin)
if #available(macOS 10.12, iOS 10.0, tvOS 10.0, watchOS 3.0, *) {
_lock.as(OSUnfairLock.self).lock()
return
}
#endif
_lock.as(PThreadMutex.self).lock()
}
public func tryLock() -> Bool {
#if canImport(Darwin)
if #available(macOS 10.12, iOS 10.0, tvOS 10.0, watchOS 3.0, *) {
return _lock.as(OSUnfairLock.self).tryLock()
}
#endif
return _lock.as(PThreadMutex.self).tryLock()
}
public func unlock() {
#if canImport(Darwin)
if #available(macOS 10.12, iOS 10.0, tvOS 10.0, watchOS 3.0, *) {
_lock.as(OSUnfairLock.self).unlock()
return
}
#endif
_lock.as(PThreadMutex.self).unlock()
}
}
// MARK: - RecursiveLock
public struct RecursiveLock: Locking {
private let _lock: UnsafeMutableRawPointer
public init() {
#if canImport(DarwinPrivate)
if #available(macOS 10.14, iOS 12.0, tvOS 12.0, watchOS 5.0, *) {
_lock = OSUnfairRecursiveLock().raw
return
}
#endif
_lock = PThreadMutex(recursive: true).raw
}
public func cleanupLock() {
#if canImport(DarwinPrivate)
if #available(macOS 10.14, iOS 12.0, tvOS 12.0, watchOS 5.0, *) {
_lock.as(OSUnfairRecursiveLock.self).cleanupLock()
return
}
#endif
_lock.as(PThreadMutex.self).cleanupLock()
}
public func lock() {
#if canImport(DarwinPrivate)
if #available(macOS 10.14, iOS 12.0, tvOS 12.0, watchOS 5.0, *) {
_lock.as(OSUnfairRecursiveLock.self).lock()
return
}
#endif
_lock.as(PThreadMutex.self).lock()
}
public func tryLock() -> Bool {
#if canImport(DarwinPrivate)
if #available(macOS 10.14, iOS 12.0, tvOS 12.0, watchOS 5.0, *) {
return _lock.as(OSUnfairRecursiveLock.self).tryLock()
}
#endif
return _lock.as(PThreadMutex.self).tryLock()
}
public func unlock() {
#if canImport(DarwinPrivate)
if #available(macOS 10.14, iOS 12.0, tvOS 12.0, watchOS 5.0, *) {
_lock.as(OSUnfairRecursiveLock.self).unlock()
return
}
#endif
_lock.as(PThreadMutex.self).unlock()
}
}
#if canImport(Darwin)
// MARK: - OSUnfairLock
private typealias OSUnfairLock = UnsafeMutablePointer<os_unfair_lock_s>
@available(macOS 10.12, iOS 10.0, tvOS 10.0, watchOS 3.0, *)
private extension UnsafeMutablePointer where Pointee == os_unfair_lock_s {
init() {
let l = UnsafeMutablePointer.allocate(capacity: 1)
l.initialize(to: os_unfair_lock_s())
self = l
}
func cleanupLock() {
deinitialize(count: 1)
deallocate()
}
func lock() {
os_unfair_lock_lock(self)
}
func tryLock() -> Bool {
os_unfair_lock_trylock(self)
}
func unlock() {
os_unfair_lock_unlock(self)
}
}
// MARK: - OSUnfairRecursiveLock
// TODO: Use os_unfair_recursive_lock_s
#if canImport(DarwinPrivate)
private typealias OSUnfairRecursiveLock = UnsafeMutablePointer<os_unfair_recursive_lock_s>
@available(macOS 10.14, iOS 12.0, tvOS 12.0, watchOS 5.0, *)
private extension UnsafeMutablePointer where Pointee == os_unfair_recursive_lock_s {
init() {
let l = UnsafeMutablePointer.allocate(capacity: 1)
l.initialize(to: os_unfair_recursive_lock_s())
self = l
}
func cleanupLock() {
deinitialize(count: 1)
deallocate()
}
func lock() {
os_unfair_recursive_lock_lock(self)
}
func tryLock() -> Bool {
let result = os_unfair_recursive_lock_trylock(self)
return result
}
func unlock() {
os_unfair_recursive_lock_unlock(self)
}
}
#endif // canImport(DarwinPrivate)
#endif // canImport(Darwin)
// MARK: - PThreadMutex
private typealias PThreadMutex = UnsafeMutablePointer<pthread_mutex_t>
private extension UnsafeMutablePointer where Pointee == pthread_mutex_t {
init(recursive: Bool) {
let l = UnsafeMutablePointer<pthread_mutex_t>.allocate(capacity: 1)
if recursive {
var attr = pthread_mutexattr_t()
pthread_mutexattr_init(&attr)
pthread_mutexattr_settype(&attr, Int32(PTHREAD_MUTEX_RECURSIVE)).assertZero()
pthread_mutex_init(l, &attr).assertZero()
} else {
pthread_mutex_init(l, nil).assertZero()
}
self = l
}
func cleanupLock() {
pthread_mutex_destroy(self).assertZero()
deinitialize(count: 1)
deallocate()
}
func lock() {
pthread_mutex_lock(self).assertZero()
}
func tryLock() -> Bool {
pthread_mutex_trylock(self) == 0
}
func unlock() {
pthread_mutex_unlock(self).assertZero()
}
}
// MARK: Helpers
private extension UnsafeMutablePointer {
@inline(__always)
var raw: UnsafeMutableRawPointer {
UnsafeMutableRawPointer(self)
}
}
private extension UnsafeMutableRawPointer {
@inline(__always)
func `as`<T>(_: UnsafeMutablePointer<T>.Type) -> UnsafeMutablePointer<T> {
assumingMemoryBound(to: T.self)
}
}
private extension Int32 {
@inline(__always)
func assertZero() {
// assert or precondition?
assert(self == 0)
}
}