Streaming multiple payloads through a response

Yeah could be. I can't think of anything else. Maybe you need the Content-Type header?

Content-Type didn't do anything. My only guess is that my chunked encoding isn't correct in some way and so URLSession resets the header rather than producing an error. I don't think there are any knobs to set on the URLSession side to control this behavior.

Please don't do this, it won't work and you don't have to do the chunked encoding yourself. NIO does chunked encoding for you automatically. The only thing you need to do is to NOT set the content-length header.

The way you get NIO to automatically do chunked encoding is just to make sure you don't set either a content-length header or a transfer-encoding: chunked header. If you want to, you can set transfer-encoding: chunked but you don't have to, NIO defaults to that.

This looks very wrong in many ways:

  • if you want chunked, you must NOT set a content-length, whatever piece of software sets the content-length: 0 needs to stop doing this and needs to be fixed.
  • chunked is not at content-encoding but a transfer-encoding!

The correct response headers for streaming are

transfer-encoding: chunked

(and no content-length). NIO will automatically insert transfer-encoding: chunked if you do not specify either a transfer-encoding or a content-length. So the safest bet is to just set no headers related to content-encoding, transfer-encoding, or content-length. They aren't needed.

The only real use case for content-length is if you're streaming a file for the user to download. You know the size ahead of time and it's nice for the user to get a progress bar, so then setting a content-length and not doing any chunked encoding stuff is good.

There seems to be a bit of confusion here as well: Whether you use chunked encoding or a predefined content length does not determine if you're streaming or not. In fact, SwiftNIO can only stream. "Not streaming" to SwiftNIO is still streaming with just one HTTPServerResponsePart.body(...).
Said that, in many use cases for streaming, you do not know the actual content length ahead of time because you're producing it whilst sending the data. If that is the case, you obviously have to use transfer-encoding: chunked because you can't know the content-length ahead of time.

1 Like

I haven't looked at it recently, but this is an issue with Vapor, not NIO directly, so a lot of my issues here may just be be Vapor instead of NIO. But none of my experimentation got any sort of chunking working.

This didn't seem to be the case, but again, Vapor may have been interfering here.

That would be Vapor, which has fixed the issue.

Yes, I realized that later, it was just a typo.

My discussion around these APIs were related to Vapor's handling, not NIO's, and I'm not sure how the two interact. Vapor didn't seem to expose a way to properly stream chunked representations using their higher level APIs (e.g. Content-conforming values) but I haven't tried lately.

To be more specific to my use case: I need to send a stream of chunked payloads in such a way that it triggers URLSession's automatic handling (meaning multiple calls to my didReceiveData delegate, once for each payload). I can see this behavior from real servers like the Clash proxy's logging endpoint, I just haven't been able to replicate it using Vapor. Attempting to just write multiple payload buffers through the connection doesn't seem to trigger any sort of chunking, the payloads are all just concatenated together.

@tanner0101 can probably help here.

Ok, I do see what you mean and what you're asking for I think is impossible. But let me first of all rephrase what I assume you want, if that's not true, feel free to skip the rest.

I assume that you want one didReceiveData callback call per chunk. Ie. if somebody sends 6\r\nfoobar\r\n you expect one didReceiveData call (foobar) but if somebody were to send 3\r\nfoo\r\n3\r\nbar\r\n, you would expect too (foo, then bar). If that is the case, then that won't be possible. It is not semantic to the data in how many chunks it has been sent over the wire (or in fact if it has been sent as transfer-encoding: chunked or not). Every intermediary (be it a library or a proxy or whatever) can totally change the transfer-encoding. The RFC actually specifies this clearly (thanks @lukasa):

Unlike Content-Encoding (Section 3.1.2.1 of [RFC7231]),
Transfer-Encoding is a property of the message, not of the
representation, and any recipient along the request/response chain
MAY decode the received transfer coding(s) or apply additional
transfer coding(s) to the message body, assuming that corresponding
changes are made to the Transfer-Encoding field-value.

