[Discussion] NIORedis: NIO-based Redis Driver

NIO-based Redis Driver

Package Description

Non-blocking Swift driver for Redis built on SwiftNIO.

Package name nio-redis
Proposed Maturity Level Incubating
License Apache 2
Dependencies SwiftNIO 2.x, Server Logging API 0.1.x

Introduction

NIORedis is a module providing general implementations for connecting to Redis and executing
commands against an instance using its proprietary Redis Seralization Protocol (RESP).

These types are designed to work in a request / response loop, representing individual connections to Redis.

The goal of this library is to provide a semi-low level API to work with Redis, while still feeling like a normal Swift package that can be used as-is.

Motivation

Implementations of Swift Redis clients have been around for as long as Swift has, but most have been abandoned, rely on Objective-C runtimes, or use C libraries.

All of the currently maintained libraries either have framework specific dependencies, are not built with NIO, or do not provide enough extensibility while providing "out of the box" capabilities.

Existing Solutions

Proposed Solution

NIORedis provides the essential types for interacting with RESP and building NIO Channel pipelines for communicating with Redis, with default implementations designed to cover most use cases.

RESPValue

RESPValue represents the different types outlined in Redis' protocol as an enum.

public enum RESPValue {
    case null
    case simpleString(String)
    case bulkString([UInt8])
    case error(RedisError)
    case integer(Int)
    case array([RESPValue])
}

RESPValueConvertible

RESPValue needs to be translatable between Swift values and RESP more easily and generically - which is provided by the RESPValueConvertible protocol.

public protocol RESPValueConvertible {
    init?(_ value: RESPValue)

    func convertedToRESPValue() -> RESPValue
}

Default conformance is provided for:

  • Optional where Wrapped: RESPValueConvertible
  • Array where Element: RESPValueConvertible
  • RedisError
  • RESPValue
  • String
  • FixedWidthInteger (Int, Int8, Int16, ...)
  • Double
  • Float

RedisClient

As a protocol, RedisClient just defines a type that is capable of sending commands and receiving responses.

public protocol RedisClient {
    var eventLoop: EventLoop { get }

    func send(command: String, with arguments: [RESPValueConvertible]) -> EventLoopFuture<RESPValue>
}

In general, this is the type to rely on for executing commands - all convenience command extensions are applied to this type so that conformers may gain the benefits of implementations for.

An example of this is RedisPipeline, which conforms to RedisClient in order to implement it's internal pipelining logic.

RedisConnection

As the primary connection type (and RedisClient implementation), RedisConnection works with NIO's ClientBootstrap to build a pipeline for executing commands in a request / response cycles.

It is designed to be long-lived; being re-used between commands as needed.

public final class RedisConnection: RedisClient {
    public var eventLoop: EventLoop { get }
    public var isConnected: Bool { get }

    public init(channel: Channel, logger: Logger = default)

    @discardableResult
    public func close() -> EventLoopFuture<Void>

    public func makePipeline() -> RedisPipeline

    public func send(command: String, with arguments: [RESPValueConvertible]) -> EventLoopFuture<RESPValue>
}

While RedisConnection can be created on the fly with just a Channel instance, it's more likely you won't have one and can instantiate a connection with the following static method:

extension RedisConnection {
    public static func connect(
        to socket: SocketAddress,
        with password: String? = nil,
        on eventLoopGroup: EventLoopGroup,
        logger: Logger = default) -> EventLoopFuture<RedisConnection>
}

Example usage of RedisConnection:

import NIORedis

// create a new event loop group
let elg = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { try! elg.syncShutdownGracefully() }

// create a new connection
let address = try SocketAddress(ipAddress: "127.0.0.1", port: 6379)
let connection = RedisConnection.connect(to: address, with: "password", on: elg.next()).wait()
defer { try! connection.close().wait() }

let value = connection.get("my_key")

RedisPipeline

Redis provides a decent overview of the benefits of providing pipelining, and NIORedis does this by
providing the RedisPipeline object, created from RedisConnection.makePipeline().

Roughly stated - a regular command through RedisConnection will be a "write and flush" operation, while RedisPipeline will write all of the commands, but not flush until execute() is called.

public final class RedisPipeline {
    /// Number of queued commands
    public var count: Int

