Asynchronous Swift serial port wrapper best approach?

I’m finding I may need to implement my own MODBUS communications library in Swift, but even if I do, I still run into the need to support cross-platform (macOS & Linux) serial port access, and the only way I can see to do that is to call the POSIX APIs for this, which end up being blocking C calls.

What I did before was create a DispatchQueue that I do async dispatches on within withCheckedThrowingContinuation blocks. This forces all reads & writes on the serial port to be serialized.

Is this the right approach (using the DispatchQueue)? It also seems that the serial port wrapper class should be an Actor, would you agree?

Assuming that much is correct, then the higher-level abstraction of implementing the MODBUS protocol involves writing a packet of bytes on the serial port, and then reading a response packet, verifying the checksum, and deserializing the data and handing it back to the caller. This MODBUS object should probably also be an Actor, and also a work queue to serialize these pairs of operations (write followed by read). The client of the MODBUS library, in this case, is a webserver that may make a number of MODBUS requests that aren't otherwise serialized. Does this seem correct?

I'm second-guessing myself in this design and I'm hoping I'm not missing some obvious other way to do this. TIA!

I think your original approach is totally fine. If you need concurrent reading and writing then you could use two queues instead of one.

1 Like

I generally use Dispatch I/O for this sort of thing. For the details, see the dispatch_io_create man page.

One trick there is to dup the descriptor so you can have separate input channel and output channels.

To be clear, I’m doing this on Apple platforms. AFAIK these APIs are present on other platforms but I’ve no direct experience with them.

Share and Enjoy

Quinn “The Eskimo!” @ DTS @ Apple

1 Like

I’m finally getting around to trying DispatchIO, but I have questions. My current need is this: I want to open a serial port and get called back whenever data arrives. I want to send data arbitrarily.

I'm trying various combinations of opening with O_NONBLOCK, opening the tty or cu, and VMIN and VTIME. Looking at the termios man page and experimenting, it seems what I want is:

  • Noncanonical mode
  • Non-blocking
  • VMIN = 0, VTIME = some timeout (I've been trying 25 * 10)

Unfortunately, my reads aren’t blocking as I would expect (I would expect them to block until the requested data is received, or 25 seconds have elapsed). The read closure is called immediately (with no data, unless some was sent before the call).

If I don’t pass O_NONBLOCK to open(), and try to open the tty, it blocks forever, I think waiting for some additional serial lines to be set.

If I try VMIN=1, then it will (theoretically) block forever until the first byte is received. I need this read to time out.

My intent is to create a queue to hold a task that loops over blocking read calls. If the calls don’t block, that will result in a very tight loop, wasting resources.

Questions

  1. How do I get my reads to block with a timeout? Even if I open without O_NONBLOCK (on a cu port), or use fcntl() to clear O_NONBLOCK, read never blocks for VMIN==0 and VTIME==50. If VMIN==1, the call never returns, even after a byte is received (because my reads are for 1024 bytes. It does return if I send 1024 bytes).
  2. What is the purpose of the queue passed to DispatchIO()? What work is being done on that queue?
  3. Does the queue passed to DispatchIO() have to be different from the queue I create to read?
  4. A problem I ran into with regular Posix read() calls is that if I try to close the port while a read is pending, the whole thing just seems to hang (quite hard, I can't break in the debugger, although I can kill the app). How do I stop a pending read?

Note that by "returns,” I mean the receive callback isn’t called.

Why might I want separate input and output channels? Can I also use DispatchIO(type:io:queue:cleanupHandler:)

My code is available here. The relevant files are Serial.swift and GroundStationApp.swift:29, 55, 61. I have two FTDI USB-serial adapters connected to each other. On one I run screen in the terminal, and this app connects to the other (the path is hard-coded here: GroundStationApp.swift:33). Note that the link to the repo is to the specific commit as it stands right now.

2. What is the purpose of the queue passed to DispatchIO(…)? What
work is being done on that queue?

It’s used to run the cleanup handler. Quoting the dispatch_io_create man page:

The provided cleanup_handler block will be submitted to the specified queue when all I/O operations on the channel have completed and it is closed or reaches the end of its lifecycle.

  1. Does the queue passed to DispatchIO(…) have to be different from the
    queue I create to read?

No.

  1. A problem I ran into with regular Posix read(…) calls is that if I
    try to close the port while a read is pending, the whole thing just
    seems to hang …. How do I stop a pending read?

Dispatch I/O uses something [1] to monitor for file descriptor readability, which means it won’t suffer from this problem because no thread is stuck within a synchronous blocking read(…). The above-mentioned man page has this to say:

The dispatch_io_close(…) function closes a dispatch I/O channel to new submissions of I/O operations. If DISPATCH_IO_STOP is passed in the flags parameter, the system will in addition not perform the I/O operations already submitted to the channel that are still pending and will make a best effort to interrupt any ongoing operations. Handlers for operations so affected will be passed the ECANCELED error code, along with any partial results.

Share and Enjoy

Quinn “The Eskimo!” @ DTS @ Apple

[1] Historically it was a kqueue, and it might well still be that. I’ve not looked at this in depth in a very long time.

1 Like

Thanks for all that. The thing I still can’t make work is a blocking read with timeout. Not even blocking, but waiting for the timeout to call my read closure (or until a byte comes in).

The thing I still can’t make work is a blocking read with timeout.

I’m not sure how blocking reads come into this. Dispatch I/O is async, always.

Not even blocking, but waiting for the timeout to call my read closure
(or until a byte comes in).

Once you’re layered on top of an async I/O primitive, timeouts are the combination of a timer (in this case a Dispatch timer source is the obvious candidate) and cancellation.

Share and Enjoy

Quinn “The Eskimo!” @ DTS @ Apple

Blocking in this sense: I need my code to be run when a) bytes come in or b) a timeout expires.

