I'm trying to bridge Combine and Concurrency.
Everything is good except that, Task always starts lazy. So I my implementation won't work until the actual Task starts.
let subject = PassthroughSubject<Int,Never>()
let cancellable = subject.mapTask{ value in
/**async task**/
await Task.yield()
return value
}.sink{ value in
// not called because Task is not started yet
print(value)
}
subject.send(0)
let cancellable2 =(0..<100).publisher.mapTask{ value in
/**async task**/
await Task.yield()
return value
}.sink{ value in
// This works as expected
print(value)
}
Yet this puzzles me a bit — it won’t start anyway unless the job is scheduled for execution, either it being Task, or any other asynchronous work (say you put there a dispatch queue), which is just the way it should’ve work, isn’t it?
Sorry, but This is suspending the Task.
What I want is to run Task in undispatched way(create new Task and run in current Thread until actual suspend occur)
It seems to me (correct me if I'm wrong) that what you want is a sequential execution. A "task" starts and suspends execution of the task caller thread until the task ends. Why do you need a task here? Isn't your "task" just a block of code that doesn't need to be executed asynchronously?
Everything in concurrency runs as part of some executor. And what you want here to achieve desired behavior is to avoid a hop to another executor, which all of the proposals allow you to do in different ways. Custom task executors a the closest match, probably, for your needs.
Suggestion by @hassila seems reasonable before Swift 6, not exactly the behavior you are trying to achieve, but should be close enough.
// this is inside sync context
let task = startTaskUndispatched{
/// This is same Theread as caller Thread
print("1")
/// suspension point, return the control to the caller thread
await doAsyncJob()
/// This is called after `startTaskUndispatched` is returned and possibly different Thread of caller Thread
}
print("2")
// print 1, 2 not 2, 1
So Task run partially in place.
This could be help ful for like below case
let subject = PassthroughSubject<Int,Never>()
let (stream, continuation) = AsyncStream<Int,Never>.makeStream()
let (demandSource, demandContinuation) = AsyncStream<Subscribers.Demand,Never>.makeStream()
/**use demandContinuation to request demand in Combine world**/
subject.subscribe(
AnySubscriber(
receiveSubscription: { subscription in
/**store subscription for later use */
},
receiveValue: {
continuation.yield($0)
return .none
},
receiveCompletion: {_ in
continuation.finish()
}
)
)
Task {
let subscription = /** fetch subscription from above**/
var iterator = stream.makeIterator()
for await var pending in demandSource {
while pending > .none {
pending -= 1
subscription.request(.max(1))
if let value = iterator.next() {
/**do async job */
pending += subsriber.receive(value)
} else {
subsriber.receive(completion: .finished)
}
}
}
}
/// I have to use `Thread.sleep` to wait until Task is started do receive the events
subject.send(0)
subject.send(1)
subject.send(.finished)
This is a code I just make for understanding, actual detail code is MapTask
I'm not very familiar with tasks, but my understanding is as follows:
This is a task thread, a parallel thread to the thread where you created the task.
Control does not return to the caller thread, its just your parallel task thread suspended until doAsyncJob() finishes. Task caller thread continues execution, as is did immediately after you created the task.
First you say execution is done in the caller thread:
/// This is same Theread as caller Thread
Then you say execution is returned to the caller thread (from where? if it was already caller thread):
/// suspension point, return the control to the caller thread
That is, it is always the caller thread that executes your code. Just remove the keyword task from your code, and it will indeed always be the caller thread, and it will print 1, 2 not 2, 1
To give it more thoughts, I’d avoid using tasks in a hidden way that way. Tasks are an unstructured part of the concurrency, and you want to have a more structured way to deal with this. Combine isn’t well suited for concurrency in general, as it has a bit different design in mind and has no interop with it for a long time so far, I suppose it has been made due to conflict between ideas presented in it with new concurrency. So I think even more preferable path is to avoid concurrency till sink and then wrap it in a task explicitly.
Consider this: Combine has subscribe(on:) and receive(on:) methods, that set preferred execution queue upward or downward stream of events. Using Task along the way breaks this flow, unless you have a way to pass a queue as a task executor. Therefore, unless Combine provides such capabilities, your solution is more likely to violate this contract in an implicit way.
To give it more thoughts, I’d avoid using tasks in a hidden way that way. Tasks are an unstructured part of the concurrency, and you want to have a more structured way to deal with this. Combine isn’t well suited for concurrency in general, as it has a bit different design in mind and has no interop with it for a long time so far, I suppose it has been made due to conflict between ideas presented in it with new concurrency. So I think even more preferable path is to avoid concurrency till sink and then wrap it in a task explicitly.
Generally you're thought is more reasonable.
But that way, it's hard to cancel each Tasks, when cancellation occur, each event creates unstructured Task.
I wanted to create single unstructured Task to pipeline the async operation.
Consider this: Combine has subscribe(on:) and receive(on:) methods, that set preferred execution queue upward or downward stream of events. Using Task along the way breaks this flow, unless you have a way to pass a queue as a task executor. Therefore, unless Combine provides such capabilities, your solution is more likely to violate this contract in an implicit way.
Thanks for reminding. but Combine already has FlatMap which can break the executor preference.
You’re asking for async code to be executed inline blocking the calling thread. We’ve specifically not provided such feature as it can cause all kinds of problems with blocking on asynchronous work from a synchronous context — which you factually have no idea if it may actually be running on the concurrent global pool already, so you may end up burning a whole queue/thread from the pool (which does NOT grow), and end up deadlocking the whole pool by all available threads blocking like this…
It is unlikely we’d accept such api for those reasons.
At the same time though there are legit reasons for such APIs when you definitely know you are NOT on the shared pool… it came up a lot during integrating with other concurrency runtimes etc. But just using it arbitrary like this in any code/thread is not something we’d want probably.
What I want to use, is not running full task in sync. Partially run task on the calling thread. Only for the part that can run immediately.
In that case, I think it won't cause resource problem that badly.
The only difference is that it runs synchronously until it reach the first valid suspension point, than suspend the task, switch to the Concurrency default thread pool.
For example startTaskUndispatched is imaginary method that take escaping async lambda. block must be sendable and nonisolated and can not have executor preference or restriction. It starts in undispatched manner until it hit the first suspension point.
code above printing 1 3 2. and print("1") and print("3") is called the caller thread, however print("2") is called from Concurrency default thread pool. So when the task block reach Task.sleep the task suspend and startTaskUndispatched return the task object.
Code below will print 1 2 3, cause withUnsafeContinuation is already resolved.
(correct me if I'm wrong but at least, 1 2 is printed in order already in current Concurrency, so I hope it won't be a big problem)
I get your intention. But in this case, what you are asking for could be probably done by continuing the work until the suspension point not in the unstructured task but within the existing synchronous context.
let task: Task<Void,Error> = Task.startTaskUndispatched {
try await Task.sleep(nanoseconds: 1_000)
print("2")
}
// continue the work here not in the `Task` as you don't need it yet (there is no asynchronous work needed at this point)
print("1")
print("3")
But that can't handle the case like print("2") depends on print("1")
something like below code, can not operate in current design.
let subject = PassthroughSubject<String,Never>()
let task: Task<Void,Error> = Task.startTaskUndispatched {
for await message in subject.values {
}
}
// it's okay cause subscribe occur before sending event
subject.send("FOO")
let subject = PassthroughSubject<String,Never>()
let task: Task<Void,Error> = Task {
for await message in subject.values {
}
}
// not okay cause subscribing is not happened yet
subject.send("FOO")
yeah, probably something like this code do work, except it needs blocking.
let subject = PassthroughSubject<String,Never>()
let semaphore = DispatchSemaphore()
let task: Task<Void,Error> = Task {
let publisher = subject.handleEvents(receiveSubscription: { _ in
// signal after current event handle is finished, to ensure task is in suspended state
DispatchQueue.global().async { semaphore.signal() }
})
for await message in publisher.values {
}
}
semaphore.wait()
subject.send("FOO")
Which is why I'm asking for such feature. kotlin's Coroutine do support this behavior.
By the way if it is marked as not planned, I would like to know the reason.