    public init(channel: channel, Logger: Logger = default)

    /// Returns `self` to allow chaining
    @discardableResult
    public func enqueue<T>(operation: (RedisClient) -> EventLoopFuture<T>) -> RedisPipeline

    /// Drains the queue
    public func execute() -> EventLoopFuture<[RESPValue]>
}

An example of usage:

let connection = RedisConnection.connect(...)
let results = connection.makePipeline()
    .enqueue { $0.set("my_key", to: 1) }
    .enqueue { $0.get("my_key") }
    .enqueue { $0.increment("my_key") }
    .enqueue { $0.increment("my_key", by: 30) }
    .execute()
    .wait()
// results = [RESPValue]
// results[0].string == "OK"
// results[1].int == 1
// results[2].int == 2
// results[3].int == 32

RESPEncoder

As part of the pipeline, RESPValue needs to be translated to RESP byte streams, which is handled by the publicly available RESPEncoder.

This class conforms to NIO's MessageToByteEncoder protocol.

public final class RESPEncoder {
    public init(logger: Logger = default)

    public func encode(_ value: RESPValue, into buffer: inout ByteBuffer)
}

RESPDecoder

Inversely, RESPDecoder will translate RESP byte streams into RESPValues.

This also conforms to NIO's ByteToMessageDecoder protocol.

public final class RESPDecoder {
    public enum ParsingState {
        case notYetParsed
        case parsed(RESPValue)
    }

    public func parse(at position: inout Int, from buffer: inout ByteBuffer) throws -> ParsingState
}

RedisCommandHandler

Redis' request / response cycle is a 1:1 mapping, and at the end of the Redis channel pipeline is RedisCommandHandler to marshal the queue of incoming and outgoing messages.

As RedisCommandContext instances are written to the channel, RedisCommandHandler pulls out the information to store callbacks in a queue.

When responses from Redis return, callbacks are popped off the queue and given the response as RESPValues.

public struct RedisCommandContext {
    public let command: RESPValue
    public let responsePromise: EventLoopPromise<RESPValue>
}

open class RedisCommandHandler {
    public init(logger: Logger = default)
}

Maturity Justification

Until now, packages through the SSWG process have been accepted as Sandbox maturity - so it's appropriate to justify why NIORedis should be considered mature enough for Incubating.

This package supports:

  • ~90 of Redis' commands as convenient extension methods
  • 130+ unit tests, including RESP encoding / decoding and all command extensions

In addition, it meets the following criteria according to the SSWG Incubation Process:

As well as Vapor, and I, are in the processes of providing a (framework agnostic) higher-level library: RedisKit that Vapor has said they will be using as their implementation in version 4.

Seeking Feedback

  • API-wise, what are things you do or don't like?
  • Is there anything we could remove and still be happy?
  • Is there anything you think is missing before it can be accepted?
7 Likes

CC @Helge_Hess1/@tanner0101/@IanPartridge for the existing Redis drivers.

Thank you for your propsal!

I have a question about this function signature. This suggests that operation can return a value of any type but that surprises me because we can't transform any type into a RESPValue. Am I missing something?

Totally everybody can suggest libraries. To move forward to the later SSWG maturity stages there is a requirement of having a few committers and probably also a 'sponsor' from the SSWG just in case you'd walk away from your project or in case there's a bad security vulnerability whilst you're on a long holiday.

1 Like

The function signature does allow that possibility, but is documented as "undefined behavior" in a sense - as the purpose of this callback is to have access to the RedisClient for invoking commands.

RedisPipeline implements RedisClient to handle the enqueuing of commands through send(command:with:).

I wrote an issue to at least improve the ergonomics of execute(): nio-redis#28

I have a feeling that while working on that, the enqueue(operation:) signature will be more restrictive to prevent inadvertent API misuse

Yes, I think we should restrict the API to not permit undefined behaviour if at all possible. And in this case it seems like it's totally possible.

I'll update the proposal to include the different parts of the implementation, rather than a single repo.

I think we discussed that before, IMO it is nice if the protocol encoders/decoders are at least kept in separate modules.

I still haven't been convinced that splitting those out into a separate module or package is entirely necessary - especially as it adds an additional step to remember "I have to declare a dependency on two different packages, or two different modules rather than just one" without using @_exported

