After reading through some posts in this thread, I noticed that there seems to be interest in an operator that provides the ability to perform side effects as elements are generated, but still be able to perform further transformations after (i.e. in the linked post, there is mention of being able to insert forEach in the middle of a function chain).
Analogously, Combine provides the handleEvents operator and RxSwift provides the do operator. I personally like these because they explicitly mark the intent to mutate outside state, so you can avoid performing such mutations in maps and whatnot.
A common practical example of this is for side effects in business logic view models - often, these objects own some sort of AsyncSequence (a QuakeMonitor wrapper, perhaps) and when a new element is generated, they need to perform a side effect using the element's raw data (e.g. firing an analytic) before transforming the sequence into one of simpler value-types (Strings, Bools, etc.) that they expose to drive the state of different UI elements.
I noticed there was also pushback to this idea, though, and I feel like this kind of functionality touches on the overall philosophy of what this package is supposed to be (I'm still trying to figure out how the whole "streams of values over time" paradigm fits here), so I thought I'd raise the issue.
Certainly seems like most of the pushback was due to it being called forEach. Finding another name would probably be useful. I like onElement, but there are a lot of colors to paint that shed.
The Combine function handleEvents is quite honestly a tool that is frought with issues. For debugging it is quite valuable but from a safety standpoint it is an easy way to get yourself into deadlocks or other unsafe conditions.
In Combine it is handling events on a scheduler that is unknown, so that poses a couple of problems - namely of which that accessing any external state needs additional synchronization and the potentials for handling cancel means that it can cause cancellation on itself easily due to the common use case of AnyCancellable.
AsyncSequence mostly does not suffer this problem in the regards that it is cancelled by the task (which is handled more cooperatively than Combine's immediate signal, as well as not having a concept of AnyCancellable). Additionally the closure should be @Sendable which should prevent any non-synchronized access. However the concept still allows for potentials that the value is self interacting.
I would say that if we had a way to indicate a closure should NEVER have captured state then it would be safe. However that seems antithetical to the ask being posed.
In short it is a very gnarly escape hatch that folks might cut themselves on.
Don't get me wrong, it can be done - but my question is more so: is there a better tool for the job, can we solve the problem of needing the side effects to be encapsulated better than what Rx or Combine did?
Yes, in RxSwift such pitfalls are possible. They are solved using:
different overloads of do() operator: do(onNext:) & do(afterNext:).
share() operator
explicit scheduler
Rx code is synchronous by default, which leads to some non-intuitive effects. I think making do(onNext:) & do(afterNext:) closures async and sendable is reasonable. Though I'm not sure is their synchronous analog is reasonable too.
The rub of course is the capturing of the terminal events, e.g. any thrown error. The problem with that is we don't have a way to determine what the failure type is (either Never or Error are beyond the linguistic grasp due to the lack of generic effects).
I know functional purity is a hard goal to achieve, but doing something more than just observing values as an inline side effect I have a feeling would require something a bit more in-depth around that concept. The key issue here is making sure there is no way to back flow execution into the iterator. Whatever is done, we need to make sure that we can't accedentally mutate the iterator below this thing out from under the consumer of that iterator. Combine suffered from that because you could technically cancel things inside the event handler (which leads to some really gnarly bugs).
I'm trying to understand the "back flow" issues you're describing - what would that look like? Something like calling cancel on a Task handle within the side effect closure? Or would it be more like issues around calling makeAsyncIterator on a sequence and using that within the side effect?
I'm wondering if adding more constraints - e.g. requiring the side effect to be synchronous, or requiring that Element is also Sendable, would help here.
In the meantime, I've stolen Jon's naming scheme and naively implemented onElement, essentially doing what your code snippet above does. Just in case anyone wants to play around with it and break things
Usage:
let seq = [1, 2, 3].async
.onElement { print($0) }
.map { $0 * $0 }
for await num in seq {
print(num)
}
// Prints 1, 1, 2, 4, 3, 9
So the things I would worry about are:
Can you from the onElement cancel the task out from under the iteration below it? in your example is there a way to cancel the iteration that map is doing inside the closure from onElement? If so, is that safe? I am not sure...
The other case is tracking the end of iteration (either nil or throw)... that is considerably more difficult than my example as listed. If that can be done (which I think it technically could be), what are the risk to the state of the other iterations? Does it open more holes for cancellation issues?
Making the closure require @Sendable is a good start because anything that is happening there will be on an unknown task. So the requirement of @Sendable will ensure the closure adheres to proper isolation w.r.t. tasks. Which that fact may make the case that it is thread safe (unlike Combine's handleEvent callouts w/o using some sort of lock). That would also require the Element itself to be Sendable.
I am just being cautious with this to make sure we cover our bases on it.