Subscriptions

@opsb I've a little bit extended your example, and implemented few operations over Loop type I've shown above. Try and play around in playground:

import Foundation
import Combine
import PlaygroundSupport

PlaygroundPage.current.needsIndefiniteExecution = true

// MARK: - Library

struct Loop<State, Action, Env> {
    let feedback: (AnyPublisher<State, Never>, Env) -> AnyPublisher<Action, Never>
}

extension Loop {
    func pullback<ParentState>(state stateTransform: @escaping (ParentState) -> State) -> Loop<ParentState, Action, Env> {
        return .init { state, env -> AnyPublisher<Action, Never> in
            self.feedback(state.map(stateTransform).eraseToAnyPublisher(), env)
        }
    }

    func toOptional() -> Loop<State?, Action, Env> {
        return .init { state, env in

            let filtered = state.flatMap { item -> AnyPublisher<State, Never> in
                if let item = item {
                    return Just(item).eraseToAnyPublisher()
                } else {
                    return Empty(completeImmediately: true).eraseToAnyPublisher()
                }
            }.eraseToAnyPublisher()

            return state
                .map { $0 != nil }
                .removeDuplicates()
                .map { isSome in isSome ? self.feedback(filtered, env) : Empty().eraseToAnyPublisher() }
                .switchToLatest()
                .eraseToAnyPublisher()
        }
    }
}

extension Loop where State: Identifiable {
    func toArray() -> Loop<[State], Action, Env> {
        return .init { statePublisher, env in
            let wrapped = self.toOptional()
            let output = PassthroughSubject<Action, Never>()

            var cancellables: [State.ID: AnyCancellable] = [:]

            let stateCancellable = statePublisher.sink { items in
                var oldIds = Set(cancellables.keys)
                items.forEach { item in
                    oldIds.remove(item.id)
                    if cancellables[item.id] == nil {
                        cancellables[item.id] = wrapped
                            .feedback(statePublisher.map { $0.first { $0.id == item.id } }.eraseToAnyPublisher(), env)
                            .subscribe(output)
                    }
                }
                oldIds.forEach { id in
                    cancellables.removeValue(forKey: id)?.cancel()
                }
            }

            return output
                .handleEvents(receiveCancel: stateCancellable.cancel)
                .eraseToAnyPublisher()
        }
    }
}

extension Loop {
    static func combine(_ loops: [Loop]) -> Loop {
        return .init { state, env in
            Publishers
                .MergeMany(loops.map { $0.feedback(state, env) })
                .eraseToAnyPublisher()
        }
    }
}

extension Loop where State: Equatable {
    /// Starts observer for each new unique state and switches to latest publisher
    /// - Parameter observer: subscription func
    static func observe(with observer: @escaping (State, Env) -> AnyPublisher<Action, Never>) -> Loop {
        return .init { state, env in
            state
                .removeDuplicates()
                .map { observer($0, env) }
                .switchToLatest()
                .eraseToAnyPublisher()
        }
    }
}

// MARK: - Example usage

// MARK: State

struct BluetoothDevice: Identifiable, Equatable {
    var id: UUID
    var name: String
}

struct State {
    var bluetoothDevices : [BluetoothDevice]? = []
    var currentDevice: BluetoothDevice?
}

// MARK: Actual observation

func observeBluetoothDevice() -> Loop<BluetoothDevice, String, Void> {
    return .observe { device, env in
        return Timer
            .publish(every: 1, on: .main, in: .common)
            .autoconnect()
            .scan(0, { (agg : Int, time : Date) in agg + 1 })
            .map { n in "\(device.name) \(n)"}
            .eraseToAnyPublisher()
    }
}

// MARK: Construction of loop for state

let observeAllDevices = observeBluetoothDevice()
    .toArray()
    .toOptional()
    .pullback(state: \State.bluetoothDevices)

let observeCurrentDevice = observeBluetoothDevice()
    .toOptional()
    .pullback(state: \State.currentDevice)

let mainLoop = Loop.combine([
    observeAllDevices,
    observeCurrentDevice,
])

// MARK: Emulation of Store


var initialState = State(bluetoothDevices: [
    .init(id: UUID(), name: "Contour Next")
])

var statePublisher = CurrentValueSubject<State, Never>(initialState)
var effectsPublisher = mainLoop.feedback(statePublisher.eraseToAnyPublisher(), ())
effectsPublisher.sink(receiveValue: { value in print(value) })

func it(_ message: String, mutate: (inout State) -> Void = { _ in }) {
    print(
        """
        ========================================================================
        \(message)
        ========================================================================
        """
    )
    mutate(&statePublisher.value)
    RunLoop.main.run(until: Date(timeIntervalSinceNow: 3))
}

it("Should subscribe to initial devices")

it("Should subscribe to new device (array)") {
    $0.bluetoothDevices?.append(.init(id: UUID(), name: "Freestyle Libre"))
}

it("Should unsubscribe from first device (array)") {
    $0.bluetoothDevices?.remove(at: 0)
}

it("Should unsubscribe from all (array)") {
    $0.bluetoothDevices = nil
}

it("Should subscribe to Fresh device (array)") {
    $0.bluetoothDevices = [.init(id: UUID(), name: "Fresh")]
}

it("Should subscribe to TEST device (current)") {
    $0.currentDevice = .init(id: UUID(), name: "TEST")
}

Output:

========================================================================
Should subscribe to initial devices
========================================================================
Contour Next 1
Contour Next 2
Contour Next 3
========================================================================
Should subscribe to new device (array)
========================================================================
Contour Next 4
Freestyle Libre 1
Contour Next 5
Freestyle Libre 2
Contour Next 6
Freestyle Libre 3
========================================================================
Should unsubscribe from first device (array)
========================================================================
Freestyle Libre 4
Freestyle Libre 5
Freestyle Libre 6
========================================================================
Should unsubscribe from all (array)
========================================================================
========================================================================
Should subscribe to Fresh device (array)
========================================================================
Fresh 1
Fresh 2
Fresh 3
========================================================================
Should subscribe to TEST device (current)
========================================================================
Fresh 4
TEST 1
Fresh 5
TEST 2
Fresh 6
TEST 3
2 Likes