Also I'd very much prefer to see (and strongly recommend to build) a common NIO connection handling and pooling library before clients are added. Otherwise we end up with N different implementations. (If high level clients are to be added to NIO in the first place, which IMO is debatable, I'd prefer to include just protocol implementations in the spirit of NIOHTTP1/2/WebSocket).

This is true, but unless it's handled by NIO, or we agree to use something like NIOKit or SwiftNIO Extras I think it's "out of scope" for the purpose of any base driver library aside from building a connection.

I think this is incorrect. Redis strings should always be treated as arbitrary Data objects, even a simple string doesn't have to be UTF-8, could be Latin-1 as well. They call them "strings", but they are Data objects really.

Is this feedback to change this as to [UInt8] storage rather than String? I don't disagree with this, and can easily make the change.

I think it is desirable to keep this as a ByteBuffer/ByteBuffer slice. Again, just for reference I have this thing:

I remember this feedback during the pitch, and that's why I worked to switch .bulkString to [UInt8] from Data like it was. However, I disagree with going further to ByteBuffer.

My goal for RESPValue & RESPValueConvertible was to provide bridges and representations of RESP with Swift native values. Users who want a ByteBuffer can easily create one however they want using the [UInt8] while it doesn't push users who don't want to, to have to.

In addition, the use of a shared allocator floating as a global (even fileprivate) in order to create RESPValue on the fly just rubs me the wrong way that I can't quite articulate.

In regards to ContingousArray<RESPValue> I don't think that's necessary over [RESPValue] as the Standard Library says this:

If the array’s Element type is a struct or enumeration, Array and ContiguousArray should have similar efficiency.

All of the RESPValue cases have storage to struct types.

Well, so in the implementation itself, you will already have it as a ByteBuffer, free of charge. Converting it to [UInt8] (or String for that matter) will allocate a new storage and copy it. If a user would want to go back to a ByteBuffer, they would need to allocate & copy again.

So I think ByteBuffer would actually be a very good type here. We can offer convenience APIs on top which do the conversion to [UInt8] but then it'll be the user who opts in to allocate and copy. If however (as it's done right now) the library chooses to allocate and copy, the user can't take opt out of that and peak performance might suffer.

Alright, I've seen the reasoning for changing from [UInt8] for storage to ByteBuffer. I've written issue #31 to track this

2 Likes

As chatted about in the Vapor Discord, I'd like to add some feedback for RedisPipeline. So if I understand correctly, the main task of RedisPipeline is batching up the commands so you can get back an array of responses. So whilst the implementation of this feature might make use of pipelining, that's not what the user is after when using the API. A user really wants to send a bunch of commands and get the responses back as an array. Therefore, if we leave everything else as is, I think we should rename this RedisCommandBatcher or something.

RedisPipeline is also confusing in other ways: First, the type doesn't create a pipeline but may use pipelin ing. The more important thing is that a user might assume that the library doesn't make use of pipelining if the user doesn't use the RedisPipeline type but that's not the case, if the user enqueues two commands with a single flush, it will still be pipelined, the only difference is that their results don't come back as an array.

Quick question regarding this: I really like the above way of having a declarative way for a value. Is there are reason we shouldn't also have a declarative way for the command themselves? Something like

public enum RESPCommand {
    case get(RESPValueConvertible)
    case increment(RESPValueConvertible)
    case set(RESPValueConvertible, RESPValueConvertible)
    // ...
}

That way, batching of requests can become more natural and we don't need

public func enqueue<T>(operation: (RedisClient) -> EventLoopFuture<T>) -> RedisPipeline

where we have this weird operation: (RedisClient) -> EventLoopFuture<T> which I commented about before. Correct me if I'm wrong but I believe the T is just there because we need support all of the singular operations like $0.get(...) which already return an EventLoopFuture but of different types... If we created an enum RESPCommand we could have

connection.sendBatch([.get("foo"), .set("bar", 1), .increment("buz")])

and sendBatch would just take [RESPCommand].

1 Like

I've thought about this too - but held off until now because I didn't see a huge use case for it.

This discussion has shown that use case. I'll write an issue for this and update this comment with a link to it for tracking.

