Subscriptions

Continued from https://github.com/pointfreeco/swift-composable-architecture/issues/16

Several people have expressed the desire to add something like Elm's subscriptions to TCA. Please read the original github issue for context, but tldr; version is "Effects depend on the whole history of Actions, while Subscriptions only depend on the latest State (useful not just in time-travel debugger but thats the extreme example we should be thinking of)".

https://github.com/pteasima/swift-composable-architecture/pull/1 contains an incomplete and naive prototype implementation, mostly meant to serve as a talking point. It shows that composability can work. Next step is to figure out whether this could be maintained as a separate add-on to TCA. One potential option is to have a ReducerWithSubscriptions (better name please :pray:) type that we can convert to a regular Reducer before passing it to a Store.

Another thing that hasnt been discussed much is the fact that if subscriptions returns the same Subscription as last time, we must distinguish:

  • if that subscription is still running, do nothing
  • if that subscription isnt running (presumably failed, since subscriptions usually dont finish), subscribe to the new one (restart it)

I originally thought we would need an API to inspect an effect's isRunning state, but now I have some ideas how to do it without a Effect-inspection API. Its a little heavy handed. We would need to augment the actions with a didFinish(effectID: AnyHashable) case (but dont let this fact leak to the public API of ReducerWithSubscriptions). So far it only compiles in my head :sweat_smile:.

Im taking a little break from implementing this (but @opsb is lurking somewhere :grinning:). I hope the discussion can continue in the meantime.

3 Likes

Hey! Hadn’t realised there was an SCA section over here now.

I’m spending a fair amount of time playing around with the architecture (particularly binding to the view) and I’m learning Swift as I go so it’s taking a little time. The app I have in mind will really benefit from Subscriptions though so I’ll definitely do an exploration soon, hopefully next week. I’ll review the ideas that were posted in the original discussion once I get started.

There was also some discussion in issue #51 on the episode-code-samples repo. I gave a description of my Elm-subscription-based implementation there, but I have refined it a bit since then. Anyway, here are some thoughts.

I don't like the name “subscription” for this concept, specifically because Combine already has a Subscription protocol, and you need to create a Combine Subscription for each Elm-like subscription. Witness this statement from @pteasima's post: “if that subscription isnt running (presumably failed, since subscriptions usually dont finish), subscribe to the new one (restart it)” A subscription is apparently a thing you subscribe to, but (at least in the Combine world) it's also what we call the thing you get back when you subscribe to a publisher. This is confusing.

There are really three kinds of objects in play here:

  • There is a key identifying the Elm-subscription. In @pteasima's PR, the key is an AnyHashable.
  • There is the Effect corresponding to the key.
  • There is the Combine Subscription to the Effect. This is managed by the Store and doesn't show up in the Reducer.

So in my implementation, I'm currently calling the Elm-subscription a “condition”. Yes, “condition” also has other meanings, and I'd like to find an even better name.

I think it's incorrect to claim that “subscriptions usually dont finish”. I use Elm-style subscriptions that finish.

I store the current subscriptions, and their corresponding AnyCancellables, in a container held (privately) by the Store. It seems like the natural place to store subscriptions.

The implementations by @pteasima (here) and PF (here) use a single function that returns a dictionary [AnyHashable: Effect<Action, Never>]. I think it is useful to separate that into two functions. In my implementation, I pass the two functions as arguments to the root store's initializer. Something like this:

  public convenience init<SubId: Hashable, Environment>(
    initialState: State,
    reducer: Reducer<State, Action, Environment>,
    environment: Environment,
    subscriptionsForState: @escaping (State) -> Set<SubId>,
    effectOfSubscription: @escaping (SubId) -> Effect<Action, Never>    
  ) { ... }

I still have the initializer that doesn't take the subscription-related arguments. A Store constructed that way doesn't track any subscriptions. In Elm terms, this is like using Browser.sandbox when you don't need subscriptions, and Browser.element when you do.

Here's some things I like about this separation of concerns:

  • I get strong typing instead of AnyHashable.
  • I can write separate, simpler tests for the two functions.
  • I can change the effectOfSubscription function while keeping subscriptionsForState exactly the same. This lets me easily return different effects for previews, tests, and production.
1 Like

Thanks mayoff, good thoughts.

I don't like the name “subscription”

Agreed, the name-clash is nasty. Not a huge fan of Condition. Observation seems to fit the Apple ecosystem, but think its been getting less popular as we're moving away from OOP. SubscriptionEffect is long, but I like that it still includes Effect.

I use Elm-style subscriptions that finish.

