A case study for `reasync`

Before we get to today's work, I want to amend that proclamation that rethrows was the only change to the explorer functions: I also modified them to remove any reliance on defer. While that change was not strictly necessary, these defers ran counter to my conviction that only the strictest minimum amount of work should be done as part of a "non-local jump", and the existing code under the defers did not rise to that bar: it was merely a convenient way for me to put internal accounting and deaccounting next to each other, without any of it accounting for actual resources. While acceptable in code that did not throw anyway, this was no longer the case.

Today we will see how to take advantage of our new support for interruption. Indeed, so far our behaviour was almost indistinguishable from just allowing the process to be terminated upon SIGINT; by handling the signal ourselves, we can keep the process around for further tasks (e.g. another computation) and/or extract some data as part of handling the signal. Today we will do the latter by showing on this event the state we have reached at the time the processing was interrupted.

Given that processing happens in parallel, such a state is not trivial. In fact, we are not going to report on the full state, but a characteristic part which is "the state before which all existing results have been reported; after that state, they may have been (owing to ongoing tasks), probably weren't, but no guarantee either way". In order to materialise that state, we will dedicate a task:

Task {
    let sequentialBatches = AsyncThrowingStream(Task<String, any Error>.self) { batchesSequencer in
        /* Omitted the bulk of the code */
    }
            
    do {
        for try await batch in sequentialBatches {
            try await progressRecorder.record(batch.value);
        }
        await resultMediator.finish()
    } catch {
        /* Note that in case of an interrupt, the continuation was already finished by throwing,
         but that's OK, because it's the job of resultMediator to debounce this kind of situation.
         */
        await resultMediator.raise(error: error);
    }
}

This task pulls double duty: it takes over the role of making sure all results have been accounted for before finishing the stream, but it does so by sequentially waiting for the next subtask in strict creation order to complete, recording its output when it does so. The subtasks were modified to yield an output which is a dump of the state at their completion. That way, we know we have explored any earlier possibility, earlier in the sense of the sequential algorithm shown in the first post, since not only that task but also all those that preceded it in this algorithm are known to have reported any potential result. The progressRecorder actor is completely trivial:

@available(macOS 12.0, iOS 15.0, *) actor Recorder<Inner> {
    var stored: Inner
    
    init(_ val: Inner) {
        stored = val;
    }
    
    func record(_ val: Inner) async {
        stored = val;
    }
}

For better logic hygiene, I modified the RaiseMediator to throw instead of just dropping results yielded after interruption: given how RaiseMediator currently works (sampling first the progressRecorder, then only creating the error) I don't believe it makes a difference, but I don't want to rely on such an assumption that could become too easily invalidated by accident: there could be more sources of interruptions in the future; by throwing I make sure that a Task or TaskGroup that is in charge of accounting for results ends up being transpierced by a thrown error in case even just one result was not reported because of interruption, so that they themselves do not report success (to do so, I made sure to add try await innerResultsScopingGroup.waitForAll() at the end of the TaskGroup).

func yield(_ result: String) async throws {
    guard let c = continuation else {
        throw error ?? LateYieldError();
    }
    c.yield(result);
}

But on the whole, there is not much else to it (except of course the code to dump the state, omitted for brevity). In particular, I still do not try and make the error be thrown out of a task that was waiting for something else: as you can see, the new task does not react directly to the interruption. Instead, it will end up terminating as a side effect of the stream finishing soon afterwards, much in the same way that the now superseded resultsScopingGroup would no longer get new tasks added to it, in the previous version of the code.

((1 + (75 * (6 + 1))) * (10 + 7) / 2)
(1 + (10 * ((6 * (75 + 1)) - (2 + 7))))
^C|_a: -1)+7)_b: +2)+75)_c: +
Available primitives: []
Skipped operands for the current operation: [10]
Skipped primitives for the current floor: [6, 1]
1 Like