EDIT: NIORedis#33

1 Like

No, pipelining doesn't map to write/flush. Even if you do (for brevity written as a synchronous program)

write(request1); flush(); write(request2); flush(); write(request3); flush()

from the perspective of Redis, you still pipelined the requests. The exact TCP packets may or may not (depending on the TCP_NODELAY setting) differ.

If you truly don't want pipelining the program would look like

write(request1); flush(); waitForResponse(); write(request2); flush(); waitForResponse(); write(request3); flush();

Which means in an asynchronous world, you would need to implement some extra stuff to hold onto the N+1st command until we have the Nth command's response.

So whether we use pipelining is not determined by when we flush but rather if we wait for a response before sending the next request.

Said that,

write(request1); write(request2); write(request3); flush()

is clearly more efficient than

write(request1); flush(); write(request2); flush(); write(request3); flush()

But that's a separate issue. The only thing I was saying is that the main purpose of RedisPipeline is not whether to pipeline or not but rather whether to batch. And if we have a batching mode we can also make sure we don't flush more often than necessary.

Why not both APIs? I would absolutely recommend users to use the latter one (connection.get("key")).

Indeed, in NIO's case I think that's fine because NIO is a low-level tool built for people who write networking libraries which then offer the syntactical sugar on top. For the Redis library I think we should offer the nicest possible API, ie. enums internally but not in the simple user-facing APIs. If however the enums solve a particular issue that users might face too, we might just expose them too in more advanced APIs. So maybe

// nice convenience API
connection.get("key") -> EventLoopFuture<RespValue>
// more complicated API, used to implement the simple API (maybe `internal` only?)
connection.singular(.get("key")) ->  EventLoopFuture<RespValue>
// the underlying API that implements all others, may be public or not
connection.batched([.get("key"), .get("other-key"), .set("foo", "bar")])

For the Redis batching API case however, the enum could work. Just because the user might want to batch multiple thing that don't have the same return type and with the enum we can return them in one array without losing any type-safety.

You are right. Using RESPValue as a key for GET is wrong because Redis won't accept all RESP values as keys. We should solve this with an explicit RESPKey type, maybe something like below.

The enum RESPCommand itself, I would keep as closely aligned to the wire-protocol as possible, as type-safe as possible. Therefore, below I suggest a RESPKey which can represent all RESP keys. It could be backed by either a String or a ByteBuffer, whatever's more correct in the protocol. For convenience (users want String literals we should implement the various ExpressibleBy* protocols). So RESPKey would feel pretty much like a String but we could do some extra validation (ie. no \r\ns in there)

public struct RESPKey: ExpressibleByStringLiteral, CustomStringConvertible, ExpressibleByStringInterpolation {
    // similar to https://github.com/apple/swift-log/blob/master/Sources/Logging/Logging.swift#L433-L448
    [...]
}

public enum RESPCommand {
    case get(RESPKey)
    case set(RESPKey, RESPValue)
}

// because some users might just want to query with a String
extension RESPCommand {
    public static func get(_ string: String) -> RESPCommand {
        // warning: needlessly inefficient implementation because RESPKey will probably offer
        // a constructor directly from String. Just to show-case 
        return .get("\(string)" as RESPKey)
    }
}

let connection = ...

// convenience API
let result1: EventLoopFuture<RESPValue> = connection.get("foo")

 // directly construct a RESPKey using ExpressibleByStringLiteral/ExpressibleByStringInterpolation
let result2: EventLoopFuture<RESPValue> = connection.singular(.get("foo"))

// via "public static func get(_ string: String) -> RESPCommand"
let explicitlyStringKey: String = "foo"
let result3: EventLoopFuture<RESPValue> = connection.singular(.get(explicitlyStringKey)) 

EDIT: Sorry everybody, whilst I totally stand by the points made in this message I don't think it adds much to the API discussion at all, so you might just skip reading it :slight_smile:. The core message here is: First, we should design the best API, then we should look how to implement this in the most efficient way. I believe that NIO is flexible enough that any reasonable API can be implemented in the most flexible way, of course using things like Redis pipelining without surfacing the word/concept of 'pipelining' to the user.


