Combine: Unexpected backpressure behaviour with zip

Hi,

I've got a question about the zip operator in Combine in combination with backpressure.

Take the following code snippet:

let sequencePublisher = Publishers.Sequence<Range<Int>, Never>(sequence: 0..<Int.max)
let subject = PassthroughSubject<String, Never>()

let handle = subject
    .zip(sequencePublisher.print())
    .print()
    .sink { letters, digits in
        print(letters, digits)
    }

subject.send("a")

When executing this in the playground, the following is the output:

receive subscription: (0..<9223372036854775807)
receive subscription: (Zip)
request unlimited
request unlimited
receive value: (0)
receive value: (1)
receive value: (2)
receive value: (3)
receive value: (4)
receive value: (5)
receive value: (6)
receive value: (7)
...

When executing it on an iOS device, the code crashes after a few seconds because of memory issues.

The underlying reason can be seen in the fourth line above where zip requests an unlimited amount of values from the sequencePublisher. Since the sequencePublisher provides the whole range of Int values, this causes memory overflows.

What I know:

  • zip waits for one value of each publisher before combining them and pushing them on
  • backpressure is used to control the demand from a subscriber to a publisher

My expectation would be that zip only requests one value from each publisher, waits for them to arrive and only requests the next values when it received one from each.

In this particular case I tried to build a behaviour where a sequence number is assigned to every value that is produced by the subject. However, I could imagine that this is always a problem when zip combines values from publishers that publish with very different frequencies.

Utilising backpressure in the zip operator seems to be perfect tool to solve that issue. Do you know why this isn't the case? Is this a bug or intentional? If intentional, why?

Thanks guys

You might have better luck asking over at the Apple Developer forums, or other online sources. Combine is a private Apple framework, and not all Apple developers hang out on this forum.

1 Like

Why do you think it is unexpected? Most likely, what You would like to have is

import Combine

let sequencePublisher = Publishers.Sequence<Range<Int>, Never>(sequence: 0..<Int.max)
let subject = PassthroughSubject<String, Never>()

let handle = subject
    .zip(sequencePublisher.print())
    //.publish
    .flatMap(maxPublishers: .max(1), { (pair)  in
        Just(pair)
    })
    .print()
    .sink { letters, digits in
        print(letters, digits)
    }

"Hello World!".map(String.init).forEach { (s) in
    subject.send(s)
}
subject.send(completion: .finished)

look how .flatMap is defined

func flatMap<T, P>(maxPublishers: Subscribers.Demand = .unlimited, _ transform: @escaping (Self.Output) -> P) -> Publishers.FlatMap<P, Self> where T == P.Output, P : Publisher, Self.Failure == P.Failure

using maxPublishers = Subscribers.Demand.max(1) is there to help.

it prints
receive subscription: (FlatMap)
request unlimited
receive subscription: (0..<9223372036854775807)
request max: (1)
receive value: (0)
receive value: (("H", 0))
H 0
request max: (1)
receive value: (1)
receive value: (("e", 1))
e 1
request max: (1)
receive value: (2)
receive value: (("l", 2))
l 2
request max: (1)
receive value: (3)
receive value: (("l", 3))
l 3
request max: (1)
receive value: (4)
receive value: (("o", 4))
o 4
request max: (1)
receive value: (5)
receive value: ((" ", 5))
5
request max: (1)
receive value: (6)
receive value: (("W", 6))
W 6
request max: (1)
receive value: (7)
receive value: (("o", 7))
o 7
request max: (1)
receive value: (8)
receive value: (("r", 8))
r 8
request max: (1)
receive value: (9)
receive value: (("l", 9))
l 9
request max: (1)
receive value: (10)
receive value: (("d", 10))
d 10
request max: (1)
receive value: (11)
receive value: (("!", 11))
! 11
request max: (1)
receive value: (12)
receive cancel
receive finished

Thanks for your reply and the elegant solution @jabb!