Using Posix read(), this would be a blocking call. I couldn't get that to time out. So I decided to try DispatchIO. But with it, my read callback is always called immediately, even if there is no data. If data later comes in, it is not called again. So I have to issue the DispatchIO read call again. If I do that in my read callback, then I'll essentially be in a tight loop calling read. That's not efficient.

Consider this test program:

import Foundation

func main() {
    let channel = DispatchIO(type: .stream, fileDescriptor: STDIN_FILENO, queue: .main) { _ in
        print("clean up")
    }
    channel.setLimit(lowWater: 1)
    channel.read(offset: 0, length: .max, queue: .main) { done, data, error in
        print("--")
        print("done:", done)
        if let data {
            print("data:", (Data(data) as NSData).debugDescription)
        }
        print("error:", error)
    }
    dispatchMain()
}

main()

When you run it, it ‘blocks’ waiting for data on stdin. When you type in some data, the handler runs and echoes that data. For example:

> % ./Test58994 
> Hello
< --
< done: false
< data: <48656c6c 6f0a>
< error: 0
> Cruel 
< --
< done: false
< data: <43727565 6c0a>
< error: 0
> World!
< --
< done: false
< data: <576f726c 64210a>
< error: 0
> ^D
< --
< done: true
< data: <>
< error: 0
> ^C

In this transcript, lines marked with > are my input and lines marked with < are the program’s output.

I put ‘blocks’ in sneer quotes above because the program doesn’t really block. Rather, the main thread parks (or terminates!) in dispatchMain() and Dispatch runs callbacks based on I/O activity. Going from this to a Swift async primitive — especially one that supports timeouts and cancellation — is feasible, but it’s certainly not a trivial task.

Share and Enjoy

Quinn “The Eskimo!” @ DTS @ Apple

2 Likes

Could the select function help you?

man select

SELECT(2) System Calls Manual SELECT(2)

NAME
FD_CLR, FD_COPY, FD_ISSET, FD_SET, FD_ZERO, select – synchronous I/O multiplexing

SYNOPSIS
#include <sys/select.h>

 void
 FD_CLR(fd, fd_set *fdset);

 void
 FD_COPY(fd_set *fdset_orig, fd_set *fdset_copy);

 int
 FD_ISSET(fd, fd_set *fdset);

 void
 FD_SET(fd, fd_set *fdset);

 void
 FD_ZERO(fd_set *fdset);

 int
 select(int nfds, fd_set *restrict readfds, fd_set *restrict writefds,
     fd_set *restrict errorfds, struct timeval *restrict timeout);