So a proxy/library may for example change 3\r\nfoo\r\n3\r\n\r\n to 6\r\nfoobar\r\n or even 1\r\nf\r\n5\r\noobar\r\n. They could even send it as content-length: 6.

Having the "right" of changing T-E is actually quite important. Imagine you're a proxy or URLSession or some other piece of software and an attacker sends you 40000000\r\n<20 MB of data>. This means they send you a chunk that is 1 GB large but they only send 20 MB of data, then they wait. To not trigger any of your timeouts, they'll send an extra byte per second. For this library/proxy it's now crucially important to "re-stream" this into 1400000\r\n<20 MB of data>\r\n and then 1\r\nX\r\n every time they receive another byte (X) in this example. If they were not allowed to "re-stream" they would need to sit on the memory (at least 20 MB but growing) until they have received the full 1 GB chunk. If the chunk size were semantic, you could very easily eat up a proxy's memory without even violating the protocol. And the same is true for URLSession. Should it sit on a "partial chunk" that has been received before handing it over to the user? No, it should just hand you what it got (and I'm pretty sure it will).

With some software you may still often see that chunks appear to be preserved. That is quite likely a side-effect. Most software (including NIO/Vapor) will send out the chunks as they receive it from a user (so they don't need to hold on data longer than necessary). So if a user does write("foo") followed by write("bar"), then it's quite likely that it's being sent out as 3\r\nfoo\r\n followed by 3\r\nbar\r\n. Depending on how efficient the software is, this may be sent over 1 or 2 TCP/IP packets. If the software is implemented not as efficiently as possible, it's likely that it's sent out as 2 TCP/IP packets which also makes it likely (but not guaranteed) that it's received in two separate read calls by the receiver. And usually, the receiver will try to hand over data it has received as fast as possible so it doesn't need to hold on to the memory longer than necessary. That means it's actually not unlikely (but far from guaranteed) that your library will call you twice (foo, then bar). But anything from 1 (foobar) to 6 times (f, o, o, b, a, r) is totally legal and can happen (even on localhost).

Long story short: If you receive the data (in however many didReceiveData calls), then the software is working correctly.

If you want to test that streaming is working properly (as opposed to some piece buffering the data somewhere), here's a recipe for such a test case:

  1. Create a HTTP echo server. That's a HTTP server that streams the received request body bytes straight into the response body.
  2. Start your HTTP client and send 1 byte only. Now you are guaranteed that you will get exactly one callback (because a byte can't be split), so didReceiveData should be called with exactly that one byte after it was echo'd back by the server. Once received, you send a second byte, and repeat.

See here for AsyncHTTPClient's test case that does this very recipe. Apologies, the test is quite long because it actually implements a full HTTP server with NIO in the test case.

@tanner0101 do you actually have a Vapor example of a basic streaming HTTP echo server? Pretty much exactly like NIOHTTP1Server's /dynamic/echo.

Thankfully this server is only meant to be a local testing server for Alamofire's unit tests, so I'm hoping that I can reliably control chunking in order to simulate these real world scenarios. I'll take a closer look at what I'm actually producing using WireShark to see if the chunking is actually in effect.

Is there some way to detect this event, rather than just writeing repeatedly?

I'll give single bytes a try.

Libraries are also intermediaries. So Vapor, SwiftNIO, and URLSession can (and will) change chunking.

That's a good idea but bear in mind that that'll just be one instance.

Yes. The idea is this

  • set up your HTTP echo server
  • set up URLSession for a streaming POST upload
  • send 1 request body byte and wait (do not send another byte)
  • wait until you received that 1 byte back from the HTTP echo server. You know there's only ever 1 byte in flight because you only sent one.
  • once you have received the byte (that you've previously sent in the request body) in the response body, send another request body byte
  • do this a couple of times.

(The unit test I sent implements exactly that)

This server is intended to replace our reliance on httpbin.org to give us greater performance and flexibility in testing realistic scenarios. In this case I'd like to test our DataStreamRequest's ability to receive multiple payloads in sequence. I can do this manually using the aforementioned Clash proxy's logging endpoint (a JSON payload is sent every second) and it works perfectly, I'd just like to create a replicable scenario in our test server. I'd prefer JSON payloads, but if single bytes would be the most reliable, I'll give that a shot.

So far I'm still having trouble streaming through Vapor's Response type, mainly enqueuing all of the write calls in a way that doesn't leak promises and crash.

Single bytes are the only way you can do that without implementing some form of framing. You can totally use JSON payloads too but you'd need to implement framing (for example each "JSON message" ends with a newline or so). So it's possible that URLSession would call you with only parts of a JSON message (you'd need to buffer until you see \n) or the opposite, URLSession calls you with two JSON messages in one callback.

Okay, this works with the Clash proxy just by chance. Because of the enormous 1s timeout, you'll just never see the situation where 2 messages get concatenated. And because it's actually unlikely (not impossible though) that things get fragmented on localhost you probably have only seen the case where you got exactly one JSON message per callback. But this isn't guaranteed, also I'm sure you don't actually want a unit test to take a whole second for that.

If you did want to replicate this with Vapor, you should absolutely be able to: Write your JSON, and a second later, schedule another write. But again, with single bytes you're not relying on change (no concatenation/fragmentation) and you don't actually need any waits at all. You just send a byte from your client, have the server echo it back, and then you send another byte from your client.

[I'm assuming you're not allocating any promises yourself because in most cases a user shouldn't have to.] This sounds like a severe bug in Vapor, I urge you to file it even if your repro isn't 100% stable. Any crash because of a promise leak is a bug in the library that allocated the promise. If the library chooses to allocate promises itself, it needs to make sure that it fulfils them in all cases. Allocating a promise and then not fulfilling it is the same as having a function that takes a completion block that in certain cases just never gets called.

I'm really unsure how I'm supposed to handle a chain of promises (write calls) inside a Response body stream. I can reduce it down to a single promise, but then what do I do with that?

app.on(.GET, "chunked", ":count") { request -> Response in
    guard let count = request.parameters["count", as: Int.self], count > 0, count <= 100 else {
        return Response(status: .badRequest)
    }

    let queue = EventLoopFutureQueue(eventLoop: request.eventLoop)
    let response = Response(body: .init(stream: { writer in
        (0..<count).reduce(request.eventLoop.future()) { _, i in
            queue.append(writer.write(.buffer(.init(integer: i))))
        }
    }, count: 0))

    return response
}

I don't know what an EventLoopFutureQueue does or is but your code is fine (*). What you have are just futures, there are no promises. The code you pasted is not responsible for an unfulfilled promise crash. Promise allocation looks like this: eventLoop.makePromise(of: SomeType.self).

(*) your code is an expensive version of just sending the byte representations of count integers in one go.

I've filed an issue with Vapor: NIO Promise Leak During Response Body Stream ¡ Issue #2389 ¡ vapor/vapor ¡ GitHub

EventLoopFutureQueue is just a way of enqueuing dependent futures. I was trying it just in case I needed the dependency chain, but it doesn't seem to make a difference. Replacing it with a simple for loop has the same behavior.

If I'm trying to simulate chunked behavior with single bytes, I don't want to send them "in one go"? I know I could build a ByteBuffer containing the entire result, but I'm trying to send a single byte at a time. Intuitively that seems like I should write each byte separately, but I could be wrong.

I understand but you're not waiting for any event before firing out the respective "next" integer which means they'll all be sent in very quick succession. So quick in fact that even if Vapor sends them out in multiple chunks, they might arrive close enough that URLSession sees them all in one read.

Said that, I wrote my first Vapor program (an HTTP echo server) and I can confirm that there's something not right with Vapor's streaming. It seems to wait for the full request body before it sends out response bytes.

But if you wanted to achieve what your Clash proxy does, that works just fine. You'll probably want to lower the delay.

import Vapor

var env = try Environment.detect()
try LoggingSystem.bootstrap(from: &env)

let app = Application(env)
defer { app.shutdown() }

app.on(.GET, "count-to-10")  { request -> Response in
    let r = Response(body: .init(stream: { writer in
        var counter = 0
        request.eventLoop.scheduleRepeatedTask(initialDelay: .seconds(1), delay: .seconds(1)) { repeatedTask in
            counter += 1
            guard counter <= 10 else {
                repeatedTask.cancel()
                writer.write(.end, promise: nil)
                return
            }
            writer.write(.buffer(.init(string: "\(counter)")), promise: nil)
        }
    }, count: 0))
    r.headers.remove(name: "content-length") // workaround for https://github.com/vapor/vapor/issues/2392
    return r
}

try app.run()

Thanks for that example. I can send chunked encoding using that method, according to WireShark, but I can't get separate data callbacks from URLSession no matter the delay. Perhaps it's a combination of payload size and time? I'll take another look later.

Thanks for the help!

@Jon_Shier So again, if it’s transfer-encoding:chunked or content-length:X should make no real difference to URLSession’s callback behaviour, it is semantically the same.
I wouldn’t expect it but URLSession may have some side effects that may allow you tell tell if was sent content-lenght or chunked but that wouldn’t be deliberate and may change in any version.

Now back to your actual question: is it possible that Foundation holds a certain amount of bytes in memory before calling out to you? Yes. I don’t know what URLSessions heuristic is but it won't buffer too many bytes before handing them to you. You may want to experiment with number of bytes and characters you send. For example sending a newline may help.

After you’ve found out when URLSession’s heuristic is you can just send the right bytes. Here's also a strategy you can use that's as reliable as sending a single byte back and forth and you still don't have to implement complicated streaming. Let's assume URLSession's heuristic is to buffer up to 16 bytes. Then you could just send {"34567":"BCD"}\n and in your didReceiveData you'd do

if receivedData.contains("\n") {
    self.uploadNextChunk()
}

Then sure, your 16 bytes may be arbitrarily "re-chunked" but we only take an action on the \n character so that doesn't matter at all.

Given the bug it doesn't seems straightforward to implement a HTTP echo server in Vapor today but I'm sure it'll be fixed soon. Until Vapor fixes that bug, you could just temporarily copy in a simple NIO HTTP echo server.

I wrote some example code for you that demonstrates the whole thing with URLSession and a NIO echo server. Just like you, I could see that URLSession doesn't immediately want to go into streaming mode. It seems that it caches up to 1000 bytes and then it starts streaming. To not hard-code that I made the echo server a little weird: It'll send 1024 bytes to start with (to poke URLSession into streaming mode). To not hard-code those 1024 bytes, it'll send another 1024 every 10 ms until it receives the first bit of the upload. Then it knows that URLSession is not ready to stream and we can communicate in arbitrarily small bits (1 byte either way is fine). The example send 10 ping/pongs back and forth and then exits. [My URLSession code is probably super terrible, my first time using it for something more complicated than downloading a few bytes into a Data].

import Foundation
import NIO
import NIOHTTP1

// MARK: - URLSession client

class PingPongy: NSObject, URLSessionTaskDelegate, URLSessionDataDelegate, URLSessionStreamDelegate, StreamDelegate {
    private let sessionQueue: DispatchQueue
    private let calloutQueue: DispatchQueue
    private let completionHandler: (Error?) -> Void
    private let url: URL
    private var meToBuffer: OutputStream? = nil // thread safety: on self.sessionQueue
    private var state = State.waitingForFirstBitsOfResponseStream

    private enum State {
        case waitingForFirstBitsOfResponseStream
        case waitingForDollarSign
        case pingPonging(counter: Int)
    }

    init(url: URL, calloutQueue: DispatchQueue, completionHandler: @escaping (Error?) -> Void) {
        self.url = url
        self.completionHandler = completionHandler
        self.sessionQueue = DispatchQueue(label: "URLSession-q", target: calloutQueue)
        self.calloutQueue = DispatchQueue(label: "callout-q", target: calloutQueue)
    }

    private func checkQueue() {
        dispatchPrecondition(condition: .onQueue(self.sessionQueue))
    }

    func start() {
        self.sessionQueue.async {
            self.doIt()
        }
    }

    private func doIt() {
        self.checkQueue()

        let sOQ = OperationQueue()
        sOQ.underlyingQueue = self.sessionQueue
        let session: URLSession = URLSession(configuration: .default,
                                             delegate: self,
                                             delegateQueue: sOQ)

        // https://developer.apple.com/documentation/foundation/url_loading_system/uploading_streams_of_data
        var bufferToURLSession: InputStream? = nil
        Stream.getBoundStreams(withBufferSize: 4096,
                               inputStream: &bufferToURLSession,
                               outputStream: &self.meToBuffer)

        var request = URLRequest(url: self.url,
                                 cachePolicy: .reloadIgnoringLocalCacheData,
                                 timeoutInterval: 10)
        request.httpBodyStream = bufferToURLSession
        request.httpMethod = "POST"

        self.meToBuffer!.delegate = self
        self.meToBuffer!.schedule(in: .current, forMode: .default)
        self.meToBuffer!.open()

        let uploadTask = session.dataTask(with: request)
        uploadTask.resume()
    }

    private func sendOne(_ string: String = "!") {
        self.checkQueue()

        var string = string
        let bytesSent = string.withUTF8 { ptr in
            self.meToBuffer!.write(ptr.baseAddress!, maxLength: ptr.count)
        }
        print("CLIENT: sent '\(string)', bytes transmitted: \(bytesSent)")
    }

    func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) {
        self.checkQueue()

        print("CLIENT: received response body: ",
              data.count > 20 ? "<too long>" : String(decoding: data, as: UTF8.self))

        switch self.state {
        case .waitingForFirstBitsOfResponseStream:
            if data.allSatisfy({ $0 == UInt8(ascii: "X") }) {
                self.state = .waitingForDollarSign
                self.sendOne("initial chunk")
            }
        case .waitingForDollarSign:
            // We're waiting for the dollar sign from the server which is when it switches into echo mode.
            if data.contains(UInt8(ascii: "$")) {
                print("CLIENT: starting ping/pong now")
                self.state = .pingPonging(counter: 1)
                self.sendOne("0!")
            }
        case .pingPonging(counter: let counter):
            self.state = .pingPonging(counter: counter + 1)
            if counter > 10 {
                print("CLIENT: === We did 10 rounds of back and forth, streaming is working, let's close ===")
                self.meToBuffer?.close()
                return
            }
            self.sendOne("\(counter)!")
        }
    }

    func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) {
        self.checkQueue()

        self.calloutQueue.async {
            self.completionHandler(error)
        }
    }
}

// MARK: - NIO Echo Server (with URLSession poking)

final class HTTPEchoServer: ChannelInboundHandler {
    typealias InboundIn = HTTPServerRequestPart
    typealias OutboundOut = HTTPServerResponsePart

    let urlSessionPokingBytes = ByteBuffer(repeating: UInt8(ascii: "X"), count: 1024)
    var pokingTask: Optional<RepeatedTask> = nil

    func startSendingRandomDataToMakeURLSessionStream(context: ChannelHandlerContext) {
        print("SERVER: starting to poke URLSession by sending \(self.urlSessionPokingBytes.readableBytes) bytes")
        self.pokingTask = context.eventLoop.scheduleRepeatedTask(initialDelay: .nanoseconds(0),
                                                                 delay: .milliseconds(10)) { repeatedTask in
            context.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(self.urlSessionPokingBytes)))).whenFailure { _ in
                repeatedTask.cancel()
            }
        }
    }

    @discardableResult
    func stopPokingURLSession() -> Bool {
        if let pokingTask = self.pokingTask {
            print("SERVER: stopping to poke URLSession")
            pokingTask.cancel()
            self.pokingTask = nil
            return true
        } else {
            return false
        }
    }

    func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        switch self.unwrapInboundIn(data) {
        case .head(let reqHead):
            print("SERVER: request head:", reqHead)
            let responseHead = HTTPServerResponsePart.head(.init(version: .init(major: 1, minor: 1),
                                                                 status: .ok,
                                                                 headers: ["request-uri": reqHead.uri]))
            context.writeAndFlush(self.wrapOutboundOut(responseHead), promise: nil)

            self.startSendingRandomDataToMakeURLSessionStream(context: context)
        case .body(let buffer):
            print("SERVER: request body:", String(buffer: buffer))
            context.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: nil)

            // Oh cool, we received something from URLSession so it must have switched to uploading.
            if self.stopPokingURLSession() {
                // Let's make it easy for the client and send a quick and recognisable message that we're now switching
                // to be a regular echo server...
                let dollarSign = context.channel.allocator.buffer(string: "$")
                context.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(dollarSign))), promise: nil)
            }
        case .end:
            self.stopPokingURLSession()

            print("SERVER: request end")
            context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
        }
    }

    func errorCaught(context: ChannelHandlerContext, error: Error) {
        print("SERVER: closing connection, unexpected error: \(error)")
        context.close(promise: nil)
    }

    func handlerRemoved(context: ChannelHandlerContext) {
        self.stopPokingURLSession()
    }
}