Can you give an example usecase?
I have thought about using subscriptions e.g. for a HTTP.GET that loads data for some screen (the usual way is to trigger an Effect in onAppear, but really I think we should be attempting to load data anytime the data isnt already loaded. But afaik Elm community doesn't do this (they use Cmd for HTTP request) and I dont know why.

I get strong typing instead of AnyHashable .

I dont see how having a single strongly typed SubId: Hashable across the whole store is helpful. The different submodules will rarely share a common SubId type, unless that type is String or even AnyHashable :sweat_smile: And with AnyHashable, we can use PF's empty Hashable struct trick to avoid coming up with actual data.

On this topic, I should note that one could have a helper that takes a (KeyPath<Environment, (Input) -> Effect<Output, Never>, Input: Equatable, KeyPath<Output, Action>) and create an AnyHashable from those. So basically the subscription is automatically Equatable, being equal as long as the path to Effect is equal, the inputs are equal, and the action transform is Equal. I have experimented with this in the past, although I wasnt familiar with AnyHashable and the implementation suffered a lot (read at your own risk https://github.com/pteasima/Uniflo/blob/master/Sources/Uniflo/SubscriptionEffect.swift)
Having to define the KeyPath from Output to Action in an extension was the least ergonomic part of using this, otherwise it was fine. This is the first time Im talking about this in the context of CA (since I thought the [AnyHashable: Effect] solution has tolerable ergonomics). But imho this is further proof that AnyHashable is the correct abstraction.

I can change the effectOfSubscription function while keeping subscriptionsForState exactly the same. This lets me easily return different effects for previews, tests, and production.

Imho this should be handled at the Environment level.

I pass the two functions as arguments to the root store's initializer.

Have you thought much about composability? I think its crucial to slap subscriptions onto Reducer (or a new ReducerWithSubscriptions type) so that the user must only supply composition code once, not separately for Reducer and Subscriptions.

1 Like

Regarding Elm-subscriptions that finish: for both of my use cases, I want to cancel and replace those effects based on state. It's not particularly important that the effects send completion, but the effects produce a finite sequence of outputs and I would have to do more work (appending Empty(completeImmediately: false)) to make them not complete.

Your loading idea is exactly one of my use cases. The user specifies which large dataset she wants to view, and I have a subscription to the loading effect for that dataset.

Another use case is lazy saving. I have a condition whose effect is to wait two seconds, then send a save action. The condition embeds the model's change count (which I track as part of the model). So every time the model changes, the condition changes The user-visible effect is that the model is saved automatically after two seconds of idle time.

Regarding strong typing, I benefit because I have separate methods conditionsForState and effectOfCondition, and the Condition type is how these methods communicate with each other. If you only have one method returning [AnyHashable: Effect] then I don't know that there's a good reason to have a Condition type.

Having separate methods also means my system doesn't construct another Effect for an existing condition. That is, I only call effectOfCondition for the conditions returned by conditionsForState that were not returned by the prior call to conditionsForState. That's probably not significant most of the time, since most of the cost of an Effect isn't incurred until you actually subscribe to it.

1 Like

Have not you thought about using the same approach which is used in ReactiveFeedback? Something similar to this:

struct Loop<State, Action, Environment> {
  // Better to replace AnyPublisher by StatePublisher
  var feedback: (AnyPublisher<State, Never>, Environment) -> Effect<Action, Never>
}

This way it's clear that these "subscriptions" are just transforming stream of states to stream of actions. And plus inside of loop we have all Combine at our disposal.
It's highly composable, as you can see: pullbacks, combines, higher order loops. just as reducers.

Plus it don't require kickoff actions like viewDidLoad or similar.

It requires to modify Store of course: add just 1 parameter.

  public convenience init<Environment>(
    initialState: State,
    reducer: Reducer<State, Action, Environment>,
    loop: Loop<State, Action, Environment>,
    environment: Environment
  ) {
    ....
  }

That's an interesting though @mpsnp, on first blush I like it a lot. It does push more work into the environment, such as managing the start/end of streams but it feels like a nice abstraction and you can always add helpers into the stream to support that.

Yeah, using it in real project (though it's not TCA, because project started even before SwiftUI, and it uses RxSwift instead of Combine), but with convenience wrappers, final loop construction looks like this:

extension AircraftFlightState {
    static let loop: StateLoop = .combine([
        .statePullbacks,
        .observeTelemetry(.position, embedTo: .telemetry .. .setLocation),
        .observeTelemetry(.velocity, embedTo: .telemetry .. .setVelocity),
        .observeTelemetry(.attitude, embedTo: .telemetry .. .setAttitude),
        .observeTelemetry(.altitude, embedTo: .telemetry .. .setAltitude),
    ])
}

Or more complex but flexible builder-style:

private extension Loop where State == AircraftConnectState, Mutation == AircraftConnectState.Mutation {
    static let observeAircraft = Loop
        .waitUntil(\.isSDKActivated)
        .pullback(mutation: .aircraft .. .receivedAircraft)
        .loop(using: .observe { env in env.observeCurrentAircraft() })
}

How do you actually start/stop the streams?

Right at the moment of store init. And then, there are several tricks regarding how to handle nested states .
In general, if you pullback optional state, you expect that nested Loop will start when its corresponding state flips from .none to .some. And when it flips back, Loop is cancelled/disposed, so the app don't waste resources.

In that case this approach, has a big advantage compared to just plain long-running effects: you'll never forget to fire cancellation effect, as this is done automatically.

This is useful when you are dealing with "observable" data source, such as CoreData, Realm, Firebase, etc.

I'll try to port this to TCA next week and will see if it worth it on examples.

I've got a bit of a disconnect here, I don't see how you can represent the problem using Combine.

The code below attempts to map a list state.blueToothDevices to effects over time. When a new item is added a subscription should be started for it. When an item is removed it's subscription should be cancelled. If an item remains in the list then it's subscription should continue uninterupted.

As you'll see from the output the code fails to achieve this. I wouldn't expect this code to work, I'm just not sure how to achieve what I want and thought would serve as a good reference for discussion (You can paste it into a playground and it will run).

import Foundation
import Combine
import PlaygroundSupport

PlaygroundPage.current.needsIndefiniteExecution = true

struct State {
  var bluetoothDevices : [String] = []
}

var initialState = State(bluetoothDevices: ["Contour Next"])


var statePublisher = CurrentValueSubject<State, Never>(initialState)
var effectsPublisher = statePublisher.flatMap(stateToPublishers)
effectsPublisher.sink(receiveValue: { value in print(value) })


func stateToPublishers(_ state : State) -> Publishers.MergeMany<Publishers.Map<Publishers.Scan<Publishers.Autoconnect<Timer.TimerPublisher>, Int>, String>> {
  return Publishers.MergeMany(state.bluetoothDevices.map(bluetoothDeviceToPublisher))
}

func bluetoothDeviceToPublisher(_ bluetoothDevice : String) -> Publishers.Map<Publishers.Scan<Publishers.Autoconnect<Timer.TimerPublisher>, Int>, String> {
  return Timer
  .publish(every: 1, on: .main, in: .common)
  .autoconnect()
  .scan(0, { (agg : Int, time : Date) in agg + 1 })
  .map { n in "\(bluetoothDevice) \(n)"}
}


RunLoop.main.run(until: Date(timeIntervalSinceNow: 5))


var working = initialState
working.bluetoothDevices.append("Freestyle Libre")

statePublisher.send(working)

RunLoop.main.run(until: Date(timeIntervalSinceNow: 5))

Output

Contour Next 1
Contour Next 2
Contour Next 3
Contour Next 4
Contour Next 5
Contour Next 6
Contour Next 1
Freestyle Libre 1
Contour Next 7
Contour Next 2
Freestyle Libre 2
Contour Next 8
Contour Next 3
Freestyle Libre 3
Contour Next 9
Contour Next 4
Freestyle Libre 4
Contour Next 10
Contour Next 5
Freestyle Libre 5

So how would you do this with Combine? (seems like you'd have to track the subscriptions yourself using a Subject)

@opsb I've a little bit extended your example, and implemented few operations over Loop type I've shown above. Try and play around in playground:

import Foundation
import Combine
import PlaygroundSupport

PlaygroundPage.current.needsIndefiniteExecution = true

// MARK: - Library

struct Loop<State, Action, Env> {
    let feedback: (AnyPublisher<State, Never>, Env) -> AnyPublisher<Action, Never>
}

extension Loop {
    func pullback<ParentState>(state stateTransform: @escaping (ParentState) -> State) -> Loop<ParentState, Action, Env> {
        return .init { state, env -> AnyPublisher<Action, Never> in
            self.feedback(state.map(stateTransform).eraseToAnyPublisher(), env)
        }
    }

    func toOptional() -> Loop<State?, Action, Env> {
        return .init { state, env in

            let filtered = state.flatMap { item -> AnyPublisher<State, Never> in
                if let item = item {
                    return Just(item).eraseToAnyPublisher()
                } else {
                    return Empty(completeImmediately: true).eraseToAnyPublisher()
                }
            }.eraseToAnyPublisher()

            return state
                .map { $0 != nil }
                .removeDuplicates()
                .map { isSome in isSome ? self.feedback(filtered, env) : Empty().eraseToAnyPublisher() }
                .switchToLatest()
                .eraseToAnyPublisher()
        }
    }
}

extension Loop where State: Identifiable {
    func toArray() -> Loop<[State], Action, Env> {
        return .init { statePublisher, env in
            let wrapped = self.toOptional()
            let output = PassthroughSubject<Action, Never>()

            var cancellables: [State.ID: AnyCancellable] = [:]

            let stateCancellable = statePublisher.sink { items in
                var oldIds = Set(cancellables.keys)
                items.forEach { item in
                    oldIds.remove(item.id)
                    if cancellables[item.id] == nil {
                        cancellables[item.id] = wrapped
                            .feedback(statePublisher.map { $0.first { $0.id == item.id } }.eraseToAnyPublisher(), env)
                            .subscribe(output)
                    }
                }
                oldIds.forEach { id in
                    cancellables.removeValue(forKey: id)?.cancel()
                }
            }

            return output
                .handleEvents(receiveCancel: stateCancellable.cancel)
                .eraseToAnyPublisher()
        }
    }
}

extension Loop {
    static func combine(_ loops: [Loop]) -> Loop {
        return .init { state, env in
            Publishers
                .MergeMany(loops.map { $0.feedback(state, env) })
                .eraseToAnyPublisher()
        }
    }
}

extension Loop where State: Equatable {
    /// Starts observer for each new unique state and switches to latest publisher
    /// - Parameter observer: subscription func
    static func observe(with observer: @escaping (State, Env) -> AnyPublisher<Action, Never>) -> Loop {
        return .init { state, env in
            state
                .removeDuplicates()
                .map { observer($0, env) }
                .switchToLatest()
                .eraseToAnyPublisher()
        }
    }
}

// MARK: - Example usage

// MARK: State

struct BluetoothDevice: Identifiable, Equatable {
    var id: UUID
    var name: String
}

struct State {
    var bluetoothDevices : [BluetoothDevice]? = []
    var currentDevice: BluetoothDevice?
}

// MARK: Actual observation

func observeBluetoothDevice() -> Loop<BluetoothDevice, String, Void> {
    return .observe { device, env in
        return Timer
            .publish(every: 1, on: .main, in: .common)
            .autoconnect()
            .scan(0, { (agg : Int, time : Date) in agg + 1 })
            .map { n in "\(device.name) \(n)"}
            .eraseToAnyPublisher()
    }
}

// MARK: Construction of loop for state

let observeAllDevices = observeBluetoothDevice()
    .toArray()
    .toOptional()
    .pullback(state: \State.bluetoothDevices)

let observeCurrentDevice = observeBluetoothDevice()
    .toOptional()
    .pullback(state: \State.currentDevice)

let mainLoop = Loop.combine([
    observeAllDevices,
    observeCurrentDevice,
])

// MARK: Emulation of Store


var initialState = State(bluetoothDevices: [
    .init(id: UUID(), name: "Contour Next")
])

var statePublisher = CurrentValueSubject<State, Never>(initialState)
var effectsPublisher = mainLoop.feedback(statePublisher.eraseToAnyPublisher(), ())
effectsPublisher.sink(receiveValue: { value in print(value) })

func it(_ message: String, mutate: (inout State) -> Void = { _ in }) {
    print(
        """
        ========================================================================
        \(message)
        ========================================================================
        """
    )
    mutate(&statePublisher.value)
    RunLoop.main.run(until: Date(timeIntervalSinceNow: 3))
}

it("Should subscribe to initial devices")

it("Should subscribe to new device (array)") {
    $0.bluetoothDevices?.append(.init(id: UUID(), name: "Freestyle Libre"))
}

it("Should unsubscribe from first device (array)") {
    $0.bluetoothDevices?.remove(at: 0)
}

it("Should unsubscribe from all (array)") {
    $0.bluetoothDevices = nil
}

it("Should subscribe to Fresh device (array)") {
    $0.bluetoothDevices = [.init(id: UUID(), name: "Fresh")]
}

it("Should subscribe to TEST device (current)") {
    $0.currentDevice = .init(id: UUID(), name: "TEST")
}

Output:

========================================================================
Should subscribe to initial devices
========================================================================
Contour Next 1
Contour Next 2
Contour Next 3
========================================================================
Should subscribe to new device (array)
========================================================================
Contour Next 4
Freestyle Libre 1
Contour Next 5
Freestyle Libre 2
Contour Next 6
Freestyle Libre 3
========================================================================
Should unsubscribe from first device (array)
========================================================================
Freestyle Libre 4
Freestyle Libre 5
Freestyle Libre 6
========================================================================
Should unsubscribe from all (array)
========================================================================
========================================================================
Should subscribe to Fresh device (array)
========================================================================
Fresh 1
Fresh 2
Fresh 3
========================================================================
Should subscribe to TEST device (current)
========================================================================
Fresh 4
TEST 1
Fresh 5
TEST 2
Fresh 6
TEST 3
1 Like

Thanks @mpsnp that's really helpful. You've answered a few of my Combine related questions and I see several refinements in there which are helpful to see as I'm getting up to speed with Swift.

As I suspected you used a subject to track when elements are added/removed from an array and it seems to work well in practice. It was good to see how you handled optionals and I can now see how this approach would extend over the entire State tree. I'll have a play and see where I get to but at the moment it seems like a very reasonable approach for Subscriptions.

Yeah, I've initially tried to implement without Subject, but eventually subject-based implementation is easier.

Though this implementation is not super optimised, but in the end this kind of subscriptions are rare. Usually you want to subscribe either to single item updates, or whole collection. So even with this approach it will work ok on relatively small datasets.

And yes, it actually scales well, even with recursive states. The downside is that you need to pullback in 2 places now: reducer & loop, but this can be simplified.

Yeah, I'm curious if it is possible to avoid using a Subject, but the approach you've taken seems fine. I think pulling back in 2 places is also fine to be honest (coming from Elm where you do this separately for view/update/subscriptions) but as you say there's no reason you couldn't introduce a primitive that allows both to be pulled back in a single call (though you're still going to need to specify 2 mappings so I'm not sure you gain that much).

Ok, I think anyway some feedback is appreciated from @mbrandonw & @stephencelis if it's worth it to take it further than those sketches and finally make a full pull request.
Nice thing is that this is completely additive change, which will even keep the public API backwards compatible (if by default store will be initialised with Loop.none).

With this Loop system, how do I print all current subscriptions in lldb?

@opsb I think is possible to avoid using subjects, though be warned that I haven't tested this beyond comparing the printed output:

extension Loop where State: Identifiable {
  func toSequence<S>() -> Loop<S, Action, Env> where S: Sequence, S.Element == State {
    typealias ScanState = (all: Set<State.ID>, new: Set<State.ID>)
    
    return .init { state, env in
      state.scan(([], [])) { accum, next -> ScanState in
        let currentIdentifiers = Set(next.map(\.id))
        let newIdentifiers = currentIdentifiers.subtracting(accum.all)
        return (all: accum.all.union(newIdentifiers), new: newIdentifiers)
      }
      .flatMap(\.new.publisher)
      .flatMap { id in
        self.toOptional().feedback(
          state.map { s in s.first(where: { $0.id == id }) }.eraseToAnyPublisher(),
          env
        )
      }
      .eraseToAnyPublisher()
    }
  }
}

Kind of a terrible implementation for sequences that have many unique IDs over time. I do think that it should be possible to avoid accumulating all identifiers and managing the lifecycle of individual element subscriptions without exiting the world of publishers, and have some vague ideas of how to go about this.

Low hanging optimisation fruit would be to use a dictionary or identified array rather than a standard array/sequence.

At a high level, I’d say you want something like this:

  1. Diff new array with previous based on identifiers, using scan.
  2. For each new element in the array, transform the state publisher into one that publishes the specific element from the array (this is where you find it based on id using first(where:), or maybe you could avoid doing the initial extraction again using prepend?) and completes when that element is no longer present in the array i.e. when first(where:) returns nil — completion can be done using prefix(while:).
  3. flatMap publishers into self.feedback which merges them.

@mayoff Why you don't ask how to print all reducers which are reducing at the moment?

This is kind of similar system: subscriptions depend on current state, like reducers as well (for example which are down the tree after .optional).

For debugging purposes you always can use these simple higher order Loops:

    func debugOutput(
        _ prefix: String = ""
    ) -> Loop {
        return .init { state, env in
            self.feedback(state, env).print(prefix).eraseToAnyPublisher()
        }
    }

    func debugInput(
        _ prefix: String = ""
    ) -> Loop {
        return .init { state, env in
            self.feedback(state.print(prefix).eraseToAnyPublisher(), env)
        }
    }

@klop I like your implementation, and agree that better to use dict or identified array. But the downside is that in case of many unique ids this subscription hangs forever even for items which were already removed from the sequence:

self.toOptional().feedback(
    state.map { s in s.first(where: { $0.id == id }) }.eraseToAnyPublisher(),
    env
)
Terms of Service

Privacy Policy

Cookie Policy