Since the doc string of maxPublishers is quite sparse and/or my knowledge regarding Combine still limited I don't quite get how maxPublisher = .max(1) works here.

maxPublishers
The maximum number of publishers produced by this method

Reading this I would expect that after the first tuple no new values/publishers are produced by flatMap. However, it somehow seems to request just one-at-a-time, so just the thing that I was searching for :slight_smile: Would you mind explaining me why/how that works?

Thanks in advance

try .max(10) !! It will be clear immediately :-) For further reading see https://stackoverflow.com/questions/59743938/combine-framework-serialize-async-operations/59889174#59889174

Publishers.Zip

A publisher created by applying the zip function to two upstream publishers.

Because default Subscribers.Demand == .unlimited, sink will consume all values till end of each of two zipped publishers. One value only from subject, but Int.max values from sequencePublisher.

Solution is clear now!

There is another question .. Why Apple guys named parameter of .flatMap operator maxPublishers, if .zip works with exactly two publishers. It is the source of confusion and misunderstanding how it works.

Finally try

let handle = subject
    .zip(sequencePublisher.print()).collect(5)
    //.publish
    .flatMap(maxPublishers: .max(1), { (pair)  in
        Just(pair)
    })
    .print()

handle.sink { v in//letters, digits in
        print(v)//print(letters, digits)
}

it gives you

receive subscription: (FlatMap)
request unlimited
receive subscription: (0..<9223372036854775807)
request max: (5)
receive value: (0)
receive value: (1)
receive value: (2)
receive value: (3)
receive value: (4)
receive value: ([("H", 0), ("e", 1), ("l", 2), ("l", 3), ("o", 4)])
[("H", 0), ("e", 1), ("l", 2), ("l", 3), ("o", 4)]
request max: (5)
receive value: (5)
receive value: (6)
receive value: (7)
receive value: (8)
receive value: (9)
receive value: ([(" ", 5), ("W", 6), ("o", 7), ("r", 8), ("l", 9)])
[(" ", 5), ("W", 6), ("o", 7), ("r", 8), ("l", 9)]
request max: (5)
receive value: (10)
receive value: (11)
receive value: (12)
receive value: (13)
receive value: (14)
receive cancel
receive value: ([("d", 10), ("!", 11)])
[("d", 10), ("!", 11)]
receive finished

Try another example, with a little bit "better" print formating

let subject = PassthroughSubject<Int,Never>()
let sequence = Publishers.Sequence<Range<Int>, Never>(sequence: 0 ..< 10)

subject.zip(sequence.print("sequence")).print("zip").sink { (pair) in
      print("zipped", pair)
}

subject.zip(sequence.print(" sequence")).print(" zip").flatMap(maxPublishers: .max(1)) { Just($0) }.sink { (v) in
      print(" flatMapped",v)
}

subject.send(1)

it produce the next

sequence: receive subscription: (0..<10)
zip: receive subscription: (Zip)
zip: request unlimited
sequence: request unlimited
sequence: receive value: (0)
sequence: receive value: (1)
sequence: receive value: (2)
sequence: receive value: (3)
sequence: receive value: (4)
sequence: receive value: (5)
sequence: receive value: (6)
sequence: receive value: (7)
sequence: receive value: (8)
sequence: receive value: (9)
sequence: receive finished
     sequence: receive subscription: (0..<10)
     zip: receive subscription: (Zip)
     zip: request max: (1)
     sequence: request max: (1)
     sequence: receive value: (0)
zip: receive value: ((1, 0))
zipped (1, 0)
     zip: receive value: ((1, 0))
     flatMapped (1, 0)
     zip: request max: (1)
     sequence: request max: (1)
     sequence: receive value: (1)
     zip: receive cancel
     sequence: receive cancel
zip: receive cancel
sequence: receive cancel
1 Like
Terms of Service

Privacy Policy

Cookie Policy