A UIControl.Event publisher example

Hello! I'm in the process of learning Combine and wanted to try my hand at creating a Publisher for UIControl.Events. I've come up with something, but I'm not very happy with it and I'm looking to get some advice on a hopefully more idiomatic approach.

extension Publishers {
    private static var key: UInt8 = 0

    public struct ControlForEvent<Control> : Publisher where Control: UIControl {

        public typealias Output = Control

        public typealias Failure = Never

        public let control: Output

        public let event: UIControl.Event

        public func receive<S>(subscriber: S)
            where S : Subscriber,
            Output == S.Input,
            Failure == S.Failure
        {
            let subject = PassthroughSubject<Output, Never>()
            let passthrough = ControlEventPassthroughSubject(
                subject: subject,
                control: control,
                event: event
            )
            objc_setAssociatedObject(
                subject,
                &Publishers.key,
                passthrough,
                .OBJC_ASSOCIATION_RETAIN
            )
            subject.receive(subscriber: subscriber)
        }

        private class ControlEventPassthroughSubject {

            unowned var subject: PassthroughSubject<Output, Failure>

            init(subject: PassthroughSubject<Output, Never>, control: Output, event: UIControl.Event) {
                self.subject = subject
                control.addTarget(self, action: #selector(receive(_:)), for: event)
            }

            @objc
            func receive(_ sender: UIControl) {
                subject.send(sender as! Output)
            }
        }
    }
}

I've started off by creating my ControlForEvent publisher inside the Publishers namespace. I've made it a struct as seems to be the approach a number of similar publishers take (like the DataTaskPublisher or the ValueForKey publisher).

I've then created a ControlEventPassthroughSubject class to listen for the given action, and then forwards all the events to a PassthroughSubject it receives.

So far so good (maybe), but things feel like they go down hill fast in the receive(subscriber:) function, specifically the objc_setAssociatedObject bit. The subscriber appears to retain its subject which is great, but I of course need to retain my ControlEventPassthroughSubject too. I also need to not cause a retain-cycle between the two, so the ControlEventPassthroughSubject has an unowned reference to its subject. This all feels a bit wonky, and I assume there's a better way, but I've not come up with anything else that doesn't involve re-implementing something like a PassthroughSubject that can also receive UIControl.Event callbacks.

Separately, I find this a bit odd:

// elsewhere...
self.cancelable = Publishers.ControlForEvent(control: self.button, event: .touchUpInside).sink {
    print("tapped button:", $0)
}

The Cancelable I get here does not cancel when it is released. I need to explicitly call cancel on it. This means I either need to implement a deinit method on any classes I have that hold on to Cancelables to cancel() them, or I need to implement something like RxSwift's DisposeBag that can cancel() everything on its own deinit. It seems a bit clunky as is, but as with the first question above I hope maybe I'm missing something!

Thanks for taking a look!

A few things:

After seeing how Foundation is exposing its various Publishers, it seems clear that Apple's pattern is for types offering specific Publishers to provide APIs vending them, instead of extending the Publishers namespace. So for something like this I think extending UIControl would be a better approach. This could also let you start simpler, where your publisher must be vended from a UIControl instance, rather than from a stream where you have to care about your Upstream type. Perhaps something like:

extension UIControl {
    func eventPublisher(for events: UIControl.Event) -> EventPublisher { }

    struct EventPublisher { } 
}

Internally, EventPublisher uses its inner class to attach the proper events. I don't think you need to worry about a reference cycle here, as I don't believe attaching a selector for an event takes a reference, just the selector itself. I don't believe the associated object should be necessary at all, as you can keep a reference to the passed control, the control itself never needs to know that it's being published.

As for cancellation, I'm still not sure how that works. I would think it would be as simple as removing the action connection upon cancellation, but I can't seem to get that to work in my own publishers. Plus, even TimerPublisher doesn't respond to cancellation like I would think (I can't get it to stop publishing). From what I've understood, you should be able to use cancellation tokens to manage the lifetime of a subscription, but I have no idea how that really works.