Is it? If you don't set TCP_NODELAY, write; flush; write; flush if followed very quickly after each other will still end up in one packet (if it fits). If you don't disable Nagle's Algorithm by setting TCP_NODELAY the kernel will try to fit as much as possible into one packet. With NIO, you can disable TCP_NODELAY and more-or-less take control of the packets that get sent out. There's never a guarantee, TCP is a stream and not a message/datagram based protocol.

What you're missing is that even if you send it out in 3 packets very quickly followed by each other, Redis might still see this in one read. Redis (or any other TCP software) cannot reliably distinguish the number of write/send/... calls or the number of flush calls made with NIO, TCP is not framed. Making one write doesn't mean mean it comes out in one packet, making two writes doesn't mean it comes out in two packets (may be 1, may be 2, may be more). And on the receiving end, receiving something in one packet doesn't necessarily mean it comes out in one read and receiving something as two packets doesn't necessarily mean it doesn't come out in just one read.

Doesn't my example have a flush in between the write and the waitForResponse? I apologise if I missed it, yes, in NIO you need to flush for any data to be sent.

I understand what Redis is. And I understand why it's beneficial to make use of pipelining because it can reduce the amount of work Redis needs to do. But even write; flush; write; flush; write; flush in very quick succession is very likely to make use of Redis' pipelining support. Without TCP_NODELAY (which is the default) it's pretty much guaranteed, with TCP_NODELAY I'd still think it's likely. And again, I'm not arguing to not do the more efficient write; write; write; flush, I'm just saying this is not about pipelining being used or not. As I understand the main point of @Mordil's 'pipeline' API is to batch the results up in an array. But @Mordil will be able to tell us more.

Regarding pipelining more generally, check this image, pipelining means sending a subsequent request before receiving the previous response as outlined before.

I'm 100% happy if we don't need it.

Again, I don't think withPipelining is a great name because it implies that not using withPipelining does never pipeline (which isn't true) but otherwise, sure we can have such an API. But I still believe that the main point of the original RedisPipeline API is the returned array and not some potential efficiency gains. Sure, you can manually create that array (eg. using whenAllComplete) but it's less convenient for users that just getting the array in the first place. @Mordil will be able to tell us if it's about the returned array or some potential efficiency gains.

If it was just about when to flush you could also just add an optional autoFlush: Bool = true parameter to each command and write:

client.get("foo1", autoFlush: false) -> future
client.get("foo2", autoFlush: false) -> future
client.get("foo3", autoFlush: false) -> future
client.get("foo4") -> future

but I really don't think it's about the flushes :slight_smile:.

Good point, connection pooling is interesting for all sorts of client APIs: Redis, MySQL, Postgres, HTTP, ...

Awesome, then RESPKey should just be backed by a ByteBuffer but for convenience still be ExpressibleByString*.

The problems RedisPipeline (the type) was intended to solve was two fold:

  1. Sending a series of commands to Redis
  2. Reducing the overhead of an intended batch of commands as much as possible
    a. This solution ideally would be easily extensible to later have a public facing API for Redis' "transactions" by inserting the necessary commands before and after the user's.

I am leaning more towards the following:

class RedisConnection {
    func send(command: String, with: [RESPValueConvertible]) -> EventLoopFuture<RESPValue>
    func send(batch: (RedisClient) -> Void) -> EventLoopFuture<[RESPValue]>
}

// `RedisPipeline` would be deleted

This would remove the need for 3 types (RedisPipeline and the proposed RESPKey and RESPCommand), solve some of the concerns raised about the enqueuing syntax, and still supports the 2 1/2 points I listed above.

All that, while still not exposing much of the NIO implementation details such as "auto flush", ByteBuffer to execute commands, or other conversions.

fwiw, one thing we did when implementing redis clients in the past (not in swift) was to create a basic api that is "command" based, then generate command specific wrappers (eg get, set, inc, etc) since they can be generated using metadata. we arrived at this design because there are 300+ commands

1 Like

Okay, this sounds great. I would even go further and say: Let's design the best API first, only introducing the concepts a user knows they want to use. I believe that NIO is flexible enough so that we can turn something that the user knows into the most efficient way of sending the bytes (for example leveraging pipelining, without the user needing to specify this).

