Best way to access mysql/mariadb database in ChannelInboundHandler? (worse than a newbie)

Sorry. I really don't know what I'm doing, and I would appreciate any and all help.

I've gotten swift-nio 2 running a server under Debian Linux, and it's really cool all this works so nice. Until...

I want to run a query on a database and return the results in a reply to the message received in ChannelInboundHandler. Using mysql-kit I get a hang and crash at .simpleQuery(query).wait(). I've hacked a workaround with a .get() instead and a loop with a sleep. Not a good thing at all.

Is it appropriate to hold up a ChannelInboundHandler for a long time?

Basically, can someone point me in the right direction (before I post the nasty mess I've created and try to fix that)? Please.

Thanks in advance!

1 Like

No, Channel handlers must not block.

Instead of using .wait, chain off the future using .whenComplete and put your logic in that closure. Alternatively, use NIOAsyncChannel and move completely into async-await land.

Thank you very much. This is the information I was needing to go further.

I'm still trying to learn, and not doing so well. So I continued to hack, and as a test wrote the following.

My question now (to anyone actually) is does this block the channel handler? (And if not how?)
And, which EventLoopGroup? I've put it in the same group as the rest, and then the .next () or is that unnecessary, or does it need to be in it's own new EventLoopGroup?

NIOAsyncChannel sounds interesting but so far I've found basically no information/examples.

final class DBaseQueryHandler: ChannelInboundHandler {
    
    typealias InboundIn = ByteBuffer
    typealias OutboundOut = ByteBuffer
    
    func channelRead (context: ChannelHandlerContext, data: NIOAny) {
        
        let buffer = unwrapInboundIn (data)
        
        let readableBytes = buffer.readableBytes
        
        let readInString = buffer.getString (at: 0, length: readableBytes)
        
        if readInString != nil {
            
            let configuration = MySQLConfiguration (
                hostname: "10.10.10.10", port: 3306,
                username: "X", password: "nevaUmind",
                database: ""
            )
            
            let pools = EventLoopGroupConnectionPool (
                source: MySQLConnectionSource (configuration: configuration),
                on: group.next ()
            )
            
            let mysql = pools.database (logger: Logger (label: "JustAHacker"))
            
            mysql.query (readInString!).whenComplete { result in
                
                defer { pools.shutdown () }
                
                let rows = try? result.get ()
                let outputString = rows!.description
                
                var bufferOut = context.channel.allocator.buffer (capacity: outputString.utf8.count)
                
                bufferOut.writeString (outputString)
                
                context.writeAndFlush (self.wrapOutboundOut (bufferOut)).map {
                    _ = context.close (mode: .output)
                }.whenSuccess {
                    context.close (promise: nil)
                }
                
            } // .whenComplete
        } // if readInString != nil
    } // func channelRead
} // final class DBaseQueryHandler

This does not seem to block the handler, but it seems to have plenty of issues.

First of all, you retrieve a network packet, convert it to a string and directly send that as a query to MySQL. That's not going to work reliably, the incoming packets may be split and not even result in a parsable UTF-8 string. So you need some framing/parsing there (others will have advise on how to do this best currently :slight_smile:) E.g. if the user sends "SELECT * FROM blue", it may arrive as multiple packets, like "SELE" and "CT * FROM blue". I.e. in separate channelRead calls.

Then I think that you wouldn't want to recreate that EventLoopGroupConnectionPool on every single incoming packet? Presumably that should be an instance variable of your query handler class. There won't be any pooling if you recreate the pool on every read.

But maybe it is OK (don't know the MySQL thing). But if so, you should probably stick to the eventloop the handler is currently running on, you can grab that from the ChannelHandlerContext: eventloop. This will avoid potentially unnecessary thread hopping (but more below).

For a database that may be running concurrent transactions non-blocking I/O isn't usually the best fit in the first place (as you may lock up, even deadlock, the database while waiting for data), but I suppose you could just use the same eventloopgroup you use for running your own handlers for the MySQL ones to get started.

Also I think you don't have to allocate the buffer that way anymore, you can just do something like let bufferOut = ByteBuffer(string: outputString).

I'm not sure you'll ever want to ignore errors (try?) or force unwrap (rows!.description). In particular while playing with things.

P.S. for Swift concurrency there is also NIO documentation, not sure it'll actually help with any of the problems, it's a layer on top of a layer: swift-nio Documentation – Swift Package Index

Thanks very much!

I wasn't sure about the data in chunks. I've seen code to gather chunks, etc. but didn't seem to need it, so hadn't started using that, and so didn't in this quick test thing. So thanks, I will be building that in.

Okay, I hadn't thought much about pools, the point of them, and thus effectively defeating them or something, so I will deal with that too.

But I'm going to have to really restudy a lot of things carefully.

Thanks for the other info. as well, and especially the link.

And now I can go on further.

This really doesn't fit the topic title any more (not in ChannelInboundHandler), but this is what I have now, and it seems to work.

My questions now are : do I need to deal with multiple packets coming in, or is this system gathering all the data first? If I need to, how?

Is there something wrong with this? Other than errors are not handled, optionals not well handled. (But of course any pointers are welcome!)

Is there a better or more proper way?

let group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)

@main
public struct TestServerTest
{
    public static func main () async
    {
        
        let server = ServerManager ( host: "127.0.0.1", port: 3010 )
        defer { pools.shutdown () }

        do {
            
            try await server.start()
            
        } catch let error {
            print("Error: \(error)")
            server.stop()
        }
        
    }  // public static func main ()
}  //  public struct MyTestServer



class ServerManager {
    
    var host: String?
    var port: Int?
    
    init(host: String, port: Int) {
        self.host = host
        self.port = port
    }
    
    func start () async throws {
        
        do {
            
            let serverChannel = try await ServerBootstrap(group: group)
                .bind ( host: host!, port: port! ) { childChannel in
                    childChannel.eventLoop.makeCompletedFuture {
                        return try NIOAsyncChannel<ByteBuffer, ByteBuffer>(
                            wrappingChannelSynchronously: childChannel
                        )
                    }
                }
            
            print ("Listening on \(String(describing: serverChannel.channel.localAddress))...")
            
            try await withThrowingDiscardingTaskGroup { group in
                try await serverChannel.executeThenClose { serverChannelInbound in
                    for try await connectionChannel in serverChannelInbound {
                        group.addTask {
                            do {
                                try await connectionChannel.executeThenClose { connectionChannelInbound, connectionChannelOutbound in
                                    for try await inboundData in connectionChannelInbound {
                                        
                                        try await connectionChannelOutbound.write (
                                            ByteBuffer (string: ServerChore (requestToHandle: String (buffer: inboundData))))
                                        try await connectionChannel.channel.close ()
                                        
                                    }
                                }
                            } catch {
                                // TODO: Handle errors
                            }
                        }
                    }
                }
            }
        } catch {
            // TODO: Handle errors
        }
    }
    
    func stop () {
        do {
            try group.syncShutdownGracefully  ()
        } catch let error {
            // TODO: Handle errors
            print ("Error shutting down \(error.localizedDescription)")
            exit (0)
        }
        print("Server has shut down.")
    }
}

let configuration = MySQLConfiguration (
    hostname: "127.0.0.1", port: 3306,
    username: "ABoyNamedSue", password: "wdntUlike2no",
    database: ""
)

let pools = EventLoopGroupConnectionPool (
    source: MySQLConnectionSource (configuration: configuration),
    on: group
)

let mysql = pools.database (logger: Logger (label: "JustAHacker"))

func ServerChore (requestToHandle: String) async -> String {
    
    do {
        
        // this will be unpack JSON data from the string here
        // then create one or more queries based on request
        
        // this is to test multiple connections open,
        //      simulate multiple simultanious requests
        //let fewSeconds:UInt64 = 60 * 3 * 1000000000
        //try await Task.sleep (nanoseconds: fewSeconds)
        
        let rows = try await mysql.query (requestToHandle).get()
        
        // this will be pack the results in JSON here
        // then convert to a string to send back
        
        return rows.debugDescription
        
    } catch {
        // TODO: Handle errors
        return "DANGER WILL ROBINSON!"
    }
    
}

You have to consider incoming data to be chunked so the request might be split across multiple different ByteBuffer or a single ByteBuffer might contain more than one request. I would recommend to implement a NIOSingleStepByteToMessageDecoder which takes care of buffering and allows you to focus on the actual protocol message decoding. You can find a good example for how this is implemented for the memcache protocol here. Additionally, you might want to implement a MessageToByteEncoder as well this is done in memcache here.

If you got both implemented you can just add them to the channel pipeline like this

try channel.pipeline.syncOperations.addHandler(MessageToByteHandler(MariaDBRequestEncoder()))
try channel.pipeline.syncOperations.addHandler(ByteToMessageHandler(MariaDBResponseDecoder()))
return try NIOAsyncChannel<MariaDBRequest, MariaDBResponse>(synchronouslyWrapping: channel)
1 Like

Excellent. Thank you so very much!

For this line

I get a warning :
warning: 'init(synchronouslyWrapping:configuration:)' is deprecated: This method has been deprecated since it defaults to deinit based resource teardown

Is there a reason to use the old label specifically? I was guessing it was just copy and paste of older code, or typing from memory, and I've been using wrappingChannelSynchronously:.

I've ran into that as well, I think the upstream docs are wrong, the new name is:
NIOAsyncChannel(wrappingChannelSynchronously: ...)

1 Like

Yeah we did a little deprecation dance here and I used the wrong init. Thanks @Helge_Hess1 for pointing to the right one!

Thanks for the replies! I just wanted to make sure.

I'm not being very successful with hacking out a solution.

If someone can explain something to me, or point out serious flaws (other than not handling errors), that would probably help me get further.

The above code works fine until a large file needs to be de-chunked.

Tracing things out with prints, I find this when adding a decoder -
I can de-chunk, but I need to use nc -q1 ... when testing,
and it throws an exception when going to write. Large or small file sent.
In the tracing I can see things are de-chunked when needed, and a proper information goes to and comes from the "business logic", and the exception is being thrown at try await connectionChannelOutbound.write. The exception is The operation could not be completed. (NIOCore.NIOAsyncWriterError error 1.).

(I'm not worried about the needing -q1 when testing with nc at a BASH prompt, guessing not a problem with a real world client, or that is easily done.)

try childChannel.pipeline.syncOperations.addHandler(ByteToMessageHandler(RequestDecoder7()))
return try NIOAsyncChannel<ByteBuffer, ByteBuffer>(wrappingChannelSynchronously: childChannel)

for try await inboundData in connectionChannelInbound {
  print("Service start:      inboundData in connectionChannelInbound top of loop, readableBytes is \(inboundData.readableBytes)")
  let response2Send = await ServerChore (requestToHandle: String (buffer: inboundData))
  print("Service start:      after ServerChore output is \(response2Send)")
  try await connectionChannelOutbound.write (NIOCore.ByteBuffer (string: response2Send))

I had tried this earlier with passing strings around as well, same results.

struct RequestDecoder7: NIOSingleStepByteToMessageDecoder {
    
    typealias InboundOut = NIOCore.ByteBuffer
    
    var fileSize: UInt64?
    
    mutating func decode(buffer: inout NIOCore.ByteBuffer) throws -> InboundOut? {
        print("Decoder decode:     start, buffer readable: \(buffer.readableBytes)")
        return nil
    }
    
    mutating func decodeLast(buffer: inout NIOCore.ByteBuffer, seenEOF: Bool) throws -> InboundOut? {
        print("Decoder decodeLast: start, buffer readable: \(buffer.readableBytes)")
        var endSizeText: Int? = nil
        let bytesView = buffer.readableBytesView
        endSizeText = bytesView.firstIndex(of: UInt8(10))
        guard endSizeText != nil else {
            print("Decoder decodeLast: newline not found apparently, so asking for more data")
            return nil
        }
        if fileSize == nil {
            print("Decoder decodeLast: fileSize is nil, trying to calc. now")
            let bytesSubView = buffer.getString (at: 0, length: endSizeText!)
            print("Decoder decodeLast: string to convert to int : " + bytesSubView!)
            let bytes = UInt64 (bytesSubView ?? "0")
            guard bytes != nil else {
                // TODO: DEAL W/ THIS
                print("Decoder decodeLast: failed to read the integer, so asking for more data")
                return nil
            }
            print("Decoder decodeLast: setting fileSize to \(bytes!), endSizeText is \(endSizeText!)")
            fileSize = bytes
        }
        print("Decoder decodeLast: fileSize is \(fileSize!), readableBytes = \(buffer.readableBytes)")
        // if have not read all the bytes that should have been sent, ask for more
        // TODO: IMPLEMENT TIME OUT
        if fileSize! != buffer.readableBytes - (endSizeText! + 1) {
            print("Decoder decodeLast: returning nil, have not read all yet")
            return nil
        }
        print("Decoder decodeLast: at \(endSizeText! + 1) for length \(buffer.readableBytes - (endSizeText! + 1)), readableBytes = \(buffer.readableBytes)")
        let outputString = buffer.getString (at: endSizeText! + 1, length: buffer.readableBytes - (endSizeText! + 1))
        print("Decoder decodeLast: returning string of length : \(outputString?.count ?? -1)")

        // clearing the buffer apparently signals done gathering data
        buffer.clear()
        return NIOCore.ByteBuffer (string: outputString ?? "")
    }
    
}  // RequestDecoder7

Any thoughts, comments, suggestions are greatly appreciated.

I didn't read this whole thread but your usage of .get* and clear is much more complicated than it needs to be and also not quite right.

If you just replace all your getString(at: ..., length: length) with readString(length: length) then it should work much better. That "reads off" the string by moving the reader index forwards. After doing so you can also just remove the buffer.clear() calls.

Example:

var buffer = ByteBuffer(string: "helloworld")
let hello = buffer.readString(length: 5)!
let world = buffer.readString(length: 5)!
print("\(hello) \(world)") // prints "hello world"

How ByteToMessageDecoder works is the following: It offers you (in buffer) all the bytes that are available from the network. You "read off" as many as you want and if there are remaining bytes, it'll call you again. So it drives the decoding loop.

For that to work however, you need to actually read the bytes off which is done by the read* methods. The get* methods merely peek at the buffer, they don't read anything off of it. I'd stay clear of get* as much as possible anyway because you need to then maintain the read cursor (or readerIndex) yourself which is error prone. Also: The first readable byte isn't at index 0 but at index buffer.readerIndex.

So

let string = buffer.readString(length: 5)

is pretty much the same as

let string = buffer.getString(at: buffer.readerIndex, length: 5)
if string != nil { 
   buffer.moveReaderIndex(forwardBy: 5)
}

In your code example you used get* which just peeks and then finally a clear() which sets both readerIndex and writerIndex to 0 which has the effect of having zero remaining bytes to read which means that ByteToMessageDecoder won't call you again. So that kinda works but it's much harder and much more error prone than just using the natural APIs which are read*.

Thanks! Either I copied and pasted, or just did that cause somewhere else I used read* but got an error/warning at that point about writing to the variable. So I will change that.

I forgot to mention that the data is being sent differently.

Without the NIOSingleStepByteToMessageDecoder it is just the string data payload itself.

With the decoder the data being sent is the length as text, followed by newline. Then the payload data itself.

Oh, that sounds very similar to JSONRPCFraming.ContentLengthHeader decoder. That decodes this format

content-length: 10\r\n
\r\n
helloworld

so pretty much the same format as yours, except for the extra content-length: and it being two newlines (and \r\n not just a plain newline). So maybe you want to get inspired by the code (it has tests too: https://github.com/apple/swift-nio-extras/blob/main/Tests/NIOExtrasTests/JSONRPCFramingContentLengthHeaderDecoderTests.swift)

That was quite a mess from a lot of strange experiments. This is simpler, and no sending the payload size. That was just a thought on part of how to handle this.

This seems like it should work :

...

try childChannel.pipeline.syncOperations.addHandler(ByteToMessageHandler(RequestDecoder9()))

return try NIOAsyncChannel<ByteBuffer, ByteBuffer>(wrappingChannelSynchronously: childChannel)

...

print("Service start:      connectionChannel.executeThenClose top of loop")
for try await inboundData in connectionChannelInbound {
                    
  print("Service start:      inboundData in connectionChannelInbound top of loop, readableBytes is \(inboundData.readableBytes)")
  
  let response2Send = await ServiceExecutor (requestToHandle: String (buffer: inboundData))
  print("Service start:      after ServiceExecutor output is \(response2Send)")
  
  try await connectionChannelOutbound.write (NIOCore.ByteBuffer (string: response2Send))

...

struct RequestDecoder9: NIOSingleStepByteToMessageDecoder {
    
    typealias InboundOut = NIOCore.ByteBuffer
    
    mutating func decode (buffer: inout NIOCore.ByteBuffer) throws -> InboundOut? {
        print("Decoder decode:     start, buffer readable: \(buffer.readableBytes)")
        return nil
    }
    
    mutating func decodeLast (buffer: inout NIOCore.ByteBuffer, seenEOF: Bool) throws -> InboundOut? {
               
        print("Decoder decodeLast: start, buffer readable: \(buffer.readableBytes)")
        let accumulatedData = buffer.readString (length: buffer.readableBytes) ?? ""
        
        print("Decoder decodeLast: returning string of length : \(accumulatedData.count)")
        return NIOCore.ByteBuffer (string: accumulatedData)
        
    }
    
}  // RequestDecoder9

With expressly going to string and back to ByteBuffer to prevent passing a pointer to the incoming buffer, which doesn't work to pass back directly (from other tests).

This is my trace results, with just a small input, nothing about chunking...

Service start:      withThrowingDiscardingTaskGroup top of loop
Service start:      serverChannel.executeThenClose top of loop
Service start:      childChannel.eventLoop.makeCompletedFuture
Decoder decode:     start, buffer readable: 17
Decoder decodeLast: start, buffer readable: 17
Decoder decodeLast: returning string of length : 17
Service start:      connectionChannel in serverChannelInbound top of loop
Service start:      group.addTask top
Service start:      connectionChannel.executeThenClose top of loop
Service start:      inboundData in connectionChannelInbound top of loop, readableBytes is 17
ServiceExecutor:    doing query : SELECT @@version;
Service start:      after ServiceExecutor output is [["@@version": "10.5.21-MariaDB-0+deb11u1"]]
Error: EXCEPTION 1 The operation could not be completed. (NIOCore.NIOAsyncWriterError error 1.)

With the exception apparently at : try await connectionChannelOutbound.write (NIOCore.ByteBuffer (string: response2Send)).

I can get it to not work in many different ways. Hopefully as I read more docs, examples, and such, I'll understand and figure it out, or stumble on the answer.

I've also tried a couple things with the output end with a MessageToByteEncoder, but that made no difference.