// MARK: - Driver

func runThatThing() throws {
    let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
    defer {
        try! group.syncShutdownGracefully()
    }
    let server = try ServerBootstrap(group: group)
        .childChannelInitializer { channel in
            channel.pipeline.configureHTTPServerPipeline().flatMap {
                channel.pipeline.addHandler(HTTPEchoServer())
            }
        }
        .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
        .bind(host: "127.0.0.1", port: 0).wait()
    defer {
        try! server.close().wait()
    }

    print("SERVER: up and running at \(server.localAddress!)")
    let dg = DispatchGroup()
    dg.enter()
    let pingPonger = PingPongy(url: URL(string: "http://127.0.0.1:\(server.localAddress!.port!)/")!,
                               calloutQueue: DispatchQueue(label: "foo")) { error in
        if let error = error {
            print("==> ERROR:", error)
        } else {
            print("==> OK, done")
        }
        dg.leave()
    }
    pingPonger.start()

    dg.wait()

}

try runThatThing()

an example output is

SERVER: up and running at [IPv4]127.0.0.1/127.0.0.1:59099
SERVER: request head: HTTPRequestHead { method: POST, uri: "/", version: HTTP/1.1, headers: [("Host", "127.0.0.1:59099"), ("Content-Type", "application/x-www-form-urlencoded"), ("Transfer-Encoding", "Chunked"), ("Accept", "*/*"), ("User-Agent", "foobar (unknown version) CFNetwork/1126 Darwin/19.5.0 (x86_64)"), ("Accept-Language", "en-gb"), ("Accept-Encoding", "gzip, deflate"), ("Connection", "keep-alive")] }
SERVER: starting to poke URLSession by sending 1024 bytes
CLIENT: received response body:  <too long>
CLIENT: sent 'initial chunk', bytes transmitted: 13
SERVER: request body: initial chunk
SERVER: stopping to poke URLSession
CLIENT: received response body:  initial chunk
CLIENT: received response body:  $
CLIENT: starting ping/pong now
CLIENT: sent '0!', bytes transmitted: 2
SERVER: request body: 0!
CLIENT: received response body:  0!
CLIENT: sent '1!', bytes transmitted: 2
SERVER: request body: 1!
CLIENT: received response body:  1!
CLIENT: sent '2!', bytes transmitted: 2
SERVER: request body: 2!
CLIENT: received response body:  2!
CLIENT: sent '3!', bytes transmitted: 2
SERVER: request body: 3!
CLIENT: received response body:  3!
CLIENT: sent '4!', bytes transmitted: 2
SERVER: request body: 4!
CLIENT: received response body:  4!
CLIENT: sent '5!', bytes transmitted: 2
SERVER: request body: 5!
CLIENT: received response body:  5!
CLIENT: sent '6!', bytes transmitted: 2
SERVER: request body: 6!
CLIENT: received response body:  6!
CLIENT: sent '7!', bytes transmitted: 2
SERVER: request body: 7!
CLIENT: received response body:  7!
CLIENT: sent '8!', bytes transmitted: 2
SERVER: request body: 8!
CLIENT: received response body:  8!
CLIENT: sent '9!', bytes transmitted: 2
SERVER: request body: 9!
CLIENT: received response body:  9!
CLIENT: sent '10!', bytes transmitted: 3
SERVER: request body: 10!
CLIENT: received response body:  10!
CLIENT: === We did 10 rounds of back and forth, streaming is working, let's close ===
SERVER: request end
==> OK, done

