Bridging SwiftNio and Async/Await

I am experiencing an interesting issue. I have a WebSocket Library that I am converting to async/await from NIO. Whenever I try to bridge the NIO inbound stream to a/a my web socket frames start to become unordered. I experienced this on channelRead() and up into my client when I receive the string data. Here are a couple of examples where I have the problem,

    internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        let promise = context.eventLoop.makePromise(of: Void.self)
        let frame = self.unwrapInboundIn(data)
//IF I READ FRAME HERE ALL OF THE MESSAGES ARE IN ORDER
        promise.completeWithTask {
//IF I READ THE FRAME HERE WE ARE OUT OF ORDER
            await self.webSocket?.read(incoming: frame)
        }
    }

    internal func didReceive(events: WebSocketEvents) {
        switch events {
        case .text(let text):
//GOOD ORDER
            print("RECEIVED___", text)
            Task {
//BAD ORDER
                print("RECEIVED___2", text)
            }
        }
    }

Any thoughts or suggestions would be greatly appreciate.

Thanks!

Yes, this is expected. In Swift Concurrency you have no guarantee that two Tasks are run in order.

It seems like you'll want an AsyncStream/AsyncSequence here to do your job. CC @fabianfett who has implemented this for AsyncHTTPClient.

1 Like

Hey Thanks @johannesweiss . I think @fabianfett pointed me down the AsyncSequence route. I thought I was on the right path, but I think I was making a ton of mistakes. When I completeWithTask and call read() on my A/A WS Library I then created an AsyncSequence, but it still seemed to give me this issue. Here is the gist of what I am doing.

    internal func streamResults(_ frame: WebSocketFrame) async {
        let sequences = WebSocketFrameSequence(frame: frame)
        switch frame.opcode {
        case .pong:
          break
        case .text:
            let iterator = sequences.makeAsyncIterator()
            var data = iterator.frame.unmaskedData
                guard let text = data.readString(length: data.readableBytes) else { return }
                try? await self.onTextCallback(self, OnTextSequence(text: text))
        case .binary:
          break
        case .continuation:
          break
        case .connectionClose:
          break
        case .ping:
         break
        default:
            break
        }
    }

internal struct WebSocketFrameSequence: AsyncSequence {
    typealias Element = ByteBuffer
    
    let frame: WebSocketFrame
    
    /// - Parameters:
    /// - frame: WebSocketFrame
    init(frame: WebSocketFrame) {
        self.frame = frame
    }

    func makeAsyncIterator() -> WebSocketFrameIterator {
        return WebSocketFrameIterator(frame: frame)
    }
}


internal struct WebSocketFrameIterator: AsyncIteratorProtocol {

    typealias Element = ByteBuffer

    let frame: WebSocketFrame
    var lastFrame: WebSocketFrame?

    /// - Parameters:
    /// - type: WebSocketOpcode
    init(frame: WebSocketFrame) {
        self.frame = frame
    }

    mutating func next() async throws -> ByteBuffer? {
        guard frame != lastFrame else { return nil }
        lastFrame = frame
        return frame.unmaskedData
    }
}

I know its not perfect, but I am trying.

Any suggestions?

From the perspective of AsyncSequence what you have written is pretty darned close.

Couple of thoughts -

I wonder if it would be possible to make the readableBytes actually an async function for when bytes are available? So for example if there is some sort of callback that could be made? If it could be done with that shape of a callback to be invoked on the next packet of bytes being read then you can use withCheckedContinuation (or perhaps the throwing variant because well errors are totally a thing when speaking of networking) to convert that callback into an async function returning the bytes.

One gotcha that likely you need to consider is the lifetime of those bytes - e.g. if it is a returned byte buffer that buffer needs to have some sort of managed lifetime, which means that there needs to either a) be a borrow semantic of those bytes to a "free when done" aspect, or b) a copy semantic to proactively copy the read byte buffer.

The bonus round -

Once you get those things addressed the next consideration is cancellation. Likely it would be interesting to tie in task cancellation of the task iterating the AsyncSequence such that you can cancel the socket somehow. Should that close the socket? Not sure, maybe?

The problem in lies with using async await within SwiftNIO. NIO offers the bridge of course for linking a promise to a/a, but as soon as the data gets handed to AsyncSequence it loses order. Perhaps I could create a callback to pass the data async in order and then feed it to an AsyncStream, but I would like to avoid the call back entirely. Which seams reasonable to me. I did adjust my AsyncSequence in the above code to make it much cleaner.