@Tony_Parker Are there any docs for building publishers yet? Will there be?

Ah, there is a bit of documentation around cancellation in the general Combine doc:

A publisher continues to emit elements until it completes normally or fails. If you no longer want to subscribe to the publisher, you can cancel the subscription. The subscriber types created by sink(receiveCompletion:receiveValue:) and assign(to:on:) both implement the Cancellable interface, which provides a cancel() method:

sub?.cancel()

If you create a custom Subscriber , the publisher sends a Subscription object when you first subscribe to it. Store this subscription, and then call its cancel() method when you want to cancel publishing. When you create a custom subscriber, you should implement the Cancellable interface, and have your cancel() implementation forward the call to the stored subscription.

However, even after updating my implementation to do this, I'm still not seeing cancel() called on my internal subscribers.

I remember from this WWDC session (16:40) that cancellation is best-effort. So in my best guess it might just take a long time.

I would think the cleanup of resources to be best effort (i.e. if a URLSessionDataTask has already completed when cancel() is called, it will not be cancelled) not that delivery of cancel() up the subscriber chain may or may not occur.

Do you know how Combine claims that Cancellable calls cancel on deinit?

AnyCancellable provides that behaviour.

Edit: or at least used to. I’m pretty sure the beta 1 documentation said that it did, but it’s not mentioned anymore.

The reference cycle I'm worried about is between my PassthroughSubject and its coordinating ControlEventPassthroughSubject, not the addTarget stuff. The PassthroughSubject is what the subscriber ultimately subscribes to (and the subscriber appears to retain that and keep it alive) but the ControlEventPassthroughSubject needs to stick around too, listening for the events to be fired. I'm using associated objects here so that the PassthroughSubject can own the ControlEventPassthroughSubject so that it's not immediately deinitd.

@Jon_Shier can you provide a bit of sample code for the TimerPublisher case not cancelling? I can take a look.

Let me try to clear up some of the Cancellable part.

Publisher is almost always a struct. When you subscribe something to it, the Publisher will create a Subscription (almost always a class) and send it to the Subscriber. This is a ref type because it's stateful - what's the pending demand, what have I sent, etc.

Since the Publisher created a reference out of thin air, something needs to retain it. Since the Subscription is sent to the Subscriber, it makes sense for the Subscriber to be responsible for it. Therefore, a Subscriber will maintain a reference to its upstream like this:

var upstream: Subscription?

Operators, being in the middle, need to also maintain a reference to their downstream, because they are sending them values:

let downstream: Downstream

(a property of the Subscription that the operator sends to its own Subscriber, of course)

It is not a weak reference for performance reasons. Sending values downstream is the "fast path" we optimize for. Having to always look up a weak reference is very expensive. So -- we have two retains: one up, one down. We need to break a cycle at some point. upstream can be set to nil upon cancel, but when to call it? Completion is one obvious point, but what about indefinite publishers like NotificationCenter? Should it be always required to call cancel or can we do it automatically?

At the termination of a stream, a Subscriber is holding on to its upstream strongly, keeping the subscriptions alive. If the stream does not finish, then just throwing away the last reference results in a leak.

To fix this, we could have weak references at every step -- but it is expensive, as I mentioned. We only need one point to break the cycle. We chose AnyCancellable as that point. It holds onto the Cancellable, but when it itself is deallocated, it calls cancel, resulting in each step along the chain throwing away any associated memory and allowing proper teardown to happen.

Hope this helps.

15 Likes

Thanks! That lines up with what I was observing. Do you have suggestions on how to better compose a subject as I've done in my example in the original post? Ultimately I'm subscribing to a PassthroughSubject but I need a coordinating object to actually update that subject. There's no simple way to retain this other object (I chose to use objc_setAssociatedObject as a work around). Or maybe there's just a totally different approach I should take?