@Jon_Shier haha, so it turns out, the server just needs to set Content-Type: application/octet-stream for URLSession to stream directly. No more buffering, and one byte will work. The example can now be vastly simplified to:

import Foundation
import NIO
import NIOHTTP1
#if canImport(FoundationNetworking)
import FoundationNetworking
#endif

// MARK: - URLSession client

class PingPongy: NSObject, URLSessionTaskDelegate, URLSessionDataDelegate, URLSessionStreamDelegate {
    private let sessionQueue: DispatchQueue
    private let calloutQueue: DispatchQueue
    private let completionHandler: (Error?) -> Void
    private let url: URL
    private var meToBuffer: OutputStream? = nil // thread safety: on self.sessionQueue
    private var pingPongCounter = 1

    private enum State {
        case waitingForFirstBitsOfResponseStream
        case waitingForDollarSign
        case pingPonging(counter: Int)
    }

    init(url: URL, calloutQueue: DispatchQueue, completionHandler: @escaping (Error?) -> Void) {
        self.url = url
        self.completionHandler = completionHandler
        self.sessionQueue = DispatchQueue(label: "URLSession-q", target: calloutQueue)
        self.calloutQueue = DispatchQueue(label: "callout-q", target: calloutQueue)
    }

    private func checkQueue() {
        dispatchPrecondition(condition: .onQueue(self.sessionQueue))
    }