DESCRIPTION
select() examines the I/O descriptor sets whose addresses are passed in readfds,
writefds, and errorfds to see if some of their descriptors are ready for reading, are
ready for writing, or have an exceptional condition pending, respectively. The first
nfds descriptors are checked in each set; i.e., the descriptors from 0 through nfds-1 in
the descriptor sets are examined. (Example: If you have set two file descriptors "4"
and "17", nfds should not be "2", but rather "17 + 1" or "18".) On return, select()
replaces the given descriptor sets with subsets consisting of those descriptors that are
ready for the requested operation. select() returns the total number of ready
descriptors in all the sets.

 The descriptor sets are stored as bit fields in arrays of integers.  The following
 macros are provided for manipulating such descriptor sets: FD_ZERO(&fdset) initializes a
 descriptor set fdset to the null set.  FD_SET(fd, &fdset) includes a particular
 descriptor fd in fdset.  FD_CLR(fd, &fdset) removes fd from fdset.  FD_ISSET(fd, &fdset)
 is non-zero if fd is a member of fdset, zero otherwise.  FD_COPY(&fdset_orig,
 &fdset_copy) replaces an already allocated &fdset_copy file descriptor set with a copy
 of &fdset_orig.  The behavior of these macros is undefined if a descriptor value is less
 than zero or greater than or equal to FD_SETSIZE, which is normally at least equal to
 the maximum number of descriptors supported by the system.

 If timeout is not a null pointer, it specifies a maximum interval to wait for the
 selection to complete.

 If timeout is a null pointer, the select blocks indefinitely.
 ...

Nice. Expanding on @eskimo answer:

class DispatchReader {
    private let io: DispatchIO
    private var isReading = false
    private var continuation: CheckedContinuation<String, Error>!
    
    init(_ io: DispatchIO) {
        self.io = io
    }
    
    func getch() async throws -> String {
        if !isReading {
            isReading = true
            io.read(offset: 0, length: .max, queue: .main) { done, data, error in
                if let data {
                    let s = String(data: Data(data), encoding: .utf8)! // TODO: fix unsafe unwrap
                    self.continuation.resume(returning: s)
                } else {
                    self.continuation.resume(throwing: NSError())
                }
                self.continuation = nil
            }
        }
        return try await withCheckedThrowingContinuation { (c: CheckedContinuation<String, Error>) in
            continuation = c
        }
    }
}
func main() {
    let channel = DispatchIO(type: .stream, fileDescriptor: STDIN_FILENO, queue: .main) { (code: Int32) in
        print("clean up \(code)")
    }
    channel.setLimit(lowWater: 1)

The following bit would put stdin in non buffering / non echo mode:

    let file = channel.fileDescriptor // same as STDIN_FILENO
    var attributes = termios()
    tcgetattr(file, &attributes)
    attributes.c_lflag &= ~UInt(ICANON | ECHO)
    tcsetattr(file, TCSANOW, &attributes)

Finally:

    let reader = DispatchReader(channel)
    Task {
        while true {
            let ch = try await reader.getch()
            print("\(ch.uppercased())", terminator: "")
            fflush(stdout) // so we can see
        }
    }
    dispatchMain()
}

Cancellation / timeouts and proper error handling is to be added.

2 Likes

Could the select function help you?

If you were using pthreads as your concurrency abstraction then select is a fine choice for implementing a ‘read with cancellation and timeout’ primitive. However, it’s not a great match for Swift concurrency because it blocks the calling thread, and you have to avoid that [1] if you’re running on a thread from the Swift concurrency cooperative thread pool. Hence the focus on Dispatch I/O, which supports completion handlers, which you can connect to Swift concurrency using a CheckedContinuation.

Share and Enjoy

Quinn “The Eskimo!” @ DTS @ Apple

[1] At least in this context, where things can block for arbitrary amounts of time waiting for user input. Blocking Swift concurrency threads for short periods of time is kinda unavoidable, for example, when you take a page fault.

2 Likes

Cancellation / timeouts and proper error handling is to be added.

