MQTT NIO - First alpha of v3 is out!

@adam-fowler and I are happy to announce that the first alpha version of MQTT NIO v3 is out!

It's a complete rewrite of the library, based around structured concurrency and following best practices of modern state-of-the-art Swift NIO libraries.

What is MQTT?

MQTT (Message Queuing Telemetry Transport) is a lightweight messaging protocol that was developed by IBM and first released in 1999.
It uses the pub/sub pattern and translates messages between devices, servers, and applications.
It is commonly used in Internet of Things (IoT) technologies.

MQTT NIO v3

MQTT NIO is a Swift NIO based implementation of a MQTT v3.1.1 and v5.0 client.
Here are some of the key changes in MQTT NIO v3:

Connection

To connect to an MQTT server you now use MQTTConnection.withConnection, a "with-style" function that manages the lifecycle of the connection.
You provide a closure that has access to an MQTTConnection object that you can use to publish and subscribe to topics.

try await MQTTConnection.withConnection(
    address: .hostname("mqtt.eclipse.org"),
    identifier: "My Client",
    logger: Logger(...)
) { connection in
    // You are now connected to the MQTT broker
    // The connection will be active only inside this closure
    try await connection.publish(
        to: "topic/example",
        payload: ByteBuffer(string: "Hello, World!"),
        qos: .atMostOnce
    )
}

The sending of CONNECT and DISCONNECT packets is automatically managed by the withConnection function before and after your closure executes.

Subscription

You can subscribe to topics with the MQTTConnection.subscribe method, another "with-style" function that manages the lifecycle of the subscription, sending SUBSCRIBE and UNSUBSCRIBE packets before and after your closure's execution.
The closure has access to an AsyncSequence of incoming messages, automatically filtered according to the topic filter provided to the subscribe function.

try await connection.subscribe(
    to: [.init(topicFilter: "topic/#", qos: .atMostOnce)]
) { subscription in
    for try await message in subscription {
        print(String(buffer: message.payload))
    }
}

Session

MQTT sessions allow clients to maintain state across multiple connections to the broker.
Sessions in MQTT NIO v3 are represented by the MQTTSession type.
It is an object that is shared across multiple connections and can be used to maintain state, such as pending QoS 1 and 2 messages and subscriptions.

Create an MQTTSession with a unique client ID and pass it to the MQTTConnection.withConnection function.

let session = MQTTSession(clientID: "My Client", logger: logger)

try await MQTTConnection.withConnection(
    address: .hostname("broker.hivemq.com"),
    session: session,
    logger: logger
) { connection, sessionPresent in
    if sessionPresent {
        print("The server has an existing session for this client ID")
    } else {
        print("The server does not have an existing session for this client ID")
    }
}

Upon connection, if there are pending QoS 2 acknowledgements that need to be sent to the broker, they will be sent automatically by MQTT NIO.

You can also open subscriptions via MQTTSession.subscribe, which will keep the subscription active across multiple connections as long as the session is not expired on the server.
When there's an active connection that uses the session, the subscription will receive messages as normal.

let session = MQTTSession(clientID: "My Client", logger: logger)

await withThrowingTaskGroup { group in
    group.addTask {
        try await session.subscribe(
            to: [.init(topicFilter: "topic/#", qos: .atLeastOnce)]
        ) { subscription in
            for try await message in subscription {
                print(String(buffer: message.payload))
            }
        }
    }

    group.addTask {
        try await MQTTConnection.withConnection(
            address: .hostname("broker.hivemq.com"),
            session: session,
            logger: logger
        ) { connection, _ in
            // The subscription will receive messages as normal while this connection is active.
            // When this connection is closed, the subscription will remain active
            // and will receive messages when a new connection is opened with the same session.
        }
    }
}

Other Features

There are many more features offered by MQTT NIO v3, to name a few:

  • MQTT v5.0 support
  • Encrypted connections via TLS
  • WebSocket connections
  • Connections using Apple's Network.framework for iOS via NIO Transport Services
  • Unix domain sockets

and much more!

Check it out

Take a look at the first alpha release and the documentation to learn more.

MQTT NIO v3 is still in development. Try the first alpha and please send us feedback via GitHub issues, all contributions are welcome!

16 Likes