    func start() {
        self.sessionQueue.async {
            self.doIt()
        }
    }

    private func doIt() {
        self.checkQueue()

        let sOQ = OperationQueue()
        sOQ.underlyingQueue = self.sessionQueue
        let session: URLSession = URLSession(configuration: .default,
                                             delegate: self,
                                             delegateQueue: sOQ)

        // https://developer.apple.com/documentation/foundation/url_loading_system/uploading_streams_of_data
        var bufferToURLSession: InputStream? = nil
        Stream.getBoundStreams(withBufferSize: 4096,
                               inputStream: &bufferToURLSession,
                               outputStream: &self.meToBuffer)

        var request = URLRequest(url: self.url,
                                 cachePolicy: .reloadIgnoringLocalCacheData,
                                 timeoutInterval: 10)
        request.httpBodyStream = bufferToURLSession
        request.httpMethod = "POST"

        self.meToBuffer!.schedule(in: .current, forMode: .default)
        self.meToBuffer!.open()

        let uploadTask = session.dataTask(with: request)
        uploadTask.resume()
        self.sendOne("0!") // kick-start this
    }

    private func sendOne(_ string: String) {
        self.checkQueue()

        var string = string
        let bytesSent = string.withUTF8 { ptr in
            self.meToBuffer!.write(ptr.baseAddress!, maxLength: ptr.count)
        }
        print("CLIENT: sent '\(string)', bytes transmitted: \(bytesSent)")
    }