These "transactions" and batching are great examples of something that should be part of the API: The user knows if they want transactions (if they want 'isolation') and the a user knows when they want to batch requests (if they'd like to execute a few things and get the results back in one array).

Cool! I do however have a question how do use the batched API. If I understand correctly, the idea is that I do

let connection = ...
// singular API
let getResult = connection.send(command: "GET", ["foo"])
// batched API
connection.send { client in
    client.send(command: "GET", ["bar"]) /* what happens with the result ? */
    client.send(command: "SET", ["foo", "bar"])
    client.send(command: "GET", ["foo"])
}

Whilst that looks great API-wise I don't quite understand how the API works. In your first post, you show that the type of RedisClient.send is:

public protocol RedisClient {
    func send(command: String, with arguments: [RESPValueConvertible]) -> EventLoopFuture<RESPValue>
}

ie. each send operation returns an EventLoopFuture<RESPValue> which makes a lot of sense. What I don't get is how that is fitting within your batched API. How are you collecting all the results of the individual send calls that you make in the closure? They all return a future of their values and you need to 'collect' them. Did you maybe mean to write

class RedisConnection {
    /* [...] */
    func send(batch: (RedisClient) -> [EventLoopFuture<RESPValue>]) -> EventLoopFuture<[RESPValue]>
    //                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    //                                make the user return the RESPValues
}

? If yes, then do you want this API to be used this way?

let connection = ...
// batched API
let arrayOfResults: EventLoopFuture<[RESPValue]> = connection.send { client -> [EventLoopFuture<RESPValue>] in
    return [client.send(command: "GET", ["bar"]),
            client.send(command: "SET", ["foo", "bar"]),
            client.send(command: "GET", ["foo"])]
}

Please note, I'm not suggesting to change anything, I'm just trying to understand how this was intended.

And quick secondary question: What's the difference between RedisClient and RedisConnection, are they the same?

That sounds like a very reasonable plan to design to me given that it's 300+ commands :flushed:

Shoot, you caught a great mistake I made.

Below is an example that I'd be OK with:

class RedisConnection {
    func sendBatch(_ commands: (RedisClient) -> [EventLoopFuture<RESPValueConvertible>] -> EventLoopFuture<[RESPValueConvertible]>
    //                          ^^^^^^^^^^^
    //                       It could be argued that this should be `RedisConnection`,
    //                       since `RedisClient` has a weaker use case now,
    //                       and this isn't a protocol requirement
}

let connection = ...
// batched API
let results = connection.sendBatch { client in
    let first = client.sadd([1, 2, 3, 4, 5], to: "first")
    let second = client.sadd([3, 4, 5, 6], to: "second")
    return [first, second, client.sdiffstore(as: "third", sources: ["first", "second"])]
}
...
let first = results[0] as? Int
let second = results[1] as? Int
let third = results[2] as? Int

This will technically make it possible to chain futures to do their own transformations on data to return anything that is RESPValueConvertible, actually giving more freedom to the user without hidden implementation details, but it also makes it easier to work with the results.

RedisClient is a base protocol for sending commands through Redis, that should be used for generics and other function signatures as a form of type erasure in case you might be passed some other delegate object or implementation.

RedisConnection is just an "out of the box" implementation of RedisClient with other capabilities.

Originally, it was to allow RedisPipeline to be implemented as it has been, but now that it is slated for removal, there's less of a use case for the RedisClient defined this low level. If I was to split the current module into two, as NozeIO does, these types would be in the very base package:

  • RESPEncoder
  • RESPDecoder
  • RedisCommandHandler
    • RedisCommandContext
  • RESPValue
  • RESPValueConvertible
  • RedisError
  • RedisClient

then the "out of the box" implementation module would contain:

  • RedisConnection

Hi All, it's been a few days and I thought I should give a status update:

  • Migrating RESPValue .bulkString and .simpleString to use ByteBuffers has been completed: #34
    • Also with this, I took the time to re-evaluate implementations of the RESPEncoder and RESPDecoder
  • I've started work on re-implementing the feature of sending batch commands
  • These next two weeks are going to be busy for me personally, as for one week I'll be on vacation.

Barring further feedback, which is 100% welcomed, my goal is to have NIORedis ready for formal review by May - with the SSWG voting on it May 16th.

3 Likes