Is @fabianfett keeping track of order here on the IteratorStream? Or Perhaps I need to create a JobQueue of sorts?

async-http-client/HTTPClientResponse.swift at d372bdc213384aa07a0ecc276a6ccf3b4a9de7b8 · swift-server/async-http-client · GitHub

It seems the fix was to create a synchronous job queue at channelRead() in NIO. We then can pass 1 ordered frame at a time to AsyncSequence. Thanks guys for your input. Always appreciated.

So I think I am on the right path according to your suggestion. Do you mind taking a look?

    var socketFrame: (Result<WebSocketFrame, Error>) -> ()
    func asyncFrame(_ completion: @escaping (Result<WebSocketFrame, Error>) ->()) {
        socketFrame = completion
    }
    func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        let myFrame = self.unwrapInboundIn(data)
        Task {
            socketFrame(Result { myFrame })
            let frame = try await asyncFrameBridge()
            for try await frame in WebSocketFrameSequence(frame: frame) {
                await self.webSocket?.handleStream(frame)
            }
        }
}
    func asyncFrameBridge() async throws -> WebSocketFrame {
        return try await withCheckedThrowingContinuation { continuation in
            asyncFrame { result in
                switch result {
                case .success(let frame):
                    continuation.resume(returning: frame)
                case .failure(let error):
                    continuation.resume(throwing: error)
                }
            }
        }
    }```

so that code seems to capture the closure multiple times because it seems recursive. That is leading to the continuation to be executed more than once which CheckedContinuation is tripping over (as expected).

You could add a semaphore:

Probably not a very good solution - but a solution :)

I would actually claim that using a semaphore is more than just a sub-par solution. Firstly it destroys any potential of good behavior of quality of service (and consequently power), and secondly it may result in blocked queues that starve off the concurrency queues (which would result in at best a thread explosion potential, and at worst a starvation/deadlock because of blocked queues).

Semaphores are often a thing to avoid except for in VERY specific/limited cases, with swift concurrency even more so.

If you need more than one event and order does not matter, the right swift concurrency solution will fall into the space of TaskGroup. If you need more than one event and order does matter then the solution will fall into the space of AsyncSequence (and in this particular case I would guess AsyncStream)

I agree that using a semaphore negates all the benefits that Swift's new concurrency features offer. The difficulty I see with using AsyncStream is that it only provides a handle to its continuation in the build closure of its init. That is not a problem when using AsyncStream as a callback wrapper (and I think the api is very nice in this case) - but it is a problem when trying to enqueue values in the stream from outside the build closure. Correct me if I am wrong @Philippe_Hausler but I think this is a race condition:

var continuationHandle: AsyncStream<Int>.Continuation?
let stream = AsyncStream<Int> { continuation in
    continuationHandle = continuation
}
continuationHandle?.yield(1) // is continuationHandle still nil or not?

That pattern is supported and the implementation accounts for that style of access. It isn't per se ergonomic all the time to do that and perhaps infers there is some missing shape somewhere when you have to use it like that. My guess is if you pushed the async meniscus further down the stack in NIO that difficulty would start to disappear eventually.

1 Like

Yeah, and this is important: we want to provide a standard async stream transformation in NIO. We just haven't gotten around to it yet.

4 Likes

It would probably be useful to add another specialized AsyncSequence to sdtlib, something like AsyncSubject. Or as an alternative the possibility to create a continuation manually and passing it to the AsyncStream on init.

2 Likes

As it turns out the framework consuming the Websocket was what was causing dis-order. An Async Await method was suspending as it was trying to decode a multipart text frame which messed everything up. Something like this works on the NIO WebSocket for anyone looking for a simple solution bridging NIO and A/A.


    internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        let frame = self.unwrapInboundIn(data)
        consumer.feedConsumer([frame])
        Task {
            await processFrame()
        }
    }
    
    func processFrame() async {

            func getNextFrame() async throws -> SequenceResult? {
                return try await iterator.next()
            }

            let res = try? await getNextFrame()
            switch res {
            case .success(let frame):
                await webSocket?.handleStream(frame)
                bufferState = .done
            case .retry:
           await processFrame()
            default:
                break
            }
        }

internal struct WebSocketFrameSequence: AsyncSequence {
    
    typealias Element = SequenceResult
    
    let consumer: Consumer
    
    init(consumer: Consumer) {
        self.consumer = consumer
    }

    func makeAsyncIterator() -> Iterator {
        return WebSocketFrameSequence.Iterator(consumer: consumer)
    }

}

extension WebSocketFrameSequence {
    struct Iterator: AsyncIteratorProtocol {

        typealias Element = SequenceResult
        
        let consumer: Consumer
        
        init(consumer: Consumer) {
            self.consumer = consumer
        }
        
        mutating func next() async throws -> SequenceResult? {
        let result = consumer.next()
            var res: SequenceResult?

            switch result {
            case .ready(let buffer):
                res = .success(buffer!.0)
            case .preparing:
                res = .retry
            }
           
            return res
        }
    }
}

enum SequenceResult {
    case success(WebSocketFrame), retry
}

enum BufferState {
    case done, waiting
}

enum NextResult {
    case ready((WebSocketFrame, Bool)?), preparing
}

var bufferState = BufferState.done
var nextResult = NextResult.preparing

class Consumer {
    
    var wb = WebSocketFrameBuffer(CircularBuffer<WebSocketFrame>())
    
    init() {}
    
    func feedConsumer(_ frame: [WebSocketFrame]) {
        wb.append(contentsOf: frame)
    }

    func next() -> NextResult {
            switch bufferState {
            case .done:
                bufferState = .waiting
                let frame = wb.removeFirst()
                if frame.1 == false {
                    return .preparing
                } else {
                    return .ready(frame)
                }
            case .waiting:
                return .preparing
            }
        }
    }
1 Like

So a useful note here is that I think that creating unstrutured Tasks from NIO is almost always a code smell. Unstructured tasks are to be avoided at all costs. When you find yourself doing this you almost certainly wanted an AsyncSequence instead.

2 Likes

Can you expand on that? Task.detached and AsyncSequence aren't really generally equivalent.

Sure. I'll start at the specifics and then talk about generalities. But first, a clarifying note: when I say "unstructured Tasks" I don't just mean those created by Task.detached, I also mean those created by Task.init.

The specific pattern used above is:

    internal func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        let frame = self.unwrapInboundIn(data)
        consumer.feedConsumer([frame])
        Task {
            await processFrame()
        }
    }

This is a mistake. channelRead is what I'd call an "unstructured stream": we're going to call this function repeatedly, in order, with the elements of the stream. It's really common, in Swift, to implement what is essentially an "ad-hoc stream" by using unstructured tasks. This implementation is slightly better than the naive one, in that it retains ordering, but it still spawns one Task per stream chunk.

I think this is an anti-pattern. In Swift concurrency we have a native abstraction for streams, AsyncSequence. This abstraction behaves better than unstructured Tasks for several reasons. The most obvious is that it retains ordering. The more important ones are that unstructured Tasks do not natively exert backpressure, and that they are "unowned". Both of these are really important.

On the backpressure front, one of the great opportunities of Swift concurrency is that we can hook AsyncSequences up to our backpressure mechanisms. This gives us the opportunity to let developers support backpressure without having to actually think about it, which is a really powerful feature.

But in reality I think the most important thing about unstructured Tasks is that they (usually) have no owner, and this is also my wider objection to unstructured Tasks and why I think they are generally an anti-pattern. This lack of ownership makes them unmanageable: they are uncancellable, unmonitorable, and unordered. It is my view that it is extremely rare that this is something you actually want. Do you really want a piece of work to happen unconditionally but not care when it happens or in what order?

In my view, the overwhelming majority of Tasks in a program should be structured (i.e. created via TaskGroups or async let). This makes it easier to understand cancellation and lifecycle. If we use structured concurrency then we can use with blocks for deterministic cleanup, cancellation should be able to work as expected, and errors always propagate sensibly. If we don't, errors get lost, tasks are uncancellable, and work can pile up in your system.

A wrinkle here is that there are only two mechanisms of communicating between tasks: either your task returns a single value, in which case you communicate using TaskGroup or async let, or it produces a stream of values, in which case you use AsyncSequence.

Importantly, sometimes you need to communicate between separate Task hierarchies. This always needs to get done by way of AsyncSequence, because it's a "shared" ownership primitive. Tasks have only one parent, but AsyncSequences have two, the producer and the consumer. This allows bidirectional communication of interest: if the producer is cancelled or errors, the consumer will find out about it, and vice versa.

So my thesis is: Tasks should (almost) always be structured, and cross-task communication should happen through AsyncSequence.

14 Likes