RTSP Client EventLoopFuture Return Value

I am currently thinking/working my way through making a NIO based RTSP Client. If you did not already know RTSP is modeled after HTTP; so a lot of the same patterns used there can be carried over.

I am trying to write client to do some basic stuff. Right now I want to be able to do a simple OPTIONS call and get the data back.

let group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
defer {
    try! group.syncShutdownGracefully()

let connection = try! RTSPConnection
        to: .makeAddressResolvingHost("", port: 554),
        on: group.next()

let options = try connection.options().wait()

// This will never unblock as we don't close the channel
try connection.wait()

The signature of RTSPConnection.options() looks like this func options() -> EventLoopFuture<String>. At least for now, all I am trying to do is get the raw output from the server and return it as a EventLoopFuture.

The try connection.options().wait() is sending (with proper CRLF)

OPTIONS rtsp:// RTSP/1.0
CSeq: 0

The server is responding (with proper CRLF):

RTSP/1.0 200 OK
CSeq: 0
Date: Tue, Jul 06 2021 21:27:11 GMT

So far so good.

Now the question, what is a "best practice" way to get that server response into the EventLoopFuture<String> returning from the options() method? I've looked at async-http-client and postgres-nio for inspiration and they seem to each do it differently.

I am inclined to believe that I should be making a ChannelHandler, specifically one that conforms to RemovableChannelHandler, and registering it to look for the corresponding CSeq in the response and removing the handler once seen. Is this right or is there a better way?

This is an entirely reasonable design, sounds good.

1 Like

Thank you. Following on, does something like this make sense?

final class Task: RemovableChannelHandler, ChannelInboundHandler {
    typealias InboundIn = ByteBuffer
    typealias InboundOut = Never

    let promise: EventLoopPromise<String>
    let sequence: Int


    func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        // Get the ByteBuffer
        let data = self.unwrapInboundIn(data)
        // Convert it to String
        if let string = data.getString(at: data.readerIndex, length: data.readableBytes) {
            // TODO: Check to see if it matches `sequence`
            // Remove Task from list

Note I am not really asking about the logic, per se. I am asking about the typealias InboundOut = Never and the context.pipeline.removeHandler(self) and if they are going to be doing what I expect them to do.

Since this thing is never expected to actually output is saying InboundOut = Never ok? Or did I break an implicit contract that I am unaware of?

Also, removeHandler returns an EventLoopFuture and I do nothing with it. It makes me think I've done something wrong here but I am not entirely sure if I have to wait for that to finish before leaving the channelRead.

This is absolutely fine.

In general handler removal may be asynchronous: ChannelHandler implementations can delay their removal until they have sorted out their state. As you're removing yourself, you always know when the actual removal takes place, so you don't have to worry about it and can safely disregard the future.

One note: I recommend removing yourself first and succeeding the promise second. That way you know that when the future callbacks are executing your handler is not in the pipeline anymore.

I was going to try and use the HTTPDecoder to try and reduce the parsing code. I never realized that the HTTP parser was actually written in C and is the NodeJS one.

Have to admit I did not see that coming.

I'll probably make mine based on LineBasedFrameDecoder.

Should I be calling fireChannelReadComplete() since this represents the last thing that will get input?

In general, yes.

Terms of Service

Privacy Policy

Cookie Policy