I’m reminded of the “draw the rest of the owl” meme (-:

Good luck!

Share and Enjoy

Quinn “The Eskimo!” @ DTS @ Apple

2 Likes

@eskimo’s example does not work for me.

If I call read() with length: .max and the serial port’s VMIN and VTIME = 0, then the callback is called immediately with 0 bytes (unless some had been buffered prior to the call, in which case it returns those bytes). It does not get called on subsequent arrival of data (but if I call it again, I get those bytes).

In other words, the system is not waiting for data to arrive and then calling my handler. It either gets called immediately with no data, or immediately with whatever was in the port buffer.

Various permutations of VMIN and VTIME don't improve thing—

CORRECTION: It seems the key is to set VMIN = 1, VTIME = 0, and self.channel?.setLimit(lowWater: 1). I was unaware of the lowWater limit until I saw @eskimo’s code.

@tera’s is a bit different, in that it requires looping over the getch() call, but my needs are better suited by @eskimo’s approach.

I still don’t know how to get Posix read() to block. but assuming DispatchIO works on Linux, it shouldn’t matter.

Caveat

One major open question remains: What happens when .max bytes are received? Is that a special value that ensures it will read forever, or will it eventually return done == true? (I don't think it matters, if it really can read 64 bits worth of bytes, since that will take millennia at the speeds I'm using, but at 32 bits it might be an issue. Sure, I can re-issue the read, but I’d like to know the expected behavior.)

In any case, thank you for helping me get this far.

The doc says "specify SIZE_MAX to continue reading data until an EOF is reached" but that can't be true as SIZE_MAX is UInt (0xFFFFFFFFFFFFFFFF) and the argument type is Int... the all bit pattern of SIZE_MAX should actually correspond to -1 ... not sure what is the actual check inside dispatch, but looks like -1 is the right constant to use (i.e. it's a bug in the documentation that should be changed to "specify -1 to continue reading data until an EOF is reached").

2 Likes

I spent far longer than I should have poring over the libdispatch sources. I didn’t try to build it, despite it having an Xcode project file, because it has complex extra steps.

But I’m not sure how it can build. I can't find a declaration for dispatch_operation_t, for example. In one part of the code, it references flags, which doesn’t seem to be declared in scope (nearby code references op->flags).

The only part I can find that compares the provided read length to anything significant is this:

			if (op->length < SIZE_MAX) {
				op->buf_siz = op->length - op->total;
				if (op->buf_siz > max_buf_siz) {
					op->buf_siz = max_buf_siz;
				}
			} else {
				op->buf_siz = max_buf_siz;
			}

Since I can’t find the declaration of dispatch_operation_t (the type of op above), I can only assume dispatch_operation_t.length is of type size_t, which is what the length parameter supplied to dispatch_io_read is.

The Swift wrapper for read() takes an Int for the length and calls dispatch_read. But the only definition of dispatch_read I can find is this:

@available(*, unavailable, renamed:"DispatchIO.read(fileDescriptor:length:queue:handler:)")
public func dispatch_read(_ fd: Int32, _ length: Int, _ queue: DispatchQueue, _ handler: @escaping (dispatch_data_t, Int32) -> Void)
{
	fatalError()
}

The .apinotes file has this:

- Name: dispatch_read
  SwiftPrivate: true

I don't know if there’s another definition somewhere that fixes up the Int.max.

In any case, I agree with you that there’s a bug. Not sure if in the docs or the code. I've filed FB13452731.

FWIW, I have had good results with the (now deprecated, but who cares) NSRunLoop abstraction for serial ports ­– on both Linux and Apple platforms. I've been using GitHub - Cornucopia-Swift/CornucopiaStreams: Cornucopia Streams Library to talk to various FTDI serial adapters.

Naturally I had my share of Foundation bugs and bogus behavior (not only on Linux, but also on Apple platforms), but with the occasional drop down to Posix, I got very stable communication running.

Perhaps digging through the sources will give you one or the other hint.

1 Like

The C version uses "unsigned long" for the length parameter. For example to read between 2GB and 4GB worth of data on a 32 platform you'd have to use

let size: UInt = a number between 2GB and 4GB
let length = Int(bitPattern: size)

which gets to to some negative number in the form of Int, but it will work "proper". So long as the number is not "-1" which would be treated as SIZE_MAX and act specially ("read until EOF").

Good luck with building dispatch.