Of course I can't reproduce it now. Cancellation seems to work just fine for TimerPublisher. Would referencing the token as a Cancellable have any effect?

I could still use some guidance on implementing cancellation in my own publishers. In my testing so far, I don't even see cancel() called at all on my publisher.

Could you post some code anyways? For the life of me, I cannot code an instance of TimerPublisher that actually sends updates to a subscriber.

Sure.

canceller = Timer.publish(every: 1, on: .main, in: .default)
    .autoconnect()
    .map { "\($0)" }
    .print()
    .subscribe(on: RunLoop.main)
    .assign(to: \UILabel.text, on: label)

The key is the autoconnect(), as TimerPublisher is a ConnectablePublisher, which wasn't talked about in the WWDC sessions. I haven't found good documentation around when you'd want to use that.

4 Likes

Yes, I was missing the autoconnect() modifier. Thank you for sharing.

This was incredibly helpful in understanding the mechanics of the stream. Thanks a ton, @Tony_Parker!

The reactive-streams.org website goes into some detail about the legal ways for subscribers, publishers and subscriptions should behave. I believe that is the standard that Combine is based on (maybe @Tony_Parker can confirm?). The specification page is especially useful.

I'm also playing with a Testing Framework for Combine within which I've created a few publishers/subscribers/schedulers that seem to be holding up so far, so might be a useful reference for others building there own – unless you see I'm making some awful error – in which case let me know! :slight_smile:

The other big thing to remember with Combine is that publishers need to obey the back pressure demands of their subscribers to be legal. That's been one of the considerations in bridging RxSwift code to Combine which has no such requirement.

3 Likes

Myself and 2 colleagues spoke with @Tony_Parker in one of the labs at WWDC and he told us explicitly that Combine is based on the Reactive Streams standard.

Exactly. One of the things reactive streams supports, which neither RxSwift or ReactiveSwift contemplate, is back pressure. RxSwift also does not support typed errors, which both Combine and ReactiveSwift do.

3 Likes

Yup, that standard is very relevant. We tweaked a few things (like returning an additional demand from receive(input:), which we found to be very important for performance of filtering operations) but all of the info there about back pressure is important when you implement a publisher or subscriber.

4 Likes

I believe I have solved my original question! The "trick" was stumbling across the handleEvents operator. This allows me to extend the life time of the object (EventObserver) that's actually listening for the UIControl.Events as well as providing a convenient hook for beginning and cancelling the observation.

extension UIControl {

    private class EventObserver {

        let control: UIControl

        let event: UIControl.Event

        let subject: PassthroughSubject<UIControl, Never>

        init(control: UIControl, event: UIControl.Event) {
            self.control = control
            self.event = event
            self.subject = .init()
        }

        func start() {
            control.addTarget(self, action: #selector(handleEvent(from:)), for: event)
        }

        func stop() {
            control.removeTarget(self, action: nil, for: event)
        }

        @objc
        func handleEvent(from sender: UIControl) {
            subject.send(sender)
        }
    }

    struct ControlEventPublisher: Publisher {

        typealias Output = UIControl

        typealias Failure = Never

        let control: UIControl

        let event: UIControl.Event

        init(control: UIControl, event: UIControl.Event) {
            self.control = control
            self.event = event
        }

        func receive<S>(subscriber: S)
            where S : Subscriber,
            S.Failure == Failure,
            S.Input == Output {
                let observer = EventObserver(control: control, event: event)
                observer
                    .subject
                    .handleEvents(
                        receiveSubscription: { _ in observer.start() },
                        receiveCancel: observer.stop
                    )
                    .receive(subscriber: subscriber)

        }
    }

    func eventPublisher(for event: UIControl.Event) -> ControlEventPublisher {
        return ControlEventPublisher(control: self, event: event)
    }
}
2 Likes