    func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) {
        self.checkQueue()

        print("CLIENT: received response body: ",
              data.count > 20 ? "<too long>" : String(decoding: data, as: UTF8.self))

        if self.pingPongCounter > 10 {
            print("CLIENT: === We did 10 rounds of back and forth, streaming is working, let's close ===")
            self.meToBuffer?.close()
            return
        }
        self.pingPongCounter += 1
        self.sendOne("\(self.pingPongCounter)!")
    }

    func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) {
        self.checkQueue()

        self.calloutQueue.async {
            self.completionHandler(error)
        }
    }
}

// MARK: - NIO HTTP Echo Server

final class HTTPEchoServer: ChannelInboundHandler {
    typealias InboundIn = HTTPServerRequestPart
    typealias OutboundOut = HTTPServerResponsePart

    func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        switch self.unwrapInboundIn(data) {
        case .head(let reqHead):
            print("SERVER: request head:", reqHead)
            let responseHead = HTTPServerResponsePart.head(.init(version: .init(major: 1, minor: 1),
                                                                 status: .ok,
                                                                 headers: ["request-uri": reqHead.uri,
                                                                           "Content-Type": "application/octet-stream"]))
            context.writeAndFlush(self.wrapOutboundOut(responseHead), promise: nil)
        case .body(let buffer):
            print("SERVER: request body:", String(buffer: buffer))
            context.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: nil)
        case .end:
            print("SERVER: request end")
            context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
        }
    }

    func errorCaught(context: ChannelHandlerContext, error: Error) {
        print("SERVER: closing connection, unexpected error: \(error)")
        context.close(promise: nil)
    }
}

// MARK: - Driver

func runThatThing() throws {
    let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
    defer {
        try! group.syncShutdownGracefully()
    }
    let server = try ServerBootstrap(group: group)
        .childChannelInitializer { channel in
            channel.pipeline.configureHTTPServerPipeline().flatMap {
                channel.pipeline.addHandler(HTTPEchoServer())
            }
        }
        .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
        .bind(host: "127.0.0.1", port: 0).wait()
    defer {
        try! server.close().wait()
    }

    print("SERVER: up and running at \(server.localAddress!)")
    let dg = DispatchGroup()
    dg.enter()
    let pingPonger = PingPongy(url: URL(string: "http://127.0.0.1:\(server.localAddress!.port!)/")!,
                               calloutQueue: DispatchQueue(label: "foo")) { error in
        if let error = error {
            print("==> ERROR:", error)
        } else {
            print("==> OK, done")
        }
        dg.leave()
    }
    pingPonger.start()

    dg.wait()
}

try runThatThing()

[@Jon_Shier whoops, I pasted the old version again, now updated without any hacks/workarounds.]

Yes, that seems to do it! I can do quick, bit by bit, stream tests, even with just a 1 millisecond delay. I'll investigate the behavior of larger payloads. Thanks for the investigation.

If you implement the ping/pong like in my example, you'll need 0 and you're guaranteed that two subsequent messages aren't in the same chunk (because the next chunk is only sent after the previous one has been received).

HTTP streaming improvements by tanner0101 ¡ Pull Request #2404 ¡ vapor/vapor ¡ GitHub and Immediate body streaming by tanner0101 ¡ Pull Request #2413 ¡ vapor/vapor ¡ GitHub have improved Vapor's HTTP streaming.

See the PRs for more details, but most importantly:

  • Response body stream's count parameter is now optional. Omit the parameter or pass -1 for streaming responses.
  • Request's body stream no longer delays the first chunk.

A simple echo server example has been added to Vapor's tests: vapor/PipelineTests.swift at main ¡ vapor/vapor ¡ GitHub

Pasting here for posterity:

app.on(.POST, "echo", body: .stream) { request -> Response in
    Response(body: .init(stream: { writer in
        request.body.drain { body in
            switch body {
            case .buffer(let buffer):
                return writer.write(.buffer(buffer))
            case .error(let error):
                return writer.write(.error(error))
            case .end:
                return writer.write(.end)
            }
        }
    }))
}